From 7574c13647de65b1410d6690c1682f4a202d83a8 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Sun, 15 Dec 2024 19:30:19 +0100 Subject: [PATCH] add press release cron job --- app/cron_company_news.py | 10 ++-- app/cron_press_releases.py | 97 ++++++++++++++++++++++++++++++++++++++ app/main.py | 33 ++++++++++++- app/primary_cron_job.py | 11 ++++- app/test.py | 11 ++--- 5 files changed, 148 insertions(+), 14 deletions(-) create mode 100644 app/cron_press_releases.py diff --git a/app/cron_company_news.py b/app/cron_company_news.py index fa2da38..3caf0b0 100644 --- a/app/cron_company_news.py +++ b/app/cron_company_news.py @@ -30,7 +30,7 @@ async def filter_and_deduplicate(data, excluded_domains=None, deduplicate_key='t Filter out items with specified domains in their URL and remove duplicates based on a specified key. """ if excluded_domains is None: - excluded_domains = ['prnewswire.com', 'globenewswire.com', 'accesswire.com', 'youtube.com'] + excluded_domains = ['prnewswire.com', 'globenewswire.com', 'accesswire.com'] seen_keys = set() filtered_data = [] for item in data: @@ -55,7 +55,7 @@ async def get_data(session, chunk, rate_limiter): """ await rate_limiter.acquire() company_tickers = ','.join(chunk) - url = f'https://financialmodelingprep.com/api/v3/stock_news?tickers={company_tickers}&page=0&limit=50&apikey={api_key}' + url = f'https://financialmodelingprep.com/stable/news/stock?symbols={company_tickers}&limit=100&apikey={api_key}' async with session.get(url) as response: if response.status == 200: @@ -97,12 +97,12 @@ async def main(): etf_symbols = get_symbols('etf.db', 'etfs') crypto_symbols = get_symbols('crypto.db', 'cryptos') total_symbols = stock_symbols + etf_symbols + crypto_symbols - + #total_symbols = ['AAPL'] # Dynamically adjust chunk size - chunk_size = 10 # Adjust based on your needs + chunk_size = 1 # Adjust based on your needs chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)] - rate_limiter = RateLimiter(rate_limit=200, sleep_time=60) + rate_limiter = RateLimiter(rate_limit=300, sleep_time=60) async with aiohttp.ClientSession() as session: tasks = [process_chunk(session, chunk, rate_limiter) for chunk in chunks] diff --git a/app/cron_press_releases.py b/app/cron_press_releases.py new file mode 100644 index 0000000..cf98080 --- /dev/null +++ b/app/cron_press_releases.py @@ -0,0 +1,97 @@ +import ujson +import asyncio +import aiohttp +import sqlite3 +from tqdm import tqdm +from dotenv import load_dotenv +import os +import time + +load_dotenv() +api_key = os.getenv('FMP_API_KEY') + +class RateLimiter: + def __init__(self, rate_limit=200, sleep_time=60): + self.rate_limit = rate_limit + self.sleep_time = sleep_time + self.request_count = 0 + self.lock = asyncio.Lock() + + async def acquire(self): + async with self.lock: + self.request_count += 1 + if self.request_count >= self.rate_limit: + print(f"Processed {self.rate_limit} requests. Sleeping for {self.sleep_time} seconds...") + await asyncio.sleep(self.sleep_time) + self.request_count = 0 + + +async def save_json(symbol, data): + """ + Save data as JSON in a batch to reduce disk I/O + """ + async with asyncio.Lock(): # Ensure thread-safe writes + with open(f"json/market-news/press-releases/{symbol}.json", 'w') as file: + ujson.dump(data, file) + +async def get_data(session, chunk, rate_limiter): + """ + Fetch data for a chunk of tickers using a single session + """ + await rate_limiter.acquire() + company_tickers = ','.join(chunk) + url = f'https://financialmodelingprep.com/stable/news/press-releases?symbols={company_tickers}&limit=50&apikey={api_key}' + + async with session.get(url) as response: + if response.status == 200: + return await response.json() + return [] + +def get_symbols(db_name, table_name): + """ + Fetch symbols from the SQLite database + """ + with sqlite3.connect(db_name) as con: + cursor = con.cursor() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute(f"SELECT DISTINCT symbol FROM {table_name} WHERE symbol NOT LIKE '%.%'") + return [row[0] for row in cursor.fetchall()] + +async def process_chunk(session, chunk, rate_limiter): + """ + Process a chunk of symbols + """ + data = await get_data(session, chunk, rate_limiter) + tasks = [] + for symbol in chunk: + try: + filtered_data = [item for item in data if item['symbol'] == symbol] + if filtered_data: + tasks.append(save_json(symbol, filtered_data)) + except Exception as e: + print(e) + if tasks: + await asyncio.gather(*tasks) + +async def main(): + """ + Main function to coordinate fetching and processing + """ + total_symbols = get_symbols('stocks.db', 'stocks') + #total_symbols = ['AAPL'] + # Dynamically adjust chunk size + chunk_size = 1 # Adjust based on your needs + chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)] + + rate_limiter = RateLimiter(rate_limit=300, sleep_time=60) + + async with aiohttp.ClientSession() as session: + tasks = [process_chunk(session, chunk, rate_limiter) for chunk in chunks] + for task in tqdm(asyncio.as_completed(tasks), total=len(tasks)): + await task + +if __name__ == "__main__": + try: + asyncio.run(main()) + except Exception as e: + print(f"An error occurred: {e}") \ No newline at end of file diff --git a/app/main.py b/app/main.py index f4e7e2b..f770b6c 100755 --- a/app/main.py +++ b/app/main.py @@ -754,7 +754,38 @@ async def stock_news(data: TickerData, api_key: str = Security(get_api_key)): data = orjson.dumps(res) compressed_data = gzip.compress(data) redis_client.set(cache_key, compressed_data) - redis_client.expire(cache_key, 60*5) + redis_client.expire(cache_key, 60*30) + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + + +@app.post("/stock-press-release") +async def stock_news(data: TickerData, api_key: str = Security(get_api_key)): + ticker = data.ticker.upper() + cache_key = f"press-releases-{ticker}" + + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"}) + + + try: + with open(f"json/market-news/press-releases/{ticker}.json", 'rb') as file: + res = orjson.loads(file.read()) + except: + res = [] + + data = orjson.dumps(res) + compressed_data = gzip.compress(data) + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 60*60) return StreamingResponse( io.BytesIO(compressed_data), diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index f5d3aa7..9c848e4 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -120,11 +120,16 @@ def run_cron_market_news(): if week <= 4: run_command(["python3", "cron_market_news.py"]) -def run_cron_company_news(): +def run_company_news(): week = datetime.today().weekday() if week <= 4: run_command(["python3", "cron_company_news.py"]) +def run_press_releases(): + week = datetime.today().weekday() + if week <= 4: + run_command(["python3", "cron_press_releases.py"]) + def run_cron_heatmap(): run_command(["python3", "cron_heatmap.py"]) @@ -360,7 +365,9 @@ schedule.every(3).hours.do(run_threaded, run_options_net_flow).tag('options_net_ #schedule.every(4).hours.do(run_threaded, run_share_statistics).tag('share_statistics_job') schedule.every(1).hours.do(run_threaded, run_analyst_rating).tag('analyst_job') -schedule.every(1).hours.do(run_threaded, run_cron_company_news).tag('company_news_job') +schedule.every(1).hours.do(run_threaded, run_company_news).tag('company_news_job') +schedule.every(3).hours.do(run_threaded, run_press_releases).tag('press_release_job') + schedule.every(2).minutes.do(run_threaded, run_dashboard).tag('dashboard_job') diff --git a/app/test.py b/app/test.py index b564145..86a805c 100644 --- a/app/test.py +++ b/app/test.py @@ -1,6 +1,5 @@ -with open("json/stock-screener/data.json", 'rb') as file: - try: - data = file.read() - print(data[14807230:14807250]) # Print the problematic section - except Exception as e: - print(f"Error reading file: {e}") \ No newline at end of file +import requests + +url = "https://api.stocktwits.com/api/2/streams/symbol/AAPL.json?filter=top" +response = requests.get(url) +print(response) \ No newline at end of file