delete cron jobs and update primary cron

This commit is contained in:
MuslemRahimi 2025-01-05 00:07:17 +01:00
parent 402af9715f
commit 68b5e22085
10 changed files with 165 additions and 738 deletions

View File

@ -1,54 +0,0 @@
import aiohttp
import ujson
import sqlite3
import asyncio
import os
from dotenv import load_dotenv
from datetime import datetime
import re
load_dotenv()
api_key = os.getenv('BENZINGA_API_KEY')
async def get_endpoint(session, symbol):
url = "https://api.benzinga.com/api/v1/bulls_bears_say"
querystring = {"token": api_key, "symbols": symbol}
formatted_data = {}
try:
async with session.get(url, params=querystring) as response:
res = ujson.loads(await response.text())
try:
for item in res['bulls_say_bears_say']:
date = datetime.fromtimestamp(item['updated'])
date = date.strftime("%Y-%m-%d %H:%M:%S")
bull_case = item['bull_case']
bear_case = item['bear_case']
formatted_data = {'date': date, 'bullSays': bull_case, 'bearSays': bear_case}
except:
pass
except:
pass
if formatted_data:
with open(f"json/bull_bear_say/{symbol}.json", 'w') as file:
ujson.dump(formatted_data, file)
async def run():
con = sqlite3.connect('stocks.db')
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
stocks_symbols = [row[0] for row in cursor.fetchall()]
#stocks_symbols = ['NVDA']
con.close()
async with aiohttp.ClientSession() as session:
await asyncio.gather(*(get_endpoint(session, symbol) for symbol in stocks_symbols))
try:
asyncio.run(run())
except Exception as e:
print(e)

View File

