diff --git a/app/cron_historical_price.py b/app/cron_historical_price.py index 4e16a9a..fa56856 100755 --- a/app/cron_historical_price.py +++ b/app/cron_historical_price.py @@ -3,16 +3,81 @@ import asyncio import aiohttp import aiofiles import sqlite3 -from datetime import datetime, timedelta, time +from datetime import datetime, timedelta import pytz import pandas as pd - -from dotenv import load_dotenv import os +from dotenv import load_dotenv load_dotenv() api_key = os.getenv('FMP_API_KEY') +# Helper to ensure directories exist and write JSON files asynchronously +async def write_json(path, data): + os.makedirs(os.path.dirname(path), exist_ok=True) + async with aiofiles.open(path, 'w') as file: + await file.write(ujson.dumps(data)) + +async def get_historical_data(ticker, query_con, session): + try: + # Form API request URLs + url_1w = (f"https://financialmodelingprep.com/stable/historical-chart/5min?" + f"symbol={ticker}&from={start_date_1w}&to={end_date}&apikey={api_key}") + url_1m = (f"https://financialmodelingprep.com/stable/historical-chart/1hour?" + f"symbol={ticker}&from={start_date_1m}&to={end_date}&apikey={api_key}") + + # Fetch both endpoints concurrently + responses = await asyncio.gather( + session.get(url_1w), + session.get(url_1m), + return_exceptions=True + ) + + data = [] + for resp in responses: + if isinstance(resp, Exception): + print(f"Error fetching data for {ticker}: {resp}") + continue + async with resp: + if resp.status != 200: + print(f"Non-200 response for {ticker}: {resp.status}") + continue + else: + json_data = await resp.json() + # Reverse rows so that oldest data comes first and reset the index + df = pd.DataFrame(json_data).iloc[::-1].reset_index(drop=True) + df = df.round(2).rename(columns={"date": "time"}) + data.append(df.to_json(orient="records")) + + # Database queries for additional periods + query_template = """ + SELECT date, open, high, low, close, volume + FROM "{ticker}" + WHERE date BETWEEN ? AND ? + """ + query = query_template.format(ticker=ticker) + df_6m = pd.read_sql_query(query, query_con, params=(start_date_6m, end_date)) + df_6m = df_6m.round(2).rename(columns={"date": "time"}) + df_1y = pd.read_sql_query(query, query_con, params=(start_date_1y, end_date)) + df_1y = df_1y.round(2).rename(columns={"date": "time"}) + df_5y = pd.read_sql_query(query, query_con, params=(start_date_5y, end_date)) + df_5y = df_5y.round(2).rename(columns={"date": "time"}) + df_max = pd.read_sql_query(query, query_con, params=(start_date_max, end_date)) + df_max = df_max.round(2).rename(columns={"date": "time"}) + + # Prepare file-writing tasks + tasks = [ + write_json(f"json/historical-price/one-week/{ticker}.json", ujson.loads(data[0])), + write_json(f"json/historical-price/one-month/{ticker}.json", ujson.loads(data[1])), + write_json(f"json/historical-price/six-months/{ticker}.json", ujson.loads(df_6m.to_json(orient="records"))), + write_json(f"json/historical-price/one-year/{ticker}.json", ujson.loads(df_1y.to_json(orient="records"))), + write_json(f"json/historical-price/five-years/{ticker}.json", ujson.loads(df_5y.to_json(orient="records"))), + write_json(f"json/historical-price/max/{ticker}.json", ujson.loads(df_max.to_json(orient="records"))) + ] + await asyncio.gather(*tasks) + + except Exception as e: + print(f"Failed to fetch data for {ticker}: {e}") async def fetch_and_save_symbols_data(symbols, etf_symbols, index_symbols, session): tasks = [] @@ -26,74 +91,12 @@ async def fetch_and_save_symbols_data(symbols, etf_symbols, index_symbols, sessi task = asyncio.create_task(get_historical_data(symbol, query_con, session)) tasks.append(task) - + # Wait for all tasks in this chunk to complete await asyncio.gather(*tasks) - - -async def get_historical_data(ticker, query_con, session): - try: - # Form API request URLs - - url_1w = f"https://financialmodelingprep.com/stable/historical-chart/5min?symbol={ticker}&from={start_date_1w}&to={end_date}&apikey={api_key}" - url_1m = f"https://financialmodelingprep.com/stable/historical-chart/1hour?symbol={ticker}&from={start_date_1m}&to={end_date}&apikey={api_key}" - - async with session.get(url_1w) as response_1w, session.get(url_1m) as response_1m: - data = [] - for response in [response_1w, response_1m]: - json_data = await response.json() - df = pd.DataFrame(json_data).iloc[::-1].reset_index(drop=True) - ''' - try: - df = df.drop(['volume'], axis=1) - except: - pass - ''' - df = df.round(2).rename(columns={"date": "time"}) - data.append(df.to_json(orient="records")) - - # Database read for 6M, 1Y, MAX data - query_template = """ - SELECT date, open,high,low,close,volume - FROM "{ticker}" - WHERE date BETWEEN ? AND ? - """ - query = query_template.format(ticker=ticker) - df_6m = pd.read_sql_query(query, query_con, params=(start_date_6m, end_date)).round(2).rename(columns={"date": "time"}) - df_1y = pd.read_sql_query(query, query_con, params=(start_date_1y, end_date)).round(2).rename(columns={"date": "time"}) - df_5y = pd.read_sql_query(query, query_con, params=(start_date_5y, end_date)).round(2).rename(columns={"date": "time"}) - df_max = pd.read_sql_query(query, query_con, params=(start_date_max, end_date)).round(2).rename(columns={"date": "time"}) - - async with aiofiles.open(f"json/historical-price/one-week/{ticker}.json", 'w') as file: - res = ujson.loads(data[0]) if data else [] - await file.write(ujson.dumps(res)) - - async with aiofiles.open(f"json/historical-price/one-month/{ticker}.json", 'w') as file: - res = ujson.loads(data[1]) if len(data) > 1 else [] - await file.write(ujson.dumps(res)) - - async with aiofiles.open(f"json/historical-price/six-months/{ticker}.json", 'w') as file: - res = ujson.loads(df_6m.to_json(orient="records")) - await file.write(ujson.dumps(res)) - - async with aiofiles.open(f"json/historical-price/one-year/{ticker}.json", 'w') as file: - res = ujson.loads(df_1y.to_json(orient="records")) - await file.write(ujson.dumps(res)) - - async with aiofiles.open(f"json/historical-price/five-years/{ticker}.json", 'w') as file: - res = ujson.loads(df_5y.to_json(orient="records")) - await file.write(ujson.dumps(res)) - - async with aiofiles.open(f"json/historical-price/max/{ticker}.json", 'w') as file: - res = ujson.loads(df_max.to_json(orient="records")) - await file.write(ujson.dumps(res)) - - except Exception as e: - print(f"Failed to fetch data for {ticker}: {e}") async def run(): - total_symbols = [] - chunk_size = 100 try: + # Prepare symbols list cursor = con.cursor() cursor.execute("PRAGMA journal_mode = wal") cursor.execute("SELECT DISTINCT symbol FROM stocks") @@ -104,42 +107,47 @@ async def run(): etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") etf_symbols = [row[0] for row in etf_cursor.fetchall()] - index_symbols =["^SPX","^VIX"] - total_symbols = stock_symbols + etf_symbols +index_symbols + index_symbols = ["^SPX", "^VIX"] + total_symbols = stock_symbols + etf_symbols + index_symbols except Exception as e: print(f"Failed to fetch symbols: {e}") return + # Process symbols in chunks to avoid overwhelming the API + chunk_size = 100 try: - connector = aiohttp.TCPConnector(limit=300) # Adjust the limit as needed + connector = aiohttp.TCPConnector(limit=100) async with aiohttp.ClientSession(connector=connector) as session: for i in range(0, len(total_symbols), chunk_size): symbols_chunk = total_symbols[i:i + chunk_size] await fetch_and_save_symbols_data(symbols_chunk, etf_symbols, index_symbols, session) - print('sleeping for 30 sec') - await asyncio.sleep(60) + print('Chunk processed; sleeping for 30 seconds...') + await asyncio.sleep(30) except Exception as e: print(f"Failed to run fetch and save data: {e}") -try: - con = sqlite3.connect('stocks.db') - etf_con = sqlite3.connect('etf.db') - index_con = sqlite3.connect('index.db') +if __name__ == "__main__": + try: + # Open SQLite connections + con = sqlite3.connect('stocks.db') + etf_con = sqlite3.connect('etf.db') + index_con = sqlite3.connect('index.db') - berlin_tz = pytz.timezone('Europe/Berlin') - end_date = datetime.now(berlin_tz) - start_date_1w = (end_date - timedelta(days=5)).strftime("%Y-%m-%d") - start_date_1m = (end_date - timedelta(days=30)).strftime("%Y-%m-%d") - start_date_6m = (end_date - timedelta(days=180)).strftime("%Y-%m-%d") - start_date_1y = (end_date - timedelta(days=365)).strftime("%Y-%m-%d") - start_date_5y = (end_date - timedelta(days=365*5)).strftime("%Y-%m-%d") - start_date_max = datetime(1970, 1, 1).strftime("%Y-%m-%d") - end_date = end_date.strftime("%Y-%m-%d") + # Prepare date variables + berlin_tz = pytz.timezone('Europe/Berlin') + now = datetime.now(berlin_tz) + end_date = now.strftime("%Y-%m-%d") + start_date_1w = (now - timedelta(days=5)).strftime("%Y-%m-%d") + start_date_1m = (now - timedelta(days=30)).strftime("%Y-%m-%d") + start_date_6m = (now - timedelta(days=180)).strftime("%Y-%m-%d") + start_date_1y = (now - timedelta(days=365)).strftime("%Y-%m-%d") + start_date_5y = (now - timedelta(days=365*5)).strftime("%Y-%m-%d") + start_date_max = datetime(1970, 1, 1).strftime("%Y-%m-%d") - asyncio.run(run()) - con.close() - etf_con.close() - index_con.close() -except Exception as e: - print(e) + asyncio.run(run()) + con.close() + etf_con.close() + index_con.close() + except Exception as e: + print(e)