bugfixing

This commit is contained in:
MuslemRahimi 2025-03-05 20:50:01 +01:00
parent f436590a1f
commit 0bdf102af2

View File

@ -3,16 +3,81 @@ import asyncio
import aiohttp
import aiofiles
import sqlite3
from datetime import datetime, timedelta, time
from datetime import datetime, timedelta
import pytz
import pandas as pd
from dotenv import load_dotenv
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('FMP_API_KEY')
# Helper to ensure directories exist and write JSON files asynchronously
async def write_json(path, data):
os.makedirs(os.path.dirname(path), exist_ok=True)
async with aiofiles.open(path, 'w') as file:
await file.write(ujson.dumps(data))
async def get_historical_data(ticker, query_con, session):
try:
# Form API request URLs
url_1w = (f"https://financialmodelingprep.com/stable/historical-chart/5min?"
f"symbol={ticker}&from={start_date_1w}&to={end_date}&apikey={api_key}")
url_1m = (f"https://financialmodelingprep.com/stable/historical-chart/1hour?"
f"symbol={ticker}&from={start_date_1m}&to={end_date}&apikey={api_key}")
# Fetch both endpoints concurrently
responses = await asyncio.gather(
session.get(url_1w),
session.get(url_1m),
return_exceptions=True
)
data = []
for resp in responses:
if isinstance(resp, Exception):
print(f"Error fetching data for {ticker}: {resp}")
continue
async with resp:
if resp.status != 200:
print(f"Non-200 response for {ticker}: {resp.status}")
continue
else:
json_data = await resp.json()
# Reverse rows so that oldest data comes first and reset the index
df = pd.DataFrame(json_data).iloc[::-1].reset_index(drop=True)
df = df.round(2).rename(columns={"date": "time"})
data.append(df.to_json(orient="records"))
# Database queries for additional periods
query_template = """
SELECT date, open, high, low, close, volume
FROM "{ticker}"
WHERE date BETWEEN ? AND ?
"""
query = query_template.format(ticker=ticker)
df_6m = pd.read_sql_query(query, query_con, params=(start_date_6m, end_date))
df_6m = df_6m.round(2).rename(columns={"date": "time"})
df_1y = pd.read_sql_query(query, query_con, params=(start_date_1y, end_date))
df_1y = df_1y.round(2).rename(columns={"date": "time"})
df_5y = pd.read_sql_query(query, query_con, params=(start_date_5y, end_date))
df_5y = df_5y.round(2).rename(columns={"date": "time"})
df_max = pd.read_sql_query(query, query_con, params=(start_date_max, end_date))
df_max = df_max.round(2).rename(columns={"date": "time"})
# Prepare file-writing tasks
tasks = [
write_json(f"json/historical-price/one-week/{ticker}.json", ujson.loads(data[0])),
write_json(f"json/historical-price/one-month/{ticker}.json", ujson.loads(data[1])),
write_json(f"json/historical-price/six-months/{ticker}.json", ujson.loads(df_6m.to_json(orient="records"))),
write_json(f"json/historical-price/one-year/{ticker}.json", ujson.loads(df_1y.to_json(orient="records"))),
write_json(f"json/historical-price/five-years/{ticker}.json", ujson.loads(df_5y.to_json(orient="records"))),
write_json(f"json/historical-price/max/{ticker}.json", ujson.loads(df_max.to_json(orient="records")))
]
await asyncio.gather(*tasks)
except Exception as e:
print(f"Failed to fetch data for {ticker}: {e}")
async def fetch_and_save_symbols_data(symbols, etf_symbols, index_symbols, session):
tasks = []
@ -26,74 +91,12 @@ async def fetch_and_save_symbols_data(symbols, etf_symbols, index_symbols, sessi
task = asyncio.create_task(get_historical_data(symbol, query_con, session))
tasks.append(task)
# Wait for all tasks in this chunk to complete
await asyncio.gather(*tasks)
async def get_historical_data(ticker, query_con, session):
try:
# Form API request URLs
url_1w = f"https://financialmodelingprep.com/stable/historical-chart/5min?symbol={ticker}&from={start_date_1w}&to={end_date}&apikey={api_key}"
url_1m = f"https://financialmodelingprep.com/stable/historical-chart/1hour?symbol={ticker}&from={start_date_1m}&to={end_date}&apikey={api_key}"
async with session.get(url_1w) as response_1w, session.get(url_1m) as response_1m:
data = []
for response in [response_1w, response_1m]:
json_data = await response.json()
df = pd.DataFrame(json_data).iloc[::-1].reset_index(drop=True)
'''
try:
df = df.drop(['volume'], axis=1)
except:
pass
'''
df = df.round(2).rename(columns={"date": "time"})
data.append(df.to_json(orient="records"))
# Database read for 6M, 1Y, MAX data
query_template = """
SELECT date, open,high,low,close,volume
FROM "{ticker}"
WHERE date BETWEEN ? AND ?
"""
query = query_template.format(ticker=ticker)
df_6m = pd.read_sql_query(query, query_con, params=(start_date_6m, end_date)).round(2).rename(columns={"date": "time"})
df_1y = pd.read_sql_query(query, query_con, params=(start_date_1y, end_date)).round(2).rename(columns={"date": "time"})
df_5y = pd.read_sql_query(query, query_con, params=(start_date_5y, end_date)).round(2).rename(columns={"date": "time"})
df_max = pd.read_sql_query(query, query_con, params=(start_date_max, end_date)).round(2).rename(columns={"date": "time"})
async with aiofiles.open(f"json/historical-price/one-week/{ticker}.json", 'w') as file:
res = ujson.loads(data[0]) if data else []
await file.write(ujson.dumps(res))
async with aiofiles.open(f"json/historical-price/one-month/{ticker}.json", 'w') as file:
res = ujson.loads(data[1]) if len(data) > 1 else []
await file.write(ujson.dumps(res))
async with aiofiles.open(f"json/historical-price/six-months/{ticker}.json", 'w') as file:
res = ujson.loads(df_6m.to_json(orient="records"))
await file.write(ujson.dumps(res))
async with aiofiles.open(f"json/historical-price/one-year/{ticker}.json", 'w') as file:
res = ujson.loads(df_1y.to_json(orient="records"))
await file.write(ujson.dumps(res))
async with aiofiles.open(f"json/historical-price/five-years/{ticker}.json", 'w') as file:
res = ujson.loads(df_5y.to_json(orient="records"))
await file.write(ujson.dumps(res))
async with aiofiles.open(f"json/historical-price/max/{ticker}.json", 'w') as file:
res = ujson.loads(df_max.to_json(orient="records"))
await file.write(ujson.dumps(res))
except Exception as e:
print(f"Failed to fetch data for {ticker}: {e}")
async def run():
total_symbols = []
chunk_size = 100
try:
# Prepare symbols list
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks")
@ -104,42 +107,47 @@ async def run():
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
index_symbols =["^SPX","^VIX"]
total_symbols = stock_symbols + etf_symbols +index_symbols
index_symbols = ["^SPX", "^VIX"]
total_symbols = stock_symbols + etf_symbols + index_symbols
except Exception as e:
print(f"Failed to fetch symbols: {e}")
return
# Process symbols in chunks to avoid overwhelming the API
chunk_size = 100
try:
connector = aiohttp.TCPConnector(limit=300) # Adjust the limit as needed
connector = aiohttp.TCPConnector(limit=100)
async with aiohttp.ClientSession(connector=connector) as session:
for i in range(0, len(total_symbols), chunk_size):
symbols_chunk = total_symbols[i:i + chunk_size]
await fetch_and_save_symbols_data(symbols_chunk, etf_symbols, index_symbols, session)
print('sleeping for 30 sec')
await asyncio.sleep(60)
print('Chunk processed; sleeping for 30 seconds...')
await asyncio.sleep(30)
except Exception as e:
print(f"Failed to run fetch and save data: {e}")
try:
con = sqlite3.connect('stocks.db')
etf_con = sqlite3.connect('etf.db')
index_con = sqlite3.connect('index.db')
if __name__ == "__main__":
try:
# Open SQLite connections
con = sqlite3.connect('stocks.db')
etf_con = sqlite3.connect('etf.db')
index_con = sqlite3.connect('index.db')
berlin_tz = pytz.timezone('Europe/Berlin')
end_date = datetime.now(berlin_tz)
start_date_1w = (end_date - timedelta(days=5)).strftime("%Y-%m-%d")
start_date_1m = (end_date - timedelta(days=30)).strftime("%Y-%m-%d")
start_date_6m = (end_date - timedelta(days=180)).strftime("%Y-%m-%d")
start_date_1y = (end_date - timedelta(days=365)).strftime("%Y-%m-%d")
start_date_5y = (end_date - timedelta(days=365*5)).strftime("%Y-%m-%d")
start_date_max = datetime(1970, 1, 1).strftime("%Y-%m-%d")
end_date = end_date.strftime("%Y-%m-%d")
# Prepare date variables
berlin_tz = pytz.timezone('Europe/Berlin')
now = datetime.now(berlin_tz)
end_date = now.strftime("%Y-%m-%d")
start_date_1w = (now - timedelta(days=5)).strftime("%Y-%m-%d")
start_date_1m = (now - timedelta(days=30)).strftime("%Y-%m-%d")
start_date_6m = (now - timedelta(days=180)).strftime("%Y-%m-%d")
start_date_1y = (now - timedelta(days=365)).strftime("%Y-%m-%d")
start_date_5y = (now - timedelta(days=365*5)).strftime("%Y-%m-%d")
start_date_max = datetime(1970, 1, 1).strftime("%Y-%m-%d")
asyncio.run(run())
con.close()
etf_con.close()
index_con.close()
except Exception as e:
print(e)
asyncio.run(run())
con.close()
etf_con.close()
index_con.close()
except Exception as e:
print(e)