@ -1,109 +0,0 @@
import ujson
import asyncio
import aiohttp
import sqlite3
from datetime import datetime,timedelta
from tqdm import tqdm
import pandas as pd
import time
from dotenv import load_dotenv
import os
keys_to_keep = {'date','stockpx', 'iv60', 'iv90', '252dclshv','60dorhv'}
load_dotenv()
api_key = os.getenv('NASDAQ_API_KEY')
# Get today's date
today = datetime.today()
# Calculate the date 12 months ago
dates = [today - timedelta(days=i) for i in range(365)]
date_str = ','.join(date.strftime('%Y-%m-%d') for date in dates)
async def save_json(symbol, data):
with open(f"json/implied-volatility/companies/{symbol}.json", 'w') as file:
ujson.dump(data, file)
# Function to filter the list
def filter_past_six_months(data):
filtered_data = []
for entry in data:
entry_date = datetime.strptime(entry['date'], '%Y-%m-%d')
if entry_date >= six_months_ago:
filtered_data.append(entry)
sorted_data = sorted(filtered_data, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d'))
return sorted_data
async def get_data(ticker):
#ticker_str = ','.join(ticker_list)
async with aiohttp.ClientSession() as session:
url = url = f"https://data.nasdaq.com/api/v3/datatables/ORATS/OPT?date={date_str}&ticker={ticker}&api_key={api_key}"
async with session.get(url) as response:
if response.status == 200:
res = await response.json()
data = res['datatable']['data']
columns = res['datatable']['columns']
return data, columns
else:
return [], []
async def run():
con = sqlite3.connect('stocks.db')
etf_con = sqlite3.connect('etf.db')
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketcap >=10E6 AND symbol NOT LIKE '%.%'")
stocks_symbols = [row[0] for row in cursor.fetchall()]
etf_cursor = etf_con.cursor()
etf_cursor.execute("PRAGMA journal_mode = wal")
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
total_symbols = stocks_symbols+etf_symbols
chunk_size = len(total_symbols) // 70 # Divide the list into N chunks
chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)]
for ticker in tqdm(total_symbols):
data, columns = await get_data(ticker)
filtered_data = []
for element in tqdm(data):
# Assuming the number of columns matches the length of each element in `data`
filtered_data.append({columns[i]["name"]: element[i] for i in range(len(columns))})
filtered_data = [{k: v for k, v in item.items() if k in keys_to_keep} for item in filtered_data]
try:
sorted_data = sorted(filtered_data, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d'))
if len(sorted_data) > 0:
await save_json(ticker, sorted_data)
except Exception as e:
print(e)
'''
for symbol in chunk:
try:
filtered_data = [item for item in transformed_data if symbol == item['ticker']]
sorted_data = sorted(filtered_data, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d'))
if len(sorted_data) > 0:
await save_json(symbol, sorted_data)
except Exception as e:
print(e)
'''
con.close()
etf_con.close()
try:
asyncio.run(run())
except Exception as e:
print(e)

View File

@ -1,251 +0,0 @@
import sqlite3
from datetime import datetime, timedelta, date
import ujson
import asyncio
import os
from dotenv import load_dotenv
from benzinga import financial_data
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
import math
from scipy.stats import norm
from scipy.optimize import brentq
load_dotenv()
api_key = os.getenv('BENZINGA_API_KEY')
fin = financial_data.Benzinga(api_key)
risk_free_rate = 0.05
def black_scholes_price(S, K, T, r, sigma, option_type="CALL"):
if T <= 0:
raise ValueError("Time to maturity (T) must be greater than 0.")
if sigma <= 0:
raise ValueError("Volatility (sigma) must be greater than 0.")
d1 = (math.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * math.sqrt(T))
d2 = d1 - sigma * math.sqrt(T)
if option_type == "CALL":
return S * norm.cdf(d1) - K * math.exp(-r * T) * norm.cdf(d2)
elif option_type == "PUT":
return K * math.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)
else:
raise ValueError("Invalid option_type. Use 'CALL' or 'PUT'.")
# Implied volatility function
def implied_volatility(S, K, T, r, market_price, option_type="CALL"):
def objective_function(sigma):
return black_scholes_price(S, K, T, r, sigma, option_type) - market_price
# Use brentq to solve for the implied volatility
try:
return brentq(objective_function, 1e-6, 3) # Bounds for volatility
except ValueError:
return None # Return None if there's no solution
def calculate_dte(date_expiration):
expiration_date = datetime.strptime(date_expiration, "%Y-%m-%d")
return (expiration_date - datetime.today()).days
def calculate_avg_dte(data):
active_options = [entry for entry in data if calculate_dte(entry['date_expiration']) >= 0]
if active_options:
total_dte = sum(entry['dte'] for entry in active_options)
return int(total_dte / len(active_options))
else:
return 0
def calculate_put_call_volumes(data):
put_volume = sum(int(entry['volume']) for entry in data if entry['put_call'] == 'PUT')
call_volume = sum(int(entry['volume']) for entry in data if entry['put_call'] == 'CALL')
return put_volume, call_volume
def options_bubble_data(chunk):
try:
company_tickers = ','.join(chunk)
end_date = date.today()
start_date = end_date - timedelta(365) #look 1 year ago
end_date_str = end_date.strftime('%Y-%m-%d')
start_date_str = start_date.strftime('%Y-%m-%d')
res_list = []
page = 0
while True:
try:
data = fin.options_activity(company_tickers=company_tickers, page=page, pagesize=1000, date_from=start_date_str, date_to=end_date_str)
data = ujson.loads(fin.output(data))['option_activity']
res_list += data
page +=1
except:
break
res_filtered = [{key: value for key, value in item.items() if key in ['ticker','underlying_price','strike_price','price','date', 'date_expiration', 'put_call', 'volume', 'open_interest']} for item in res_list]
#================Start computing historical iv60=====================#
# Convert to DataFrame for easier manipulation
df = pd.DataFrame(res_filtered)
# Ensure correct types for dates and numerical fields
df['date'] = pd.to_datetime(df['date'])
df['date_expiration'] = pd.to_datetime(df['date_expiration'])
df['underlying_price'] = pd.to_numeric(df['underlying_price'], errors='coerce')
df['strike_price'] = pd.to_numeric(df['strike_price'], errors='coerce')
df['price'] = pd.to_numeric(df['price'], errors='coerce')
df['volume'] = pd.to_numeric(df['volume'], errors='coerce')
df['open_interest'] = pd.to_numeric(df['open_interest'], errors='coerce')
df['days_to_expiration'] = (df['date_expiration'] - df['date']).dt.days
df_30d = df[(df['days_to_expiration'] >= 0) & (df['days_to_expiration'] <= 1000)]
# Calculate implied volatility for options in the 30-day range
iv_data = []
for _, option in df_30d.iterrows():
S = option['underlying_price']
K = option['strike_price']
T = option['days_to_expiration'] / 365
market_price = option['price']
option_type = "CALL" if option['put_call'] == "CALL" else "PUT"
# Check for missing values
if pd.notna(S) and pd.notna(K) and pd.notna(T) and pd.notna(market_price):
# Calculate IV
iv = implied_volatility(S, K, T, risk_free_rate, market_price, option_type)
if iv is not None:
iv_data.append({
"date": option['date'],
"IV": iv,
"volume": option['volume']
})
# Create a DataFrame with the calculated IV data
iv_df = pd.DataFrame(iv_data)
# Calculate daily IV60 by averaging IVs (weighted by volume)
def calculate_daily_iv60(group):
weighted_iv = (group["IV"] * group["volume"]).sum() / group["volume"].sum()
return weighted_iv
# Group by date and compute daily IV60
iv60_history = iv_df.groupby("date").apply(calculate_daily_iv60)
# Fill NaN values using forward fill to carry the last valid IV60 forward
iv60_history = iv60_history.ffill()
iv60_history = iv60_history.to_dict()
iv60_dict = {k.strftime('%Y-%m-%d'): v for k, v in iv60_history.items()}
#print(iv60_dict)
#====================================================================#
for option_type in ['CALL', 'PUT']:
for item in res_filtered:
try:
if item['put_call'].upper() == option_type:
item['dte'] = calculate_dte(item['date_expiration'])
if item['ticker'] in ['BRK.A', 'BRK.B']:
item['ticker'] = f"BRK-{item['ticker'][-1]}"
except:
pass
#Save raw data for each ticker for options page stack bar chart
result_list = []
for ticker in chunk:
try:
ticker_filtered_data = [entry for entry in res_filtered if entry['ticker'] == ticker]
if len(ticker_filtered_data) != 0:
# Sum up calls and puts for each day for the plot
summed_data = {}
for entry in ticker_filtered_data:
volume = int(entry['volume'])
open_interest = int(entry['open_interest'])
put_call = entry['put_call']
date_str = entry['date']
if date_str not in summed_data:
summed_data[date_str] = {'CALL': {'volume': 0, 'open_interest': 0}, 'PUT': {'volume': 0, 'open_interest': 0}, 'iv60': None}
summed_data[date_str][put_call]['volume'] += volume
summed_data[date_str][put_call]['open_interest'] += open_interest
if date_str in iv60_dict:
summed_data[date_str]['iv60'] = round(iv60_dict[date_str]*100,1)
result_list.extend([{'date': date, 'CALL': summed_data[date]['CALL'], 'PUT': summed_data[date]['PUT'], 'iv60': summed_data[date]['iv60']} for date in summed_data])
# Reverse the list
result_list = result_list[::-1]
with open(f"json/options-flow/company/{ticker}.json", 'w') as file:
ujson.dump(result_list, file)
except Exception as e:
print(f"Error found: {e}")
pass
#Save bubble data for each ticker for overview page
'''
for ticker in chunk:
bubble_data = {}
for time_period, days in {'oneDay': 1, 'oneWeek': 7, 'oneMonth': 30, 'threeMonth': 90, 'sixMonth': 180, 'oneYear': 252}.items():
start_date = end_date - timedelta(days=days) #end_date is today
filtered_data = [item for item in res_filtered if start_date <= datetime.strptime(item['date'], '%Y-%m-%d').date() <= end_date]
ticker_filtered_data = [entry for entry in filtered_data if entry['ticker'] == ticker]
put_volume, call_volume = calculate_put_call_volumes(ticker_filtered_data)
avg_dte = calculate_avg_dte(ticker_filtered_data)
bubble_data[time_period] = {'putVolume': put_volume, 'callVolume': call_volume, 'avgDTE': avg_dte}
if all(all(value == 0 for value in data.values()) for data in bubble_data.values()):
bubble_data = {}
#don't save the json
else:
with open(f"json/options-bubble/{ticker}.json", 'w') as file:
ujson.dump(bubble_data, file)
'''
except ValueError as ve:
print(ve)
except Exception as e:
print(f"Error found in the process: {e}")
async def main():
try:
stock_con = sqlite3.connect('stocks.db')
stock_cursor = stock_con.cursor()
stock_cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
stock_symbols = [row[0] for row in stock_cursor.fetchall()]
etf_con = sqlite3.connect('etf.db')
etf_cursor = etf_con.cursor()
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
stock_con.close()
etf_con.close()
total_symbols = stock_symbols + etf_symbols
total_symbols = [item.replace("BRK-B", "BRK.B") for item in total_symbols]
chunk_size = 1 #len(total_symbols) // 2000 # Divide the list into N chunks
chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)]
#chunks = [['U']]
loop = asyncio.get_running_loop()
with ThreadPoolExecutor(max_workers=4) as executor:
tasks = [loop.run_in_executor(executor, options_bubble_data, chunk) for chunk in chunks]
for f in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
await f
except Exception as e:
print(e)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -159,11 +159,12 @@ for symbol in tqdm(total_symbols):
if response.status_code == 200: if response.status_code == 200:
data = response.json()['data'] data = response.json()['data']
prepare_data(data, symbol) prepare_data(data, symbol)
counter +=1
counter +=1
# If 50 chunks have been processed, sleep for 60 seconds # If 50 chunks have been processed, sleep for 60 seconds
if counter == 100: if counter == 260:
print("Sleeping...") print("Sleeping...")
time.sleep(30) # Sleep for 60 seconds time.sleep(60) # Sleep for 60 seconds
counter = 0 counter = 0
except Exception as e: except Exception as e:

