From f58997f8b77ae664627efaf785e0b5359da60d5d Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Sun, 12 Jan 2025 22:47:01 +0100 Subject: [PATCH] bugfixing --- app/cron_earnings.py | 80 +------------- app/cron_earnings_price_action.py | 172 ++++++++++++++++++++++++++++++ app/primary_cron_job.py | 6 +- 3 files changed, 176 insertions(+), 82 deletions(-) create mode 100644 app/cron_earnings_price_action.py diff --git a/app/cron_earnings.py b/app/cron_earnings.py index 4dabb61..fd92cb0 100644 --- a/app/cron_earnings.py +++ b/app/cron_earnings.py @@ -21,12 +21,6 @@ today = datetime.now(ny_tz).replace(hour=0, minute=0, second=0, microsecond=0) min_date = ny_tz.localize(datetime.strptime("2015-01-01", "%Y-%m-%d")) N_days_ago = today - timedelta(days=10) -query_template = """ - SELECT date, open, high, low, close - FROM "{ticker}" - WHERE date >= ? -""" - def check_existing_file(ticker, folder_name): file_path = f"json/earnings/{folder_name}/{ticker}.json" @@ -58,78 +52,6 @@ async def save_json(data, symbol, dir_path): async with aiofiles.open(file_path, 'w') as file: await file.write(ujson.dumps(data)) -async def get_past_data(data, ticker, con): - # Filter data based on date constraints - filtered_data = [] - for item in data: - try: - item_date = ny_tz.localize(datetime.strptime(item["date"], "%Y-%m-%d")) - if min_date <= item_date <= today: - filtered_data.append( - { - 'revenue': float(item['revenue']), - 'revenueEst': float(item['revenue_est']), - 'revenueSurprisePercent': round(float(item['revenue_surprise_percent'])*100, 2), - 'eps': round(float(item['eps']), 2), - 'epsEst': round(float(item['eps_est']), 2), - 'epsSurprisePercent': round(float(item['eps_surprise_percent'])*100, 2), - 'year': item['period_year'], - 'quarter': item['period'], - 'date': item['date'] - } - ) - except: - pass - - # Sort the filtered data by date - if len(filtered_data) > 0: - filtered_data.sort(key=lambda x: x['date'], reverse=True) - - try: - # Load the price history data - with open(f"json/historical-price/max/{ticker}.json") as file: - price_history = orjson.loads(file.read()) - - # Convert price_history dates to datetime objects for easy comparison - price_history_dict = { - datetime.strptime(item['time'], "%Y-%m-%d"): item for item in price_history - } - - # Calculate volatility for each earnings release - for entry in filtered_data: - earnings_date = datetime.strptime(entry['date'], "%Y-%m-%d") - volatility_prices = [] - - # Collect prices from (X-2) to (X+1) - for i in range(-2, 2): - current_date = earnings_date + timedelta(days=i) - if current_date in price_history_dict: - volatility_prices.append(price_history_dict[current_date]) - - # Calculate volatility if we have at least one price entry - if volatility_prices: - high_prices = [day['high'] for day in volatility_prices] - low_prices = [day['low'] for day in volatility_prices] - close_prices = [day['close'] for day in volatility_prices] - - max_high = max(high_prices) - min_low = min(low_prices) - avg_close = sum(close_prices) / len(close_prices) - - # Volatility percentage calculation - volatility = round(((max_high - min_low) / avg_close) * 100, 2) - else: - volatility = None # No data available for volatility calculation - - # Add the volatility to the entry - entry['volatility'] = volatility - - # Save the updated filtered_data - await save_json(filtered_data, ticker, 'json/earnings/past') - - except: - pass - async def get_data(session, ticker, con): querystring = {"token": api_key, "parameters[tickers]": ticker} @@ -137,7 +59,7 @@ async def get_data(session, ticker, con): async with session.get(url, params=querystring, headers=headers) as response: data = ujson.loads(await response.text())['earnings'] - await get_past_data(data, ticker, con) + #await get_past_data(data, ticker, con) # Filter for future earnings future_dates = [item for item in data if ny_tz.localize(datetime.strptime(item["date"], "%Y-%m-%d")) >= today] diff --git a/app/cron_earnings_price_action.py b/app/cron_earnings_price_action.py new file mode 100644 index 0000000..8c52329 --- /dev/null +++ b/app/cron_earnings_price_action.py @@ -0,0 +1,172 @@ +import aiohttp +import aiofiles +import ujson +import orjson +import sqlite3 +import asyncio +import pandas as pd +import time +import os +from dotenv import load_dotenv +from datetime import datetime, timedelta +from tqdm import tqdm +import pytz + +headers = {"accept": "application/json"} +url = "https://api.benzinga.com/api/v2.1/calendar/earnings" +load_dotenv() +api_key = os.getenv('BENZINGA_API_KEY') + +ny_tz = pytz.timezone('America/New_York') +today = datetime.now(ny_tz).replace(hour=0, minute=0, second=0, microsecond=0) +min_date = ny_tz.localize(datetime.strptime("2020-01-01", "%Y-%m-%d")) +N_days_ago = today - timedelta(days=10) + + + +async def save_json(data, symbol, dir_path): + file_path = os.path.join(dir_path, f"{symbol}.json") + async with aiofiles.open(file_path, 'w') as file: + await file.write(ujson.dumps(data)) + + +from datetime import datetime, timedelta +import pytz + +ny_tz = pytz.timezone("America/New_York") + +async def calculate_price_reactions(filtered_data, price_history): + # Ensure price_history is sorted by date + price_history.sort(key=lambda x: datetime.strptime(x['time'], "%Y-%m-%d")) + + # Convert price history to a dictionary for quick lookup + price_dict = {entry['time']: entry for entry in price_history} + + results = [] + + for earnings in filtered_data: + report_date = earnings['date'] + report_datetime = ny_tz.localize(datetime.strptime(report_date, "%Y-%m-%d")) + + # Initialize a dictionary for price reactions + price_reactions = {'date': report_date, 'quarter': earnings['quarter'], 'year': earnings['year']} + + for offset in [0,1,2]: # Days around earnings + # Calculate initial target date with offset + target_date = report_datetime - timedelta(days=offset) + + # Adjust target_date to the latest weekday if it falls on a weekend + if target_date.weekday() == 5: # Saturday + target_date -= timedelta(days=1) # Move to Friday + elif target_date.weekday() == 6: # Sunday + target_date -= timedelta(days=2) # Move to Friday + + target_date_str = target_date.strftime("%Y-%m-%d") + while target_date_str not in price_dict: # Ensure target_date exists in price_dict + target_date -= timedelta(days=1) + target_date_str = target_date.strftime("%Y-%m-%d") + + price_data = price_dict[target_date_str] + + # Find the previous day's price data + previous_date = target_date - timedelta(days=1) + if previous_date.weekday() == 5: # Saturday + previous_date -= timedelta(days=1) # Move to Friday + elif previous_date.weekday() == 6: # Sunday + previous_date -= timedelta(days=2) # Move to Friday + + previous_date_str = previous_date.strftime("%Y-%m-%d") + while previous_date_str not in price_dict: # Ensure previous_date exists in price_dict + previous_date -= timedelta(days=1) + previous_date_str = previous_date.strftime("%Y-%m-%d") + + previous_price_data = price_dict[previous_date_str] + + # Calculate close price and percentage change + price_reactions[f"{offset+1}_days_close"] = price_data['close'] + price_reactions[f"{offset+1}_days_change_percent"] = round( + (price_data['close'] / previous_price_data['close'] - 1) * 100, 2 + ) + + print(target_date_str, previous_date_str) + results.append(price_reactions) + + return results + + + +async def get_past_data(data, ticker, con): + # Filter data based on date constraints + filtered_data = [] + for item in data: + try: + item_date = ny_tz.localize(datetime.strptime(item["date"], "%Y-%m-%d")) + if min_date <= item_date <= today: + filtered_data.append( + { + 'revenue': float(item['revenue']), + 'revenueEst': float(item['revenue_est']), + 'revenueSurprisePercent': round(float(item['revenue_surprise_percent'])*100, 2), + 'eps': round(float(item['eps']), 2), + 'epsEst': round(float(item['eps_est']), 2), + 'epsSurprisePercent': round(float(item['eps_surprise_percent'])*100, 2), + 'year': item['period_year'], + 'quarter': item['period'], + 'date': item['date'] + } + ) + except: + pass + + # Sort the filtered data by date + if len(filtered_data) > 0: + filtered_data.sort(key=lambda x: x['date'], reverse=True) + + try: + # Load the price history data + with open(f"json/historical-price/max/{ticker}.json") as file: + price_history = orjson.loads(file.read()) + + results = await calculate_price_reactions(filtered_data, price_history) + print(filtered_data[0]) + print(results[1]) + # Save the updated filtered_data + #await save_json(filtered_data, ticker, 'json/earnings/past') + + except: + pass + + +async def get_data(session, ticker, con): + querystring = {"token": api_key, "parameters[tickers]": ticker} + try: + async with session.get(url, params=querystring, headers=headers) as response: + data = ujson.loads(await response.text())['earnings'] + + await get_past_data(data, ticker, con) + + except Exception as e: + print(e) + #pass + +async def run(stock_symbols, con): + async with aiohttp.ClientSession() as session: + tasks = [get_data(session, symbol, con) for symbol in stock_symbols] + for f in tqdm(asyncio.as_completed(tasks), total=len(stock_symbols)): + await f + +try: + + con = sqlite3.connect('stocks.db') + cursor = con.cursor() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%' AND symbol NOT LIKE '%-%'") + stock_symbols = [row[0] for row in cursor.fetchall()] + stock_symbols = ['AMD'] + + asyncio.run(run(stock_symbols, con)) + +except Exception as e: + print(e) +finally: + con.close() \ No newline at end of file diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index de7c00e..0e2346d 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -252,11 +252,11 @@ def run_ownership_stats(): run_command(["python3", "cron_ownership_stats.py"]) -def run_options_gex(): +def run_options_historical_flow(): week = datetime.today().weekday() if week <= 5: - run_command(["python3", "cron_options_gex.py"]) run_command(["python3", "cron_options_historical_flow.py"]) + def run_hedge_fund(): @@ -349,7 +349,7 @@ def run_threaded(job_func): schedule.every().day.at("00:00").do(run_threaded, run_options_jobs).tag('options_job') schedule.every().day.at("02:00").do(run_threaded, run_db_schedule_job) -#schedule.every().day.at("05:00").do(run_threaded, run_options_gex).tag('options_gex_job') +schedule.every().day.at("05:00").do(run_threaded, run_options_historical_flow).tag('options_historical_flow_job') schedule.every().day.at("06:00").do(run_threaded, run_historical_price).tag('historical_job')