From c8fe9fb2554974d5bf7364a1333f215802b0044c Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Fri, 21 Feb 2025 23:08:35 +0100 Subject: [PATCH] update top analyst summary --- app/cron_analyst_ticker.py | 204 ++++++++++++++++++++++++++++++++----- app/main.py | 19 +++- 2 files changed, 197 insertions(+), 26 deletions(-) diff --git a/app/cron_analyst_ticker.py b/app/cron_analyst_ticker.py index 772d7e7..1da24a4 100755 --- a/app/cron_analyst_ticker.py +++ b/app/cron_analyst_ticker.py @@ -8,6 +8,7 @@ from scipy.stats import norm import time import sqlite3 import ujson +import orjson import math import statistics import os @@ -49,11 +50,6 @@ def filter_latest_entries(data): # Return only the latest entries return [entry for _, entry in latest_entries.values()] -# Example usage -# filtered_list = remove_duplicate_names(your_original_list) -# Example usage -# filtered_list = remove_duplicate_ids(your_original_list) - # Define a function to remove duplicates based on a key def remove_duplicates(data, key): @@ -67,18 +63,16 @@ def remove_duplicates(data, key): -def get_summary(res_list): +def get_all_analyst_summary(res_list): # Get the latest summary of ratings from the last 12 months end_date = date.today() - - # Filter the data for the last 12 months and consider the last N ratings #Furthermore consider only the last rating of the analyst if he provided multiple in the last 12 months #filtered data is needed for the recommendation list filtered_data = [item for item in res_list if start_date_12m <= datetime.strptime(item['date'], '%Y-%m-%d').date() <= end_date] #unique list is needed for analyst summary rating - unique_filtered_data = filter_latest_entries(filtered_data)[:30] + unique_filtered_data = filter_latest_entries(filtered_data)[:60] # Initialize dictionary to store the latest price target for each analyst latest_pt_current = defaultdict(list) @@ -199,6 +193,134 @@ def get_summary(res_list): res = {**stats, **categorical_ratings} return res +def get_top_analyst_summary(res_list): + # Get the latest summary of ratings from the last 12 months + end_date = date.today() + res_list = [item for item in res_list if item['analystScore'] >= 4] + # Filter the data for the last 12 months and consider the last N ratings + #Furthermore consider only the last rating of the analyst if he provided multiple in the last 12 months + #filtered data is needed for the recommendation list + filtered_data = [item for item in res_list if start_date_12m <= datetime.strptime(item['date'], '%Y-%m-%d').date() <= end_date] + #unique list is needed for analyst summary rating + unique_filtered_data = filter_latest_entries(filtered_data) + + print(unique_filtered_data) + # Initialize dictionary to store the latest price target for each analyst + latest_pt_current = defaultdict(list) + + # Iterate through the filtered data to collect pt_current for each analyst + for item in unique_filtered_data: + if 'adjusted_pt_current' in item and item['adjusted_pt_current']: + analyst_name = item['analyst_name'] + try: + pt_current_value = float(item['adjusted_pt_current']) + # Collect all pt_current values for each analyst + latest_pt_current[analyst_name].append(pt_current_value) + except (ValueError, TypeError): + print(f"Invalid pt_current value for analyst '{analyst_name}': {item['adjusted_pt_current']}") + + # Compute statistics for price targets + pt_current_values = [val for sublist in latest_pt_current.values() for val in sublist] + #remove outliers to keep high and low price target reasonable + q1, q3 = np.percentile(pt_current_values, [25, 75]) + iqr = q3 - q1 + pt_current_values = [x for x in pt_current_values if (q1 - 1.5 * iqr) <= x <= (q3 + 1.5 * iqr)] + + # Compute different price target metrics if there are values, otherwise set to 0 + if pt_current_values: + median_pt_current = statistics.median(pt_current_values) + avg_pt_current = statistics.mean(pt_current_values) + low_pt_current = min(pt_current_values) + high_pt_current = max(pt_current_values) + else: + median_pt_current = avg_pt_current = low_pt_current = high_pt_current = 0 + + # Initialize recommendation tracking + rating_hierarchy = {'Strong Sell': 0, 'Sell': 1, 'Hold': 2, 'Buy': 3, 'Strong Buy': 4} + + # Track monthly recommendations + monthly_recommendations = {} + + # Iterate through the filtered data to track monthly recommendations + for item in filtered_data: + # Extract month from the date + item_date = datetime.strptime(item['date'], '%Y-%m-%d') + month_key = item_date.strftime('%Y-%m-01') + + # Initialize month's recommendation counts if not exists + if month_key not in monthly_recommendations: + monthly_recommendations[month_key] = { + 'Strong Sell': 0, + 'Sell': 0, + 'Hold': 0, + 'Buy': 0, + 'Strong Buy': 0 + } + + # Check and increment recommendation count for the month + if 'rating_current' in item and item['rating_current'] in rating_hierarchy: + monthly_recommendations[month_key][item['rating_current']] += 1 + + # Convert monthly recommendations to a sorted list + recommendation_list = [] + for month in sorted(monthly_recommendations.keys()): + month_data = monthly_recommendations[month] + recommendation_list.append({ + 'date': month, + 'Strong Sell': month_data['Strong Sell'], + 'Sell': month_data['Sell'], + 'Hold': month_data['Hold'], + 'Buy': month_data['Buy'], + 'Strong Buy': month_data['Strong Buy'] + }) + + # Compute consensus ratings (similar to previous implementation) + consensus_ratings = defaultdict(str) + for item in unique_filtered_data: + if 'rating_current' in item and item['rating_current'] and 'analyst_name' in item and item['analyst_name']: + try: + analyst_name = item['analyst_name'] + current_rating = item['rating_current'] + if current_rating in rating_hierarchy: + consensus_ratings[analyst_name] = current_rating + except: + pass + + # Compute the consensus rating based on the most frequent rating among analysts + consensus_rating_counts = defaultdict(int) + for rating in consensus_ratings.values(): + consensus_rating_counts[rating] += 1 + consensus_rating = max(consensus_rating_counts, key=consensus_rating_counts.get) + + # Sum up all Buy, Sell, Hold for the progress bar in sveltekit + data_dict = dict(consensus_rating_counts) + buy_total = data_dict.get('Strong Buy', 0) + data_dict.get('Buy', 0) + sell_total = data_dict.get('Strong Sell', 0) + data_dict.get('Sell', 0) + hold_total = data_dict.get('Hold', 0) + + # Count unique analysts + unique_analyst_names = set() + numOfAnalyst = len(unique_filtered_data) + + + # Update stats dictionary with new keys including recommendationList + stats = { + 'numOfAnalyst': numOfAnalyst, + 'consensusRating': consensus_rating, + 'medianPriceTarget': round(median_pt_current, 2), + 'avgPriceTarget': round(avg_pt_current, 2), + 'lowPriceTarget': round(low_pt_current, 2), + 'highPriceTarget': round(high_pt_current, 2), + 'recommendationList': recommendation_list + } + + categorical_ratings = {'Buy': buy_total, 'Sell': sell_total, 'Hold': hold_total} + + res = {**stats, **categorical_ratings} + return res + + + def run(chunk, analyst_list, con): start_date = datetime(2015, 1, 1) end_date_str = end_date.strftime('%Y-%m-%d') @@ -216,6 +338,23 @@ def run(chunk, analyst_list, con): break res_list = [item for item in res_list if item.get('analyst_name')] + with open(f"json/analyst/all-analyst-data.json", 'r') as file: + raw_analyst_list = orjson.loads(file.read()) + + #add analystScore to each analyst name + #if score is not available for some reason replace it with 0 + # Build a mapping of analyst names to scores. + analyst_scores = {raw_item.get('analystName'): raw_item.get('analystScore', 0) + for raw_item in raw_analyst_list} + + # Update each item in res_list using the precomputed mapping. + for item in res_list: + try: + # Use .get() on the dictionary to return 0 if the key is missing. + item['analystScore'] = analyst_scores.get(item.get('analyst_name'), 0) + except Exception: + item['analystScore'] = 0 + for ticker in chunk: try: ticker_filtered_data = [item for item in res_list if item['ticker'] == ticker] @@ -269,8 +408,9 @@ def run(chunk, analyst_list, con): except: pass - summary = get_summary(ticker_filtered_data) - + all_analyst_summary = get_all_analyst_summary(ticker_filtered_data) + top_analyst_summary = get_top_analyst_summary(ticker_filtered_data) + try: # Add historical price for the last 12 months query = query_template.format(ticker=ticker) @@ -279,24 +419,38 @@ def run(chunk, analyst_list, con): df_12m_last_per_month = df_12m.groupby(df_12m['date'].dt.to_period('M')).tail(1) past_price_list = [{"date": row['date'].strftime('%Y-%m-%d'), "close": row['close']} for _, row in df_12m_last_per_month.iterrows()] - summary["pastPriceList"] = past_price_list + all_analyst_summary["pastPriceList"] = past_price_list + top_analyst_summary["pastPriceList"] = past_price_list except: - summary["pastPriceList"] = [] + all_analyst_summary["pastPriceList"] = [] + top_analyst_summary["pastPriceList"] = [] + + + file_path = f"json/analyst/summary/all_analyst/{ticker}.json" + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, 'w') as file: + ujson.dump(all_analyst_summary, file) + + file_path = f"json/analyst/summary/top_analyst/{ticker}.json" + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, 'w') as file: + ujson.dump(top_analyst_summary, file) + - # Get ratings of each analyst - with open(f"json/analyst/summary/{ticker}.json", 'w') as file: - ujson.dump(summary, file) for item1 in ticker_filtered_data: for item2 in analyst_stats_list: - if item1['analyst'] == item2['companyName'] and item1['analyst_name'] == item2['analystName']: - item1['analystId'] = item2['analystId'] - item1['analystScore'] = item2['analystScore'] - break - elif item1['analyst_name'] == item2['analystName']: - item1['analystId'] = item2['analystId'] - item1['analystScore'] = item2['analystScore'] - break + try: + if item1['analyst'] == item2['companyName'] and item1['analyst_name'] == item2['analystName']: + item1['analystId'] = item2['analystId'] + item1['analystScore'] = item2['analystScore'] + break + elif item1['analyst_name'] == item2['analystName']: + item1['analystId'] = item2['analystId'] + item1['analystScore'] = item2['analystScore'] + break + except: + pass desired_keys = ['date', 'action_company', 'rating_current', 'adjusted_pt_current', 'adjusted_pt_prior', 'analystId', 'analystScore', 'analyst', 'analyst_name'] @@ -325,7 +479,7 @@ try: chunk_size = len(stock_symbols) // 300 # Divide the list into N chunks chunks = [stock_symbols[i:i + chunk_size] for i in range(0, len(stock_symbols), chunk_size)] - #chunks = [['NVDA']] + chunks = [['AAPL']] for chunk in chunks: run(chunk, analyst_stats_list, con) diff --git a/app/main.py b/app/main.py index 8edc216..a5cdcd9 100755 --- a/app/main.py +++ b/app/main.py @@ -1213,7 +1213,24 @@ async def get_analyst_rating(data: TickerData, api_key: str = Security(get_api_k if cached_result: return orjson.loads(cached_result) try: - with open(f"json/analyst/summary/{ticker}.json", 'rb') as file: + with open(f"json/analyst/summary/all_analyst/{ticker}.json", 'rb') as file: + res = orjson.loads(file.read()) + except: + res = {} + + redis_client.set(cache_key, orjson.dumps(res)) + redis_client.expire(cache_key, 60*60) # Set cache expiration time to 1 day + return res + +@app.post("/top-analyst-summary-rating") +async def get_analyst_rating(data: TickerData, api_key: str = Security(get_api_key)): + ticker = data.ticker.upper() + cache_key = f"top-analyst-summary-rating-{ticker}" + cached_result = redis_client.get(cache_key) + if cached_result: + return orjson.loads(cached_result) + try: + with open(f"json/analyst/summary/top_analyst/{ticker}.json", 'rb') as file: res = orjson.loads(file.read()) except: res = {}