This commit is contained in:
MuslemRahimi 2024-11-01 01:26:25 +01:00
parent d4ccc0ab99
commit 0b99b1a130
3 changed files with 102 additions and 39 deletions

View File

@ -3,6 +3,7 @@ import requests
from datetime import datetime, timedelta, date from datetime import datetime, timedelta, date
from collections import defaultdict from collections import defaultdict
import numpy as np import numpy as np
import pandas as pd
from scipy.stats import norm from scipy.stats import norm
import time import time
import sqlite3 import sqlite3
@ -17,6 +18,13 @@ api_key = os.getenv('BENZINGA_API_KEY')
fin = financial_data.Benzinga(api_key) fin = financial_data.Benzinga(api_key)
query_template = """
SELECT date,close
FROM "{ticker}"
WHERE date BETWEEN ? AND ?
"""
end_date = date.today()
start_date_12m = end_date - timedelta(days=365) # end_date is today
# Define a function to remove duplicates based on a key # Define a function to remove duplicates based on a key
@ -34,38 +42,79 @@ def remove_duplicates(data, key):
def get_summary(res_list): def get_summary(res_list):
# Get the latest summary of ratings from the last 12 months # Get the latest summary of ratings from the last 12 months
end_date = date.today() end_date = date.today()
start_date = end_date - timedelta(days=365) # end_date is today
# Filter the data for the last 12 months # Filter the data for the last 12 months
filtered_data = [item for item in res_list if start_date <= datetime.strptime(item['date'], '%Y-%m-%d').date() <= end_date] filtered_data = [item for item in res_list if start_date_12m <= datetime.strptime(item['date'], '%Y-%m-%d').date() <= end_date]
# Initialize dictionary to store the latest price target for each analyst # Initialize dictionary to store the latest price target for each analyst
latest_pt_current = defaultdict(int) latest_pt_current = defaultdict(list)
# Iterate through the filtered data to update the latest pt_current for each analyst # Iterate through the filtered data to collect pt_current for each analyst
for item in filtered_data: for item in filtered_data:
if 'adjusted_pt_current' in item and item['adjusted_pt_current']: if 'adjusted_pt_current' in item and item['adjusted_pt_current']:
analyst_name = item['analyst_name'] analyst_name = item['analyst_name']
try: try:
pt_current_value = float(item['adjusted_pt_current']) pt_current_value = float(item['adjusted_pt_current'])
# Update with the maximum value for each analyst # Collect all pt_current values for each analyst
if isinstance(pt_current_value, (float, int)): latest_pt_current[analyst_name].append(pt_current_value)
latest_pt_current[analyst_name] = max(latest_pt_current.get(analyst_name, pt_current_value), pt_current_value)
except (ValueError, TypeError): except (ValueError, TypeError):
print(f"Invalid pt_current value for analyst '{analyst_name}': {item['pt_current']}") print(f"Invalid pt_current value for analyst '{analyst_name}': {item['pt_current']}")
# Get the price target values # Compute statistics for price targets
pt_current_values = list(latest_pt_current.values()) pt_current_values = [val for sublist in latest_pt_current.values() for val in sublist]
# Compute the median pt_current if there are values, otherwise set to 0 # Compute different price target metrics if there are values, otherwise set to 0
median_pt_current = statistics.median(pt_current_values) if pt_current_values else 0 if pt_current_values:
#print("Median pt_current:", round(median_pt_current, 2)) median_pt_current = statistics.median(pt_current_values)
avg_pt_current = statistics.mean(pt_current_values)
consensus_ratings = defaultdict(str) low_pt_current = min(pt_current_values)
# Define the rating hierarchy high_pt_current = max(pt_current_values)
else:
median_pt_current = avg_pt_current = low_pt_current = high_pt_current = 0
# Initialize recommendation tracking
rating_hierarchy = {'Strong Sell': 0, 'Sell': 1, 'Hold': 2, 'Buy': 3, 'Strong Buy': 4} rating_hierarchy = {'Strong Sell': 0, 'Sell': 1, 'Hold': 2, 'Buy': 3, 'Strong Buy': 4}
# Iterate through the data to update the consensus rating for each analyst # Track monthly recommendations
monthly_recommendations = {}
# Iterate through the filtered data to track monthly recommendations
for item in filtered_data:
# Extract month from the date
item_date = datetime.strptime(item['date'], '%Y-%m-%d')
month_key = item_date.strftime('%Y-%m-01')
# Initialize month's recommendation counts if not exists
if month_key not in monthly_recommendations:
monthly_recommendations[month_key] = {
'Strong Sell': 0,
'Sell': 0,
'Hold': 0,
'Buy': 0,
'Strong Buy': 0
}
# Check and increment recommendation count for the month
if 'rating_current' in item and item['rating_current'] in rating_hierarchy:
monthly_recommendations[month_key][item['rating_current']] += 1
# Convert monthly recommendations to a sorted list
recommendation_list = []
for month in sorted(monthly_recommendations.keys()):
month_data = monthly_recommendations[month]
recommendation_list.append({
'date': month,
'Strong Sell': month_data['Strong Sell'],
'Sell': month_data['Sell'],
'Hold': month_data['Hold'],
'Buy': month_data['Buy'],
'Strong Buy': month_data['Strong Buy']
})
# Compute consensus ratings (similar to previous implementation)
consensus_ratings = defaultdict(str)
for item in filtered_data: for item in filtered_data:
if 'rating_current' in item and item['rating_current'] and 'analyst_name' in item and item['analyst_name']: if 'rating_current' in item and item['rating_current'] and 'analyst_name' in item and item['analyst_name']:
try: try:
@ -75,44 +124,44 @@ def get_summary(res_list):
consensus_ratings[analyst_name] = current_rating consensus_ratings[analyst_name] = current_rating
except: except:
pass pass
# Compute the consensus rating based on the most frequent rating among analysts # Compute the consensus rating based on the most frequent rating among analysts
consensus_rating_counts = defaultdict(int) consensus_rating_counts = defaultdict(int)
for rating in consensus_ratings.values(): for rating in consensus_ratings.values():
consensus_rating_counts[rating] += 1 consensus_rating_counts[rating] += 1
consensus_rating = max(consensus_rating_counts, key=consensus_rating_counts.get) consensus_rating = max(consensus_rating_counts, key=consensus_rating_counts.get)
#print("Consensus Rating:", consensus_rating)
# Sum up all Buy, Sell, Hold for the progress bar in sveltekit # Sum up all Buy, Sell, Hold for the progress bar in sveltekit
# Convert defaultdict to regular dictionary
data_dict = dict(consensus_rating_counts) data_dict = dict(consensus_rating_counts)
# Sum up 'Strong Buy' and 'Buy'
buy_total = data_dict.get('Strong Buy', 0) + data_dict.get('Buy', 0) buy_total = data_dict.get('Strong Buy', 0) + data_dict.get('Buy', 0)
# Sum up 'Strong Sell' and 'Sell'
sell_total = data_dict.get('Strong Sell', 0) + data_dict.get('Sell', 0) sell_total = data_dict.get('Strong Sell', 0) + data_dict.get('Sell', 0)
hold_total = data_dict.get('Hold', 0) hold_total = data_dict.get('Hold', 0)
# Count unique analysts
unique_analyst_names = set() unique_analyst_names = set()
numOfAnalyst = 0 numOfAnalyst = 0
for item in filtered_data: for item in filtered_data:
if item['analyst_name'] not in unique_analyst_names: if item['analyst_name'] not in unique_analyst_names:
unique_analyst_names.add(item['analyst_name']) unique_analyst_names.add(item['analyst_name'])
numOfAnalyst += 1 numOfAnalyst += 1
#print("Number of unique analyst names:", numOfAnalyst)
# Update stats dictionary with new keys including recommendationList
stats = {'numOfAnalyst': numOfAnalyst, 'consensusRating': consensus_rating, 'priceTarget': round(median_pt_current, 2)} stats = {
'numOfAnalyst': numOfAnalyst,
'consensusRating': consensus_rating,
'medianPriceTarget': round(median_pt_current, 2),
'avgPriceTarget': round(avg_pt_current, 2),
'lowPriceTarget': round(low_pt_current, 2),
'highPriceTarget': round(high_pt_current, 2),
'recommendationList': recommendation_list
}
categorical_ratings = {'Buy': buy_total, 'Sell': sell_total, 'Hold': hold_total} categorical_ratings = {'Buy': buy_total, 'Sell': sell_total, 'Hold': hold_total}
res = {**stats, **categorical_ratings} res = {**stats, **categorical_ratings}
return res return res
def run(chunk,analyst_list, con):
def run(chunk,analyst_list):
end_date = date.today()
start_date = datetime(2015,1,1) start_date = datetime(2015,1,1)
end_date_str = end_date.strftime('%Y-%m-%d') end_date_str = end_date.strftime('%Y-%m-%d')
start_date_str = start_date.strftime('%Y-%m-%d') start_date_str = start_date.strftime('%Y-%m-%d')
@ -178,6 +227,18 @@ def run(chunk,analyst_list):
item['action_comapny'] = 'Initiates' item['action_comapny'] = 'Initiates'
summary = get_summary(ticker_filtered_data) summary = get_summary(ticker_filtered_data)
try:
#Add historical price for the last 12 month
query = query_template.format(ticker=ticker)
df_12m = pd.read_sql_query(query, con, params=(start_date_12m, end_date)).round(2)
df_12m['date'] = pd.to_datetime(df_12m['date'])
df_12m_last_per_month = df_12m.groupby(df_12m['date'].dt.to_period('M')).tail(1)
past_price_list = [{"date": row['date'].strftime('%Y-%m-%d'), "close": row['close']} for _, row in df_12m_last_per_month.iterrows()]
summary["pastPriceList"] = past_price_list
except:
summary["pastPriceList"] = []
#get ratings of each analyst #get ratings of each analyst
with open(f"json/analyst/summary/{ticker}.json", 'w') as file: with open(f"json/analyst/summary/{ticker}.json", 'w') as file:
@ -231,7 +292,6 @@ try:
stock_cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") stock_cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
stock_symbols =[row[0] for row in stock_cursor.fetchall()] stock_symbols =[row[0] for row in stock_cursor.fetchall()]
con.close()
#Save all analyst data in raw form for the next step #Save all analyst data in raw form for the next step
with open(f"json/analyst/all-analyst-data.json", 'r') as file: with open(f"json/analyst/all-analyst-data.json", 'r') as file:
@ -242,7 +302,10 @@ try:
chunks = [stock_symbols[i:i + chunk_size] for i in range(0, len(stock_symbols), chunk_size)] chunks = [stock_symbols[i:i + chunk_size] for i in range(0, len(stock_symbols), chunk_size)]
#chunks = [['CMG']] #chunks = [['CMG']]
for chunk in chunks: for chunk in chunks:
run(chunk, analyst_stats_list) run(chunk, analyst_stats_list, con)
except Exception as e: except Exception as e:
print(e) print(e)
finally:
con.close()

