From b49928fe747e9813f21296c46eb88892de000d3b Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Wed, 11 Sep 2024 14:31:41 +0200 Subject: [PATCH] add earnings cron job --- app/cron_earnings.py | 114 ++++++++++++++++++++++++++++++++++++++ app/cron_next_earnings.py | 70 ----------------------- app/main.py | 22 +++++++- app/primary_cron_job.py | 10 ++++ 4 files changed, 144 insertions(+), 72 deletions(-) create mode 100644 app/cron_earnings.py delete mode 100644 app/cron_next_earnings.py diff --git a/app/cron_earnings.py b/app/cron_earnings.py new file mode 100644 index 0000000..bbd4d93 --- /dev/null +++ b/app/cron_earnings.py @@ -0,0 +1,114 @@ +import aiohttp +import aiofiles +import ujson +import sqlite3 +import asyncio +import os +from dotenv import load_dotenv +from datetime import datetime, timedelta +from tqdm import tqdm + +headers = {"accept": "application/json"} +url = "https://api.benzinga.com/api/v2.1/calendar/earnings" +load_dotenv() +api_key = os.getenv('BENZINGA_API_KEY_EXTRA') + +today = datetime.today() +N_days_ago = today - timedelta(days=10) + +# Function to delete all files in a directory +def delete_files_in_directory(directory): + for filename in os.listdir(directory): + file_path = os.path.join(directory, filename) + try: + if os.path.isfile(file_path): + os.remove(file_path) + except Exception as e: + print(f"Failed to delete {file_path}. Reason: {e}") + +async def save_json(data, symbol, dir_path): + file_path = os.path.join(dir_path, f"{symbol}.json") + async with aiofiles.open(file_path, 'w') as file: + await file.write(ujson.dumps(data)) + +async def get_data(session, ticker): + querystring = {"token": api_key, "parameters[tickers]": ticker} + try: + async with session.get(url, params=querystring, headers=headers) as response: + if response.status == 200: + data = ujson.loads(await response.text())['earnings'] + + # Filter for future earnings + future_dates = [item for item in data if datetime.strptime(item["date"], "%Y-%m-%d") > today] + if future_dates: + nearest_future = min(future_dates, key=lambda x: datetime.strptime(x["date"], "%Y-%m-%d")) + try: + symbol = nearest_future['ticker'] + time = nearest_future['time'] + date = nearest_future['date'] + eps_prior = float(nearest_future['eps_prior']) if nearest_future['eps_prior'] else 0 + eps_est = float(nearest_future['eps_est']) if nearest_future['eps_est'] else 0 + revenue_est = float(nearest_future['revenue_est']) if nearest_future['revenue_est'] else 0 + revenue_prior = float(nearest_future['revenue_prior']) if nearest_future['revenue_prior'] else 0 + if revenue_est and revenue_prior and eps_prior and eps_est: + res_list = { + 'date': date, + 'time': time, + 'epsPrior': eps_prior, + 'epsEst': eps_est, + 'revenuePrior': revenue_prior, + 'revenueEst': revenue_est + } + await save_json(res_list, symbol, 'json/earnings/next') + except KeyError: + pass + + # Filter for past earnings within the last 20 days + recent_dates = [item for item in data if N_days_ago <= datetime.strptime(item["date"], "%Y-%m-%d") <= today] + if recent_dates: + nearest_recent = min(recent_dates, key=lambda x: datetime.strptime(x["date"], "%Y-%m-%d")) + try: + date = nearest_recent['date'] + eps_prior = float(nearest_recent['eps_prior']) if nearest_recent['eps_prior'] != '' else 0 + eps_surprise = float(nearest_recent['eps_surprise']) if nearest_recent['eps_surprise'] != '' else 0 + eps = float(nearest_recent['eps']) if nearest_recent['eps'] != '' else 0 + revenue_prior = float(nearest_recent['revenue_prior']) if nearest_recent['revenue_prior'] != '' else 0 + revenue_surprise = float(nearest_recent['revenue_surprise']) if nearest_recent['revenue_surprise'] != '' else 0 + revenue = float(nearest_recent['revenue']) if nearest_recent['revenue'] != '' else 0 + if revenue != 0 and revenue_prior != 0 and eps_prior != 0 and eps != 0 and revenue_surprise != 0 and eps_surprise != 0: + res_list = { + 'epsPrior':eps_prior, + 'epsSurprise': eps_surprise, + 'eps': eps, + 'revenuePrior': revenue_prior, + 'revenueSurprise': revenue_surprise, + 'revenue': revenue, + 'date': date, + } + await save_json(res_list, symbol, 'json/earnings/surprise') + except: + pass + except Exception as e: + pass + +async def run(stock_symbols): + async with aiohttp.ClientSession() as session: + tasks = [get_data(session, symbol) for symbol in stock_symbols] + for f in tqdm(asyncio.as_completed(tasks), total=len(stock_symbols)): + await f + +try: + # Delete old files in the directories + delete_files_in_directory("json/earnings/next") + delete_files_in_directory("json/earnings/surprise") + + con = sqlite3.connect('stocks.db') + cursor = con.cursor() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") + stock_symbols = [row[0] for row in cursor.fetchall()] + con.close() + asyncio.run(run(stock_symbols)) + +except Exception as e: + print(e) diff --git a/app/cron_next_earnings.py b/app/cron_next_earnings.py deleted file mode 100644 index bb213de..0000000 --- a/app/cron_next_earnings.py +++ /dev/null @@ -1,70 +0,0 @@ -import aiohttp -import aiofiles -import ujson -import sqlite3 -import asyncio -import os -from dotenv import load_dotenv -from datetime import datetime -from tqdm import tqdm - -headers = {"accept": "application/json"} -url = "https://api.benzinga.com/api/v2.1/calendar/earnings" -load_dotenv() -benzinga_api_key_extra = os.getenv('BENZINGA_API_KEY_EXTRA') - -today = datetime.today() - -async def save_json(data, symbol): - async with aiofiles.open(f"json/next-earnings/companies/{symbol}.json", 'w') as file: - await file.write(ujson.dumps(data)) - -async def get_data(session, ticker): - querystring = {"token": benzinga_api_key_extra, "parameters[tickers]": ticker} - try: - async with session.get(url, params=querystring, headers=headers) as response: - if response.status == 200: - data = ujson.loads(await response.text())['earnings'] - future_dates = [item for item in data if datetime.strptime(item["date"], "%Y-%m-%d") > today] - if future_dates: - data = min(future_dates, key=lambda x: datetime.strptime(x["date"], "%Y-%m-%d")) - try: - symbol = data['ticker'] - time = data['time'] - date = data['date'] - eps_prior = float(data['eps_prior']) if data['eps_prior'] else 0 - eps_est = float(data['eps_est']) if data['eps_est'] else 0 - revenue_est = float(data['revenue_est']) if data['revenue_est'] else 0 - revenue_prior = float(data['revenue_prior']) if data['revenue_prior'] else 0 - if revenue_est and revenue_prior and eps_prior and eps_est: - res_list = { - 'date': date, - 'time': time, - 'epsPrior': eps_prior, - 'epsEst': eps_est, - 'revenuePrior': revenue_prior, - 'revenueEst': revenue_est - } - await save_json(res_list, symbol) - except KeyError: - pass - except Exception as e: - pass - -async def run(stock_symbols): - async with aiohttp.ClientSession() as session: - tasks = [get_data(session, symbol) for symbol in stock_symbols] - for f in tqdm(asyncio.as_completed(tasks), total=len(stock_symbols)): - await f - -try: - con = sqlite3.connect('stocks.db') - cursor = con.cursor() - cursor.execute("PRAGMA journal_mode = wal") - cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") - stock_symbols = [row[0] for row in cursor.fetchall()] - con.close() - asyncio.run(run(stock_symbols)) - -except Exception as e: - print(e) diff --git a/app/main.py b/app/main.py index 7304e4e..512986f 100755 --- a/app/main.py +++ b/app/main.py @@ -3587,14 +3587,14 @@ async def get_economic_indicator(api_key: str = Security(get_api_key)): ) @app.post("/next-earnings") -async def get_economic_indicator(data:TickerData, api_key: str = Security(get_api_key)): +async def get_next_earnings(data:TickerData, api_key: str = Security(get_api_key)): ticker = data.ticker cache_key = f"next-earnings-{ticker}" cached_result = redis_client.get(cache_key) if cached_result: return orjson.loads(cached_result) try: - with open(f"json/next-earnings/companies/{ticker}.json", 'rb') as file: + with open(f"json/earnings/next/{ticker}.json", 'rb') as file: res = orjson.loads(file.read()) except: res = {} @@ -3604,6 +3604,24 @@ async def get_economic_indicator(data:TickerData, api_key: str = Security(get_ap return res +@app.post("/earnings-surprise") +async def get_surprise_earnings(data:TickerData, api_key: str = Security(get_api_key)): + ticker = data.ticker + cache_key = f"earnings-surprise{ticker}" + cached_result = redis_client.get(cache_key) + if cached_result: + return orjson.loads(cached_result) + try: + with open(f"json/earnings/surprise/{ticker}.json", 'rb') as file: + res = orjson.loads(file.read()) + except: + res = {} + + redis_client.set(cache_key, orjson.dumps(res)) + redis_client.expire(cache_key,15*60) + + return res + @app.get("/newsletter") async def get_newsletter(): try: diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index 4b312ff..2e37c59 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -556,6 +556,15 @@ def run_dividends(): ] run_command(command) +def run_earnings(): + run_command(["python3", "cron_earnings.py"]) + command = [ + "sudo", "rsync", "-avz", "-e", "ssh", + "/root/backend/app/json/earnings", + f"root@{useast_ip_address}:/root/backend/app/json" + ] + run_command(command) + def run_economy_indicator(): run_command(["python3", "cron_economic_indicator.py"]) command = [ @@ -625,6 +634,7 @@ schedule.every(10).minutes.do(run_threaded, run_tracker).tag('tracker_job') schedule.every(1).minutes.do(run_threaded, run_cron_quote).tag('quote_job') schedule.every(1).minutes.do(run_threaded, run_cron_price_alert).tag('price_alert_job') schedule.every(15).minutes.do(run_threaded, run_market_moods).tag('market_moods_job') +schedule.every(30).minutes.do(run_threaded, run_earnings).tag('earnings_job') #schedule.every(10).minutes.do(run_threaded, run_dark_pool_flow).tag('dark_pool_flow_job') schedule.every(2).hours.do(run_threaded, run_fda_calendar).tag('fda_calendar_job')