from benzinga import financial_data import requests from datetime import datetime import numpy as np from scipy.stats import norm import time import sqlite3 import ujson import os from dotenv import load_dotenv from tqdm import tqdm import pandas as pd from collections import Counter load_dotenv() api_key = os.getenv('BENZINGA_API_KEY') fin = financial_data.Benzinga(api_key) headers = {"accept": "application/json"} # Define a function to remove duplicates based on a key def remove_duplicates(data, key): seen = set() new_data = [] for item in data: if item[key] not in seen: seen.add(item[key]) new_data.append(item) return new_data def extract_sector(ticker, con): query_template = f""" SELECT sector FROM stocks WHERE symbol = ? """ try: df = pd.read_sql_query(query_template, con, params=(ticker,)) sector = df['sector'].iloc[0] except: sector = None return sector def calculate_rating(data): overall_average_return = float(data['avgReturn']) overall_success_rate = float(data['successRate']) total_ratings = int(data['totalRatings']) last_rating = data['lastRating'] try: last_rating_date = datetime.strptime(last_rating, "%Y-%m-%d") difference = (datetime.now() - last_rating_date).days except: difference = 1000 # In case of None if total_ratings == 0 or difference >= 600: return 0 else: # Define weights for each factor weight_return = 0.4 weight_success_rate = 0.3 weight_total_ratings = 0.1 weight_difference = 0.2 # Reduced weight for difference # Calculate weighted sum weighted_sum = (weight_return * overall_average_return + weight_success_rate * overall_success_rate + weight_total_ratings * total_ratings + weight_difference * (1 / (1 + difference))) # Adjusted weight for difference # Normalize the weighted sum to get a rating between 0 and 5 min_rating = 0 max_rating = 5 normalized_rating = min(max(weighted_sum / (weight_return + weight_success_rate + weight_total_ratings + weight_difference), min_rating), max_rating) if normalized_rating >= 4: if total_ratings < 10: normalized_rating -= 2.4 elif total_ratings < 15: normalized_rating -= 2.5 elif total_ratings < 20: normalized_rating -= 0.75 elif total_ratings < 30: normalized_rating -= 1 elif overall_average_return <=10: normalized_rating -=1.1 ''' if overall_average_return <= 0 and overall_average_return >= -5: normalized_rating = min(normalized_rating - 2, 0) elif overall_average_return < -5 and overall_average_return >= -10: normalized_rating = min(normalized_rating - 3, 0) else: normalized_rating = min(normalized_rating - 4, 0) ''' if overall_average_return <= 0: normalized_rating = min(normalized_rating - 2, 0) normalized_rating = max(normalized_rating, 0) return round(normalized_rating, 2) def get_analyst_ratings(analyst_id): url = "https://api.benzinga.com/api/v2.1/calendar/ratings" res_list = [] for page in range(0,5): try: querystring = {"token":api_key,"parameters[analyst_id]": analyst_id, "page": str(page), "pagesize":"1000"} response = requests.request("GET", url, headers=headers, params=querystring) data = ujson.loads(response.text)['ratings'] res_list +=data time.sleep(2) except: break return res_list def get_all_analyst_stats(): url = "https://api.benzinga.com/api/v2.1/calendar/ratings/analysts" res_list = [] for _ in range(0,20): #Run the api N times because not all analyst are counted Bug from benzinga for page in range(0,100): try: querystring = {"token":api_key,"page": f"{page}", 'pagesize': "1000"} response = requests.request("GET", url, headers=headers, params=querystring) data = ujson.loads(response.text)['analyst_ratings_analyst'] res_list+=data except: break time.sleep(5) res_list = remove_duplicates(res_list, 'id') # remove duplicates of analyst res_list = [item for item in res_list if item.get('ratings_accuracy', {}).get('total_ratings', 0) != 0] final_list = [] for item in res_list: analyst_dict = { 'analystName': item['name_full'], 'companyName': item['firm_name'], 'analystId': item['id'], 'firmId': item['firm_id'] } stats_dict = { 'avgReturn': item['ratings_accuracy'].get('overall_average_return', 0), 'successRate': item['ratings_accuracy'].get('overall_success_rate', 0), 'totalRatings': item['ratings_accuracy'].get('total_ratings', 0), } final_list.append({**analyst_dict,**stats_dict}) return final_list def get_top_stocks(): with open(f"json/analyst/all-analyst-data.json", 'r') as file: analyst_stats_list = ujson.load(file) filtered_data = [item for item in analyst_stats_list if item['analystScore'] >= 5] res_list = [] for item in filtered_data: ticker_list = item['ratingsList'] ticker_list = [{'ticker': i['ticker'], 'pt_current': i['pt_current']} for i in ticker_list if i['rating_current'] == 'Strong Buy'] if len(ticker_list) > 0: #res_list += list(set(ticker_list)) res_list += ticker_list # Create a dictionary to store ticker occurrences and corresponding pt_current values ticker_data = {} for item in res_list: ticker = item['ticker'] pt_current_str = item['pt_current'] if pt_current_str: # Skip empty strings pt_current = float(pt_current_str) if ticker in ticker_data: ticker_data[ticker]['sum'] += pt_current ticker_data[ticker]['counter'] += 1 else: ticker_data[ticker] = {'sum': pt_current, 'counter': 1} for ticker, info in ticker_data.items(): try: with open(f"json/quote/{ticker}.json", 'r') as file: res = ujson.load(file) info['price'] = res.get('price', None) info['name'] = res.get('name', None) info['marketCap'] = res.get('marketCap', None) except: info['price'] = None info['name'] = None info['marketCap'] = None # Calculate average pt_current for each ticker for ticker, info in ticker_data.items(): info['average'] = round(info['sum'] / info['counter'],2) # Convert the dictionary back to a list format result = [{'ticker': ticker, 'upside': round((info['average']/info.get('price')-1)*100, 2) if info.get('price') else None, 'priceTarget': info['average'], 'price': info['price'], 'counter': info['counter'], 'name': info['name'], 'marketCap': info['marketCap']} for ticker, info in ticker_data.items()] result = [item for item in result if item['upside'] is not None and item['upside'] >= 5 and item['upside'] <= 250] #filter outliners result_sorted = sorted(result, key=lambda x: x['counter'] if x['counter'] is not None else float('-inf'), reverse=True) for rank, item in enumerate(result_sorted): item['rank'] = rank+1 with open(f"json/analyst/top-stocks.json", 'w') as file: ujson.dump(result_sorted, file) if __name__ == "__main__": #Step1 get all analyst id's and stats analyst_list = get_all_analyst_stats() print('Number of analyst:', len(analyst_list)) #Step2 get rating history for each individual analyst and score the analyst for item in tqdm(analyst_list): data = get_analyst_ratings(item['analystId']) item['ratingsList'] = data item['totalRatings'] = len(data) #true total ratings, which is important for the score item['lastRating'] = data[0]['date'] if len(data) > 0 else None item['numOfStocks'] = len({item['ticker'] for item in data}) stats_dict = { 'avgReturn': item.get('avgReturn', 0), 'successRate': item.get('successRate', 0), 'totalRatings': item.get('totalRatings', 0), 'lastRating': item.get('lastRating', None), } item['analystScore'] = calculate_rating(stats_dict) try: con = sqlite3.connect('stocks.db') print('Start extracting main sectors') for item in tqdm(analyst_list): ticker_list = [entry['ticker'] for entry in item['ratingsList']] sector_list = [] for ticker in ticker_list: sector = extract_sector(ticker, con) sector_list.append(sector) sector_counts = Counter(sector_list) main_sectors = sector_counts.most_common(3) main_sectors = [item[0] for item in main_sectors if item[0] is not None] item['mainSectors'] = main_sectors con.close() except Exception as e: print(e) analyst_list = sorted(analyst_list, key=lambda x: float(x['analystScore']), reverse=True) number_of_all_analysts = len(analyst_list) for rank, item in enumerate(analyst_list): item['rank'] = rank+1 item['numOfAnalysts'] = number_of_all_analysts item['avgReturn'] = round(float(item['avgReturn']),2) item['successRate'] = round(float(item['successRate']),2) with open(f"json/analyst/analyst-db/{item['analystId']}.json", 'w') as file: ujson.dump(item, file) #Save top 100 analysts top_analysts_list = [] #Drop the element ratingsList for the top 100 analysts list for item in analyst_list[0:100]: top_analysts_list.append({ 'analystName': item['analystName'], 'analystId': item['analystId'], 'rank': item['rank'], 'analystScore': item['analystScore'], 'companyName': item['companyName'], 'successRate': item['successRate'], 'avgReturn': item['avgReturn'], 'totalRatings': item['totalRatings'], 'lastRating': item['lastRating'], 'mainSectors': item['mainSectors'] }) with open(f"json/analyst/top-analysts.json", 'w') as file: ujson.dump(top_analysts_list, file) #Save all analyst data in raw form for the next step with open(f"json/analyst/all-analyst-data.json", 'w') as file: ujson.dump(analyst_list, file) #Save top stocks with strong buys from 5 star analysts get_top_stocks()