From f1ddcd2003a8e522bcf582afabb2cceaac118b99 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Tue, 24 Sep 2024 18:50:52 +0200 Subject: [PATCH] add fomc impact cron job --- app/cron_fomc_impact.py | 155 ++++++++++++++++++++++++++++++++++++++++ app/main.py | 32 ++++++++- 2 files changed, 186 insertions(+), 1 deletion(-) create mode 100644 app/cron_fomc_impact.py diff --git a/app/cron_fomc_impact.py b/app/cron_fomc_impact.py new file mode 100644 index 0000000..9a2528a --- /dev/null +++ b/app/cron_fomc_impact.py @@ -0,0 +1,155 @@ +from datetime import datetime, timedelta +import ujson +import asyncio +import aiohttp +import os +from dotenv import load_dotenv +import sqlite3 +import pandas as pd +from tqdm import tqdm + +# Load environment variables +load_dotenv() +api_key = os.getenv('FMP_API_KEY') + + +query_template = """ + SELECT date, close + FROM "{ticker}" + WHERE date BETWEEN ? AND ? +""" + +# Function to save JSON data +async def save_json(symbol, data): + with open(f'json/fomc-impact/companies/{symbol}.json', 'w') as file: + ujson.dump(data, file) + +# Function to fetch data from the API +async def get_data(session, url): + async with session.get(url) as response: + data = await response.json() + return data + +async def get_fomc_data(): + fomc_data = [] + start_date = datetime.now() - timedelta(days=365) + end_date = datetime.now() + + async with aiohttp.ClientSession() as session: + current_date = start_date + while current_date < end_date: + next_date = min(current_date + timedelta(days=10), end_date) + start_str = current_date.strftime('%Y-%m-%d') + end_str = next_date.strftime('%Y-%m-%d') + + url = f"https://financialmodelingprep.com/api/v3/economic_calendar?from={start_str}&to={end_str}&apikey={api_key}" + data = await get_data(session, url) + if data: + # Filter for "FOMC Economic Projections" events + fomc_events = [item for item in data if item.get('event') == "Fed Interest Rate Decision"] + fomc_data.extend(fomc_events) + + # Move to the next 10-day period + current_date = next_date + + filtered_data = [ + { + 'date': item['date'][0:10], + 'changePercentage': item['changePercentage'], + 'previous': item['previous'], + 'actual': item['actual'], + 'estimate': item['estimate'] + } + for item in fomc_data + ] + + filtered_data = sorted(filtered_data, key=lambda x: x['date']) + + return filtered_data + + +async def run(): + fomc_dates = await get_fomc_data() # Assumed to return the list of dictionaries as provided + start_date = datetime.now() - timedelta(days=365) + end_date = datetime.now() + + # Extracting the dates for filtering + fomc_dates_list = [datetime.strptime(fomc['date'], '%Y-%m-%d').date() for fomc in fomc_dates] + + # Connect to SQLite databases + stock_con = sqlite3.connect('stocks.db') + etf_con = sqlite3.connect('etf.db') + + stock_cursor = stock_con.cursor() + stock_cursor.execute("PRAGMA journal_mode = wal") + stock_cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%' AND marketCap >= 500E6") + stock_symbols = [row[0] for row in stock_cursor.fetchall()] + + etf_cursor = etf_con.cursor() + etf_cursor.execute("PRAGMA journal_mode = wal") + etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") + etf_symbols = [row[0] for row in etf_cursor.fetchall()] + + total_symbols = stock_symbols + etf_symbols + for ticker in tqdm(total_symbols): + try: + query = query_template.format(ticker=ticker) + connection = stock_con if ticker in stock_symbols else etf_con + df_price = pd.read_sql_query(query, connection, params=(start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d'))) + + if len(df_price) > 150 and len(fomc_dates) > 0: + # Convert 'date' column in df_price to datetime.date for comparison + df_price['date'] = pd.to_datetime(df_price['date']).dt.date + + # Filter out every fifth row, unless the date is in fomc_dates + filtered_df = df_price[ + (df_price.index % 5 != 0) | (df_price['date'].isin(fomc_dates_list)) + ] + + filtered_df['date'] = filtered_df['date'].apply(lambda x: x.strftime('%Y-%m-%d')) + + # Prepare the result with filtered data and original fomc_dates + fomc_data_unique = {} + for fomc in fomc_dates: + date = fomc['date'] + if date not in fomc_data_unique: # Check for duplicates + fomc_data_unique[date] = { + 'date': date, + 'changePercentage': fomc['changePercentage'], + 'previous': fomc['previous'], + 'actual': fomc['actual'], + 'estimate': fomc['estimate'] + } + + # Convert the unique FOMC data back to a list + res = { + 'fomcData': list(fomc_data_unique.values()), # Ensure unique dates + 'history': filtered_df.to_dict('records') + } + + # Compute percentage changes for FOMC dates + for i in range(len(res['fomcData']) - 1): + current_fomc_date = res['fomcData'][i]['date'] + next_fomc_date = res['fomcData'][i + 1]['date'] + + # Find closing prices for the current and next FOMC dates + current_price_row = filtered_df[filtered_df['date'] == current_fomc_date] + next_price_row = filtered_df[filtered_df['date'] == next_fomc_date] + + if not current_price_row.empty and not next_price_row.empty: + current_price = current_price_row['close'].values[0] + next_price = next_price_row['close'].values[0] + + # Calculate the percentage change + percentage_change = ((next_price - current_price) / current_price) * 100 + res['fomcData'][i]['changePercentage'] = round(percentage_change,2) # Update with the new change percentage + + await save_json(ticker, res) + except Exception as e: + print(f"Error processing {ticker}: {e}") + + + +# Run the asyncio event loop +loop = asyncio.get_event_loop() +loop.run_until_complete(run()) diff --git a/app/main.py b/app/main.py index c3762cc..d9ea191 100755 --- a/app/main.py +++ b/app/main.py @@ -3899,6 +3899,36 @@ async def get_info_text(data:InfoText, api_key: str = Security(get_api_key)): return res +@app.post("/fomc-impact") +async def get_fomc_impact(data: TickerData, api_key: str = Security(get_api_key)): + ticker = data.ticker + + cache_key = f"fomc-impact-{ticker}" + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + try: + with open(f"json/fomc-impact/companies/{ticker}.json", 'rb') as file: + res = orjson.loads(file.read()) + except: + res = {} + + data = orjson.dumps(res) + compressed_data = gzip.compress(data) + + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key,3600*3600) + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + @app.get("/newsletter") async def get_newsletter(): try: @@ -3906,4 +3936,4 @@ async def get_newsletter(): res = orjson.loads(file.read()) except: res = [] - return res \ No newline at end of file + return res