diff --git a/app/cron_financial_statements.py b/app/cron_financial_statements.py new file mode 100644 index 0000000..52d654d --- /dev/null +++ b/app/cron_financial_statements.py @@ -0,0 +1,73 @@ +import os +import ujson +import random +import asyncio +import aiohttp +import sqlite3 +from tqdm import tqdm +from dotenv import load_dotenv + +load_dotenv() +api_key = os.getenv('FMP_API_KEY') + +# Configurations +include_current_quarter = False +max_concurrent_requests = 100 # Limit concurrent requests + +async def fetch_data(session, url, symbol, attempt=0): + try: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + return data + else: + print(f"Error fetching data for {symbol}: HTTP {response.status}") + return None + except Exception as e: + print(f"Exception during fetching data for {symbol}: {e}") + return None + +async def save_json(symbol, period, data_type, data): + os.makedirs(f"json/financial-statements/{data_type}/{period}/", exist_ok=True) + with open(f"json/financial-statements/{data_type}/{period}/{symbol}.json", 'w') as file: + ujson.dump(data, file) + +async def get_financial_statements(session, symbol, semaphore, request_counter): + base_url = "https://financialmodelingprep.com/api/v3" + periods = ['quarter', 'annual'] + data_types = ['income-statement', 'balance-sheet-statement', 'cash-flow-statement', 'ratios'] + + async with semaphore: + for period in periods: + for data_type in data_types: + url = f"{base_url}/{data_type}/{symbol}?period={period}&apikey={api_key}" + data = await fetch_data(session, url, symbol) + if data: + await save_json(symbol, period, data_type, data) + + request_counter[0] += 1 # Increment the request counter + if request_counter[0] >= 1000: + await asyncio.sleep(60) # Pause for 60 seconds + request_counter[0] = 0 # Reset the request counter after the pause + +async def run(): + con = sqlite3.connect('stocks.db') + cursor = con.cursor() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") + symbols = [row[0] for row in cursor.fetchall()] + con.close() + + semaphore = asyncio.Semaphore(max_concurrent_requests) + request_counter = [0] # Using a list to keep a mutable counter across async tasks + + async with aiohttp.ClientSession() as session: + tasks = [] + for symbol in tqdm(symbols): + task = asyncio.create_task(get_financial_statements(session, symbol, semaphore, request_counter)) + tasks.append(task) + + await asyncio.gather(*tasks) + +if __name__ == "__main__": + asyncio.run(run()) \ No newline at end of file diff --git a/app/main.py b/app/main.py index 3b782a4..c2c07d7 100755 --- a/app/main.py +++ b/app/main.py @@ -710,33 +710,39 @@ async def stock_income(data: TickerData, api_key: str = Security(get_api_key)): cache_key = f"stock-income-{ticker}" cached_result = redis_client.get(cache_key) + if cached_result: - return orjson.loads(cached_result) + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) - query_template = """ - SELECT - income, income_growth - FROM - stocks - WHERE - symbol = ? - """ try: - with db_connection(STOCK_DB) as cursor: - cursor.execute(query_template, (ticker,)) - result = cursor.fetchone() - if result: - income_statement = orjson.loads(result[0]) - income_statement_growth = orjson.loads(result[1]) - res = clean_financial_data(income_statement, income_statement_growth) - else: - res = [] + with open(f"json/financial-statements/income-statement/quarter/{ticker}.json", 'rb') as file: + quarter_res = orjson.loads(file.read()) except: - res = [] + quarter_res = [] - redis_client.set(cache_key, orjson.dumps(res)) - redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 hour - return res + try: + with open(f"json/financial-statements/income-statement/annual/{ticker}.json", 'rb') as file: + annual_res = orjson.loads(file.read()) + except: + annual_res = [] + + res = {'quarter': quarter_res, 'annual': annual_res} + + res = orjson.dumps(res) + compressed_data = gzip.compress(res) + + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 3600 * 24) # Set cache expiration time to 1 day + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) @app.post("/stock-balance-sheet") async def stock_balance_sheet(data: TickerData, api_key: str = Security(get_api_key)): @@ -745,33 +751,39 @@ async def stock_balance_sheet(data: TickerData, api_key: str = Security(get_api_ cache_key = f"stock-balance-sheet-{ticker}" cached_result = redis_client.get(cache_key) + if cached_result: - return orjson.loads(cached_result) + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) - query_template = """ - SELECT - balance, balance_growth - FROM - stocks - WHERE - symbol = ? - """ try: - with db_connection(STOCK_DB) as cursor: - cursor.execute(query_template, (ticker,)) - result = cursor.fetchone() - if result: - balance_statement = orjson.loads(result[0]) - balance_statement_growth = orjson.loads(result[1]) - res = clean_financial_data(balance_statement, balance_statement_growth) - else: - res = [] + with open(f"json/financial-statements/balance-sheet-statement/quarter/{ticker}.json", 'rb') as file: + quarter_res = orjson.loads(file.read()) except: - res = [] + quarter_res = [] - redis_client.set(cache_key, orjson.dumps(res)) - redis_client.expire(cache_key, 3600*3600) - return res + try: + with open(f"json/financial-statements/balance-sheet-statement/annual/{ticker}.json", 'rb') as file: + annual_res = orjson.loads(file.read()) + except: + annual_res = [] + + res = {'quarter': quarter_res, 'annual': annual_res} + + res = orjson.dumps(res) + compressed_data = gzip.compress(res) + + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 3600 * 24) # Set cache expiration time to 1 day + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) @app.post("/stock-ratios") async def stock_ratios(data: TickerData, api_key: str = Security(get_api_key)): @@ -780,27 +792,39 @@ async def stock_ratios(data: TickerData, api_key: str = Security(get_api_key)): cache_key = f"stock-ratios-{ticker}" cached_result = redis_client.get(cache_key) - if cached_result: - return orjson.loads(cached_result) - query_template = """ - SELECT - ratios - FROM - stocks - WHERE - symbol = ? - """ + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) try: - df = pd.read_sql_query(query_template,con, params=(ticker,)) - res = orjson.loads(df['ratios'].iloc[0]) + with open(f"json/financial-statements/ratios/quarter/{ticker}.json", 'rb') as file: + quarter_res = orjson.loads(file.read()) except: - res = [] + quarter_res = [] - redis_client.set(cache_key, orjson.dumps(res)) - redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 hour - return res + try: + with open(f"json/financial-statements/ratios/annual/{ticker}.json", 'rb') as file: + annual_res = orjson.loads(file.read()) + except: + annual_res = [] + + res = {'quarter': quarter_res, 'annual': annual_res} + + res = orjson.dumps(res) + compressed_data = gzip.compress(res) + + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 3600 * 24) # Set cache expiration time to 1 day + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) @app.post("/stock-cash-flow") @@ -810,29 +834,40 @@ async def stock_cash_flow(data: TickerData, api_key: str = Security(get_api_key) cache_key = f"stock-cash-flow-{ticker}" cached_result = redis_client.get(cache_key) - if cached_result: - return orjson.loads(cached_result) - query_template = """ - SELECT - cashflow, cashflow_growth - FROM - stocks - WHERE - symbol = ? - """ + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) try: - df = pd.read_sql_query(query_template,con, params=(ticker,)) - cash_flow_statement = orjson.loads(df['cashflow'].iloc[0]) - cash_flow_statement_growth = orjson.loads(df['cashflow_growth'].iloc[0]) - res = clean_financial_data(cash_flow_statement,cash_flow_statement_growth) + with open(f"json/financial-statements/cash-flow-statement/quarter/{ticker}.json", 'rb') as file: + quarter_res = orjson.loads(file.read()) except: - res = [] + quarter_res = [] + + try: + with open(f"json/financial-statements/cash-flow-statement/annual/{ticker}.json", 'rb') as file: + annual_res = orjson.loads(file.read()) + except: + annual_res = [] + + res = {'quarter': quarter_res, 'annual': annual_res} + + res = orjson.dumps(res) + compressed_data = gzip.compress(res) + + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 3600 * 24) # Set cache expiration time to 1 day + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) - redis_client.set(cache_key, orjson.dumps(res)) - redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 hour - return res diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index a1486a9..1c0f6ef 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -464,6 +464,14 @@ def run_tracker(): command = base_command + [source, f"root@{useast_ip_address}:{dest}"] run_command(command) +def run_financial_statements(): + run_command(["python3", "cron_financial_statements.py"]) + command = [ + "sudo", "rsync", "-avz", "-e", "ssh", + "/root/backend/app/json/financial-statements", + f"root@{useast_ip_address}:/root/backend/app/json" + ] + run_command(command) # Create functions to run each schedule in a separate thread def run_threaded(job_func): @@ -481,6 +489,8 @@ schedule.every().day.at("06:30").do(run_threaded, run_pocketbase).tag('pocketbas schedule.every().day.at("07:00").do(run_threaded, run_ta_rating).tag('ta_rating_job') schedule.every().day.at("07:30").do(run_threaded, run_government_contract).tag('government_contract_job') +schedule.every().day.at("07:30").do(run_threaded, run_financial_statements).tag('financial_statements_job') + schedule.every().day.at("08:00").do(run_threaded, run_cron_insider_trading).tag('insider_trading_job') schedule.every().day.at("09:00").do(run_threaded, run_congress_trading).tag('congress_job')