backend/app/cron_analyst_db.py
2024-10-04 20:01:23 +02:00

310 lines
11 KiB
Python
Executable File

import requests
from datetime import datetime
import numpy as np
from scipy.stats import norm
import time
import sqlite3
import ujson
import os
from dotenv import load_dotenv
from tqdm import tqdm
import pandas as pd
from collections import Counter
import aiohttp
import asyncio
load_dotenv()
api_key = os.getenv('BENZINGA_API_KEY')
headers = {"accept": "application/json"}
# Define a function to remove duplicates based on a key
def remove_duplicates(data, key):
seen = set()
new_data = []
for item in data:
if item[key] not in seen:
seen.add(item[key])
new_data.append(item)
return new_data
def extract_sector(ticker, con):
query_template = f"""
SELECT
sector
FROM
stocks
WHERE
symbol = ?
"""
try:
df = pd.read_sql_query(query_template, con, params=(ticker,))
sector = df['sector'].iloc[0]
except:
sector = None
return sector
def calculate_rating(data):
overall_average_return = float(data['avgReturn'])
overall_success_rate = float(data['successRate'])
total_ratings = int(data['totalRatings'])
last_rating = data['lastRating']
try:
last_rating_date = datetime.strptime(last_rating, "%Y-%m-%d")
difference = (datetime.now() - last_rating_date).days
except:
difference = 1000 # In case of None or invalid date
if total_ratings == 0 or difference >= 600:
return 0
else:
# Define weights for each factor
weight_return = 0.4
weight_success_rate = 0.3
weight_total_ratings = 0.1
weight_difference = 0.2 # Reduced weight for difference
# Calculate weighted sum
weighted_sum = (weight_return * overall_average_return +
weight_success_rate * overall_success_rate +
weight_total_ratings * total_ratings +
weight_difference * (1 / (1 + difference))) # Adjusted weight for difference
# Normalize the weighted sum to get a rating between 0 and 5
min_rating = 0
max_rating = 5
normalized_rating = min(max(weighted_sum / (weight_return + weight_success_rate + weight_total_ratings + weight_difference), min_rating), max_rating)
# Apply additional conditions based on total ratings and average return
if normalized_rating >= 4:
if total_ratings < 10:
normalized_rating -= 2.4
elif total_ratings < 15:
normalized_rating -= 2.5
elif total_ratings < 20:
normalized_rating -= 0.75
elif total_ratings < 30:
normalized_rating -= 1
elif overall_average_return <= 10:
normalized_rating -= 1.1
if overall_average_return <= 0:
normalized_rating = max(normalized_rating - 2, 0)
# Cap the rating if the last rating is older than 30 days
if difference > 30:
normalized_rating = min(normalized_rating, 4.5)
return round(normalized_rating, 2)
def get_top_stocks():
with open(f"json/analyst/all-analyst-data.json", 'r') as file:
analyst_stats_list = ujson.load(file)
filtered_data = [item for item in analyst_stats_list if item['analystScore'] >= 5]
res_list = []
for item in filtered_data:
ticker_list = item['ratingsList']
ticker_list = [{'ticker': i['ticker'], 'pt_current': i['pt_current']} for i in ticker_list if i['rating_current'] == 'Strong Buy']
if len(ticker_list) > 0:
#res_list += list(set(ticker_list))
res_list += ticker_list
# Create a dictionary to store ticker occurrences and corresponding pt_current values
ticker_data = {}
for item in res_list:
ticker = item['ticker']
pt_current_str = item['pt_current']
if pt_current_str: # Skip empty strings
pt_current = float(pt_current_str)
if ticker in ticker_data:
ticker_data[ticker]['sum'] += pt_current
ticker_data[ticker]['counter'] += 1
else:
ticker_data[ticker] = {'sum': pt_current, 'counter': 1}
for ticker, info in ticker_data.items():
try:
with open(f"json/quote/{ticker}.json", 'r') as file:
res = ujson.load(file)
info['price'] = res.get('price', None)
info['name'] = res.get('name', None)
info['marketCap'] = res.get('marketCap', None)
except:
info['price'] = None
info['name'] = None
info['marketCap'] = None
# Calculate average pt_current for each ticker
for ticker, info in ticker_data.items():
info['average'] = round(info['sum'] / info['counter'],2)
# Convert the dictionary back to a list format
result = [{'ticker': ticker, 'upside': round((info['average']/info.get('price')-1)*100, 2) if info.get('price') else None, 'priceTarget': info['average'], 'price': info['price'], 'counter': info['counter'], 'name': info['name'], 'marketCap': info['marketCap']} for ticker, info in ticker_data.items()]
result = [item for item in result if item['upside'] is not None and item['upside'] >= 5 and item['upside'] <= 250] #filter outliners
result_sorted = sorted(result, key=lambda x: x['counter'] if x['counter'] is not None else float('-inf'), reverse=True)
for rank, item in enumerate(result_sorted):
item['rank'] = rank+1
with open(f"json/analyst/top-stocks.json", 'w') as file:
ujson.dump(result_sorted, file)
async def get_analyst_ratings(analyst_id, session):
url = "https://api.benzinga.com/api/v2.1/calendar/ratings"
res_list = []
for page in range(5):
try:
querystring = {
"token": api_key,
"parameters[analyst_id]": analyst_id,
"page": str(page),
"pagesize": "1000"
}
async with session.get(url, headers=headers, params=querystring) as response:
data = await response.json()
ratings = data.get('ratings', [])
if not ratings:
break # Stop fetching if no more ratings
res_list += ratings
except Exception as e:
#print(f"Error fetching page {page} for analyst {analyst_id}: {e}")
break
return res_list
async def get_all_analyst_stats():
url = "https://api.benzinga.com/api/v2.1/calendar/ratings/analysts"
res_list = []
async with aiohttp.ClientSession() as session:
tasks = [
session.get(url, headers=headers, params={"token": api_key, "page": str(page), 'pagesize': "1000"})
for page in range(100)
]
# Gather responses concurrently
responses = await asyncio.gather(*tasks)
# Process each response
for response in responses:
if response.status == 200: # Check for successful response
try:
data = ujson.loads(await response.text())['analyst_ratings_analyst']
res_list += data
except Exception as e:
pass
print(len(res_list))
# Remove duplicates of analysts and filter based on ratings accuracy
res_list = remove_duplicates(res_list, 'id')
res_list = [item for item in res_list if item.get('ratings_accuracy', {}).get('total_ratings', 0) != 0]
# Construct the final result list
final_list = [{
'analystName': item['name_full'],
'companyName': item['firm_name'],
'analystId': item['id'],
'firmId': item['firm_id'],
'avgReturn': item['ratings_accuracy'].get('overall_average_return', 0),
'successRate': item['ratings_accuracy'].get('overall_success_rate', 0),
'totalRatings': item['ratings_accuracy'].get('total_ratings', 0),
} for item in res_list]
return final_list
async def process_analyst(item, session):
data = await get_analyst_ratings(item['analystId'], session)
item['ratingsList'] = data
item['totalRatings'] = len(data) # True total ratings
item['lastRating'] = data[0]['date'] if data else None
item['numOfStocks'] = len({d['ticker'] for d in data})
# Stats dictionary for calculating score
stats_dict = {
'avgReturn': item.get('avgReturn', 0),
'successRate': item.get('successRate', 0),
'totalRatings': item['totalRatings'],
'lastRating': item['lastRating'],
}
item['analystScore'] = calculate_rating(stats_dict)
async def get_single_analyst_data(analyst_list):
async with aiohttp.ClientSession() as session:
tasks = [process_analyst(item, session) for item in analyst_list]
for task in tqdm(asyncio.as_completed(tasks), total=len(analyst_list)):
await task
async def run():
#Step1 get all analyst id's and stats
analyst_list = await get_all_analyst_stats()
print('Number of analyst:', len(analyst_list))
#Step2 get rating history for each individual analyst and score the analyst
await get_single_analyst_data(analyst_list)
try:
con = sqlite3.connect('stocks.db')
print('Start extracting main sectors')
for item in tqdm(analyst_list):
ticker_list = [entry['ticker'] for entry in item['ratingsList']]
sector_list = []
for ticker in ticker_list:
sector = extract_sector(ticker, con)
sector_list.append(sector)
sector_counts = Counter(sector_list)
main_sectors = sector_counts.most_common(3)
main_sectors = [item[0] for item in main_sectors if item[0] is not None]
item['mainSectors'] = main_sectors
con.close()
except Exception as e:
print(e)
analyst_list = sorted(analyst_list, key=lambda x: float(x['analystScore']), reverse=True)
number_of_all_analysts = len(analyst_list)
for rank, item in enumerate(analyst_list):
item['rank'] = rank+1
item['numOfAnalysts'] = number_of_all_analysts
item['avgReturn'] = round(float(item['avgReturn']),2)
item['successRate'] = round(float(item['successRate']),2)
with open(f"json/analyst/analyst-db/{item['analystId']}.json", 'w') as file:
ujson.dump(item, file)
#Save top 100 analysts
top_analysts_list = []
#Drop the element ratingsList for the top 100 analysts list
for item in analyst_list[0:100]:
top_analysts_list.append({
'analystName': item['analystName'],
'analystId': item['analystId'],
'rank': item['rank'],
'analystScore': item['analystScore'],
'companyName': item['companyName'],
'successRate': item['successRate'],
'avgReturn': item['avgReturn'],
'totalRatings': item['totalRatings'],
'lastRating': item['lastRating']
})
with open(f"json/analyst/top-analysts.json", 'w') as file:
ujson.dump(top_analysts_list, file)
#Save all analyst data in raw form for the next step
with open(f"json/analyst/all-analyst-data.json", 'w') as file:
ujson.dump(analyst_list, file)
#Save top stocks with strong buys from 5 star analysts
get_top_stocks()
if __name__ == "__main__":
asyncio.run(run())