backend/app/cron_ownership_stats.py
2024-08-16 16:04:20 +02:00

88 lines
3.1 KiB
Python

from datetime import datetime, timedelta
import ujson
import time
import sqlite3
import time
import asyncio
import aiohttp
import random
from tqdm import tqdm
from dotenv import load_dotenv
import os
load_dotenv()
api_key = os.getenv('FMP_API_KEY')
include_current_quarter = True
async def get_data(session, symbol, max_retries=3, initial_delay=1):
url = f"https://financialmodelingprep.com/api/v4/institutional-ownership/symbol-ownership?symbol={symbol}&includeCurrentQuarter={include_current_quarter}&apikey={api_key}"
for attempt in range(max_retries):
try:
async with session.get(url) as response:
if response.status == 200:
content_type = response.headers.get('Content-Type', '')
if 'application/json' in content_type:
data = await response.json()
if len(data) > 0:
await save_json(symbol, data[0])
return
else:
print(f"Unexpected content type for {symbol}: {content_type}")
elif response.status == 504:
if attempt < max_retries - 1:
delay = initial_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Gateway Timeout for {symbol}. Retrying in {delay:.2f} seconds...")
await asyncio.sleep(delay)
else:
print(f"Max retries reached for {symbol} after Gateway Timeout")
else:
print(f"Error fetching data for {symbol}: HTTP {response.status}")
return
except Exception as e:
print(f"Error processing {symbol}: {str(e)}")
if attempt < max_retries - 1:
delay = initial_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Retrying in {delay:.2f} seconds...")
await asyncio.sleep(delay)
else:
print(f"Max retries reached for {symbol}")
async def save_json(symbol, data):
with open(f"json/ownership-stats/{symbol}.json", 'w') as file:
ujson.dump(data, file)
async def run():
con = sqlite3.connect('stocks.db')
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
symbols = [row[0] for row in cursor.fetchall()]
con.close()
async with aiohttp.ClientSession() as session:
tasks = []
i = 0
for symbol in tqdm(symbols):
tasks.append(get_data(session, symbol))
i += 1
if i % 400 == 0:
await asyncio.gather(*tasks)
tasks = []
print('sleeping mode: ', i)
await asyncio.sleep(60) # Pause for 60 seconds
if tasks:
await asyncio.gather(*tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())