backend/app/cron_dashboard.py
MuslemRahimi 056044355d bugfxing
2024-12-02 18:49:07 +01:00

500 lines
21 KiB
Python

import aiohttp
import aiofiles
import ujson
import sqlite3
import pandas as pd
import asyncio
import pytz
import os
from pathlib import Path
from dotenv import load_dotenv
from datetime import datetime, timedelta, date
import sqlite3
headers = {"accept": "application/json"}
def check_market_hours():
holidays = [
"2024-01-01", "2024-01-15", "2024-02-19", "2024-03-29",
"2024-05-27", "2024-06-19", "2024-07-04", "2024-09-02",
"2024-11-28", "2024-12-25"
]
# Get the current date and time in ET (Eastern Time)
et_timezone = pytz.timezone('America/New_York')
current_time = datetime.now(et_timezone)
current_date_str = current_time.strftime('%Y-%m-%d')
current_hour = current_time.hour
current_minute = current_time.minute
current_day = current_time.weekday() # Monday is 0, Sunday is 6
# Check if the current date is a holiday or weekend
is_weekend = current_day >= 5 # Saturday (5) or Sunday (6)
is_holiday = current_date_str in holidays
# Determine the market status
if is_weekend or is_holiday:
return 0 #Closed
elif current_hour < 9 or (current_hour == 9 and current_minute < 30):
return 1 # Pre-Market
elif 9 <= current_hour < 16 or (current_hour == 16 and current_minute == 0):
return 0 #"Market hours."
elif 16 <= current_hour < 24:
return 2 #"After-market hours."
else:
return 0 #"Market is closed."
load_dotenv()
benzinga_api_key = os.getenv('BENZINGA_API_KEY')
fmp_api_key = os.getenv('FMP_API_KEY')
query_template = """
SELECT
marketCap
FROM
stocks
WHERE
symbol = ?
"""
async def save_json(data):
with open(f"json/dashboard/data.json", 'w') as file:
ujson.dump(data, file)
def parse_time(time_str):
try:
# Try parsing as full datetime
return datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
except ValueError:
try:
# Try parsing as time only
time_obj = datetime.strptime(time_str, '%H:%M:%S').time()
# Combine with today's date
return datetime.combine(date.today(), time_obj)
except ValueError:
# If all else fails, return a default datetime
return datetime.min
def remove_duplicates(elements):
seen = set()
unique_elements = []
for element in elements:
if element['symbol'] not in seen:
seen.add(element['symbol'])
unique_elements.append(element)
return unique_elements
def weekday():
today = datetime.today()
if today.weekday() >= 5: # 5 = Saturday, 6 = Sunday
yesterday = today - timedelta(2)
else:
yesterday = today - timedelta(1)
return yesterday.strftime('%Y-%m-%d')
today = datetime.today().strftime('%Y-%m-%d')
tomorrow = (datetime.today() + timedelta(1))
yesterday = weekday()
if tomorrow.weekday() >= 5: # 5 = Saturday, 6 = Sunday
tomorrow = tomorrow + timedelta(days=(7 - tomorrow.weekday()))
tomorrow = tomorrow.strftime('%Y-%m-%d')
async def get_upcoming_earnings(session, end_date, filter_today=True):
url = "https://api.benzinga.com/api/v2.1/calendar/earnings"
importance_list = ["1", "2", "3", "4", "5"]
res_list = []
today = date.today().strftime('%Y-%m-%d')
for importance in importance_list:
querystring = {
"token": benzinga_api_key,
"parameters[importance]": importance,
"parameters[date_from]": today,
"parameters[date_to]": end_date,
"parameters[date_sort]": "date"
}
try:
async with session.get(url, params=querystring, headers=headers) as response:
res = ujson.loads(await response.text())['earnings']
# Apply the time filter if filter_today is True
if filter_today:
res = [
e for e in res if
datetime.strptime(e['date'], "%Y-%m-%d").date() != date.today() or
datetime.strptime(e['time'], "%H:%M:%S").time() >= datetime.strptime("16:00:00", "%H:%M:%S").time()
]
for item in res:
try:
symbol = item['ticker']
name = item['name']
time = item['time']
is_today = item['date'] == today
eps_prior = float(item['eps_prior']) if item['eps_prior'] != '' else 0
eps_est = float(item['eps_est']) if item['eps_est'] != '' else 0
revenue_est = float(item['revenue_est']) if item['revenue_est'] != '' else 0
revenue_prior = float(item['revenue_prior']) if item['revenue_prior'] != '' else 0
if symbol in stock_symbols and revenue_est and revenue_prior and eps_prior and eps_est:
df = pd.read_sql_query(query_template, con, params=(symbol,))
market_cap = float(df['marketCap'].iloc[0]) if df['marketCap'].iloc[0] != '' else 0
res_list.append({
'symbol': symbol,
'name': name,
'time': time,
'isToday': is_today,
'marketCap': market_cap,
'epsPrior': eps_prior,
'epsEst': eps_est,
'revenuePrior': revenue_prior,
'revenueEst': revenue_est
})
except Exception as e:
print('Upcoming Earnings:', e)
pass
except Exception as e:
print(e)
pass
try:
res_list = remove_duplicates(res_list)
res_list.sort(key=lambda x: x['marketCap'], reverse=True)
return res_list[:10]
except Exception as e:
print(e)
return []
async def get_recent_earnings(session):
url = "https://api.benzinga.com/api/v2.1/calendar/earnings"
res_list = []
importance_list = ["1","2","3","4","5"]
res_list = []
for importance in importance_list:
querystring = {"token": benzinga_api_key,"parameters[importance]":importance, "parameters[date_from]":yesterday,"parameters[date_to]":today,"parameters[date_sort]":"date"}
try:
async with session.get(url, params=querystring, headers=headers) as response:
res = ujson.loads(await response.text())['earnings']
for item in res:
try:
symbol = item['ticker']
name = item['name']
time = item['time']
eps_prior = float(item['eps_prior']) if item['eps_prior'] != '' else 0
eps_surprise = float(item['eps_surprise']) if item['eps_surprise'] != '' else 0
eps = float(item['eps']) if item['eps'] != '' else 0
revenue_prior = float(item['revenue_prior']) if item['revenue_prior'] != '' else 0
revenue_surprise = float(item['revenue_surprise']) if item['revenue_surprise'] != '' else 0
revenue = float(item['revenue']) if item['revenue'] != '' else 0
if symbol in stock_symbols and revenue != 0 and revenue_prior != 0 and eps_prior != 0 and eps != 0 and revenue_surprise != 0 and eps_surprise != 0:
df = pd.read_sql_query(query_template, con, params=(symbol,))
market_cap = float(df['marketCap'].iloc[0]) if df['marketCap'].iloc[0] != '' else 0
res_list.append({
'symbol': symbol,
'name': name,
'time': time,
'marketCap': market_cap,
'epsPrior':eps_prior,
'epsSurprise': eps_surprise,
'eps': eps,
'revenuePrior': revenue_prior,
'revenueSurprise': revenue_surprise,
'revenue': revenue
})
except Exception as e:
print('Recent Earnings:', e)
pass
except Exception as e:
pass
res_list = remove_duplicates(res_list)
res_list.sort(key=lambda x: x['marketCap'], reverse=True)
#res_list.sort(key=lambda x: (-parse_time(x['time']).timestamp(), -x['marketCap']))
res_list = [{k: v for k, v in d.items() if k != 'marketCap'} for d in res_list]
return res_list[0:10]
'''
async def get_recent_dividends(session):
url = "https://api.benzinga.com/api/v2.1/calendar/dividends"
importance_list = ["1","2","3","4","5"]
res_list = []
for importance in importance_list:
querystring = {"token": benzinga_api_key,"parameters[importance]":importance,"parameters[date_from]":yesterday,"parameters[date_to]":today}
try:
async with session.get(url, params=querystring, headers=headers) as response:
res = ujson.loads(await response.text())['dividends']
for item in res:
try:
symbol = item['ticker']
name = item['name']
dividend = float(item['dividend']) if item['dividend'] != '' else 0
dividend_prior = float(item['dividend_prior']) if item['dividend_prior'] != '' else 0
dividend_yield = round(float(item['dividend_yield'])*100,2) if item['dividend_yield'] != '' else 0
ex_dividend_date = item['ex_dividend_date'] if item['ex_dividend_date'] != '' else 0
payable_date = item['payable_date'] if item['payable_date'] != '' else 0
record_date = item['record_date'] if item['record_date'] != '' else 0
if symbol in stock_symbols and dividend != 0 and payable_date != 0 and dividend_prior != 0 and ex_dividend_date != 0 and record_date != 0 and dividend_yield != 0:
df = pd.read_sql_query(query_template, con, params=(symbol,))
market_cap = float(df['marketCap'].iloc[0]) if df['marketCap'].iloc[0] != '' else 0
res_list.append({
'symbol': symbol,
'name': name,
'dividend': dividend,
'marketCap': market_cap,
'dividendPrior':dividend_prior,
'dividendYield': dividend_yield,
'exDividendDate': ex_dividend_date,
'payableDate': payable_date,
'recordDate': record_date,
'updated': item['updated']
})
except Exception as e:
print('Recent Dividends:', e)
pass
except Exception as e:
print(e)
pass
res_list = remove_duplicates(res_list)
res_list.sort(key=lambda x: x['marketCap'], reverse=True)
res_list = [{k: v for k, v in d.items() if k != 'marketCap'} for d in res_list]
return res_list[0:5]
'''
async def get_analyst_report():
try:
# Connect to the database and retrieve symbols
with sqlite3.connect('stocks.db') as con:
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%' AND symbol NOT LIKE '%-%' AND marketCap > 50E9")
symbols = {row[0] for row in cursor.fetchall()} # Use a set for fast lookups
# Define the directory path
directory = Path("json/analyst/insight")
# Track the latest data and symbol based on the "date" key in the JSON file
latest_data = None
latest_symbol = None
latest_date = datetime.min # Initialize to the earliest possible date
# Loop through all .json files in the directory
for file_path in directory.glob("*.json"):
symbol = file_path.stem # Get the filename without extension
if symbol in symbols:
# Read each JSON file asynchronously
async with aiofiles.open(file_path, mode='r') as file:
data = ujson.loads(await file.read())
# Parse the "date" field and compare it to the latest_date
data_date = datetime.strptime(data.get('date', ''), '%b %d, %Y')
if data_date > latest_date:
latest_date = data_date
latest_data = data
latest_symbol = symbol
# If the latest report and symbol are found, add additional data from the summary file
if latest_symbol and latest_data:
summary_path = Path(f"json/analyst/summary/{latest_symbol}.json")
if summary_path.is_file(): # Ensure the summary file exists
async with aiofiles.open(summary_path, mode='r') as file:
summary_data = ujson.loads(await file.read())
# Merge the summary data into the latest data dictionary
latest_data.update({
'symbol': latest_symbol,
'numOfAnalyst': summary_data.get('numOfAnalyst'),
'consensusRating': summary_data.get('consensusRating'),
'medianPriceTarget': summary_data.get('medianPriceTarget'),
'avgPriceTarget': summary_data.get('avgPriceTarget'),
'lowPriceTarget': summary_data.get('lowPriceTarget'),
'highPriceTarget': summary_data.get('highPriceTarget')
})
# Load the current price from the quote file
quote_path = Path(f"json/quote/{latest_symbol}.json")
if quote_path.is_file():
async with aiofiles.open(quote_path, mode='r') as file:
quote_data = ujson.loads(await file.read())
price = quote_data.get('price')
if price:
# Calculate the percentage change for each target relative to the current price
def calculate_percentage_change(target):
return round(((target - price) / price) * 100, 2) if target is not None else None
latest_data.update({
'medianPriceChange': calculate_percentage_change(latest_data.get('medianPriceTarget')),
'avgPriceChange': calculate_percentage_change(latest_data.get('avgPriceTarget')),
'lowPriceChange': calculate_percentage_change(latest_data.get('lowPriceTarget')),
'highPriceChange': calculate_percentage_change(latest_data.get('highPriceTarget')),
})
#print(f"The latest report for symbol {latest_symbol}:", latest_data)
# Return the latest data found
return latest_data if latest_data else {}
except Exception as e:
print(f"An error occurred: {e}")
return {}
async def run():
async with aiohttp.ClientSession() as session:
recent_earnings = await get_recent_earnings(session)
upcoming_earnings = await get_upcoming_earnings(session, today, filter_today=False)
upcoming_earnings = [
item for item in upcoming_earnings
if item['symbol'] not in [earning['symbol'] for earning in recent_earnings]
]
if len(upcoming_earnings) < 5:
upcoming_earnings = await get_upcoming_earnings(session, today, filter_today=True)
if len(upcoming_earnings) < 5:
upcoming_earnings = await get_upcoming_earnings(session, tomorrow, filter_today=True)
recent_analyst_report = await get_analyst_report()
upcoming_earnings = [
item for item in upcoming_earnings
if item['symbol'] not in [earning['symbol'] for earning in recent_earnings]
]
try:
with open("json/options-flow/feed/data.json", 'r') as file:
options_flow = ujson.load(file)
# Filter the options_flow to include only items with ticker in total_symbol
options_flow = [item for item in options_flow if item['ticker'] in stock_symbols]
highest_volume = sorted(options_flow, key=lambda x: int(x['volume']), reverse=True)
highest_volume = [
{key: item[key] for key in ['cost_basis', 'ticker', 'underlying_type', 'date_expiration', 'put_call', 'volume', 'strike_price']}
for item in highest_volume[0:4]
]
highest_premium = sorted(options_flow, key=lambda x: int(x['cost_basis']), reverse=True)
highest_premium = [
{key: item[key] for key in ['cost_basis', 'ticker', 'underlying_type', 'date_expiration', 'put_call', 'volume', 'strike_price']}
for item in highest_premium[0:4]
]
highest_open_interest = sorted(options_flow, key=lambda x: int(x['open_interest']), reverse=True)
highest_open_interest = [
{key: item[key] for key in ['cost_basis', 'ticker', 'underlying_type', 'date_expiration', 'put_call', 'open_interest', 'strike_price']}
for item in highest_open_interest[0:4]
]
options_flow = {
'premium': highest_premium,
'volume': highest_volume,
'openInterest': highest_open_interest
}
except Exception as e:
print(e)
options_flow = {}
market_status = check_market_hours()
if market_status == 0:
try:
with open("json/market-movers/markethours/gainers.json", 'r') as file:
gainers = ujson.load(file)
with open("json/market-movers/markethours/losers.json", 'r') as file:
losers = ujson.load(file)
market_movers = {'gainers': gainers['1D'][:5], 'losers': losers['1D'][:5]}
except:
market_movers = {}
elif market_status == 1:
try:
with open("json/market-movers/premarket/gainers.json", 'r') as file:
data = ujson.load(file)
gainers = [
{'symbol': item['symbol'], 'name': item['name'], 'price': item['price'],
'changesPercentage': item['changesPercentage'], 'marketCap': item['marketCap']}
for item in data[:5]
]
with open("json/market-movers/premarket/losers.json", 'r') as file:
data = ujson.load(file)
losers = [
{'symbol': item['symbol'], 'name': item['name'], 'price': item['price'],
'changesPercentage': item['changesPercentage'], 'marketCap': item['marketCap']}
for item in data[:5]
]
market_movers = {'gainers': gainers, 'losers': losers}
except:
market_movers = {}
elif market_status == 2:
try:
with open("json/market-movers/afterhours/gainers.json", 'r') as file:
data = ujson.load(file)
gainers = [
{'symbol': item['symbol'], 'name': item['name'], 'price': item['price'],
'changesPercentage': item['changesPercentage'], 'marketCap': item['marketCap']}
for item in data[:5]
]
with open("json/market-movers/afterhours/losers.json", 'r') as file:
data = ujson.load(file)
losers = [
{'symbol': item['symbol'], 'name': item['name'], 'price': item['price'],
'changesPercentage': item['changesPercentage'], 'marketCap': item['marketCap']}
for item in data[:5]
]
market_movers = {'gainers': gainers, 'losers': losers}
except:
market_movers = {}
data = {
'marketMovers': market_movers,
'marketStatus': market_status,
'optionsFlow': options_flow,
'recentEarnings': recent_earnings,
'upcomingEarnings': upcoming_earnings,
'analystReport': recent_analyst_report,
}
if len(data) > 0:
await save_json(data)
try:
con = sqlite3.connect('stocks.db')
etf_con = sqlite3.connect('etf.db')
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks")
stock_symbols = [row[0] for row in cursor.fetchall()]
etf_cursor = etf_con.cursor()
etf_cursor.execute("PRAGMA journal_mode = wal")
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
total_symbols = stock_symbols+etf_symbols
asyncio.run(run())
con.close()
etf_con.close()
except Exception as e:
print(e)