backend/app/cron_analyst_db.py
2024-06-09 10:38:40 +02:00

294 lines
9.9 KiB
Python
Executable File

from benzinga import financial_data
import requests
from datetime import datetime
import numpy as np
from scipy.stats import norm
import time
import sqlite3
import ujson
import os
from dotenv import load_dotenv
from tqdm import tqdm
import pandas as pd
from collections import Counter
load_dotenv()
api_key = os.getenv('BENZINGA_API_KEY')
fin = financial_data.Benzinga(api_key)
headers = {"accept": "application/json"}
# Define a function to remove duplicates based on a key
def remove_duplicates(data, key):
seen = set()
new_data = []
for item in data:
if item[key] not in seen:
seen.add(item[key])
new_data.append(item)
return new_data
def extract_sector(ticker, con):
query_template = f"""
SELECT
sector
FROM
stocks
WHERE
symbol = ?
"""
try:
df = pd.read_sql_query(query_template, con, params=(ticker,))
sector = df['sector'].iloc[0]
except:
sector = None
return sector
def calculate_rating(data):
overall_average_return = float(data['avgReturn'])
overall_success_rate = float(data['successRate'])
total_ratings = int(data['totalRatings'])
last_rating = data['lastRating']
try:
last_rating_date = datetime.strptime(last_rating, "%Y-%m-%d")
difference = (datetime.now() - last_rating_date).days
except:
difference = 1000 # In case of None
if total_ratings == 0 or difference >= 600:
return 0
else:
# Define weights for each factor
weight_return = 0.4
weight_success_rate = 0.3
weight_total_ratings = 0.1
weight_difference = 0.2 # Reduced weight for difference
# Calculate weighted sum
weighted_sum = (weight_return * overall_average_return +
weight_success_rate * overall_success_rate +
weight_total_ratings * total_ratings +
weight_difference * (1 / (1 + difference))) # Adjusted weight for difference
# Normalize the weighted sum to get a rating between 0 and 5
min_rating = 0
max_rating = 5
normalized_rating = min(max(weighted_sum / (weight_return + weight_success_rate + weight_total_ratings + weight_difference), min_rating), max_rating)
if normalized_rating >= 4:
if total_ratings < 10:
normalized_rating -= 2.4
elif total_ratings < 15:
normalized_rating -= 2.5
elif total_ratings < 20:
normalized_rating -= 0.75
elif total_ratings < 30:
normalized_rating -= 1
elif overall_average_return <=10:
normalized_rating -=1.1
'''
if overall_average_return <= 0 and overall_average_return >= -5:
normalized_rating = min(normalized_rating - 2, 0)
elif overall_average_return < -5 and overall_average_return >= -10:
normalized_rating = min(normalized_rating - 3, 0)
else:
normalized_rating = min(normalized_rating - 4, 0)
'''
if overall_average_return <= 0:
normalized_rating = min(normalized_rating - 2, 0)
normalized_rating = max(normalized_rating, 0)
return round(normalized_rating, 2)
def get_analyst_ratings(analyst_id):
url = "https://api.benzinga.com/api/v2.1/calendar/ratings"
res_list = []
for page in range(0,5):
try:
querystring = {"token":api_key,"parameters[analyst_id]": analyst_id, "page": str(page), "pagesize":"1000"}
response = requests.request("GET", url, headers=headers, params=querystring)
data = ujson.loads(response.text)['ratings']
res_list +=data
time.sleep(2)
except:
break
return res_list
def get_all_analyst_stats():
url = "https://api.benzinga.com/api/v2.1/calendar/ratings/analysts"
res_list = []
for _ in range(0,20): #Run the api N times because not all analyst are counted Bug from benzinga
for page in range(0,100):
try:
querystring = {"token":api_key,"page": f"{page}", 'pagesize': "1000"}
response = requests.request("GET", url, headers=headers, params=querystring)
data = ujson.loads(response.text)['analyst_ratings_analyst']
res_list+=data
except:
break
time.sleep(5)
res_list = remove_duplicates(res_list, 'id') # remove duplicates of analyst
res_list = [item for item in res_list if item.get('ratings_accuracy', {}).get('total_ratings', 0) != 0]
final_list = []
for item in res_list:
analyst_dict = {
'analystName': item['name_full'],
'companyName': item['firm_name'],
'analystId': item['id'],
'firmId': item['firm_id']
}
stats_dict = {
'avgReturn': item['ratings_accuracy'].get('overall_average_return', 0),
'successRate': item['ratings_accuracy'].get('overall_success_rate', 0),
'totalRatings': item['ratings_accuracy'].get('total_ratings', 0),
}
final_list.append({**analyst_dict,**stats_dict})
return final_list
def get_top_stocks():
with open(f"json/analyst/all-analyst-data.json", 'r') as file:
analyst_stats_list = ujson.load(file)
filtered_data = [item for item in analyst_stats_list if item['analystScore'] >= 5]
res_list = []
for item in filtered_data:
ticker_list = item['ratingsList']
ticker_list = [{'ticker': i['ticker'], 'pt_current': i['pt_current']} for i in ticker_list if i['rating_current'] == 'Strong Buy']
if len(ticker_list) > 0:
#res_list += list(set(ticker_list))
res_list += ticker_list
# Create a dictionary to store ticker occurrences and corresponding pt_current values
ticker_data = {}
for item in res_list:
ticker = item['ticker']
pt_current_str = item['pt_current']
if pt_current_str: # Skip empty strings
pt_current = float(pt_current_str)
if ticker in ticker_data:
ticker_data[ticker]['sum'] += pt_current
ticker_data[ticker]['counter'] += 1
else:
ticker_data[ticker] = {'sum': pt_current, 'counter': 1}
for ticker, info in ticker_data.items():
try:
with open(f"json/quote/{ticker}.json", 'r') as file:
res = ujson.load(file)
info['price'] = res.get('price', None)
info['name'] = res.get('name', None)
info['marketCap'] = res.get('marketCap', None)
except:
info['price'] = None
info['name'] = None
info['marketCap'] = None
# Calculate average pt_current for each ticker
for ticker, info in ticker_data.items():
info['average'] = round(info['sum'] / info['counter'],2)
# Convert the dictionary back to a list format
result = [{'ticker': ticker, 'upside': round((info['average']/info.get('price')-1)*100, 2) if info.get('price') else None, 'priceTarget': info['average'], 'price': info['price'], 'counter': info['counter'], 'name': info['name'], 'marketCap': info['marketCap']} for ticker, info in ticker_data.items()]
result = [item for item in result if item['upside'] is not None and item['upside'] >= 5 and item['upside'] <= 250] #filter outliners
result_sorted = sorted(result, key=lambda x: x['counter'] if x['counter'] is not None else float('-inf'), reverse=True)
for rank, item in enumerate(result_sorted):
item['rank'] = rank+1
with open(f"json/analyst/top-stocks.json", 'w') as file:
ujson.dump(result_sorted, file)
if __name__ == "__main__":
#Step1 get all analyst id's and stats
analyst_list = get_all_analyst_stats()
print('Number of analyst:', len(analyst_list))
#Step2 get rating history for each individual analyst and score the analyst
for item in tqdm(analyst_list):
data = get_analyst_ratings(item['analystId'])
item['ratingsList'] = data
item['totalRatings'] = len(data) #true total ratings, which is important for the score
item['lastRating'] = data[0]['date'] if len(data) > 0 else None
item['numOfStocks'] = len({item['ticker'] for item in data})
stats_dict = {
'avgReturn': item.get('avgReturn', 0),
'successRate': item.get('successRate', 0),
'totalRatings': item.get('totalRatings', 0),
'lastRating': item.get('lastRating', None),
}
item['analystScore'] = calculate_rating(stats_dict)
try:
con = sqlite3.connect('stocks.db')
print('Start extracting main sectors')
for item in tqdm(analyst_list):
ticker_list = [entry['ticker'] for entry in item['ratingsList']]
sector_list = []
for ticker in ticker_list:
sector = extract_sector(ticker, con)
sector_list.append(sector)
sector_counts = Counter(sector_list)
main_sectors = sector_counts.most_common(3)
main_sectors = [item[0] for item in main_sectors if item[0] is not None]
item['mainSectors'] = main_sectors
con.close()
except Exception as e:
print(e)
analyst_list = sorted(analyst_list, key=lambda x: float(x['analystScore']), reverse=True)
number_of_all_analysts = len(analyst_list)
for rank, item in enumerate(analyst_list):
item['rank'] = rank+1
item['numOfAnalysts'] = number_of_all_analysts
item['avgReturn'] = round(float(item['avgReturn']),2)
item['successRate'] = round(float(item['successRate']),2)
with open(f"json/analyst/analyst-db/{item['analystId']}.json", 'w') as file:
ujson.dump(item, file)
#Save top 100 analysts
top_analysts_list = []
#Drop the element ratingsList for the top 100 analysts list
for item in analyst_list[0:100]:
top_analysts_list.append({
'analystName': item['analystName'],
'analystId': item['analystId'],
'rank': item['rank'],
'analystScore': item['analystScore'],
'companyName': item['companyName'],
'successRate': item['successRate'],
'avgReturn': item['avgReturn'],
'totalRatings': item['totalRatings'],
'lastRating': item['lastRating'],
'mainSectors': item['mainSectors']
})
with open(f"json/analyst/top-analysts.json", 'w') as file:
ujson.dump(top_analysts_list, file)
#Save all analyst data in raw form for the next step
with open(f"json/analyst/all-analyst-data.json", 'w') as file:
ujson.dump(analyst_list, file)
#Save top stocks with strong buys from 5 star analysts
get_top_stocks()