import aiohttp import aiofiles import ujson import sqlite3 import pandas as pd import asyncio import pytz import time import os from dotenv import load_dotenv from datetime import datetime, timedelta from tqdm import tqdm date_format = "%a, %d %b %Y %H:%M:%S %z" load_dotenv() api_key = os.getenv('BENZINGA_API_KEY') headers = {"accept": "application/json"} query_template = """ SELECT close FROM "{symbol}" WHERE date BETWEEN ? AND ? """ # List of holidays when the stock market is closed holidays = [ "2024-01-01", "2024-03-29", "2024-12-25", ] def is_holiday(date): """Check if the given date is a holiday""" str_date = date.strftime("%Y-%m-%d") return str_date in holidays def correct_weekday(selected_date): # Monday is 0 and Sunday is 6 if selected_date.weekday() == 0: selected_date -= timedelta(3) elif selected_date.weekday() <= 4: selected_date -= timedelta(1) elif selected_date.weekday() == 5: selected_date -= timedelta(1) elif selected_date.weekday() == 6: selected_date -= timedelta(2) # Check if the selected date is a holiday and adjust if necessary while is_holiday(selected_date): selected_date -= timedelta(1) # Adjust again if the resulting date is a Saturday or Sunday if selected_date.weekday() >= 5: selected_date -= timedelta(selected_date.weekday() - 4) return selected_date # Create a semaphore to limit concurrent requests REQUEST_LIMIT = 500 PAUSE_TIME = 10 async def get_endpoint(session, symbol, con, semaphore): async with semaphore: url = "https://api.benzinga.com/api/v2/news" querystring = {"token": api_key, "tickers": symbol, "channels":"WIIM", "pageSize":"20", "displayOutput":"full"} try: async with session.get(url, params=querystring, headers=headers) as response: res_list = [] res = ujson.loads(await response.text()) for item in res: try: date_obj = datetime.strptime(item['created'], date_format) new_date_obj_utc = date_obj start_date_obj_utc = correct_weekday(date_obj) start_date = start_date_obj_utc.strftime("%Y-%m-%d") end_date = new_date_obj_utc.strftime("%Y-%m-%d") new_date_str = new_date_obj_utc.strftime("%Y-%m-%d %H:%M:%S") query = query_template.format(symbol=symbol) try: df = pd.read_sql_query(query, con, params=(start_date, end_date)) if not df.empty: change_percent = round((df['close'].iloc[1]/df['close'].iloc[0] -1)*100,2) else: change_percent = '-' except Exception as e: change_percent = '-' res_list.append({'date': new_date_str, 'text': item['title'], 'changesPercentage': change_percent, 'url': item['url']}) except Exception as e: print(f"Error processing item for {symbol}: {e}") if len(res_list) > 0: print("Done", symbol) with open(f"json/wiim/company/{symbol}.json", 'w') as file: ujson.dump(res_list, file) except Exception as e: print(f"Error fetching data for {symbol}: {e}") async def get_latest_wiim(session, stock_symbols, etf_symbols): url = "https://api.benzinga.com/api/v2/news" querystring = {"token": api_key, "channels":"WIIM", "pageSize":"5", "displayOutput":"full"} try: async with session.get(url, params=querystring, headers=headers) as response: res_list = [] res = ujson.loads(await response.text()) for item in res: for el in item['stocks']: # Update the 'name' key to 'ticker' if 'name' in el: el['ticker'] = el.pop('name') if el['ticker'] in stock_symbols: el['assetType'] = 'stock' elif el['ticker'] in etf_symbols: el['assetType'] = 'etf' res_list.append({'date': item['created'], 'text': item['title'], 'stocks': item['stocks']}) with open(f"json/wiim/rss-feed/data.json", 'w') as file: ujson.dump(res_list, file) except Exception as e: print(e) async def run(): con = sqlite3.connect('stocks.db') cursor = con.cursor() cursor.execute("PRAGMA journal_mode = wal") cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") stock_symbols = [row[0] for row in cursor.fetchall()] #stock_symbols = ['GME'] etf_con = sqlite3.connect('etf.db') etf_cursor = etf_con.cursor() etf_cursor.execute("PRAGMA journal_mode = wal") etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") etf_symbols = [row[0] for row in etf_cursor.fetchall()] # Create a semaphore to limit concurrent requests and implement rate limiting semaphore = asyncio.Semaphore(REQUEST_LIMIT) async with aiohttp.ClientSession() as session: # Combine stock and ETF symbols all_symbols = stock_symbols + etf_symbols # Split symbols into batches for i in range(0, len(all_symbols), REQUEST_LIMIT): batch = all_symbols[i:i+REQUEST_LIMIT] # Determine which symbols are stocks or ETFs batch_stocks = [s for s in batch if s in stock_symbols] batch_etfs = [s for s in batch if s in etf_symbols] # Process this batch tasks = [] if batch_stocks: tasks.extend(get_endpoint(session, symbol, con, semaphore) for symbol in batch_stocks) if batch_etfs: tasks.extend(get_endpoint(session, symbol, etf_con, semaphore) for symbol in batch_etfs) # Wait for this batch to complete await asyncio.gather(*tasks) # If not the last batch, pause if i + REQUEST_LIMIT < len(all_symbols): print(f"Processed {i+REQUEST_LIMIT} symbols. Pausing for {PAUSE_TIME} seconds...") await asyncio.sleep(PAUSE_TIME) con.close() etf_con.close() try: asyncio.run(run()) except Exception as e: print(e)