223 lines
7.9 KiB
Python
Executable File
223 lines
7.9 KiB
Python
Executable File
import aiohttp
|
|
import aiofiles
|
|
import ujson
|
|
import sqlite3
|
|
import pandas as pd
|
|
import asyncio
|
|
import pytz
|
|
import time
|
|
import os
|
|
from dotenv import load_dotenv
|
|
from datetime import datetime, timedelta
|
|
from tqdm import tqdm
|
|
import pytz
|
|
|
|
|
|
date_format = "%a, %d %b %Y %H:%M:%S %z"
|
|
|
|
load_dotenv()
|
|
api_key = os.getenv('BENZINGA_API_KEY')
|
|
|
|
headers = {"accept": "application/json"}
|
|
|
|
N_weeks_ago = datetime.now(pytz.UTC) - timedelta(weeks=50)
|
|
|
|
query_template = """
|
|
SELECT
|
|
close
|
|
FROM
|
|
"{symbol}"
|
|
WHERE
|
|
date BETWEEN ? AND ?
|
|
"""
|
|
|
|
# List of holidays when the stock market is closed
|
|
holidays = ['2025-01-01', '2025-01-09','2025-01-20', '2025-02-17', '2025-04-18', '2025-05-26', '2025-06-19', '2025-07-04', '2025-09-01', '2025-11-27', '2025-12-25']
|
|
|
|
|
|
def is_holiday(date):
|
|
"""Check if the given date is a holiday"""
|
|
str_date = date.strftime("%Y-%m-%d")
|
|
return str_date in holidays
|
|
|
|
def correct_weekday(selected_date):
|
|
# Monday is 0 and Sunday is 6
|
|
if selected_date.weekday() == 0:
|
|
selected_date -= timedelta(3)
|
|
elif selected_date.weekday() <= 4:
|
|
selected_date -= timedelta(1)
|
|
elif selected_date.weekday() == 5:
|
|
selected_date -= timedelta(1)
|
|
elif selected_date.weekday() == 6:
|
|
selected_date -= timedelta(2)
|
|
|
|
# Check if the selected date is a holiday and adjust if necessary
|
|
while is_holiday(selected_date):
|
|
selected_date -= timedelta(1)
|
|
|
|
# Adjust again if the resulting date is a Saturday or Sunday
|
|
if selected_date.weekday() >= 5:
|
|
selected_date -= timedelta(selected_date.weekday() - 4)
|
|
|
|
return selected_date
|
|
|
|
# Create a semaphore to limit concurrent requests
|
|
REQUEST_LIMIT = 500
|
|
PAUSE_TIME = 5
|
|
|
|
def check_existing_file(symbol):
|
|
file_path = f"json/wiim/company/{symbol}.json"
|
|
if os.path.exists(file_path):
|
|
try:
|
|
with open(file_path, 'r') as file:
|
|
existing_data = ujson.load(file)
|
|
|
|
|
|
# Filter out elements older than two weeks
|
|
updated_data = []
|
|
for item in existing_data:
|
|
try:
|
|
# Parse the date
|
|
date_obj = datetime.strptime(item['date'], "%Y-%m-%d %H:%M:%S")
|
|
if date_obj.tzinfo is None:
|
|
date_obj = date_obj.replace(tzinfo=pytz.UTC)
|
|
|
|
if date_obj >= N_weeks_ago:
|
|
updated_data.append(item)
|
|
except Exception as e:
|
|
print(f"Error processing existing item: {e}")
|
|
|
|
# Write back the filtered data
|
|
if updated_data:
|
|
with open(file_path, 'w') as file:
|
|
ujson.dump(updated_data, file)
|
|
print(f"Updated existing file for {symbol}, removed old entries.")
|
|
else:
|
|
os.remove(file_path)
|
|
print(f"Deleted file for {symbol} as all entries were older than two weeks.")
|
|
except Exception as e:
|
|
print(f"Error processing existing file for {symbol}: {e}")
|
|
|
|
async def get_endpoint(session, symbol, con, semaphore):
|
|
async with semaphore:
|
|
url = "https://api.benzinga.com/api/v2/news"
|
|
querystring = {
|
|
"token": api_key,
|
|
"tickers": symbol,
|
|
"channels": "WIIM",
|
|
"pageSize": "20",
|
|
"sort":"created:desc",
|
|
}
|
|
|
|
try:
|
|
async with session.get(url, params=querystring, headers=headers) as response:
|
|
res_list = []
|
|
res = ujson.loads(await response.text())
|
|
|
|
# Define New York timezone
|
|
ny_tz = pytz.timezone("America/New_York")
|
|
|
|
for item in res:
|
|
try:
|
|
# Parse the date and ensure timezone-awareness
|
|
date_obj = datetime.strptime(item['created'], date_format)
|
|
if date_obj.tzinfo is None:
|
|
date_obj = date_obj.replace(tzinfo=pytz.UTC)
|
|
|
|
# Skip items older than two weeks
|
|
if date_obj < N_weeks_ago:
|
|
continue
|
|
|
|
# Convert the date to New York timezone
|
|
date_obj_ny = date_obj.astimezone(ny_tz)
|
|
|
|
start_date_obj_utc = correct_weekday(date_obj)
|
|
start_date = start_date_obj_utc.strftime("%Y-%m-%d")
|
|
end_date = date_obj.strftime("%Y-%m-%d")
|
|
new_date_str = date_obj_ny.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
query = query_template.format(symbol=symbol)
|
|
|
|
try:
|
|
df = pd.read_sql_query(query, con, params=(start_date, end_date))
|
|
if not df.empty:
|
|
change_percent = round((df['close'].iloc[1] / df['close'].iloc[0] - 1) * 100, 2)
|
|
else:
|
|
change_percent = '-'
|
|
except Exception as e:
|
|
print(f"Error fetching stock data for {symbol}: {e}")
|
|
change_percent = '-'
|
|
|
|
res_list.append({
|
|
'date': new_date_str,
|
|
'text': item['title'],
|
|
'changesPercentage': change_percent
|
|
})
|
|
except:
|
|
pass
|
|
|
|
if res_list:
|
|
print(f"Done processing {symbol}")
|
|
with open(f"json/wiim/company/{symbol}.json", 'w') as file:
|
|
ujson.dump(res_list, file)
|
|
else:
|
|
check_existing_file(symbol)
|
|
|
|
except Exception as e:
|
|
print(f"Error fetching data for {symbol}: {e}")
|
|
|
|
async def run():
|
|
con = sqlite3.connect('stocks.db')
|
|
|
|
cursor = con.cursor()
|
|
cursor.execute("PRAGMA journal_mode = wal")
|
|
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
|
|
stock_symbols = [row[0] for row in cursor.fetchall()]
|
|
|
|
etf_con = sqlite3.connect('etf.db')
|
|
|
|
etf_cursor = etf_con.cursor()
|
|
etf_cursor.execute("PRAGMA journal_mode = wal")
|
|
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
|
|
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
|
|
|
|
#stock_symbols = ['DIS']
|
|
#etf_symbols = []
|
|
|
|
# Create a semaphore to limit concurrent requests and implement rate limiting
|
|
semaphore = asyncio.Semaphore(REQUEST_LIMIT)
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
# Combine stock and ETF symbols
|
|
all_symbols = stock_symbols + etf_symbols
|
|
|
|
# Split symbols into batches
|
|
for i in range(0, len(all_symbols), REQUEST_LIMIT):
|
|
batch = all_symbols[i:i+REQUEST_LIMIT]
|
|
|
|
# Determine which symbols are stocks or ETFs
|
|
batch_stocks = [s for s in batch if s in stock_symbols]
|
|
batch_etfs = [s for s in batch if s in etf_symbols]
|
|
|
|
# Process this batch
|
|
tasks = []
|
|
if batch_stocks:
|
|
tasks.extend(get_endpoint(session, symbol, con, semaphore) for symbol in batch_stocks)
|
|
if batch_etfs:
|
|
tasks.extend(get_endpoint(session, symbol, etf_con, semaphore) for symbol in batch_etfs)
|
|
|
|
# Wait for this batch to complete
|
|
await asyncio.gather(*tasks)
|
|
|
|
# If not the last batch, pause
|
|
if i + REQUEST_LIMIT < len(all_symbols):
|
|
print(f"Processed {i+REQUEST_LIMIT} symbols. Pausing for {PAUSE_TIME} seconds...")
|
|
await asyncio.sleep(PAUSE_TIME)
|
|
|
|
con.close()
|
|
etf_con.close()
|
|
|
|
try:
|
|
asyncio.run(run())
|
|
except Exception as e:
|
|
print(e) |