add financial statements cron job

This commit is contained in:
MuslemRahimi 2024-08-14 01:27:12 +02:00
parent 6fd82a0f4a
commit cf324c849d
3 changed files with 196 additions and 78 deletions

View File

@ -0,0 +1,73 @@
import os
import ujson
import random
import asyncio
import aiohttp
import sqlite3
from tqdm import tqdm
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('FMP_API_KEY')
# Configurations
include_current_quarter = False
max_concurrent_requests = 100 # Limit concurrent requests
async def fetch_data(session, url, symbol, attempt=0):
try:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return data
else:
print(f"Error fetching data for {symbol}: HTTP {response.status}")
return None
except Exception as e:
print(f"Exception during fetching data for {symbol}: {e}")
return None
async def save_json(symbol, period, data_type, data):
os.makedirs(f"json/financial-statements/{data_type}/{period}/", exist_ok=True)
with open(f"json/financial-statements/{data_type}/{period}/{symbol}.json", 'w') as file:
ujson.dump(data, file)
async def get_financial_statements(session, symbol, semaphore, request_counter):
base_url = "https://financialmodelingprep.com/api/v3"
periods = ['quarter', 'annual']
data_types = ['income-statement', 'balance-sheet-statement', 'cash-flow-statement', 'ratios']
async with semaphore:
for period in periods:
for data_type in data_types:
url = f"{base_url}/{data_type}/{symbol}?period={period}&apikey={api_key}"
data = await fetch_data(session, url, symbol)
if data:
await save_json(symbol, period, data_type, data)
request_counter[0] += 1 # Increment the request counter
if request_counter[0] >= 1000:
await asyncio.sleep(60) # Pause for 60 seconds
request_counter[0] = 0 # Reset the request counter after the pause
async def run():
con = sqlite3.connect('stocks.db')
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
symbols = [row[0] for row in cursor.fetchall()]
con.close()
semaphore = asyncio.Semaphore(max_concurrent_requests)
request_counter = [0] # Using a list to keep a mutable counter across async tasks
async with aiohttp.ClientSession() as session:
tasks = []
for symbol in tqdm(symbols):
task = asyncio.create_task(get_financial_statements(session, symbol, semaphore, request_counter))
tasks.append(task)
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(run())

View File

