add cron job sector performance

This commit is contained in:
MuslemRahimi 2024-08-24 22:24:07 +02:00
parent 961c52092f
commit 9b9710e23a
2 changed files with 117 additions and 3 deletions

83
app/cron_sector.py Normal file
View File

@ -0,0 +1,83 @@
from datetime import datetime, timedelta
import ujson
import asyncio
import aiohttp
import os
from dotenv import load_dotenv
from collections import defaultdict # Import defaultdict
# Load environment variables
load_dotenv()
api_key = os.getenv('FMP_API_KEY')
def get_sector_path(sector):
sector_paths = {
'basicMaterialsChangesPercentage': "basic-materials",
'communicationServicesChangesPercentage': "communication-services",
'consumerCyclicalChangesPercentage': "consumer-cyclical",
'consumerDefensiveChangesPercentage': "consumer-defensive",
'financialServicesChangesPercentage': "financial",
'industrialsChangesPercentage': "industrials",
'energyChangesPercentage': "energy",
'utilitiesChangesPercentage': "utilities",
'realEstateChangesPercentage': "real-estate",
'technologyChangesPercentage': "technology",
'healthcareChangesPercentage': 'healthcare',
}
return sector_paths.get(sector, None)
# Function to save JSON data
async def save_json(data, name):
os.makedirs('json/sector', exist_ok=True)
with open(f'json/sector/{name}.json', 'w') as file:
ujson.dump(data, file)
# Function to fetch data from the API
async def get_data(session, start_date, end_date):
url = f"https://financialmodelingprep.com/api/v3/historical-sectors-performance?from={start_date}&to={end_date}&apikey={api_key}"
async with session.get(url) as response:
data = await response.json()
return data
# Main function to manage the date iteration and API calls
async def run():
sector_data = defaultdict(list)
start_date = datetime.now() - timedelta(days=180)
today = datetime.now()
async with aiohttp.ClientSession() as session:
while start_date <= today:
# Calculate the next end_date, ensuring it doesn't go beyond today
end_date = min(start_date + timedelta(days=30), today)
start_str = start_date.strftime('%Y-%m-%d')
end_str = end_date.strftime('%Y-%m-%d')
data = await get_data(session, start_str, end_str)
if data:
for item in data:
date = item['date']
for sector_key, sector_value in item.items():
if sector_key == 'date':
continue
sector_name = get_sector_path(sector_key)
if sector_name:
sector_data[sector_name].append({
'date': date,
'changesPercentage': round(sector_value,3)
})
# Update start_date for the next loop iteration
start_date = end_date + timedelta(days=1)
# Save each sector's data as a separate JSON file
for sector, records in sector_data.items():
records = sorted(records, key=lambda x: x['date'])
await save_json(records, sector)
return sector_data
# Run the asyncio event loop
loop = asyncio.get_event_loop()
sector_results = loop.run_until_complete(run())

View File

@ -1812,6 +1812,38 @@ async def get_delisted_companies(api_key: str = Security(get_api_key)):
return res return res
@app.post("/historical-sector-price")
async def historical_sector_price(data:FilterStockList, api_key: str = Security(get_api_key)):
data = data.dict()
print(data)
sector = data['filterList']
cache_key = f"history-price-sector-{sector}"
cached_result = redis_client.get(cache_key)
if cached_result:
return StreamingResponse(
io.BytesIO(cached_result),
media_type="application/json",
headers={"Content-Encoding": "gzip"})
try:
with open(f"json/sector/{sector}.json", 'rb') as file:
res = orjson.loads(file.read())
except:
res = []
data = orjson.dumps(res)
compressed_data = gzip.compress(data)
redis_client.set(cache_key, compressed_data)
redis_client.expire(cache_key, 60*60) # Set cache expiration time to 1 day
return StreamingResponse(
io.BytesIO(compressed_data),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)
@app.post("/filter-stock-list") @app.post("/filter-stock-list")
async def filter_stock_list(data:FilterStockList, api_key: str = Security(get_api_key)): async def filter_stock_list(data:FilterStockList, api_key: str = Security(get_api_key)):
data = data.dict() data = data.dict()
@ -1828,8 +1860,7 @@ async def filter_stock_list(data:FilterStockList, api_key: str = Security(get_ap
base_query = """ base_query = """
SELECT symbol, name, price, changesPercentage, marketCap, revenue, netIncome SELECT symbol, name, price, changesPercentage, marketCap, revenue, netIncome
FROM stocks FROM stocks
WHERE symbol != ? WHERE (price IS NOT NULL OR changesPercentage IS NOT NULL)
AND (price IS NOT NULL OR changesPercentage IS NOT NULL)
AND {} AND {}
""" """
@ -1867,7 +1898,7 @@ async def filter_stock_list(data:FilterStockList, api_key: str = Security(get_ap
# Execute the query with the relevant country # Execute the query with the relevant country
if filter_list in conditions: if filter_list in conditions:
full_query = base_query.format(conditions[filter_list]) full_query = base_query.format(conditions[filter_list])
cursor.execute(full_query, ('%5EGSPC',)) cursor.execute(full_query)
# Fetch the results # Fetch the results
raw_data = cursor.fetchall() raw_data = cursor.fetchall()