View File

@ -7,6 +7,10 @@ import os
import sqlite3 import sqlite3
import time import time
from tqdm import tqdm from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
from functools import partial
import asyncio
import aiohttp
load_dotenv() load_dotenv()
@ -127,76 +131,19 @@ def get_hottest_contracts():
if response.status_code == 200: if response.status_code == 200:
data = response.json()['data'] data = response.json()['data']
prepare_data(data, symbol) prepare_data(data, symbol)
counter +=1
counter +=1
# If 50 chunks have been processed, sleep for 60 seconds # If 50 chunks have been processed, sleep for 60 seconds
if counter == 100: if counter == 260:
print("Sleeping...") print("Sleeping...")
time.sleep(30) # Sleep for 60 seconds time.sleep(60)
counter = 0 counter = 0
except Exception as e: except Exception as e:
print(f"Error for {symbol}:{e}") print(f"Error for {symbol}:{e}")
def get_single_contract_historical_data(contract_id):
keys_to_remove = {'high_price', 'low_price', 'iv_low', 'iv_high', 'last_tape_time'}
url = f"https://api.unusualwhales.com/api/option-contract/{contract_id}/historic"
response = requests.get(url, headers=headers)
data = response.json()['chains']
data = sorted(data, key=lambda x: datetime.strptime(x.get('date', ''), '%Y-%m-%d'))
res_list = []
for i, item in enumerate(data):
new_item = {
key: safe_round(value) if isinstance(value, (int, float, str)) else value
for key, value in item.items()
}
# Compute open interest change and percent if not the first item
if i > 0:
previous_open_interest = safe_round(data[i-1].get('open_interest', 0))
open_interest = safe_round(item.get('open_interest', 0))
if previous_open_interest > 0:
new_item['open_interest_change'] = safe_round(open_interest - previous_open_interest)
new_item['open_interest_change_percent'] = safe_round((open_interest / previous_open_interest - 1) * 100)
else:
new_item['open_interest_change'] = 0
new_item['open_interest_change_percent'] = 0
res_list.append(new_item)
if res_list:
res_list = [{key: value for key, value in item.items() if key not in keys_to_remove} for item in res_list]
res_list = sorted(res_list, key=lambda x: datetime.strptime(x.get('date', ''), '%Y-%m-%d'), reverse=True)
save_json(res_list, contract_id,"json/hottest-contracts/contracts")
if __name__ == '__main__': if __name__ == '__main__':
get_hottest_contracts() get_hottest_contracts()
'''
total_symbols = get_tickers_from_directory(directory_path)
contract_id_set = set() # Use a set to ensure uniqueness
for symbol in total_symbols:
try:
with open(f"json/hottest-contracts/companies/{symbol}.json", "r") as file:
data = orjson.loads(file.read())
for item in data:
try:
contract_id_set.add(item['option_symbol']) # Add to the set
except KeyError:
pass # Handle missing 'option_symbol' keys gracefully
except FileNotFoundError:
pass # Handle missing files gracefully
# Convert the set to a list if needed
contract_id_list = list(contract_id_set)
print(len(contract_id_list))
print(contract_id_list[0])
get_single_contract_historical_data('GME250117C00125000')
'''

