backend/app/cron_analyst_db.py
2024-11-23 14:26:17 +01:00

486 lines
18 KiB
Python
Executable File

import requests
from datetime import datetime, timedelta
import numpy as np
from scipy.stats import norm
import time
import sqlite3
import orjson
import os
from dotenv import load_dotenv
from tqdm import tqdm
import pandas as pd
from collections import Counter
import aiohttp
import asyncio
import statistics
load_dotenv()
api_key = os.getenv('BENZINGA_API_KEY')
headers = {"accept": "application/json"}
# Load stock screener data
with open(f"json/stock-screener/data.json", 'rb') as file:
stock_screener_data = orjson.loads(file.read())
stock_screener_data_dict = {item['symbol']: item for item in stock_screener_data}
query_template = """
SELECT date, close
FROM "{ticker}"
WHERE date BETWEEN ? AND ?
"""
buy_ratings = ['Outperform', 'Overweight', 'Market Outperform', 'Buy', 'Positive', 'Sector Outperform']
sell_ratings = ['Negative', 'Underperform', 'Underweight', 'Reduce', 'Sell']
ticker_price_cache = {} # Dictionary to store cached price data for tickers
# Define a function to remove duplicates based on a key
def remove_duplicates(data, key):
seen = set()
new_data = []
for item in data:
if item[key] not in seen:
seen.add(item[key])
new_data.append(item)
return new_data
async def get_data(ticker_list, item):
"""Extract specified columns data for a given symbol."""
columns = ['sector', 'industry', 'price']
sector_list = []
industry_list = []
for ticker in ticker_list:
ticker_data = stock_screener_data_dict.get(ticker, {})
# Extract specified columns data for each ticker
sector = ticker_data.get('sector',None)
industry = ticker_data.get('industry',None)
price = ticker_data.get('price',None)
price = round(price, 2) if price is not None else None
# Append data to relevant lists if values are present
if sector:
sector_list.append(sector)
if industry:
industry_list.append(industry)
# Update ratingsList in item with price for the corresponding ticker
try:
if len(item.get('ratingsList')) > 0:
for rating in item.get('ratingsList', []):
try:
if rating.get('ticker') == ticker:
upside = round((float(rating['adjusted_pt_current'])/price-1)*100,2)
rating['price'] = price
rating['upside'] = upside
except:
rating['price'] = None
rating['upside'] = None
except:
pass
# Get the top 3 most common sectors and industries
sector_counts = Counter(sector_list)
industry_counts = Counter(industry_list)
main_sectors = [item[0] for item in sector_counts.most_common(3)]
main_industries = [item[0] for item in industry_counts.most_common(3)]
# Add main sectors and industries to the item dictionary
item['mainSectors'] = main_sectors
item['mainIndustries'] = main_industries
return item
def calculate_rating(data):
overall_average_return = float(data['avgReturn'])
overall_success_rate = float(data['successRate'])
total_ratings = int(data['totalRatings'])
last_rating = data['lastRating']
try:
last_rating_date = datetime.strptime(last_rating, "%Y-%m-%d")
difference = (datetime.now() - last_rating_date).days
except:
difference = 1000 # In case of None or invalid date
if total_ratings == 0 or difference >= 600:
return 0
else:
# Define weights for each factor
weight_return = 0.4
weight_success_rate = 0.3
weight_total_ratings = 0.1
weight_difference = 0.2 # Reduced weight for difference
# Calculate weighted sum
weighted_sum = (weight_return * overall_average_return +
weight_success_rate * overall_success_rate +
weight_total_ratings * total_ratings +
weight_difference * (1 / (1 + difference))) # Adjusted weight for difference
# Normalize the weighted sum to get a rating between 0 and 5
min_rating = 0
max_rating = 5
normalized_rating = min(max(weighted_sum / (weight_return + weight_success_rate + weight_total_ratings + weight_difference), min_rating), max_rating)
# Apply additional conditions based on total ratings and average return
if normalized_rating >= 4:
if total_ratings < 10:
normalized_rating -= 2.4
elif total_ratings < 15:
normalized_rating -= 2.5
elif total_ratings < 20:
normalized_rating -= 0.75
elif total_ratings < 30:
normalized_rating -= 1
elif overall_average_return <= 10:
normalized_rating -= 1.1
if overall_average_return <= 0:
normalized_rating = max(normalized_rating - 2, 0)
# Cap the rating if the last rating is older than 30 days
if difference > 30:
normalized_rating = min(normalized_rating, 4.5)
if overall_success_rate < 50:
normalized_rating = min(normalized_rating, 4.6)
if overall_average_return < 30:
normalized_rating = min(normalized_rating, 4.6)
return round(normalized_rating, 2)
def get_top_stocks():
with open(f"json/analyst/all-analyst-data.json", 'r') as file:
analyst_stats_list = orjson.loads(file.read())
filtered_data = [item for item in analyst_stats_list if item['analystScore'] >= 4]
res_list = []
# Define the date range for the past 12 months
end_date = datetime.now().date()
start_date = end_date - timedelta(days=365)
res_list = []
for item in filtered_data:
ticker_list = item['ratingsList']
# Filter by 'Strong Buy' and ensure the rating is within the last 12 months
ticker_list = [{'ticker': i['ticker'], 'adjusted_pt_current': i['adjusted_pt_current'], 'date': i['date']}
for i in ticker_list
if i['rating_current'] == 'Strong Buy'
and start_date <= datetime.strptime(i['date'], '%Y-%m-%d').date() <= end_date]
if len(ticker_list) > 0:
res_list += ticker_list
# Create a dictionary to store ticker occurrences and corresponding pt_current values
ticker_data = {}
for item in res_list:
ticker = item['ticker']
pt_current_str = item['adjusted_pt_current']
if pt_current_str: # Skip empty strings
pt_current = float(pt_current_str)
if ticker in ticker_data:
ticker_data[ticker]['pt_list'].append(pt_current)
else:
ticker_data[ticker] = {'pt_list': [pt_current]}
for ticker, info in ticker_data.items():
try:
with open(f"json/quote/{ticker}.json", 'r') as file:
res = orjson.loads(file.read())
info['price'] = res.get('price', None)
info['name'] = res.get('name', None)
info['marketCap'] = res.get('marketCap', None)
except:
info['price'] = None
info['name'] = None
info['marketCap'] = None
# Calculate median pt_current for each ticker
for ticker, info in ticker_data.items():
if info['pt_list']:
info['median'] = round(statistics.median(info['pt_list']), 2)
# Convert the dictionary back to a list format
result = [{'symbol': ticker,
'upside': round((info['median']/info.get('price')-1)*100, 2) if info.get('price') else None,
'priceTarget': info['median'],
'price': info['price'],
'counter': len(info['pt_list']),
'name': info['name']}
for ticker, info in ticker_data.items()]
result = [item for item in result if item['upside'] is not None and item['upside'] >= 5 and item['upside'] <= 250] # Filter outliers
result_sorted = sorted(result, key=lambda x: x['counter'] if x['counter'] is not None else float('-inf'), reverse=True)
for rank, item in enumerate(result_sorted):
item['rank'] = rank + 1
with open(f"json/analyst/top-stocks.json", 'w') as file:
file.write(orjson.dumps(result_sorted).decode('utf-8'))
async def get_analyst_ratings(analyst_id, session):
url = "https://api.benzinga.com/api/v2.1/calendar/ratings"
res_list = []
for page in range(5):
try:
querystring = {
"token": api_key,
"parameters[analyst_id]": analyst_id,
"page": str(page),
"pagesize": "1000"
}
async with session.get(url, headers=headers, params=querystring) as response:
data = await response.json()
ratings = data.get('ratings', [])
if not ratings:
break # Stop fetching if no more ratings
res_list += ratings
except Exception as e:
#print(f"Error fetching page {page} for analyst {analyst_id}: {e}")
break
# Date filter: only include items with 'date' >= '2015-01-01'
filtered_data = [
{key: value for key, value in item.items() if key not in {'url_news', 'url', 'url_calendar', 'updated', 'time', 'currency'}}
for item in res_list
if datetime.strptime(item['date'], '%Y-%m-%d') >= datetime(2015, 1, 1)
]
# If prior rating and current rating is "Buy" we interpret it as "Strong Buy"
for item in filtered_data:
try:
if item.get("rating_prior",None) == "Buy" and item.get("rating_current",None) == "Buy":
item["rating_current"] = "Strong Buy"
except:
pass
return filtered_data
async def get_all_analyst_stats():
url = "https://api.benzinga.com/api/v2.1/calendar/ratings/analysts"
res_list = []
async with aiohttp.ClientSession() as session:
tasks = [
session.get(url, headers=headers, params={"token": api_key, "page": str(page), 'pagesize': "1000"})
for page in range(100)
]
# Gather responses concurrently
responses = await asyncio.gather(*tasks)
# Process each response
for response in responses:
if response.status == 200: # Check for successful response
try:
data = orjson.loads(await response.text())['analyst_ratings_analyst']
res_list += data
except Exception as e:
pass
# Remove duplicates of analysts and filter based on ratings accuracy
res_list = remove_duplicates(res_list, 'id')
res_list = [item for item in res_list if item.get('ratings_accuracy', {}).get('total_ratings', 0) != 0]
# Construct the final result list
final_list = [{
'analystName': item['name_full'],
'companyName': item['firm_name'],
'analystId': item['id'],
'firmId': item['firm_id'],
'avgReturn': item['ratings_accuracy'].get('overall_average_return', 0),
'successRate': item['ratings_accuracy'].get('overall_success_rate', 0),
'totalRatings': item['ratings_accuracy'].get('total_ratings', 0),
} for item in res_list]
return final_list
async def process_analyst(item, con, session, start_date, end_date):
# Fetch analyst ratings
data = await get_analyst_ratings(item['analystId'], session)
item['ratingsList'] = data
item['totalRatings'] = len(data)
item['lastRating'] = data[0]['date'] if data else None
item['numOfStocks'] = len({d['ticker'] for d in data})
total_return = 0
valid_ratings_count = 0
success_count = 0 # To track successful ratings for success rate calculation
for stock in data:
try:
ticker = stock['ticker']
rating_date = stock['date']
rating_current = stock['rating_current']
# Skip neutral or undefined ratings
if rating_current not in buy_ratings and rating_current not in sell_ratings:
continue
# Check if the ticker data is already cached
if ticker not in ticker_price_cache:
# If not cached, query the stock data and cache it
query = query_template.format(ticker=ticker)
df = pd.read_sql_query(query, con, params=(start_date, end_date))
ticker_price_cache[ticker] = df
else:
# Use cached data
df = ticker_price_cache[ticker]
# Ensure we have data for the rating date
rating_date_data = df[df['date'] == rating_date]
if rating_date_data.empty:
# Try finding the closest date within a few days if exact date is missing
for days_offset in range(1, 5):
closest_date = (pd.to_datetime(rating_date) - pd.Timedelta(days=days_offset)).strftime('%Y-%m-%d')
rating_date_data = df[df['date'] == closest_date]
if not rating_date_data.empty:
break
if rating_date_data.empty:
continue # Skip if no close price data found
# Get close price on rating date
close_price_on_rating = rating_date_data['close'].values[0]
# Calculate the date 12 months later
future_date = (pd.to_datetime(rating_date) + pd.DateOffset(months=12)).strftime('%Y-%m-%d')
# Try to find the close price 12 months later
future_date_data = df[df['date'] == future_date]
if future_date_data.empty:
# If 12 months price isn't available, use the latest available price
future_date_data = df.iloc[-1] # Use the last available price
if future_date_data.empty:
continue # If no future data, skip this rating
close_price_in_future = future_date_data['close'] if isinstance(future_date_data, pd.Series) else future_date_data['close'].values[0]
# Calculate return
stock_return = (close_price_in_future - close_price_on_rating) / close_price_on_rating
total_return += stock_return
valid_ratings_count += 1
# Determine if the rating was successful
if rating_current in buy_ratings:
if close_price_in_future > close_price_on_rating:
success_count += 1 # Success for buy ratings
elif rating_current in sell_ratings:
if close_price_in_future < close_price_on_rating:
success_count += 1 # Success for sell ratings
except:
pass
# Calculate average return if there are valid ratings
if valid_ratings_count > 0:
item['avgReturn'] = round(total_return / valid_ratings_count * 100, 2) # Percentage format
else:
item['avgReturn'] = 0
# Calculate success rate
if valid_ratings_count > 0:
item['successRate'] = round(success_count / valid_ratings_count * 100, 2) # Success rate in percentage
else:
item['successRate'] = 0
# Populate other stats and score
stats_dict = {
'avgReturn': item.get('avgReturn', 0),
'successRate': item.get('successRate', 0),
'totalRatings': item['totalRatings'],
'lastRating': item['lastRating'],
}
item['analystScore'] = calculate_rating(stats_dict)
async def get_single_analyst_data(analyst_list, con):
start_date = '2015-01-01'
end_date = datetime.today().strftime("%Y-%m-%d")
async with aiohttp.ClientSession() as session:
tasks = [process_analyst(item, con, session, start_date, end_date) for item in analyst_list]
for task in tqdm(asyncio.as_completed(tasks), total=len(analyst_list)):
await task
async def run():
# Step1: Get all analyst id's and stats
con = sqlite3.connect('stocks.db')
analyst_list = await get_all_analyst_stats()
print('Number of analysts:', len(analyst_list))
#Test Mode
#analyst_list = [ item for item in analyst_list if item['analystId'] =='5a02da51efacff00010633d2']
# Step2: Get rating history for each individual analyst and score the analyst
await get_single_analyst_data(analyst_list, con)
try:
print('Start extracting data')
for item in tqdm(analyst_list):
ticker_list = [entry['ticker'] for entry in item['ratingsList']]
if len(ticker_list) > 0:
await get_data(ticker_list, item)
except Exception as e:
print(e)
# Sort analysts by score
analyst_list = sorted(analyst_list, key=lambda x: (float(x['analystScore']), float(x['avgReturn']), float(x['successRate'])), reverse=True)
number_of_all_analysts = len(analyst_list)
# Assign rank and other metrics to analysts
for rank, item in enumerate(analyst_list):
item['rank'] = rank + 1
item['numOfAnalysts'] = number_of_all_analysts
item['avgReturn'] = round(float(item['avgReturn']), 2)
item['successRate'] = round(float(item['successRate']), 2)
with open(f"json/analyst/analyst-db/{item['analystId']}.json", 'w') as file:
file.write(orjson.dumps(item).decode('utf-8'))
# Save top 100 analysts
top_analysts_list = []
for item in analyst_list[0:100]:
top_analysts_list.append({
'analystName': item['analystName'],
'analystId': item['analystId'],
'rank': item['rank'],
'analystScore': item['analystScore'],
'companyName': item['companyName'],
'successRate': item['successRate'],
'avgReturn': item['avgReturn'],
'totalRatings': item['totalRatings'],
'lastRating': item['lastRating']
})
with open(f"json/analyst/top-analysts.json", 'w') as file:
file.write(orjson.dumps(top_analysts_list).decode('utf-8'))
# Save all analyst data in raw form for the next step
with open(f"json/analyst/all-analyst-data.json", 'w') as file:
file.write(orjson.dumps(analyst_list).decode('utf-8'))
# Save top stocks with strong buys from 5-star analysts
get_top_stocks()
# Close the connection
con.close()
if __name__ == "__main__":
asyncio.run(run())