221 lines
9.8 KiB
Python
221 lines
9.8 KiB
Python
import aiohttp
|
|
import aiofiles
|
|
import ujson
|
|
import orjson
|
|
import sqlite3
|
|
import asyncio
|
|
import pandas as pd
|
|
import os
|
|
from dotenv import load_dotenv
|
|
from datetime import datetime, timedelta
|
|
from tqdm import tqdm
|
|
import pytz
|
|
|
|
headers = {"accept": "application/json"}
|
|
url = "https://api.benzinga.com/api/v2.1/calendar/earnings"
|
|
load_dotenv()
|
|
api_key = os.getenv('BENZINGA_API_KEY')
|
|
|
|
ny_tz = pytz.timezone('America/New_York')
|
|
today = datetime.now(ny_tz).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
min_date = ny_tz.localize(datetime.strptime("2015-01-01", "%Y-%m-%d"))
|
|
N_days_ago = today - timedelta(days=10)
|
|
|
|
query_template = """
|
|
SELECT date, open, high, low, close
|
|
FROM "{ticker}"
|
|
WHERE date >= ?
|
|
"""
|
|
|
|
|
|
def check_existing_file(ticker, folder_name):
|
|
file_path = f"json/earnings/{folder_name}/{ticker}.json"
|
|
still_new = False
|
|
if os.path.exists(file_path):
|
|
try:
|
|
with open(file_path, 'r') as file:
|
|
existing_data = ujson.load(file)
|
|
date_obj = datetime.strptime(existing_data['date'], "%Y-%m-%d")
|
|
if date_obj.tzinfo is None:
|
|
date_obj = date_obj.replace(tzinfo=pytz.UTC)
|
|
|
|
if folder_name == 'surprise':
|
|
if date_obj+timedelta(1) >= N_days_ago:
|
|
still_new = True
|
|
elif folder_name == 'next':
|
|
if date_obj+timedelta(1) >= today:
|
|
still_new = True
|
|
|
|
if still_new == False:
|
|
os.remove(file_path)
|
|
print(f"Deleted file for {ticker}.")
|
|
except Exception as e:
|
|
print(f"Error processing existing file for {ticker}: {e}")
|
|
|
|
|
|
async def save_json(data, symbol, dir_path):
|
|
file_path = os.path.join(dir_path, f"{symbol}.json")
|
|
async with aiofiles.open(file_path, 'w') as file:
|
|
await file.write(ujson.dumps(data))
|
|
|
|
async def get_past_data(data, ticker, con):
|
|
# Filter data based on date constraints
|
|
filtered_data = []
|
|
for item in data:
|
|
try:
|
|
item_date = ny_tz.localize(datetime.strptime(item["date"], "%Y-%m-%d"))
|
|
if min_date <= item_date <= today:
|
|
filtered_data.append(
|
|
{
|
|
'revenue': float(item['revenue']),
|
|
'revenueEst': float(item['revenue_est']),
|
|
'revenueSurprisePercent': round(float(item['revenue_surprise_percent'])*100, 2),
|
|
'eps': round(float(item['eps']), 2),
|
|
'epsEst': round(float(item['eps_est']), 2),
|
|
'epsSurprisePercent': round(float(item['eps_surprise_percent'])*100, 2),
|
|
'year': item['period_year'],
|
|
'quarter': item['period'],
|
|
'date': item['date']
|
|
}
|
|
)
|
|
except:
|
|
pass
|
|
|
|
# Sort the filtered data by date
|
|
if len(filtered_data) > 0:
|
|
filtered_data.sort(key=lambda x: x['date'], reverse=True)
|
|
|
|
try:
|
|
# Load the price history data
|
|
with open(f"json/historical-price/max/{ticker}.json") as file:
|
|
price_history = orjson.loads(file.read())
|
|
|
|
# Convert price_history dates to datetime objects for easy comparison
|
|
price_history_dict = {
|
|
datetime.strptime(item['time'], "%Y-%m-%d"): item for item in price_history
|
|
}
|
|
|
|
# Calculate volatility for each earnings release
|
|
for entry in filtered_data:
|
|
earnings_date = datetime.strptime(entry['date'], "%Y-%m-%d")
|
|
volatility_prices = []
|
|
|
|
# Collect prices from (X-2) to (X+1)
|
|
for i in range(-2, 2):
|
|
current_date = earnings_date + timedelta(days=i)
|
|
if current_date in price_history_dict:
|
|
volatility_prices.append(price_history_dict[current_date])
|
|
|
|
# Calculate volatility if we have at least one price entry
|
|
if volatility_prices:
|
|
high_prices = [day['high'] for day in volatility_prices]
|
|
low_prices = [day['low'] for day in volatility_prices]
|
|
close_prices = [day['close'] for day in volatility_prices]
|
|
|
|
max_high = max(high_prices)
|
|
min_low = min(low_prices)
|
|
avg_close = sum(close_prices) / len(close_prices)
|
|
|
|
# Volatility percentage calculation
|
|
volatility = round(((max_high - min_low) / avg_close) * 100, 2)
|
|
else:
|
|
volatility = None # No data available for volatility calculation
|
|
|
|
# Add the volatility to the entry
|
|
entry['volatility'] = volatility
|
|
|
|
# Save the updated filtered_data
|
|
await save_json(filtered_data, ticker, 'json/earnings/past')
|
|
|
|
except:
|
|
pass
|
|
|
|
|
|
async def get_data(session, ticker, con):
|
|
querystring = {"token": api_key, "parameters[tickers]": ticker}
|
|
try:
|
|
async with session.get(url, params=querystring, headers=headers) as response:
|
|
data = ujson.loads(await response.text())['earnings']
|
|
|
|
await get_past_data(data, ticker, con)
|
|
|
|
# Filter for future earnings
|
|
future_dates = [item for item in data if ny_tz.localize(datetime.strptime(item["date"], "%Y-%m-%d")) >= today]
|
|
if future_dates:
|
|
nearest_future = min(future_dates, key=lambda x: datetime.strptime(x["date"], "%Y-%m-%d"))
|
|
try:
|
|
symbol = nearest_future['ticker']
|
|
time = nearest_future['time']
|
|
date = nearest_future['date']
|
|
eps_prior = float(nearest_future['eps_prior']) if nearest_future['eps_prior'] else None
|
|
eps_est = float(nearest_future['eps_est']) if nearest_future['eps_est'] else None
|
|
revenue_est = float(nearest_future['revenue_est']) if nearest_future['revenue_est'] else None
|
|
revenue_prior = float(nearest_future['revenue_prior']) if nearest_future['revenue_prior'] else None
|
|
if revenue_est is not None and revenue_prior is not None and eps_prior is not None and eps_est is not None:
|
|
res_list = {
|
|
'date': date,
|
|
'time': time,
|
|
'epsPrior': eps_prior,
|
|
'epsEst': eps_est,
|
|
'revenuePrior': revenue_prior,
|
|
'revenueEst': revenue_est
|
|
}
|
|
await save_json(res_list, symbol, 'json/earnings/next')
|
|
except Exception as e:
|
|
print(e)
|
|
pass
|
|
else:
|
|
check_existing_file(ticker, "next")
|
|
|
|
# Filter for past earnings within the last 20 days
|
|
recent_dates = [item for item in data if N_days_ago <= ny_tz.localize(datetime.strptime(item["date"], "%Y-%m-%d")) <= today]
|
|
if recent_dates:
|
|
nearest_recent = min(recent_dates, key=lambda x: datetime.strptime(x["date"], "%Y-%m-%d"))
|
|
try:
|
|
date = nearest_recent['date']
|
|
eps_prior = float(nearest_recent['eps_prior']) if nearest_recent['eps_prior'] != '' else None
|
|
eps_surprise = float(nearest_recent['eps_surprise']) if nearest_recent['eps_surprise'] != '' else None
|
|
eps = float(nearest_recent['eps']) if nearest_recent['eps'] != '' else None
|
|
revenue_prior = float(nearest_recent['revenue_prior']) if nearest_recent['revenue_prior'] != '' else None
|
|
revenue_surprise = float(nearest_recent['revenue_surprise']) if nearest_recent['revenue_surprise'] != '' else None
|
|
revenue = float(nearest_recent['revenue']) if nearest_recent['revenue'] != '' else None
|
|
if revenue is not None and revenue_prior is not None and eps_prior is not None and eps is not None and revenue_surprise is not None and eps_surprise is not None:
|
|
res_list = {
|
|
'epsPrior':eps_prior,
|
|
'epsSurprise': eps_surprise,
|
|
'eps': eps,
|
|
'revenuePrior': revenue_prior,
|
|
'revenueSurprise': revenue_surprise,
|
|
'revenue': revenue,
|
|
'date': date,
|
|
}
|
|
await save_json(res_list, symbol, 'json/earnings/surprise')
|
|
except Exception as e:
|
|
print(e)
|
|
else:
|
|
check_existing_file(ticker, "surprise")
|
|
except Exception as e:
|
|
print(e)
|
|
#pass
|
|
|
|
async def run(stock_symbols, con):
|
|
async with aiohttp.ClientSession() as session:
|
|
tasks = [get_data(session, symbol, con) for symbol in stock_symbols]
|
|
for f in tqdm(asyncio.as_completed(tasks), total=len(stock_symbols)):
|
|
await f
|
|
|
|
try:
|
|
|
|
con = sqlite3.connect('stocks.db')
|
|
cursor = con.cursor()
|
|
cursor.execute("PRAGMA journal_mode = wal")
|
|
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%' AND symbol NOT LIKE '%-%'")
|
|
stock_symbols = [row[0] for row in cursor.fetchall()]
|
|
#stock_symbols = ['TSLA']
|
|
|
|
asyncio.run(run(stock_symbols, con))
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
finally:
|
|
con.close() |