diff --git a/app/cron_historical_adj_price.py b/app/cron_historical_adj_price.py new file mode 100644 index 0000000..fdd8e26 --- /dev/null +++ b/app/cron_historical_adj_price.py @@ -0,0 +1,73 @@ +from datetime import datetime, timedelta +import ujson +import time +import sqlite3 +import asyncio +import aiohttp +import random +from tqdm import tqdm +from dotenv import load_dotenv +import os + +load_dotenv() +api_key = os.getenv('FMP_API_KEY') + + +today = datetime.today().strftime('%Y-%m-%d') + + +async def save_json(symbol, data): + path = f"json/historical-price/adj" + os.makedirs(path, exist_ok=True) # Create directories if they don't exist + with open(f"{path}/{symbol}.json", 'w') as file: + ujson.dump(data, file) + +def get_symbols(db_name, table_name): + """ + Fetch symbols from the SQLite database + """ + with sqlite3.connect(db_name) as con: + cursor = con.cursor() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute(f"SELECT DISTINCT symbol FROM {table_name} WHERE symbol NOT LIKE '%.%'") + return [row[0] for row in cursor.fetchall()] + +async def get_data(session, symbol): + res_list = [] + start_date = '2000-01-01' + + url = f"https://financialmodelingprep.com/stable/historical-price-eod/dividend-adjusted?symbol={symbol}&from={start_date}&to={today}&apikey={api_key}" + try: + async with session.get(url) as response: + if response.status == 200: + res_list = await response.json() + except Exception as e: + print(f"Error fetching data for {symbol}: {e}") + + if len(res_list) > 0: + await save_json(symbol, res_list) + +async def run(): + stock_symbols = get_symbols('stocks.db', 'stocks') + etf_symbols = get_symbols('etf.db', 'etfs') + index_symbols = ['^SPX','^VIX'] + total_symbols = stock_symbols + etf_symbols + index_symbols + async with aiohttp.ClientSession() as session: + tasks = [] + for i, symbol in enumerate(tqdm(total_symbols), 1): + try: + tasks.append(get_data(session, symbol)) + if i % 500 == 0: + await asyncio.gather(*tasks) + tasks = [] + print(f'sleeping mode: {i}') + await asyncio.sleep(60) # Pause for 60 seconds + except: + pass + + if tasks: + await asyncio.gather(*tasks) + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(run()) \ No newline at end of file diff --git a/app/main.py b/app/main.py index 4e8f8d6..4df4ba0 100755 --- a/app/main.py +++ b/app/main.py @@ -491,6 +491,61 @@ async def rating_stock(data: TickerData, api_key: str = Security(get_api_key)): redis_client.expire(cache_key, 3600*24) # Set cache expiration time to 1 day return res +@app.post("/historical-adj-price") +async def get_stock(data: TickerData, api_key: str = Security(get_api_key)): + ticker = data.ticker.upper() + + cache_key = f"historical-adj-price-{ticker}" + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + + try: + with open(f"json/historical-price/max/{ticker}.json", 'rb') as file: + res = orjson.loads(file.read()) + except Exception as e: + # if file reading fails, initialize to an empty list + res = [] + + try: + with open(f"json/historical-price/adj/{ticker}.json", 'rb') as file: + adj_res = orjson.loads(file.read()) + except Exception as e: + # if file reading fails, initialize to an empty list + adj_res = [] + + # Create a dictionary mapping date (or time) to the corresponding adj price entry. + # Assuming "date" in adj_res corresponds to "time" in res. + adj_by_date = { entry["date"]: entry for entry in adj_res if "date" in entry } + + # Loop over the historical price records and add the adjusted prices if the date matches. + for record in res: + date_key = record.get("time") + if date_key in adj_by_date: + adj_entry = adj_by_date[date_key] + # add adjusted data to record; adjust field names as necessary. + record["adjOpen"] = adj_entry.get("adjOpen") + record["adjHigh"] = adj_entry.get("adjHigh") + record["adjLow"] = adj_entry.get("adjLow") + record["adjClose"] = adj_entry.get("adjClose") + + # Serialize and cache the result. + res_json = orjson.dumps(res) + compressed_data = gzip.compress(res_json) + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 60*60*6) # cache for 24 hours + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + + @app.post("/historical-price") async def get_stock(data: HistoricalPrice, api_key: str = Security(get_api_key)): ticker = data.ticker.upper() diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index e69be8b..920d9a5 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -200,6 +200,7 @@ def run_historical_price(): week = datetime.today().weekday() if week <= 5: run_command(["python3", "cron_historical_price.py"]) + run_command(["python3","cron_historical_adj_price.py"]) def run_one_day_price(): now = datetime.now(ny_tz)