diff --git a/app/cron_wiim.py b/app/cron_wiim.py index 8c9fe70..1ec478c 100755 --- a/app/cron_wiim.py +++ b/app/cron_wiim.py @@ -60,61 +60,55 @@ def correct_weekday(selected_date): return selected_date -async def get_endpoint(session, symbol, con): - url = "https://api.benzinga.com/api/v2/news" - querystring = {"token": api_key,"tickers": symbol, "channels":"WIIM","pageSize":"20","displayOutput":"full"} - async with session.get(url, params=querystring, headers=headers) as response: - res_list = [] - res = ujson.loads(await response.text()) +# Create a semaphore to limit concurrent requests +REQUEST_LIMIT = 500 +PAUSE_TIME = 10 - for item in res: - try: - date_obj = datetime.strptime(item['created'], date_format) - date_obj_utc = date_obj.astimezone(pytz.utc) - - new_date_obj_utc = date_obj_utc - - start_date_obj_utc = correct_weekday(date_obj_utc) - - start_date = start_date_obj_utc.strftime("%Y-%m-%d") - end_date = new_date_obj_utc.strftime("%Y-%m-%d") - - new_date_str = new_date_obj_utc.strftime("%Y-%m-%d %H:%M:%S") - query = query_template.format(symbol=symbol) +async def get_endpoint(session, symbol, con, semaphore): + async with semaphore: + url = "https://api.benzinga.com/api/v2/news" + querystring = {"token": api_key, "tickers": symbol, "channels":"WIIM", "pageSize":"20", "displayOutput":"full"} - try: - df = pd.read_sql_query(query,con, params=(start_date, end_date)) - if not df.empty: - change_percent = round((df['close'].iloc[1]/df['close'].iloc[0] -1)*100,2) - else: - change_percent = '-' - except Exception as e: - change_percent = '-' - res_list.append({'date': new_date_str, 'text': item['title'], 'changesPercentage': change_percent, 'url': item['url']}) - except: - pass + try: + async with session.get(url, params=querystring, headers=headers) as response: + res_list = [] + res = ujson.loads(await response.text()) + for item in res: + try: + date_obj = datetime.strptime(item['created'], date_format) + new_date_obj_utc = date_obj + + start_date_obj_utc = correct_weekday(date_obj) + start_date = start_date_obj_utc.strftime("%Y-%m-%d") + end_date = new_date_obj_utc.strftime("%Y-%m-%d") - if len(res_list) > 0: - with open(f"json/wiim/company/{symbol}.json", 'w') as file: - ujson.dump(res_list, file) + new_date_str = new_date_obj_utc.strftime("%Y-%m-%d %H:%M:%S") + query = query_template.format(symbol=symbol) + try: + df = pd.read_sql_query(query, con, params=(start_date, end_date)) + if not df.empty: + change_percent = round((df['close'].iloc[1]/df['close'].iloc[0] -1)*100,2) + else: + change_percent = '-' + except Exception as e: + change_percent = '-' + res_list.append({'date': new_date_str, 'text': item['title'], 'changesPercentage': change_percent, 'url': item['url']}) + except Exception as e: + print(f"Error processing item for {symbol}: {e}") - ''' - current_date = datetime.now(pytz.utc) - date_difference = current_date - new_date_obj_utc - if date_difference.days < 2: - new_date_str = new_date_obj_utc.strftime("%b %d, %Y") - formatted_data = {'wiim': res[0]['title'], 'updated': new_date_str} - - with open(f"json/wiim/{symbol}.json", 'w') as file: - ujson.dump(formatted_data, file) - ''' - + if len(res_list) > 0: + print("Done", symbol) + with open(f"json/wiim/company/{symbol}.json", 'w') as file: + ujson.dump(res_list, file) + + except Exception as e: + print(f"Error fetching data for {symbol}: {e}") async def get_latest_wiim(session, stock_symbols, etf_symbols): url = "https://api.benzinga.com/api/v2/news" - querystring = {"token": api_key,"channels":"WIIM","pageSize":"5","displayOutput":"full"} + querystring = {"token": api_key, "channels":"WIIM", "pageSize":"5", "displayOutput":"full"} try: async with session.get(url, params=querystring, headers=headers) as response: @@ -134,7 +128,6 @@ async def get_latest_wiim(session, stock_symbols, etf_symbols): ujson.dump(res_list, file) except Exception as e: - #pass print(e) async def run(): @@ -144,6 +137,7 @@ async def run(): cursor.execute("PRAGMA journal_mode = wal") cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") stock_symbols = [row[0] for row in cursor.fetchall()] + #stock_symbols = ['GME'] etf_con = sqlite3.connect('etf.db') @@ -152,14 +146,40 @@ async def run(): etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") etf_symbols = [row[0] for row in etf_cursor.fetchall()] + # Create a semaphore to limit concurrent requests and implement rate limiting + semaphore = asyncio.Semaphore(REQUEST_LIMIT) + async with aiohttp.ClientSession() as session: - #await get_latest_wiim(session, stock_symbols, etf_symbols) - await asyncio.gather(*(get_endpoint(session, symbol, con) for symbol in tqdm(stock_symbols))) - await asyncio.gather(*(get_endpoint(session, symbol, etf_con) for symbol in etf_symbols)) + # Combine stock and ETF symbols + all_symbols = stock_symbols + etf_symbols + + # Split symbols into batches + for i in range(0, len(all_symbols), REQUEST_LIMIT): + batch = all_symbols[i:i+REQUEST_LIMIT] + + # Determine which symbols are stocks or ETFs + batch_stocks = [s for s in batch if s in stock_symbols] + batch_etfs = [s for s in batch if s in etf_symbols] + + # Process this batch + tasks = [] + if batch_stocks: + tasks.extend(get_endpoint(session, symbol, con, semaphore) for symbol in batch_stocks) + if batch_etfs: + tasks.extend(get_endpoint(session, symbol, etf_con, semaphore) for symbol in batch_etfs) + + # Wait for this batch to complete + await asyncio.gather(*tasks) + + # If not the last batch, pause + if i + REQUEST_LIMIT < len(all_symbols): + print(f"Processed {i+REQUEST_LIMIT} symbols. Pausing for {PAUSE_TIME} seconds...") + await asyncio.sleep(PAUSE_TIME) con.close() etf_con.close() + try: asyncio.run(run()) except Exception as e: - print(e) + print(e) \ No newline at end of file