diff --git a/app/cron_analyst_ticker.py b/app/cron_analyst_ticker.py index 907cce3..9274322 100755 --- a/app/cron_analyst_ticker.py +++ b/app/cron_analyst_ticker.py @@ -8,6 +8,7 @@ import time import sqlite3 import ujson import math +import statistics import os from dotenv import load_dotenv @@ -31,92 +32,84 @@ def remove_duplicates(data, key): def get_summary(res_list): - #Get Latest Summary of ratings from the last 12 months - # -Number of Analyst, -Price Target, -Consensus Rating - end_date = date.today() - start_date = end_date - timedelta(days=365) #end_date is today - filtered_data = [item for item in res_list if start_date <= datetime.strptime(item['date'], '%Y-%m-%d').date() <= end_date] + # Get the latest summary of ratings from the last 12 months + end_date = date.today() + start_date = end_date - timedelta(days=365) # end_date is today + + # Filter the data for the last 12 months + filtered_data = [item for item in res_list if start_date <= datetime.strptime(item['date'], '%Y-%m-%d').date() <= end_date] + + # Initialize dictionary to store the latest price target for each analyst + latest_pt_current = defaultdict(int) + + # Iterate through the filtered data to update the latest pt_current for each analyst + for item in filtered_data: + if 'adjusted_pt_current' in item and item['adjusted_pt_current']: + analyst_name = item['analyst_name'] + try: + pt_current_value = float(item['adjusted_pt_current']) + # Update with the maximum value for each analyst + if isinstance(pt_current_value, (float, int)): + latest_pt_current[analyst_name] = max(latest_pt_current.get(analyst_name, pt_current_value), pt_current_value) + except (ValueError, TypeError): + print(f"Invalid pt_current value for analyst '{analyst_name}': {item['pt_current']}") + + # Get the price target values + pt_current_values = list(latest_pt_current.values()) + + # Compute the median pt_current if there are values, otherwise set to 0 + median_pt_current = statistics.median(pt_current_values) if pt_current_values else 0 + #print("Median pt_current:", round(median_pt_current, 2)) - #Compute Average Price Target - latest_pt_current = defaultdict(int) - # Iterate through the data to update the latest pt_current for each analyst - for item in filtered_data: - if 'adjusted_pt_current' in item and item['adjusted_pt_current']: - analyst_name = item['analyst_name'] - # Convert pt_current to float and check if it's a valid number - try: - pt_current_value = float(item['pt_current']) - # Check if the value is float or int - if isinstance(pt_current_value, (float, int)): - # Initialize the analyst entry if it doesn't exist - if analyst_name not in latest_pt_current: - latest_pt_current[analyst_name] = pt_current_value - else: - # Update with the maximum value - latest_pt_current[analyst_name] = max(latest_pt_current[analyst_name], pt_current_value) - except (ValueError, TypeError): - print(f"Invalid pt_current value for analyst '{analyst_name}': {item['pt_current']}") + consensus_ratings = defaultdict(str) + # Define the rating hierarchy + rating_hierarchy = {'Strong Sell': 0, 'Sell': 1, 'Hold': 2, 'Buy': 3, 'Strong Buy': 4} + # Iterate through the data to update the consensus rating for each analyst + for item in filtered_data: + if 'rating_current' in item and item['rating_current'] and 'analyst_name' in item and item['analyst_name']: + try: + analyst_name = item['analyst_name'] + current_rating = item['rating_current'] + if current_rating in rating_hierarchy: + consensus_ratings[analyst_name] = current_rating + except: + pass - # Compute the average pt_current based on the latest values - pt_current_values = list(latest_pt_current.values()) - average_pt_current = sum(pt_current_values) / len(pt_current_values) if pt_current_values else 0 + # Compute the consensus rating based on the most frequent rating among analysts + consensus_rating_counts = defaultdict(int) + for rating in consensus_ratings.values(): + consensus_rating_counts[rating] += 1 - #print("Average pt_current:", round(average_pt_current, 2)) + consensus_rating = max(consensus_rating_counts, key=consensus_rating_counts.get) + #print("Consensus Rating:", consensus_rating) + # Sum up all Buy, Sell, Hold for the progress bar in sveltekit + # Convert defaultdict to regular dictionary + data_dict = dict(consensus_rating_counts) + # Sum up 'Strong Buy' and 'Buy' + buy_total = data_dict.get('Strong Buy', 0) + data_dict.get('Buy', 0) + # Sum up 'Strong Sell' and 'Sell' + sell_total = data_dict.get('Strong Sell', 0) + data_dict.get('Sell', 0) + hold_total = data_dict.get('Hold', 0) - # Compute Consensus Rating - consensus_ratings = defaultdict(str) - # Define the rating hierarchy - rating_hierarchy = {'Strong Sell': 0, 'Sell': 1, 'Hold': 2, 'Buy': 3, 'Strong Buy': 4} + unique_analyst_names = set() + numOfAnalyst = 0 - # Iterate through the data to update the consensus rating for each analyst - for item in filtered_data: - if 'rating_current' in item and item['rating_current'] and 'analyst_name' in item and item['analyst_name']: - try: - analyst_name = item['analyst_name'] - current_rating = item['rating_current'] - if current_rating in rating_hierarchy: - consensus_ratings[analyst_name] = current_rating - except: - pass + for item in filtered_data: + if item['analyst_name'] not in unique_analyst_names: + unique_analyst_names.add(item['analyst_name']) + numOfAnalyst += 1 + #print("Number of unique analyst names:", numOfAnalyst) - # Compute the consensus rating based on the most frequent rating among analysts - consensus_rating_counts = defaultdict(int) - for rating in consensus_ratings.values(): - consensus_rating_counts[rating] += 1 + stats = {'numOfAnalyst': numOfAnalyst, 'consensusRating': consensus_rating, 'priceTarget': round(median_pt_current, 2)} + categorical_ratings = {'Buy': buy_total, 'Sell': sell_total, 'Hold': hold_total} + + res = {**stats, **categorical_ratings} + return res - consensus_rating = max(consensus_rating_counts, key=consensus_rating_counts.get) - #print("Consensus Rating:", consensus_rating) - - #Sum up all Buy,Sell,Hold for the progress bar in sveltekit - # Convert defaultdict to regular dictionary - data_dict = dict(consensus_rating_counts) - - # Sum up 'Strong Buy' and 'Buy' - buy_total = data_dict.get('Strong Buy', 0) + data_dict.get('Buy', 0) - - # Sum up 'Strong Sell' and 'Sell' - sell_total = data_dict.get('Strong Sell', 0) + data_dict.get('Sell', 0) - hold_total = data_dict.get('Hold', 0) - - - unique_analyst_names = set() - numOfAnalyst = 0 - - for item in filtered_data: - if item['analyst_name'] not in unique_analyst_names: - unique_analyst_names.add(item['analyst_name']) - numOfAnalyst += 1 - #print("Number of unique analyst names:", numOfAnalyst) - - stats = {'numOfAnalyst': numOfAnalyst, 'consensusRating': consensus_rating, 'priceTarget': round(average_pt_current, 2)} - categorical_ratings = {'Buy': buy_total, 'Sell': sell_total, 'Hold': hold_total} - - res = {**stats, **categorical_ratings} - return res def run(chunk,analyst_list): end_date = date.today() @@ -247,7 +240,7 @@ try: chunk_size = len(stock_symbols) // 40 # Divide the list into N chunks chunks = [stock_symbols[i:i + chunk_size] for i in range(0, len(stock_symbols), chunk_size)] - #chunks = [['NVDA']] + #chunks = [['CMG']] for chunk in chunks: run(chunk, analyst_stats_list)