View File

@ -1,145 +0,0 @@
import sqlite3
from datetime import datetime, timedelta, date
import ujson
import os
import numpy as np
from dotenv import load_dotenv
from benzinga import financial_data
from collections import defaultdict
from tqdm import tqdm
load_dotenv()
api_key = os.getenv('BENZINGA_API_KEY')
fin = financial_data.Benzinga(api_key)
def save_json(symbol, data):
with open(f"json/options-net-flow/companies/{symbol}.json", 'w') as file:
ujson.dump(data, file)
def calculate_moving_average(data, window_size):
data = np.array(data, dtype=float)
cumsum = np.cumsum(data)
moving_avg = (cumsum[window_size - 1:] - np.concatenate(([0], cumsum[:-window_size]))) / window_size
return moving_avg.tolist()
def calculate_net_flow(data):
date_data = defaultdict(lambda: {'price': [], 'netCall': 0, 'netPut': 0})
for item in data:
date_str = item['date']
time_str = item['time']
datetime_str = f"{date_str} {time_str}"
# Parse the combined date and time into a datetime object
date_time = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S')
try:
premium = float(item['cost_basis'])
date_data[date_time]['price'].append(round(float(item['underlying_price']), 2))
if item['put_call'] == 'CALL':
if item['execution_estimate'] == 'AT_ASK':
date_data[date_time]['netCall'] += premium
elif item['execution_estimate'] == 'AT_BID':
date_data[date_time]['netCall'] -= premium
elif item['put_call'] == 'PUT':
if item['execution_estimate'] == 'AT_ASK':
date_data[date_time]['netPut'] -= premium
elif item['execution_estimate'] == 'AT_BID':
date_data[date_time]['netPut'] += premium
except:
pass
# Calculate average underlying price and format the results
result = []
for date_time, values in date_data.items():
result.append({
'date': date_time.strftime('%Y-%m-%d %H:%M:%S'),
'price': sum(values['price']) / len(values['price']) if values['price'] else 0,
'netCall': int(values['netCall']),
'netPut': int(values['netPut']),
})
sorted_data = sorted(result, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d %H:%M:%S'))
# Compute 30-minute interval averages
interval_data = defaultdict(lambda: {'price': [], 'netCall': [], 'netPut': []})
for item in sorted_data:
date_time = datetime.strptime(item['date'], '%Y-%m-%d %H:%M:%S')
interval_start = date_time.replace(minute=date_time.minute // 120 * 120, second=0)
interval_data[interval_start]['price'].append(item['price'])
interval_data[interval_start]['netCall'].append(item['netCall'])
interval_data[interval_start]['netPut'].append(item['netPut'])
# Calculate averages for each 30-minute interval
averaged_data = []
for interval_start, values in interval_data.items():
if values['price']:
averaged_data.append({
'date': interval_start.strftime('%Y-%m-%d %H:%M:%S'),
#'price': sum(values['price']) / len(values['price']) ,
'netCall': sum(values['netCall']) if values['netCall'] else 0,
'netPut': sum(values['netPut']) if values['netPut'] else 0,
})
# Sort the averaged data by interval start time
averaged_data.sort(key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d %H:%M:%S'))
return averaged_data
def get_data(symbol):
try:
end_date = date.today()
start_date = end_date - timedelta(10)
end_date_str = end_date.strftime('%Y-%m-%d')
start_date_str = start_date.strftime('%Y-%m-%d')
res_list = []
for page in range(0, 1000):
try:
data = fin.options_activity(company_tickers=symbol, page=page, pagesize=1000, date_from=start_date_str, date_to=end_date_str)
data = ujson.loads(fin.output(data))['option_activity']
res_list += data
except:
break
res_filtered = [{key: value for key, value in item.items() if key in ['ticker','time','date','execution_estimate', 'underlying_price', 'put_call', 'cost_basis']} for item in res_list]
#Save raw data for each ticker for options page stack bar chart
ticker_filtered_data = [entry for entry in res_filtered if entry['ticker'] == symbol]
if len(ticker_filtered_data) > 100:
net_flow_data = calculate_net_flow(ticker_filtered_data)
if len(net_flow_data) > 0:
save_json(symbol, net_flow_data)
except ValueError as ve:
print(ve)
except Exception as e:
print(e)
try:
stock_con = sqlite3.connect('stocks.db')
stock_cursor = stock_con.cursor()
stock_cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >500E6 AND symbol NOT LIKE '%.%'")
stock_symbols = [row[0] for row in stock_cursor.fetchall()]
etf_con = sqlite3.connect('etf.db')
etf_cursor = etf_con.cursor()
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
stock_con.close()
etf_con.close()
total_symbols = stock_symbols + etf_symbols
for symbol in tqdm(total_symbols):
get_data(symbol)
except Exception as e:
print(e)

View File

@ -0,0 +1,118 @@
import requests
import orjson
import re
from datetime import datetime
from dotenv import load_dotenv
import os
import time
import asyncio
import aiohttp
from tqdm import tqdm
load_dotenv()
api_key = os.getenv('UNUSUAL_WHALES_API_KEY')
headers = {"Accept": "application/json, text/plain", "Authorization": api_key}
keys_to_remove = {'high_price', 'low_price', 'iv_low', 'iv_high', 'last_tape_time'}
def save_json(data, filename, directory):
os.makedirs(directory, exist_ok=True)
filepath = os.path.join(directory, f"{filename}.json")
with open(filepath, 'wb') as file:
file.write(orjson.dumps(data))
def get_tickers_from_directory(directory: str):
try:
if not os.path.exists(directory):
raise FileNotFoundError(f"The directory '{directory}' does not exist.")
return [file.replace(".json", "") for file in os.listdir(directory) if file.endswith(".json")]
except Exception as e:
print(f"An error occurred: {e}")
return []
def safe_round(value, decimals=2):
try:
return round(float(value), decimals)
except (ValueError, TypeError):
return value
def get_single_contract_historical_data(contract_id):
keys_to_remove = {'high_price', 'low_price', 'iv_low', 'iv_high', 'last_tape_time'}
url = f"https://api.unusualwhales.com/api/option-contract/{contract_id}/historic"
response = requests.get(url, headers=headers)
data = response.json()['chains']
data = sorted(data, key=lambda x: datetime.strptime(x.get('date', ''), '%Y-%m-%d'))
res_list = []
for i, item in enumerate(data):
new_item = {
key: safe_round(value) if isinstance(value, (int, float, str)) else value
for key, value in item.items()
}
# Compute open interest change and percent if not the first item
if i > 0:
previous_open_interest = safe_round(data[i-1].get('open_interest', 0))
open_interest = safe_round(item.get('open_interest', 0))
if previous_open_interest > 0:
new_item['open_interest_change'] = safe_round(open_interest - previous_open_interest)
new_item['open_interest_change_percent'] = safe_round((open_interest / previous_open_interest - 1) * 100)
else:
new_item['open_interest_change'] = 0
new_item['open_interest_change_percent'] = 0
res_list.append(new_item)
if res_list:
res_list = [{key: value for key, value in item.items() if key not in keys_to_remove} for item in res_list]
res_list = sorted(res_list, key=lambda x: datetime.strptime(x.get('date', ''), '%Y-%m-%d'), reverse=True)
save_json(res_list, contract_id,"json/hottest-contracts/contracts")
if __name__ == '__main__':
directory_path = "json/hottest-contracts/companies"
total_symbols = get_tickers_from_directory(directory_path)
contract_id_set = set() # Use a set to ensure uniqueness
for symbol in total_symbols:
try:
with open(f"json/hottest-contracts/companies/{symbol}.json", "r") as file:
data = orjson.loads(file.read())
volume_list = data.get('volume',[])
open_interest_list = data.get('openInterest',[])
if len(volume_list) > 0:
for item in volume_list:
try:
contract_id_set.add(item['option_symbol']) # Add to the set
except KeyError:
pass # Handle missing 'option_symbol' keys gracefully
if len(open_interest_list) > 0:
for item in open_interest_list:
try:
contract_id_set.add(item['option_symbol']) # Add to the set
except KeyError:
pass # Handle missing 'option_symbol' keys gracefully
except FileNotFoundError:
pass # Handle missing files gracefully
# Convert the set to a list if needed
contract_id_list = list(contract_id_set)
print("Number of contract chains:", len(contract_id_list))
counter = 0
for item in tqdm(contract_id_list):
try:
get_single_contract_historical_data(item)
except:
pass
# If 50 chunks have been processed, sleep for 60 seconds
counter +=1
if counter == 260:
print("Sleeping...")
time.sleep(60)
counter = 0

View File

@ -1,102 +0,0 @@
import time
from benzinga import financial_data
import ujson
import numpy as np
import sqlite3
import asyncio
from datetime import datetime, timedelta
import concurrent.futures
from GetStartEndDate import GetStartEndDate
from dotenv import load_dotenv
import os
load_dotenv()
api_key = os.getenv('BENZINGA_API_KEY')
fin = financial_data.Benzinga(api_key)
stock_con = sqlite3.connect('stocks.db')
stock_cursor = stock_con.cursor()
stock_cursor.execute("SELECT DISTINCT symbol FROM stocks")
stock_symbols = [row[0] for row in stock_cursor.fetchall()]
etf_con = sqlite3.connect('etf.db')
etf_cursor = etf_con.cursor()
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
start_date_1d, end_date_1d = GetStartEndDate().run()
start_date = start_date_1d.strftime("%Y-%m-%d")
end_date = end_date_1d.strftime("%Y-%m-%d")
#print(start_date,end_date)
def process_page(page):
try:
data = fin.options_activity(date_from=start_date, date_to=end_date, page=page, pagesize=1000)
data = ujson.loads(fin.output(data))['option_activity']
filtered_data = [{key: value for key, value in item.items() if key not in ['description_extended','updated']} for item in data]
time.sleep(1)
page_list = []
for item in filtered_data:
if item['underlying_price'] != '':
ticker = item['ticker']
if ticker == 'BRK.A':
ticker = 'BRK-A'
elif ticker == 'BRK.B':
ticker = 'BRK-B'
put_call = 'Calls' if item['put_call'] == 'CALL' else 'Puts'
asset_type = 'stock' if ticker in stock_symbols else ('etf' if ticker in etf_symbols else '')
item['assetType'] = asset_type
item['put_call'] = put_call
item['ticker'] = ticker
item['price'] = round(float(item['price']), 2)
item['strike_price'] = round(float(item['strike_price']), 2)
item['cost_basis'] = round(float(item['cost_basis']), 2)
item['underlying_price'] = round(float(item['underlying_price']), 2)
item['type'] = item['option_activity_type'].capitalize()
item['sentiment'] = item['sentiment'].capitalize()
item['executionEstimate'] = item['execution_estimate'].replace('_', ' ').title()
item['tradeCount'] = item['trade_count']
if item['date_expiration'] == start_date:
page_list.append(item)
return page_list
except:
return []
# Assuming fin, stock_symbols, and etf_symbols are defined elsewhere
res_list = []
# Adjust max_workers to control the degree of parallelism
max_workers = 6
# Fetch pages concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_page = {executor.submit(process_page, page): page for page in range(20)}
for future in concurrent.futures.as_completed(future_to_page):
page = future_to_page[future]
try:
page_list = future.result()
res_list += page_list
except Exception as e:
print(f"Exception occurred: {e}")
# res_list now contains the aggregated results from all pages
#print(res_list)
#print(len(res_list))
# Define a custom key function to extract the time and convert it to a sortable format
def custom_key(item):
return item['time']
res_list = sorted(res_list, key=custom_key, reverse =True)
with open(f"json/options-flow/zero-dte/data.json", 'w') as file:
ujson.dump(res_list, file)
stock_con.close()
etf_con.close()

View File

@ -2632,6 +2632,32 @@ async def get_pre_post_quote(data:TickerData, api_key: str = Security(get_api_ke
@app.post("/options-contract-history")
async def get_data(data:GeneralData, api_key: str = Security(get_api_key)):
contract_id = data.params
cache_key = f"options-contract-history-{contract_id}"
cached_result = redis_client.get(cache_key)
if cached_result:
return StreamingResponse(
io.BytesIO(cached_result),
media_type="application/json",
headers={"Content-Encoding": "gzip"})
try:
with open(f"json/hottest-contracts/contracts/{contract_id}.json", 'rb') as file:
res = orjson.loads(file.read())
except:
res = []
data = orjson.dumps(res)
compressed_data = gzip.compress(data)
redis_client.set(cache_key, compressed_data)
redis_client.expire(cache_key, 3600*60)
return StreamingResponse(
io.BytesIO(compressed_data),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)
@app.post("/options-stats-ticker") @app.post("/options-stats-ticker")
async def get_options_stats_ticker(data:TickerData, api_key: str = Security(get_api_key)): async def get_options_stats_ticker(data:TickerData, api_key: str = Security(get_api_key)):

View File

@ -85,12 +85,17 @@ def run_dark_pool_ticker():
run_command(["python3", "cron_dark_pool_ticker.py"]) run_command(["python3", "cron_dark_pool_ticker.py"])
def run_options_stats(): def run_options_jobs():
now = datetime.now(ny_tz) now = datetime.now(ny_tz)
week = now.weekday() week = now.weekday()
if week <= 5: if week <= 5:
run_command(["python3", "cron_options_stats.py"]) run_command(["python3", "cron_options_stats.py"])
time.sleep(60)
run_command(["python3", "cron_options_historical_volume.py"]) run_command(["python3", "cron_options_historical_volume.py"])
time.sleep(60)
run_command(["python3", "cron_options_hottest_contracts.py"])
time.sleep(60)
run_command(["python3", "cron_options_single_contract.py"])
def run_fda_calendar(): def run_fda_calendar():
now = datetime.now(ny_tz) now = datetime.now(ny_tz)
@ -216,12 +221,6 @@ def run_executive():
if week <= 4: if week <= 4:
run_command(["python3", "cron_executive.py"]) run_command(["python3", "cron_executive.py"])
def run_options_bubble_ticker():
week = datetime.today().weekday()
if week <= 4:
run_command(["python3", "cron_options_bubble.py"])
def run_analyst_rating(): def run_analyst_rating():
week = datetime.today().weekday() week = datetime.today().weekday()
if week <= 4: if week <= 4:
@ -341,14 +340,11 @@ def run_threaded(job_func):
# Schedule the job to run # Schedule the job to run
schedule.every().day.at("01:00").do(run_threaded, run_options_jobs).tag('options_job')
schedule.every().day.at("01:00").do(run_threaded, run_options_bubble_ticker).tag('options_ticker_job')
schedule.every().day.at("02:00").do(run_threaded, run_db_schedule_job) schedule.every().day.at("02:00").do(run_threaded, run_db_schedule_job)
#schedule.every().day.at("05:00").do(run_threaded, run_options_gex).tag('options_gex_job') #schedule.every().day.at("05:00").do(run_threaded, run_options_gex).tag('options_gex_job')
schedule.every().day.at("05:00").do(run_threaded, run_export_price).tag('export_price_job') schedule.every().day.at("05:00").do(run_threaded, run_export_price).tag('export_price_job')
schedule.every().day.at("03:30").do(run_threaded, run_options_stats).tag('options_stats_job')
schedule.every().day.at("06:00").do(run_threaded, run_historical_price).tag('historical_job') schedule.every().day.at("06:00").do(run_threaded, run_historical_price).tag('historical_job')
schedule.every().day.at("06:30").do(run_threaded, run_ai_score).tag('ai_score_job') schedule.every().day.at("06:30").do(run_threaded, run_ai_score).tag('ai_score_job')