backend/app/cron_analyst_ticker.py
MuslemRahimi a7374541dc bugfixing
2024-10-04 20:14:54 +02:00

257 lines
10 KiB
Python
Executable File

from benzinga import financial_data
import requests
from datetime import datetime, timedelta, date
from collections import defaultdict
import numpy as np
from scipy.stats import norm
import time
import sqlite3
import ujson
import math
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('BENZINGA_API_KEY')
fin = financial_data.Benzinga(api_key)
# Define a function to remove duplicates based on a key
def remove_duplicates(data, key):
seen = set()
new_data = []
for item in data:
if item[key] not in seen:
seen.add(item[key])
new_data.append(item)
return new_data
def get_summary(res_list):
#Get Latest Summary of ratings from the last 12 months
# -Number of Analyst, -Price Target, -Consensus Rating
end_date = date.today()
start_date = end_date - timedelta(days=365) #end_date is today
filtered_data = [item for item in res_list if start_date <= datetime.strptime(item['date'], '%Y-%m-%d').date() <= end_date]
#Compute Average Price Target
latest_pt_current = defaultdict(int)
# Iterate through the data to update the latest pt_current for each analyst
for item in filtered_data:
if 'adjusted_pt_current' in item and item['adjusted_pt_current']:
analyst_name = item['analyst_name']
# Convert pt_current to float and check if it's a valid number
try:
pt_current_value = float(item['pt_current'])
# Check if the value is float or int
if isinstance(pt_current_value, (float, int)):
# Initialize the analyst entry if it doesn't exist
if analyst_name not in latest_pt_current:
latest_pt_current[analyst_name] = pt_current_value
else:
# Update with the maximum value
latest_pt_current[analyst_name] = max(latest_pt_current[analyst_name], pt_current_value)
except (ValueError, TypeError):
print(f"Invalid pt_current value for analyst '{analyst_name}': {item['pt_current']}")
# Compute the average pt_current based on the latest values
pt_current_values = list(latest_pt_current.values())
average_pt_current = sum(pt_current_values) / len(pt_current_values) if pt_current_values else 0
#print("Average pt_current:", round(average_pt_current, 2))
# Compute Consensus Rating
consensus_ratings = defaultdict(str)
# Define the rating hierarchy
rating_hierarchy = {'Strong Sell': 0, 'Sell': 1, 'Hold': 2, 'Buy': 3, 'Strong Buy': 4}
# Iterate through the data to update the consensus rating for each analyst
for item in filtered_data:
if 'rating_current' in item and item['rating_current'] and 'analyst_name' in item and item['analyst_name']:
try:
analyst_name = item['analyst_name']
current_rating = item['rating_current']
if current_rating in rating_hierarchy:
consensus_ratings[analyst_name] = current_rating
except:
pass
# Compute the consensus rating based on the most frequent rating among analysts
consensus_rating_counts = defaultdict(int)
for rating in consensus_ratings.values():
consensus_rating_counts[rating] += 1
consensus_rating = max(consensus_rating_counts, key=consensus_rating_counts.get)
#print("Consensus Rating:", consensus_rating)
#Sum up all Buy,Sell,Hold for the progress bar in sveltekit
# Convert defaultdict to regular dictionary
data_dict = dict(consensus_rating_counts)
# Sum up 'Strong Buy' and 'Buy'
buy_total = data_dict.get('Strong Buy', 0) + data_dict.get('Buy', 0)
# Sum up 'Strong Sell' and 'Sell'
sell_total = data_dict.get('Strong Sell', 0) + data_dict.get('Sell', 0)
hold_total = data_dict.get('Hold', 0)
unique_analyst_names = set()
numOfAnalyst = 0
for item in filtered_data:
if item['analyst_name'] not in unique_analyst_names:
unique_analyst_names.add(item['analyst_name'])
numOfAnalyst += 1
#print("Number of unique analyst names:", numOfAnalyst)
stats = {'numOfAnalyst': numOfAnalyst, 'consensusRating': consensus_rating, 'priceTarget': round(average_pt_current, 2)}
categorical_ratings = {'Buy': buy_total, 'Sell': sell_total, 'Hold': hold_total}
res = {**stats, **categorical_ratings}
return res
def run(chunk,analyst_list):
end_date = date.today()
start_date = datetime(2015,1,1)
end_date_str = end_date.strftime('%Y-%m-%d')
start_date_str = start_date.strftime('%Y-%m-%d')
company_tickers = ','.join(chunk)
res_list = []
for page in range(0, 500):
try:
data = fin.ratings(company_tickers=company_tickers, page=page, pagesize=1000, date_from=start_date_str, date_to=end_date_str)
data = ujson.loads(fin.output(data))['ratings']
res_list += data
except:
break
res_list = [item for item in res_list if item.get('analyst_name')]
#print(res_list[-15])
for ticker in chunk:
try:
ticker_filtered_data = [item for item in res_list if item['ticker'] == ticker]
if len(ticker_filtered_data) != 0:
for item in ticker_filtered_data:
if item['rating_current'] == 'Strong Sell' or item['rating_current'] == 'Strong Buy':
pass
elif item['rating_current'] == 'Neutral':
item['rating_current'] = 'Hold'
elif item['rating_current'] == 'Equal-Weight' or item['rating_current'] == 'Sector Weight' or item['rating_current'] == 'Sector Perform':
item['rating_current'] = 'Hold'
elif item['rating_current'] == 'In-Line':
item['rating_current'] = 'Hold'
elif item['rating_current'] == 'Outperform' and item['action_company'] == 'Downgrades':
item['rating_current'] = 'Hold'
elif item['rating_current'] == 'Negative':
item['rating_current'] = 'Sell'
elif (item['rating_current'] == 'Outperform' or item['rating_current'] == 'Overweight') and (item['action_company'] == 'Reiterates' or item['action_company'] == 'Initiates Coverage On'):
item['rating_current'] = 'Buy'
item['action_comapny'] = 'Initiates'
elif item['rating_current'] == 'Market Outperform' and (item['action_company'] == 'Maintains' or item['action_company'] == 'Reiterates'):
item['rating_current'] = 'Buy'
elif item['rating_current'] == 'Outperform' and (item['action_company'] == 'Maintains' or item['action_pt'] == 'Announces' or item['action_company'] == 'Upgrades'):
item['rating_current'] = 'Buy'
elif item['rating_current'] == 'Buy' and (item['action_company'] == 'Raises' or item['action_pt'] == 'Raises'):
item['rating_current'] = 'Strong Buy'
elif item['rating_current'] == 'Overweight' and (item['action_company'] == 'Maintains' or item['action_company'] == 'Upgrades' or item['action_company'] == 'Reiterates' or item['action_pt'] == 'Raises'):
item['rating_current'] = 'Buy'
elif item['rating_current'] == 'Positive' or item['rating_current'] == 'Sector Outperform':
item['rating_current'] = 'Buy'
elif item['rating_current'] == 'Underperform' or item['rating_current'] == 'Underweight':
item['rating_current'] = 'Sell'
elif item['rating_current'] == 'Reduce' and (item['action_company'] == 'Downgrades' or item['action_pt'] == 'Lowers'):
item['rating_current'] = 'Sell'
elif item['rating_current'] == 'Sell' and item['action_pt'] == 'Announces':
item['rating_current'] = 'Strong Sell'
elif item['rating_current'] == 'Market Perform':
item['rating_current'] = 'Hold'
elif item['rating_prior'] == 'Outperform' and item['action_company'] == 'Downgrades':
item['rating_current'] = 'Hold'
elif item['rating_current'] == 'Peer Perform' and item['rating_prior'] == 'Peer Perfrom':
item['rating_current'] = 'Hold'
elif item['rating_current'] == 'Peer Perform' and item['action_pt'] == 'Announces':
item['rating_current'] = 'Hold'
item['action_comapny'] = 'Initiates'
summary = get_summary(ticker_filtered_data)
#get ratings of each analyst
with open(f"json/analyst/summary/{ticker}.json", 'w') as file:
ujson.dump(summary, file)
for item1 in ticker_filtered_data:
#item1['analystId'] = ''
#item1['analystScore'] = 0
#item1['adjusted_pt_current'] = 0
#item1['adjusted_pt_prior'] = 0
for item2 in analyst_stats_list:
if item1['analyst'] == item2['companyName'] and item1['analyst_name'] == item2['analystName']:
item1['analystId'] = item2['analystId']
item1['analystScore'] = item2['analystScore']
break
elif item1['analyst_name'] == item2['analystName']:
item1['analystId'] = item2['analystId']
item1['analystScore'] = item2['analystScore']
break
#Bug: Benzinga does not give me reliable all analyst names and hence score.
# Compute in those cases the analyst score separately for each analyst
'''
if 'analystScore' not in item1: #or item1['analystScore'] == 0:
one_sample_list = get_one_sample_analyst_data(item1['analyst_name'], item1['analyst'])
item1['analystId'] = one_sample_list[0]['id']
item1['analystScore'] = one_sample_list[0]['analystScore']
'''
desired_keys = ['date', 'action_company', 'rating_current', 'adjusted_pt_current', 'adjusted_pt_prior', 'analystId', 'analystScore', 'analyst', 'analyst_name']
ticker_filtered_data = [
{key: item[key] if key in item else None for key in desired_keys}
for item in ticker_filtered_data
]
#print(ticker_filtered_data[0])
#time.sleep(10000)
with open(f"json/analyst/history/{ticker}.json", 'w') as file:
ujson.dump(ticker_filtered_data, file)
print(ticker_filtered_data)
except Exception as e:
print(e)
try:
con = sqlite3.connect('stocks.db')
stock_cursor = con.cursor()
stock_cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
stock_symbols =[row[0] for row in stock_cursor.fetchall()]
con.close()
#Save all analyst data in raw form for the next step
with open(f"json/analyst/all-analyst-data.json", 'r') as file:
analyst_stats_list = ujson.load(file)
chunk_size = len(stock_symbols) // 40 # Divide the list into N chunks
chunks = [stock_symbols[i:i + chunk_size] for i in range(0, len(stock_symbols), chunk_size)]
#chunks = [['NVDA']]
for chunk in chunks:
run(chunk, analyst_stats_list)
except Exception as e:
print(e)