diff --git a/app/cron_company_news.py b/app/cron_company_news.py index 6fb73fe..ccec947 100644 --- a/app/cron_company_news.py +++ b/app/cron_company_news.py @@ -10,6 +10,20 @@ import time load_dotenv() api_key = os.getenv('FMP_API_KEY') +class RateLimiter: + def __init__(self, rate_limit=200, sleep_time=60): + self.rate_limit = rate_limit + self.sleep_time = sleep_time + self.request_count = 0 + self.lock = asyncio.Lock() + + async def acquire(self): + async with self.lock: + self.request_count += 1 + if self.request_count >= self.rate_limit: + print(f"Processed {self.rate_limit} requests. Sleeping for {self.sleep_time} seconds...") + await asyncio.sleep(self.sleep_time) + self.request_count = 0 async def filter_and_deduplicate(data, excluded_domains=None, deduplicate_key='title'): """ @@ -17,20 +31,16 @@ async def filter_and_deduplicate(data, excluded_domains=None, deduplicate_key='t """ if excluded_domains is None: excluded_domains = ['prnewswire.com', 'globenewswire.com', 'accesswire.com'] - seen_keys = set() filtered_data = [] - for item in data: if not any(domain in item['url'] for domain in excluded_domains): key = item.get(deduplicate_key) if key and key not in seen_keys: filtered_data.append(item) seen_keys.add(key) - return filtered_data - async def save_json(symbol, data): """ Save data as JSON in a batch to reduce disk I/O @@ -39,11 +49,11 @@ async def save_json(symbol, data): with open(f"json/market-news/companies/{symbol}.json", 'w') as file: ujson.dump(data, file) - -async def get_data(session, chunk): +async def get_data(session, chunk, rate_limiter): """ Fetch data for a chunk of tickers using a single session """ + await rate_limiter.acquire() company_tickers = ','.join(chunk) url = f'https://financialmodelingprep.com/api/v3/stock_news?tickers={company_tickers}&page=0&limit=2000&apikey={api_key}' @@ -52,7 +62,6 @@ async def get_data(session, chunk): return await response.json() return [] - def get_symbols(db_name, table_name): """ Fetch symbols from the SQLite database @@ -63,12 +72,11 @@ def get_symbols(db_name, table_name): cursor.execute(f"SELECT DISTINCT symbol FROM {table_name} WHERE symbol NOT LIKE '%.%'") return [row[0] for row in cursor.fetchall()] - -async def process_chunk(session, chunk): +async def process_chunk(session, chunk, rate_limiter): """ Process a chunk of symbols """ - data = await get_data(session, chunk) + data = await get_data(session, chunk, rate_limiter) tasks = [] for symbol in chunk: try: @@ -81,7 +89,6 @@ async def process_chunk(session, chunk): if tasks: await asyncio.gather(*tasks) - async def main(): """ Main function to coordinate fetching and processing @@ -90,19 +97,20 @@ async def main(): etf_symbols = get_symbols('etf.db', 'etfs') crypto_symbols = get_symbols('crypto.db', 'cryptos') total_symbols = stock_symbols + etf_symbols + crypto_symbols - + # Dynamically adjust chunk size chunk_size = 10 # Adjust based on your needs chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)] - + + rate_limiter = RateLimiter(rate_limit=200, sleep_time=60) + async with aiohttp.ClientSession() as session: - tasks = [process_chunk(session, chunk) for chunk in chunks] + tasks = [process_chunk(session, chunk, rate_limiter) for chunk in chunks] for task in tqdm(asyncio.as_completed(tasks), total=len(tasks)): await task - if __name__ == "__main__": try: asyncio.run(main()) except Exception as e: - print(f"An error occurred: {e}") + print(f"An error occurred: {e}") \ No newline at end of file