From 9b9710e23a6860f7af9795c3f0c21c2042a62e9d Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Sat, 24 Aug 2024 22:24:07 +0200 Subject: [PATCH] add cron job sector performance --- app/cron_sector.py | 83 ++++++++++++++++++++++++++++++++++++++++++++++ app/main.py | 37 +++++++++++++++++++-- 2 files changed, 117 insertions(+), 3 deletions(-) create mode 100644 app/cron_sector.py diff --git a/app/cron_sector.py b/app/cron_sector.py new file mode 100644 index 0000000..e84441e --- /dev/null +++ b/app/cron_sector.py @@ -0,0 +1,83 @@ +from datetime import datetime, timedelta +import ujson +import asyncio +import aiohttp +import os +from dotenv import load_dotenv +from collections import defaultdict # Import defaultdict + +# Load environment variables +load_dotenv() +api_key = os.getenv('FMP_API_KEY') + + +def get_sector_path(sector): + sector_paths = { + 'basicMaterialsChangesPercentage': "basic-materials", + 'communicationServicesChangesPercentage': "communication-services", + 'consumerCyclicalChangesPercentage': "consumer-cyclical", + 'consumerDefensiveChangesPercentage': "consumer-defensive", + 'financialServicesChangesPercentage': "financial", + 'industrialsChangesPercentage': "industrials", + 'energyChangesPercentage': "energy", + 'utilitiesChangesPercentage': "utilities", + 'realEstateChangesPercentage': "real-estate", + 'technologyChangesPercentage': "technology", + 'healthcareChangesPercentage': 'healthcare', + } + return sector_paths.get(sector, None) + +# Function to save JSON data +async def save_json(data, name): + os.makedirs('json/sector', exist_ok=True) + with open(f'json/sector/{name}.json', 'w') as file: + ujson.dump(data, file) + +# Function to fetch data from the API +async def get_data(session, start_date, end_date): + url = f"https://financialmodelingprep.com/api/v3/historical-sectors-performance?from={start_date}&to={end_date}&apikey={api_key}" + async with session.get(url) as response: + data = await response.json() + return data + +# Main function to manage the date iteration and API calls +async def run(): + sector_data = defaultdict(list) + start_date = datetime.now() - timedelta(days=180) + today = datetime.now() + + async with aiohttp.ClientSession() as session: + while start_date <= today: + # Calculate the next end_date, ensuring it doesn't go beyond today + end_date = min(start_date + timedelta(days=30), today) + + start_str = start_date.strftime('%Y-%m-%d') + end_str = end_date.strftime('%Y-%m-%d') + + data = await get_data(session, start_str, end_str) + if data: + for item in data: + date = item['date'] + for sector_key, sector_value in item.items(): + if sector_key == 'date': + continue + sector_name = get_sector_path(sector_key) + if sector_name: + sector_data[sector_name].append({ + 'date': date, + 'changesPercentage': round(sector_value,3) + }) + + # Update start_date for the next loop iteration + start_date = end_date + timedelta(days=1) + + # Save each sector's data as a separate JSON file + for sector, records in sector_data.items(): + records = sorted(records, key=lambda x: x['date']) + await save_json(records, sector) + + return sector_data + +# Run the asyncio event loop +loop = asyncio.get_event_loop() +sector_results = loop.run_until_complete(run()) diff --git a/app/main.py b/app/main.py index ba2f9fc..f45eab5 100755 --- a/app/main.py +++ b/app/main.py @@ -1812,6 +1812,38 @@ async def get_delisted_companies(api_key: str = Security(get_api_key)): return res +@app.post("/historical-sector-price") +async def historical_sector_price(data:FilterStockList, api_key: str = Security(get_api_key)): + data = data.dict() + print(data) + sector = data['filterList'] + cache_key = f"history-price-sector-{sector}" + cached_result = redis_client.get(cache_key) + + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"}) + + try: + with open(f"json/sector/{sector}.json", 'rb') as file: + res = orjson.loads(file.read()) + except: + res = [] + + data = orjson.dumps(res) + compressed_data = gzip.compress(data) + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 60*60) # Set cache expiration time to 1 day + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + + @app.post("/filter-stock-list") async def filter_stock_list(data:FilterStockList, api_key: str = Security(get_api_key)): data = data.dict() @@ -1828,8 +1860,7 @@ async def filter_stock_list(data:FilterStockList, api_key: str = Security(get_ap base_query = """ SELECT symbol, name, price, changesPercentage, marketCap, revenue, netIncome FROM stocks - WHERE symbol != ? - AND (price IS NOT NULL OR changesPercentage IS NOT NULL) + WHERE (price IS NOT NULL OR changesPercentage IS NOT NULL) AND {} """ @@ -1867,7 +1898,7 @@ async def filter_stock_list(data:FilterStockList, api_key: str = Security(get_ap # Execute the query with the relevant country if filter_list in conditions: full_query = base_query.format(conditions[filter_list]) - cursor.execute(full_query, ('%5EGSPC',)) + cursor.execute(full_query) # Fetch the results raw_data = cursor.fetchall()