View File

@ -31,7 +31,7 @@ async def get_data(symbol):
'capitalExpenditure','freeCashFlow','freeCashFlowPerShare','grossProfitMargin','operatingProfitMargin','pretaxProfitMargin', 'capitalExpenditure','freeCashFlow','freeCashFlowPerShare','grossProfitMargin','operatingProfitMargin','pretaxProfitMargin',
'netProfitMargin','ebitdaMargin','ebitMargin','freeCashFlowMargin','failToDeliver','relativeFTD', 'netProfitMargin','ebitdaMargin','ebitMargin','freeCashFlowMargin','failToDeliver','relativeFTD',
'annualDividend','dividendYield','payoutRatio','dividendGrowth','earningsYield','freeCashFlowYield','altmanZScore','piotroskiScore', 'annualDividend','dividendYield','payoutRatio','dividendGrowth','earningsYield','freeCashFlowYield','altmanZScore','piotroskiScore',
'lastStockSplit','splitType','splitRatio','analystRating','analystCounter','priceTarget','upside' 'lastStockSplit','splitType','splitRatio','analystRating','analystCounter','medianPriceTarget','upside'
] ]
if symbol in stock_screener_data_dict: if symbol in stock_screener_data_dict:

View File

@ -649,7 +649,7 @@ async def get_stock_screener(con):
res = orjson.loads(file.read()) res = orjson.loads(file.read())
item['analystRating'] = res['consensusRating'] item['analystRating'] = res['consensusRating']
item['analystCounter'] = res['numOfAnalyst'] item['analystCounter'] = res['numOfAnalyst']
item['priceTarget'] = res['priceTarget'] item['priceTarget'] = res['medianPriceTarget']
item['upside'] = round((item['priceTarget']/item['price']-1)*100, 1) if item['price'] else None item['upside'] = round((item['priceTarget']/item['price']-1)*100, 1) if item['price'] else None
except Exception as e: except Exception as e:
item['analystRating'] = None item['analystRating'] = None