diff --git a/app/create_stock_db.py b/app/create_stock_db.py index 58c59f5..4a27b27 100755 --- a/app/create_stock_db.py +++ b/app/create_stock_db.py @@ -101,7 +101,6 @@ class StockDatabase: f"https://financialmodelingprep.com/api/v3/profile/{symbol}?apikey={api_key}", f"https://financialmodelingprep.com/api/v3/quote/{symbol}?apikey={api_key}", f"https://financialmodelingprep.com/stable/dividends?symbol={symbol}&apikey={api_key}", - f"https://financialmodelingprep.com/stable/employee-count?symbol={symbol}&apikey={api_key}", f"https://financialmodelingprep.com/api/v3/historical-price-full/stock_split/{symbol}?apikey={api_key}", f"https://financialmodelingprep.com/api/v4/stock_peers?symbol={symbol}&apikey={api_key}", f"https://financialmodelingprep.com/stable/institutional-ownership/extract-analytics/holder?symbol={symbol}&year={year}&quarter={quarter}&apikey={api_key}", @@ -156,9 +155,6 @@ class StockDatabase: elif "dividends" in url: # Handle list response, save as JSON object fundamental_data['stock_dividend'] = ujson.dumps(parsed_data) - elif "employee-count" in url: - # Handle list response, save as JSON object - fundamental_data['history_employee_count'] = ujson.dumps(parsed_data) elif "stock_split" in url: # Handle list response, save as JSON object fundamental_data['stock_split'] = ujson.dumps(parsed_data['historical']) diff --git a/app/cron_enterprise_values.py b/app/cron_enterprise_values.py index d75c05f..d593484 100755 --- a/app/cron_enterprise_values.py +++ b/app/cron_enterprise_values.py @@ -13,7 +13,11 @@ api_key = os.getenv('FMP_API_KEY') async def save_json_data(symbol, data): - async with aiofiles.open(f"json/enterprise-values/{symbol}.json", 'w') as file: + folder_path = "json/enterprise-values" + os.makedirs(folder_path, exist_ok=True) # Ensure the folder exists + + file_path = f"{folder_path}/{symbol}.json" + async with aiofiles.open(file_path, 'w') as file: await file.write(ujson.dumps(data)) async def get_data(symbols, session): @@ -25,7 +29,8 @@ async def get_data(symbols, session): if len(responses) > 0: for symbol, response in zip(symbols, responses): - await save_json_data(symbol, response) + if response: + await save_json_data(symbol, response) async def replace_date_with_fiscal_year(data): res_list = [] @@ -43,7 +48,7 @@ async def get_endpoints(symbol, session): data = [] try: # Form API request URLs - url= f"https://financialmodelingprep.com/api/v3/enterprise-values/{symbol}/?period=annual&apikey={api_key}" + url= f"https://financialmodelingprep.com/stable/enterprise-values?symbol={symbol}&apikey={api_key}" async with session.get(url) as response: data = [] diff --git a/app/cron_historical_employees.py b/app/cron_historical_employees.py new file mode 100644 index 0000000..ffa9341 --- /dev/null +++ b/app/cron_historical_employees.py @@ -0,0 +1,93 @@ +import ujson +import asyncio +import aiohttp +import aiofiles +import sqlite3 +import pandas as pd + +from dotenv import load_dotenv +import os + +load_dotenv() +api_key = os.getenv('FMP_API_KEY') + + +async def save_json_data(symbol, data): + folder_path = "json/historical-employees" + os.makedirs(folder_path, exist_ok=True) # Ensure the folder exists + + file_path = f"{folder_path}/{symbol}.json" + async with aiofiles.open(file_path, 'w') as file: + await file.write(ujson.dumps(data)) + +async def get_data(symbols, session): + tasks = [] + for symbol in symbols: + task = asyncio.create_task(get_endpoints(symbol, session)) + tasks.append(task) + responses = await asyncio.gather(*tasks) + + if len(responses) > 0: + for symbol, response in zip(symbols, responses): + if response: + await save_json_data(symbol, response) + +async def get_endpoints(symbol, session): + data = [] + try: + url= f"https://financialmodelingprep.com/stable/employee-count?symbol={symbol}&apikey={api_key}" + + async with session.get(url) as response: + data = [] + data = await response.json() + + res = [] + + for item in data: + try: + res.append({'date': item['periodOfReport'], 'employeeCount': item['employeeCount']}) + except: + pass + + return res + + + except Exception as e: + print(f"Failed to fetch data for {symbol}: {e}") + + return data + + + +async def run(): + try: + con = sqlite3.connect('stocks.db') + cursor = con.cursor() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute("SELECT DISTINCT symbol FROM stocks") + stock_symbols = [row[0] for row in cursor.fetchall()] + con.close() + + total_symbols = stock_symbols + chunk_size = 1000 + + except Exception as e: + print(f"Failed to fetch symbols: {e}") + return + + try: + connector = aiohttp.TCPConnector(limit=100) # Adjust the limit as needed + async with aiohttp.ClientSession(connector=connector) as session: + for i in range(0, len(total_symbols), chunk_size): + symbols_chunk = total_symbols[i:i + chunk_size] + await get_data(symbols_chunk, session) + print('sleeping for 60 sec') + await asyncio.sleep(60) # Wait for 60 seconds between chunks + except Exception as e: + print(f"Failed to run fetch and save data: {e}") + +try: + asyncio.run(run()) +except Exception as e: + print(e) + diff --git a/app/main.py b/app/main.py index 3af3f81..205278d 100755 --- a/app/main.py +++ b/app/main.py @@ -881,35 +881,38 @@ async def stock_dividend(data: TickerData, api_key: str = Security(get_api_key)) redis_client.expire(cache_key, 60) return res -@app.post("/history-employees") -async def history_employees(data: TickerData, api_key: str = Security(get_api_key)): - data = data.dict() - ticker = data['ticker'].upper() +@app.post("/historical-employees") +async def economic_calendar(data:TickerData, api_key: str = Security(get_api_key)): - cache_key = f"history-employees-{ticker}" + ticker = data.ticker.upper() + + cache_key = f"historical-employees-{ticker}" cached_result = redis_client.get(cache_key) if cached_result: - return orjson.loads(cached_result) + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) - query_template = """ - SELECT - history_employee_count - FROM - stocks - WHERE - symbol = ? - """ - - df = pd.read_sql_query(query_template,con, params=(ticker,)) try: - history_employee_count = orjson.loads(df['history_employee_count'].iloc[0]) - res = sorted([entry for entry in history_employee_count if entry["employeeCount"] != 0], key=lambda x: x["filingDate"]) + with open(f"json/historical-employees/{ticker}.json", 'rb') as file: + res = orjson.loads(file.read()) except: res = [] - redis_client.set(cache_key, orjson.dumps(res)) - redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 hour - return res + res = orjson.dumps(res) + compressed_data = gzip.compress(res) + + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 60 * 15) # Set cache expiration time to 1 day + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + @app.post("/stock-income") async def stock_income(data: TickerData, api_key: str = Security(get_api_key)): diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index f6c183e..6ea4c81 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -102,6 +102,11 @@ def run_options_jobs(): run_command(["python3", "cron_options_gex_dex.py"]) run_command(["python3", "cron_options_contract_lookup.py"]) +def run_historical_employees(): + now = datetime.now(ny_tz) + week = now.weekday() + if week <= 5: + run_command(["python3", "cron_historical_employees.py"]) def run_cron_insider_trading(): week = datetime.today().weekday() @@ -355,6 +360,7 @@ def run_threaded(job_func): schedule.every().day.at("01:00").do(run_threaded, run_db_schedule_job) schedule.every().day.at("22:30").do(run_threaded, run_options_jobs).tag('options_job') +schedule.every().day.at("22:30").do(run_threaded, run_historical_employees).tag('employees_job') schedule.every().day.at("05:00").do(run_threaded, run_options_historical_flow).tag('options_historical_flow_job')