@ -710,33 +710,39 @@ async def stock_income(data: TickerData, api_key: str = Security(get_api_key)):
cache_key = f"stock-income-{ticker}"
cached_result = redis_client.get(cache_key)
if cached_result:
return orjson.loads(cached_result)
return StreamingResponse(
io.BytesIO(cached_result),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)
query_template = """
SELECT
income, income_growth
FROM
stocks
WHERE
symbol = ?
"""
try:
with db_connection(STOCK_DB) as cursor:
cursor.execute(query_template, (ticker,))
result = cursor.fetchone()
if result:
income_statement = orjson.loads(result[0])
income_statement_growth = orjson.loads(result[1])
res = clean_financial_data(income_statement, income_statement_growth)
else:
res = []
with open(f"json/financial-statements/income-statement/quarter/{ticker}.json", 'rb') as file:
quarter_res = orjson.loads(file.read())
except:
res = []
quarter_res = []
redis_client.set(cache_key, orjson.dumps(res))
redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 hour
return res
try:
with open(f"json/financial-statements/income-statement/annual/{ticker}.json", 'rb') as file:
annual_res = orjson.loads(file.read())
except:
annual_res = []
res = {'quarter': quarter_res, 'annual': annual_res}
res = orjson.dumps(res)
compressed_data = gzip.compress(res)
redis_client.set(cache_key, compressed_data)
redis_client.expire(cache_key, 3600 * 24) # Set cache expiration time to 1 day
return StreamingResponse(
io.BytesIO(compressed_data),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)
@app.post("/stock-balance-sheet")
async def stock_balance_sheet(data: TickerData, api_key: str = Security(get_api_key)):
@ -745,33 +751,39 @@ async def stock_balance_sheet(data: TickerData, api_key: str = Security(get_api_
cache_key = f"stock-balance-sheet-{ticker}"
cached_result = redis_client.get(cache_key)
if cached_result:
return orjson.loads(cached_result)
return StreamingResponse(
io.BytesIO(cached_result),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)
query_template = """
SELECT
balance, balance_growth
FROM
stocks
WHERE
symbol = ?
"""
try:
with db_connection(STOCK_DB) as cursor:
cursor.execute(query_template, (ticker,))
result = cursor.fetchone()
if result:
balance_statement = orjson.loads(result[0])
balance_statement_growth = orjson.loads(result[1])
res = clean_financial_data(balance_statement, balance_statement_growth)
else:
res = []
with open(f"json/financial-statements/balance-sheet-statement/quarter/{ticker}.json", 'rb') as file:
quarter_res = orjson.loads(file.read())
except:
res = []
quarter_res = []
redis_client.set(cache_key, orjson.dumps(res))
redis_client.expire(cache_key, 3600*3600)
return res
try:
with open(f"json/financial-statements/balance-sheet-statement/annual/{ticker}.json", 'rb') as file:
annual_res = orjson.loads(file.read())
except:
annual_res = []
res = {'quarter': quarter_res, 'annual': annual_res}
res = orjson.dumps(res)
compressed_data = gzip.compress(res)
redis_client.set(cache_key, compressed_data)
redis_client.expire(cache_key, 3600 * 24) # Set cache expiration time to 1 day
return StreamingResponse(
io.BytesIO(compressed_data),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)
@app.post("/stock-ratios")
async def stock_ratios(data: TickerData, api_key: str = Security(get_api_key)):
@ -780,27 +792,39 @@ async def stock_ratios(data: TickerData, api_key: str = Security(get_api_key)):
cache_key = f"stock-ratios-{ticker}"
cached_result = redis_client.get(cache_key)
if cached_result:
return orjson.loads(cached_result)
query_template = """
SELECT
ratios
FROM
stocks
WHERE
symbol = ?
"""
if cached_result:
return StreamingResponse(
io.BytesIO(cached_result),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)
try:
df = pd.read_sql_query(query_template,con, params=(ticker,))
res = orjson.loads(df['ratios'].iloc[0])
with open(f"json/financial-statements/ratios/quarter/{ticker}.json", 'rb') as file:
quarter_res = orjson.loads(file.read())
except:
res = []
quarter_res = []
redis_client.set(cache_key, orjson.dumps(res))
redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 hour
return res
try:
with open(f"json/financial-statements/ratios/annual/{ticker}.json", 'rb') as file:
annual_res = orjson.loads(file.read())
except:
annual_res = []
res = {'quarter': quarter_res, 'annual': annual_res}
res = orjson.dumps(res)
compressed_data = gzip.compress(res)
redis_client.set(cache_key, compressed_data)
redis_client.expire(cache_key, 3600 * 24) # Set cache expiration time to 1 day
return StreamingResponse(
io.BytesIO(compressed_data),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)
@app.post("/stock-cash-flow")
@ -810,29 +834,40 @@ async def stock_cash_flow(data: TickerData, api_key: str = Security(get_api_key)
cache_key = f"stock-cash-flow-{ticker}"
cached_result = redis_client.get(cache_key)
if cached_result:
return orjson.loads(cached_result)
query_template = """
SELECT
cashflow, cashflow_growth
FROM
stocks
WHERE
symbol = ?
"""
if cached_result:
return StreamingResponse(
io.BytesIO(cached_result),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)
try:
df = pd.read_sql_query(query_template,con, params=(ticker,))
cash_flow_statement = orjson.loads(df['cashflow'].iloc[0])
cash_flow_statement_growth = orjson.loads(df['cashflow_growth'].iloc[0])
res = clean_financial_data(cash_flow_statement,cash_flow_statement_growth)
with open(f"json/financial-statements/cash-flow-statement/quarter/{ticker}.json", 'rb') as file:
quarter_res = orjson.loads(file.read())
except:
res = []
quarter_res = []
try:
with open(f"json/financial-statements/cash-flow-statement/annual/{ticker}.json", 'rb') as file:
annual_res = orjson.loads(file.read())
except:
annual_res = []
res = {'quarter': quarter_res, 'annual': annual_res}
res = orjson.dumps(res)
compressed_data = gzip.compress(res)
redis_client.set(cache_key, compressed_data)
redis_client.expire(cache_key, 3600 * 24) # Set cache expiration time to 1 day
return StreamingResponse(
io.BytesIO(compressed_data),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)
redis_client.set(cache_key, orjson.dumps(res))
redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 hour
return res

View File

@ -464,6 +464,14 @@ def run_tracker():
command = base_command + [source, f"root@{useast_ip_address}:{dest}"]
run_command(command)
def run_financial_statements():
run_command(["python3", "cron_financial_statements.py"])
command = [
"sudo", "rsync", "-avz", "-e", "ssh",
"/root/backend/app/json/financial-statements",
f"root@{useast_ip_address}:/root/backend/app/json"
]
run_command(command)
# Create functions to run each schedule in a separate thread
def run_threaded(job_func):
@ -481,6 +489,8 @@ schedule.every().day.at("06:30").do(run_threaded, run_pocketbase).tag('pocketbas
schedule.every().day.at("07:00").do(run_threaded, run_ta_rating).tag('ta_rating_job')
schedule.every().day.at("07:30").do(run_threaded, run_government_contract).tag('government_contract_job')
schedule.every().day.at("07:30").do(run_threaded, run_financial_statements).tag('financial_statements_job')
schedule.every().day.at("08:00").do(run_threaded, run_cron_insider_trading).tag('insider_trading_job')
schedule.every().day.at("09:00").do(run_threaded, run_congress_trading).tag('congress_job')