From 68b5e22085f76c2a03addc71b476508f63e30060 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Sun, 5 Jan 2025 00:07:17 +0100 Subject: [PATCH] delete cron jobs and update primary cron --- app/cron_bull_bear_say.py | 54 ------ app/cron_implied_volatility.py | 109 ----------- app/cron_options_bubble.py | 251 -------------------------- app/cron_options_historical_volume.py | 7 +- app/cron_options_hottest_contracts.py | 73 +------- app/cron_options_net_flow.py | 145 --------------- app/cron_options_single_contract.py | 118 ++++++++++++ app/cron_options_zero_dte.py | 102 ----------- app/main.py | 26 +++ app/primary_cron_job.py | 18 +- 10 files changed, 165 insertions(+), 738 deletions(-) delete mode 100755 app/cron_bull_bear_say.py delete mode 100644 app/cron_implied_volatility.py delete mode 100755 app/cron_options_bubble.py delete mode 100644 app/cron_options_net_flow.py create mode 100644 app/cron_options_single_contract.py delete mode 100755 app/cron_options_zero_dte.py diff --git a/app/cron_bull_bear_say.py b/app/cron_bull_bear_say.py deleted file mode 100755 index c1320fb..0000000 --- a/app/cron_bull_bear_say.py +++ /dev/null @@ -1,54 +0,0 @@ -import aiohttp -import ujson -import sqlite3 -import asyncio -import os -from dotenv import load_dotenv -from datetime import datetime -import re - -load_dotenv() -api_key = os.getenv('BENZINGA_API_KEY') - - - -async def get_endpoint(session, symbol): - url = "https://api.benzinga.com/api/v1/bulls_bears_say" - querystring = {"token": api_key, "symbols": symbol} - formatted_data = {} - try: - async with session.get(url, params=querystring) as response: - res = ujson.loads(await response.text()) - try: - for item in res['bulls_say_bears_say']: - date = datetime.fromtimestamp(item['updated']) - date = date.strftime("%Y-%m-%d %H:%M:%S") - bull_case = item['bull_case'] - bear_case = item['bear_case'] - formatted_data = {'date': date, 'bullSays': bull_case, 'bearSays': bear_case} - except: - pass - except: - pass - - if formatted_data: - with open(f"json/bull_bear_say/{symbol}.json", 'w') as file: - ujson.dump(formatted_data, file) - -async def run(): - con = sqlite3.connect('stocks.db') - - cursor = con.cursor() - cursor.execute("PRAGMA journal_mode = wal") - cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") - stocks_symbols = [row[0] for row in cursor.fetchall()] - #stocks_symbols = ['NVDA'] - con.close() - - async with aiohttp.ClientSession() as session: - await asyncio.gather(*(get_endpoint(session, symbol) for symbol in stocks_symbols)) - -try: - asyncio.run(run()) -except Exception as e: - print(e) diff --git a/app/cron_implied_volatility.py b/app/cron_implied_volatility.py deleted file mode 100644 index c23cd3e..0000000 --- a/app/cron_implied_volatility.py +++ /dev/null @@ -1,109 +0,0 @@ -import ujson -import asyncio -import aiohttp -import sqlite3 -from datetime import datetime,timedelta -from tqdm import tqdm -import pandas as pd -import time - -from dotenv import load_dotenv -import os - -keys_to_keep = {'date','stockpx', 'iv60', 'iv90', '252dclshv','60dorhv'} - -load_dotenv() -api_key = os.getenv('NASDAQ_API_KEY') - - -# Get today's date -today = datetime.today() -# Calculate the date 12 months ago -dates = [today - timedelta(days=i) for i in range(365)] -date_str = ','.join(date.strftime('%Y-%m-%d') for date in dates) - -async def save_json(symbol, data): - with open(f"json/implied-volatility/companies/{symbol}.json", 'w') as file: - ujson.dump(data, file) - - -# Function to filter the list -def filter_past_six_months(data): - filtered_data = [] - for entry in data: - entry_date = datetime.strptime(entry['date'], '%Y-%m-%d') - if entry_date >= six_months_ago: - filtered_data.append(entry) - sorted_data = sorted(filtered_data, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d')) - return sorted_data - -async def get_data(ticker): - #ticker_str = ','.join(ticker_list) - - async with aiohttp.ClientSession() as session: - url = url = f"https://data.nasdaq.com/api/v3/datatables/ORATS/OPT?date={date_str}&ticker={ticker}&api_key={api_key}" - async with session.get(url) as response: - if response.status == 200: - res = await response.json() - data = res['datatable']['data'] - columns = res['datatable']['columns'] - return data, columns - else: - return [], [] - - -async def run(): - con = sqlite3.connect('stocks.db') - etf_con = sqlite3.connect('etf.db') - - cursor = con.cursor() - cursor.execute("PRAGMA journal_mode = wal") - cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketcap >=10E6 AND symbol NOT LIKE '%.%'") - stocks_symbols = [row[0] for row in cursor.fetchall()] - - etf_cursor = etf_con.cursor() - etf_cursor.execute("PRAGMA journal_mode = wal") - etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") - etf_symbols = [row[0] for row in etf_cursor.fetchall()] - - - - total_symbols = stocks_symbols+etf_symbols - - chunk_size = len(total_symbols) // 70 # Divide the list into N chunks - chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)] - - for ticker in tqdm(total_symbols): - data, columns = await get_data(ticker) - filtered_data = [] - for element in tqdm(data): - # Assuming the number of columns matches the length of each element in `data` - filtered_data.append({columns[i]["name"]: element[i] for i in range(len(columns))}) - - filtered_data = [{k: v for k, v in item.items() if k in keys_to_keep} for item in filtered_data] - - try: - sorted_data = sorted(filtered_data, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d')) - if len(sorted_data) > 0: - await save_json(ticker, sorted_data) - except Exception as e: - print(e) - - ''' - for symbol in chunk: - try: - filtered_data = [item for item in transformed_data if symbol == item['ticker']] - sorted_data = sorted(filtered_data, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d')) - if len(sorted_data) > 0: - await save_json(symbol, sorted_data) - except Exception as e: - print(e) - ''' - - con.close() - etf_con.close() - -try: - asyncio.run(run()) -except Exception as e: - print(e) \ No newline at end of file diff --git a/app/cron_options_bubble.py b/app/cron_options_bubble.py deleted file mode 100755 index cca290d..0000000 --- a/app/cron_options_bubble.py +++ /dev/null @@ -1,251 +0,0 @@ -import sqlite3 -from datetime import datetime, timedelta, date -import ujson -import asyncio -import os -from dotenv import load_dotenv -from benzinga import financial_data -from tqdm import tqdm -from concurrent.futures import ThreadPoolExecutor -import pandas as pd -import math -from scipy.stats import norm -from scipy.optimize import brentq - - -load_dotenv() -api_key = os.getenv('BENZINGA_API_KEY') - -fin = financial_data.Benzinga(api_key) - - -risk_free_rate = 0.05 - -def black_scholes_price(S, K, T, r, sigma, option_type="CALL"): - if T <= 0: - raise ValueError("Time to maturity (T) must be greater than 0.") - if sigma <= 0: - raise ValueError("Volatility (sigma) must be greater than 0.") - - d1 = (math.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * math.sqrt(T)) - d2 = d1 - sigma * math.sqrt(T) - - if option_type == "CALL": - return S * norm.cdf(d1) - K * math.exp(-r * T) * norm.cdf(d2) - elif option_type == "PUT": - return K * math.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1) - else: - raise ValueError("Invalid option_type. Use 'CALL' or 'PUT'.") - -# Implied volatility function -def implied_volatility(S, K, T, r, market_price, option_type="CALL"): - def objective_function(sigma): - return black_scholes_price(S, K, T, r, sigma, option_type) - market_price - - # Use brentq to solve for the implied volatility - try: - return brentq(objective_function, 1e-6, 3) # Bounds for volatility - except ValueError: - return None # Return None if there's no solution - - -def calculate_dte(date_expiration): - expiration_date = datetime.strptime(date_expiration, "%Y-%m-%d") - return (expiration_date - datetime.today()).days - -def calculate_avg_dte(data): - active_options = [entry for entry in data if calculate_dte(entry['date_expiration']) >= 0] - - if active_options: - total_dte = sum(entry['dte'] for entry in active_options) - return int(total_dte / len(active_options)) - else: - return 0 - -def calculate_put_call_volumes(data): - put_volume = sum(int(entry['volume']) for entry in data if entry['put_call'] == 'PUT') - call_volume = sum(int(entry['volume']) for entry in data if entry['put_call'] == 'CALL') - return put_volume, call_volume - -def options_bubble_data(chunk): - try: - company_tickers = ','.join(chunk) - end_date = date.today() - start_date = end_date - timedelta(365) #look 1 year ago - - end_date_str = end_date.strftime('%Y-%m-%d') - start_date_str = start_date.strftime('%Y-%m-%d') - - res_list = [] - page = 0 - while True: - try: - data = fin.options_activity(company_tickers=company_tickers, page=page, pagesize=1000, date_from=start_date_str, date_to=end_date_str) - data = ujson.loads(fin.output(data))['option_activity'] - res_list += data - page +=1 - except: - break - - res_filtered = [{key: value for key, value in item.items() if key in ['ticker','underlying_price','strike_price','price','date', 'date_expiration', 'put_call', 'volume', 'open_interest']} for item in res_list] - - #================Start computing historical iv60=====================# - # Convert to DataFrame for easier manipulation - df = pd.DataFrame(res_filtered) - - # Ensure correct types for dates and numerical fields - df['date'] = pd.to_datetime(df['date']) - df['date_expiration'] = pd.to_datetime(df['date_expiration']) - df['underlying_price'] = pd.to_numeric(df['underlying_price'], errors='coerce') - df['strike_price'] = pd.to_numeric(df['strike_price'], errors='coerce') - df['price'] = pd.to_numeric(df['price'], errors='coerce') - df['volume'] = pd.to_numeric(df['volume'], errors='coerce') - df['open_interest'] = pd.to_numeric(df['open_interest'], errors='coerce') - - df['days_to_expiration'] = (df['date_expiration'] - df['date']).dt.days - df_30d = df[(df['days_to_expiration'] >= 0) & (df['days_to_expiration'] <= 1000)] - # Calculate implied volatility for options in the 30-day range - iv_data = [] - for _, option in df_30d.iterrows(): - S = option['underlying_price'] - K = option['strike_price'] - T = option['days_to_expiration'] / 365 - market_price = option['price'] - option_type = "CALL" if option['put_call'] == "CALL" else "PUT" - - # Check for missing values - if pd.notna(S) and pd.notna(K) and pd.notna(T) and pd.notna(market_price): - # Calculate IV - iv = implied_volatility(S, K, T, risk_free_rate, market_price, option_type) - if iv is not None: - iv_data.append({ - "date": option['date'], - "IV": iv, - "volume": option['volume'] - }) - - # Create a DataFrame with the calculated IV data - iv_df = pd.DataFrame(iv_data) - - # Calculate daily IV60 by averaging IVs (weighted by volume) - def calculate_daily_iv60(group): - weighted_iv = (group["IV"] * group["volume"]).sum() / group["volume"].sum() - return weighted_iv - - # Group by date and compute daily IV60 - iv60_history = iv_df.groupby("date").apply(calculate_daily_iv60) - - # Fill NaN values using forward fill to carry the last valid IV60 forward - iv60_history = iv60_history.ffill() - iv60_history = iv60_history.to_dict() - iv60_dict = {k.strftime('%Y-%m-%d'): v for k, v in iv60_history.items()} - #print(iv60_dict) - #====================================================================# - - for option_type in ['CALL', 'PUT']: - for item in res_filtered: - try: - if item['put_call'].upper() == option_type: - item['dte'] = calculate_dte(item['date_expiration']) - if item['ticker'] in ['BRK.A', 'BRK.B']: - item['ticker'] = f"BRK-{item['ticker'][-1]}" - except: - pass - - #Save raw data for each ticker for options page stack bar chart - result_list = [] - for ticker in chunk: - try: - ticker_filtered_data = [entry for entry in res_filtered if entry['ticker'] == ticker] - if len(ticker_filtered_data) != 0: - # Sum up calls and puts for each day for the plot - summed_data = {} - for entry in ticker_filtered_data: - volume = int(entry['volume']) - open_interest = int(entry['open_interest']) - put_call = entry['put_call'] - date_str = entry['date'] - - if date_str not in summed_data: - summed_data[date_str] = {'CALL': {'volume': 0, 'open_interest': 0}, 'PUT': {'volume': 0, 'open_interest': 0}, 'iv60': None} - - summed_data[date_str][put_call]['volume'] += volume - summed_data[date_str][put_call]['open_interest'] += open_interest - - if date_str in iv60_dict: - summed_data[date_str]['iv60'] = round(iv60_dict[date_str]*100,1) - - result_list.extend([{'date': date, 'CALL': summed_data[date]['CALL'], 'PUT': summed_data[date]['PUT'], 'iv60': summed_data[date]['iv60']} for date in summed_data]) - - # Reverse the list - result_list = result_list[::-1] - - with open(f"json/options-flow/company/{ticker}.json", 'w') as file: - ujson.dump(result_list, file) - except Exception as e: - print(f"Error found: {e}") - pass - - - - #Save bubble data for each ticker for overview page - ''' - for ticker in chunk: - bubble_data = {} - for time_period, days in {'oneDay': 1, 'oneWeek': 7, 'oneMonth': 30, 'threeMonth': 90, 'sixMonth': 180, 'oneYear': 252}.items(): - start_date = end_date - timedelta(days=days) #end_date is today - - filtered_data = [item for item in res_filtered if start_date <= datetime.strptime(item['date'], '%Y-%m-%d').date() <= end_date] - - ticker_filtered_data = [entry for entry in filtered_data if entry['ticker'] == ticker] - put_volume, call_volume = calculate_put_call_volumes(ticker_filtered_data) - avg_dte = calculate_avg_dte(ticker_filtered_data) - bubble_data[time_period] = {'putVolume': put_volume, 'callVolume': call_volume, 'avgDTE': avg_dte} - - if all(all(value == 0 for value in data.values()) for data in bubble_data.values()): - bubble_data = {} - #don't save the json - else: - with open(f"json/options-bubble/{ticker}.json", 'w') as file: - ujson.dump(bubble_data, file) - ''' - - except ValueError as ve: - print(ve) - except Exception as e: - print(f"Error found in the process: {e}") - - -async def main(): - try: - stock_con = sqlite3.connect('stocks.db') - stock_cursor = stock_con.cursor() - stock_cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") - stock_symbols = [row[0] for row in stock_cursor.fetchall()] - - etf_con = sqlite3.connect('etf.db') - etf_cursor = etf_con.cursor() - etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") - etf_symbols = [row[0] for row in etf_cursor.fetchall()] - - stock_con.close() - etf_con.close() - - total_symbols = stock_symbols + etf_symbols - total_symbols = [item.replace("BRK-B", "BRK.B") for item in total_symbols] - - - chunk_size = 1 #len(total_symbols) // 2000 # Divide the list into N chunks - chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)] - #chunks = [['U']] - loop = asyncio.get_running_loop() - with ThreadPoolExecutor(max_workers=4) as executor: - tasks = [loop.run_in_executor(executor, options_bubble_data, chunk) for chunk in chunks] - for f in tqdm(asyncio.as_completed(tasks), total=len(tasks)): - await f - - except Exception as e: - print(e) - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/app/cron_options_historical_volume.py b/app/cron_options_historical_volume.py index 183f207..7720914 100644 --- a/app/cron_options_historical_volume.py +++ b/app/cron_options_historical_volume.py @@ -159,11 +159,12 @@ for symbol in tqdm(total_symbols): if response.status_code == 200: data = response.json()['data'] prepare_data(data, symbol) - counter +=1 + + counter +=1 # If 50 chunks have been processed, sleep for 60 seconds - if counter == 100: + if counter == 260: print("Sleeping...") - time.sleep(30) # Sleep for 60 seconds + time.sleep(60) # Sleep for 60 seconds counter = 0 except Exception as e: diff --git a/app/cron_options_hottest_contracts.py b/app/cron_options_hottest_contracts.py index 6f4fcf7..864fa78 100644 --- a/app/cron_options_hottest_contracts.py +++ b/app/cron_options_hottest_contracts.py @@ -7,6 +7,10 @@ import os import sqlite3 import time from tqdm import tqdm +from concurrent.futures import ThreadPoolExecutor +from functools import partial +import asyncio +import aiohttp load_dotenv() @@ -127,76 +131,19 @@ def get_hottest_contracts(): if response.status_code == 200: data = response.json()['data'] prepare_data(data, symbol) - counter +=1 + + counter +=1 + # If 50 chunks have been processed, sleep for 60 seconds - if counter == 100: + if counter == 260: print("Sleeping...") - time.sleep(30) # Sleep for 60 seconds + time.sleep(60) counter = 0 except Exception as e: print(f"Error for {symbol}:{e}") -def get_single_contract_historical_data(contract_id): - keys_to_remove = {'high_price', 'low_price', 'iv_low', 'iv_high', 'last_tape_time'} - - url = f"https://api.unusualwhales.com/api/option-contract/{contract_id}/historic" - response = requests.get(url, headers=headers) - data = response.json()['chains'] - data = sorted(data, key=lambda x: datetime.strptime(x.get('date', ''), '%Y-%m-%d')) - res_list = [] - for i, item in enumerate(data): - new_item = { - key: safe_round(value) if isinstance(value, (int, float, str)) else value - for key, value in item.items() - } - - # Compute open interest change and percent if not the first item - if i > 0: - previous_open_interest = safe_round(data[i-1].get('open_interest', 0)) - open_interest = safe_round(item.get('open_interest', 0)) - - if previous_open_interest > 0: - new_item['open_interest_change'] = safe_round(open_interest - previous_open_interest) - new_item['open_interest_change_percent'] = safe_round((open_interest / previous_open_interest - 1) * 100) - else: - new_item['open_interest_change'] = 0 - new_item['open_interest_change_percent'] = 0 - - res_list.append(new_item) - - if res_list: - res_list = [{key: value for key, value in item.items() if key not in keys_to_remove} for item in res_list] - res_list = sorted(res_list, key=lambda x: datetime.strptime(x.get('date', ''), '%Y-%m-%d'), reverse=True) - - save_json(res_list, contract_id,"json/hottest-contracts/contracts") - if __name__ == '__main__': - get_hottest_contracts() - - ''' - total_symbols = get_tickers_from_directory(directory_path) - - contract_id_set = set() # Use a set to ensure uniqueness - for symbol in total_symbols: - try: - with open(f"json/hottest-contracts/companies/{symbol}.json", "r") as file: - data = orjson.loads(file.read()) - for item in data: - try: - contract_id_set.add(item['option_symbol']) # Add to the set - except KeyError: - pass # Handle missing 'option_symbol' keys gracefully - except FileNotFoundError: - pass # Handle missing files gracefully - - # Convert the set to a list if needed - contract_id_list = list(contract_id_set) - - print(len(contract_id_list)) - print(contract_id_list[0]) - - get_single_contract_historical_data('GME250117C00125000') - ''' \ No newline at end of file + get_hottest_contracts() \ No newline at end of file diff --git a/app/cron_options_net_flow.py b/app/cron_options_net_flow.py deleted file mode 100644 index bed8462..0000000 --- a/app/cron_options_net_flow.py +++ /dev/null @@ -1,145 +0,0 @@ -import sqlite3 -from datetime import datetime, timedelta, date -import ujson -import os -import numpy as np -from dotenv import load_dotenv -from benzinga import financial_data -from collections import defaultdict -from tqdm import tqdm - -load_dotenv() -api_key = os.getenv('BENZINGA_API_KEY') - -fin = financial_data.Benzinga(api_key) - - -def save_json(symbol, data): - with open(f"json/options-net-flow/companies/{symbol}.json", 'w') as file: - ujson.dump(data, file) - -def calculate_moving_average(data, window_size): - data = np.array(data, dtype=float) - cumsum = np.cumsum(data) - moving_avg = (cumsum[window_size - 1:] - np.concatenate(([0], cumsum[:-window_size]))) / window_size - return moving_avg.tolist() - - -def calculate_net_flow(data): - date_data = defaultdict(lambda: {'price': [], 'netCall': 0, 'netPut': 0}) - for item in data: - date_str = item['date'] - time_str = item['time'] - datetime_str = f"{date_str} {time_str}" - - # Parse the combined date and time into a datetime object - date_time = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S') - - try: - premium = float(item['cost_basis']) - date_data[date_time]['price'].append(round(float(item['underlying_price']), 2)) - if item['put_call'] == 'CALL': - if item['execution_estimate'] == 'AT_ASK': - date_data[date_time]['netCall'] += premium - elif item['execution_estimate'] == 'AT_BID': - date_data[date_time]['netCall'] -= premium - elif item['put_call'] == 'PUT': - if item['execution_estimate'] == 'AT_ASK': - date_data[date_time]['netPut'] -= premium - elif item['execution_estimate'] == 'AT_BID': - date_data[date_time]['netPut'] += premium - except: - pass - - # Calculate average underlying price and format the results - result = [] - for date_time, values in date_data.items(): - result.append({ - 'date': date_time.strftime('%Y-%m-%d %H:%M:%S'), - 'price': sum(values['price']) / len(values['price']) if values['price'] else 0, - 'netCall': int(values['netCall']), - 'netPut': int(values['netPut']), - }) - - sorted_data = sorted(result, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d %H:%M:%S')) - - # Compute 30-minute interval averages - interval_data = defaultdict(lambda: {'price': [], 'netCall': [], 'netPut': []}) - for item in sorted_data: - date_time = datetime.strptime(item['date'], '%Y-%m-%d %H:%M:%S') - interval_start = date_time.replace(minute=date_time.minute // 120 * 120, second=0) - - interval_data[interval_start]['price'].append(item['price']) - interval_data[interval_start]['netCall'].append(item['netCall']) - interval_data[interval_start]['netPut'].append(item['netPut']) - - # Calculate averages for each 30-minute interval - averaged_data = [] - for interval_start, values in interval_data.items(): - if values['price']: - averaged_data.append({ - 'date': interval_start.strftime('%Y-%m-%d %H:%M:%S'), - #'price': sum(values['price']) / len(values['price']) , - 'netCall': sum(values['netCall']) if values['netCall'] else 0, - 'netPut': sum(values['netPut']) if values['netPut'] else 0, - }) - - # Sort the averaged data by interval start time - averaged_data.sort(key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d %H:%M:%S')) - - return averaged_data - -def get_data(symbol): - try: - end_date = date.today() - start_date = end_date - timedelta(10) - - end_date_str = end_date.strftime('%Y-%m-%d') - start_date_str = start_date.strftime('%Y-%m-%d') - - res_list = [] - for page in range(0, 1000): - try: - data = fin.options_activity(company_tickers=symbol, page=page, pagesize=1000, date_from=start_date_str, date_to=end_date_str) - data = ujson.loads(fin.output(data))['option_activity'] - res_list += data - except: - break - - res_filtered = [{key: value for key, value in item.items() if key in ['ticker','time','date','execution_estimate', 'underlying_price', 'put_call', 'cost_basis']} for item in res_list] - - #Save raw data for each ticker for options page stack bar chart - ticker_filtered_data = [entry for entry in res_filtered if entry['ticker'] == symbol] - if len(ticker_filtered_data) > 100: - net_flow_data = calculate_net_flow(ticker_filtered_data) - if len(net_flow_data) > 0: - save_json(symbol, net_flow_data) - - - except ValueError as ve: - print(ve) - except Exception as e: - print(e) - -try: - stock_con = sqlite3.connect('stocks.db') - stock_cursor = stock_con.cursor() - stock_cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >500E6 AND symbol NOT LIKE '%.%'") - stock_symbols = [row[0] for row in stock_cursor.fetchall()] - - etf_con = sqlite3.connect('etf.db') - etf_cursor = etf_con.cursor() - etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") - etf_symbols = [row[0] for row in etf_cursor.fetchall()] - - stock_con.close() - etf_con.close() - - total_symbols = stock_symbols + etf_symbols - - for symbol in tqdm(total_symbols): - get_data(symbol) - -except Exception as e: - print(e) - diff --git a/app/cron_options_single_contract.py b/app/cron_options_single_contract.py new file mode 100644 index 0000000..3096de2 --- /dev/null +++ b/app/cron_options_single_contract.py @@ -0,0 +1,118 @@ +import requests +import orjson +import re +from datetime import datetime +from dotenv import load_dotenv +import os +import time +import asyncio +import aiohttp +from tqdm import tqdm + +load_dotenv() + +api_key = os.getenv('UNUSUAL_WHALES_API_KEY') +headers = {"Accept": "application/json, text/plain", "Authorization": api_key} +keys_to_remove = {'high_price', 'low_price', 'iv_low', 'iv_high', 'last_tape_time'} + +def save_json(data, filename, directory): + os.makedirs(directory, exist_ok=True) + filepath = os.path.join(directory, f"{filename}.json") + with open(filepath, 'wb') as file: + file.write(orjson.dumps(data)) + +def get_tickers_from_directory(directory: str): + try: + if not os.path.exists(directory): + raise FileNotFoundError(f"The directory '{directory}' does not exist.") + return [file.replace(".json", "") for file in os.listdir(directory) if file.endswith(".json")] + except Exception as e: + print(f"An error occurred: {e}") + return [] + +def safe_round(value, decimals=2): + try: + return round(float(value), decimals) + except (ValueError, TypeError): + return value + + +def get_single_contract_historical_data(contract_id): + keys_to_remove = {'high_price', 'low_price', 'iv_low', 'iv_high', 'last_tape_time'} + + url = f"https://api.unusualwhales.com/api/option-contract/{contract_id}/historic" + response = requests.get(url, headers=headers) + data = response.json()['chains'] + data = sorted(data, key=lambda x: datetime.strptime(x.get('date', ''), '%Y-%m-%d')) + res_list = [] + for i, item in enumerate(data): + new_item = { + key: safe_round(value) if isinstance(value, (int, float, str)) else value + for key, value in item.items() + } + + # Compute open interest change and percent if not the first item + if i > 0: + previous_open_interest = safe_round(data[i-1].get('open_interest', 0)) + open_interest = safe_round(item.get('open_interest', 0)) + + if previous_open_interest > 0: + new_item['open_interest_change'] = safe_round(open_interest - previous_open_interest) + new_item['open_interest_change_percent'] = safe_round((open_interest / previous_open_interest - 1) * 100) + else: + new_item['open_interest_change'] = 0 + new_item['open_interest_change_percent'] = 0 + + res_list.append(new_item) + + if res_list: + res_list = [{key: value for key, value in item.items() if key not in keys_to_remove} for item in res_list] + res_list = sorted(res_list, key=lambda x: datetime.strptime(x.get('date', ''), '%Y-%m-%d'), reverse=True) + + save_json(res_list, contract_id,"json/hottest-contracts/contracts") + +if __name__ == '__main__': + directory_path = "json/hottest-contracts/companies" + + total_symbols = get_tickers_from_directory(directory_path) + + contract_id_set = set() # Use a set to ensure uniqueness + for symbol in total_symbols: + try: + with open(f"json/hottest-contracts/companies/{symbol}.json", "r") as file: + data = orjson.loads(file.read()) + volume_list = data.get('volume',[]) + open_interest_list = data.get('openInterest',[]) + if len(volume_list) > 0: + for item in volume_list: + try: + contract_id_set.add(item['option_symbol']) # Add to the set + except KeyError: + pass # Handle missing 'option_symbol' keys gracefully + + if len(open_interest_list) > 0: + for item in open_interest_list: + try: + contract_id_set.add(item['option_symbol']) # Add to the set + except KeyError: + pass # Handle missing 'option_symbol' keys gracefully + except FileNotFoundError: + pass # Handle missing files gracefully + + # Convert the set to a list if needed + contract_id_list = list(contract_id_set) + + print("Number of contract chains:", len(contract_id_list)) + + counter = 0 + for item in tqdm(contract_id_list): + try: + get_single_contract_historical_data(item) + except: + pass + # If 50 chunks have been processed, sleep for 60 seconds + counter +=1 + if counter == 260: + print("Sleeping...") + time.sleep(60) + counter = 0 \ No newline at end of file diff --git a/app/cron_options_zero_dte.py b/app/cron_options_zero_dte.py deleted file mode 100755 index 44582dd..0000000 --- a/app/cron_options_zero_dte.py +++ /dev/null @@ -1,102 +0,0 @@ -import time -from benzinga import financial_data -import ujson -import numpy as np -import sqlite3 -import asyncio -from datetime import datetime, timedelta -import concurrent.futures -from GetStartEndDate import GetStartEndDate - -from dotenv import load_dotenv -import os -load_dotenv() -api_key = os.getenv('BENZINGA_API_KEY') - -fin = financial_data.Benzinga(api_key) - -stock_con = sqlite3.connect('stocks.db') -stock_cursor = stock_con.cursor() -stock_cursor.execute("SELECT DISTINCT symbol FROM stocks") -stock_symbols = [row[0] for row in stock_cursor.fetchall()] - -etf_con = sqlite3.connect('etf.db') -etf_cursor = etf_con.cursor() -etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") -etf_symbols = [row[0] for row in etf_cursor.fetchall()] - -start_date_1d, end_date_1d = GetStartEndDate().run() -start_date = start_date_1d.strftime("%Y-%m-%d") -end_date = end_date_1d.strftime("%Y-%m-%d") - -#print(start_date,end_date) - -def process_page(page): - try: - data = fin.options_activity(date_from=start_date, date_to=end_date, page=page, pagesize=1000) - data = ujson.loads(fin.output(data))['option_activity'] - filtered_data = [{key: value for key, value in item.items() if key not in ['description_extended','updated']} for item in data] - time.sleep(1) - page_list = [] - for item in filtered_data: - if item['underlying_price'] != '': - ticker = item['ticker'] - if ticker == 'BRK.A': - ticker = 'BRK-A' - elif ticker == 'BRK.B': - ticker = 'BRK-B' - - put_call = 'Calls' if item['put_call'] == 'CALL' else 'Puts' - - asset_type = 'stock' if ticker in stock_symbols else ('etf' if ticker in etf_symbols else '') - - item['assetType'] = asset_type - item['put_call'] = put_call - item['ticker'] = ticker - item['price'] = round(float(item['price']), 2) - item['strike_price'] = round(float(item['strike_price']), 2) - item['cost_basis'] = round(float(item['cost_basis']), 2) - item['underlying_price'] = round(float(item['underlying_price']), 2) - item['type'] = item['option_activity_type'].capitalize() - item['sentiment'] = item['sentiment'].capitalize() - item['executionEstimate'] = item['execution_estimate'].replace('_', ' ').title() - item['tradeCount'] = item['trade_count'] - - if item['date_expiration'] == start_date: - page_list.append(item) - return page_list - except: - return [] - - -# Assuming fin, stock_symbols, and etf_symbols are defined elsewhere -res_list = [] - -# Adjust max_workers to control the degree of parallelism -max_workers = 6 - -# Fetch pages concurrently -with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_page = {executor.submit(process_page, page): page for page in range(20)} - for future in concurrent.futures.as_completed(future_to_page): - page = future_to_page[future] - try: - page_list = future.result() - res_list += page_list - except Exception as e: - print(f"Exception occurred: {e}") - -# res_list now contains the aggregated results from all pages -#print(res_list) -#print(len(res_list)) - -# Define a custom key function to extract the time and convert it to a sortable format -def custom_key(item): - return item['time'] - -res_list = sorted(res_list, key=custom_key, reverse =True) -with open(f"json/options-flow/zero-dte/data.json", 'w') as file: - ujson.dump(res_list, file) - -stock_con.close() -etf_con.close() diff --git a/app/main.py b/app/main.py index b096664..b631a3a 100755 --- a/app/main.py +++ b/app/main.py @@ -2632,6 +2632,32 @@ async def get_pre_post_quote(data:TickerData, api_key: str = Security(get_api_ke +@app.post("/options-contract-history") +async def get_data(data:GeneralData, api_key: str = Security(get_api_key)): + contract_id = data.params + cache_key = f"options-contract-history-{contract_id}" + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"}) + + try: + with open(f"json/hottest-contracts/contracts/{contract_id}.json", 'rb') as file: + res = orjson.loads(file.read()) + except: + res = [] + data = orjson.dumps(res) + compressed_data = gzip.compress(data) + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 3600*60) + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + @app.post("/options-stats-ticker") async def get_options_stats_ticker(data:TickerData, api_key: str = Security(get_api_key)): diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index d07a7aa..a61cb62 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -85,12 +85,17 @@ def run_dark_pool_ticker(): run_command(["python3", "cron_dark_pool_ticker.py"]) -def run_options_stats(): +def run_options_jobs(): now = datetime.now(ny_tz) week = now.weekday() if week <= 5: run_command(["python3", "cron_options_stats.py"]) + time.sleep(60) run_command(["python3", "cron_options_historical_volume.py"]) + time.sleep(60) + run_command(["python3", "cron_options_hottest_contracts.py"]) + time.sleep(60) + run_command(["python3", "cron_options_single_contract.py"]) def run_fda_calendar(): now = datetime.now(ny_tz) @@ -216,12 +221,6 @@ def run_executive(): if week <= 4: run_command(["python3", "cron_executive.py"]) -def run_options_bubble_ticker(): - week = datetime.today().weekday() - if week <= 4: - run_command(["python3", "cron_options_bubble.py"]) - - def run_analyst_rating(): week = datetime.today().weekday() if week <= 4: @@ -341,14 +340,11 @@ def run_threaded(job_func): # Schedule the job to run - -schedule.every().day.at("01:00").do(run_threaded, run_options_bubble_ticker).tag('options_ticker_job') +schedule.every().day.at("01:00").do(run_threaded, run_options_jobs).tag('options_job') schedule.every().day.at("02:00").do(run_threaded, run_db_schedule_job) #schedule.every().day.at("05:00").do(run_threaded, run_options_gex).tag('options_gex_job') schedule.every().day.at("05:00").do(run_threaded, run_export_price).tag('export_price_job') -schedule.every().day.at("03:30").do(run_threaded, run_options_stats).tag('options_stats_job') - schedule.every().day.at("06:00").do(run_threaded, run_historical_price).tag('historical_job') schedule.every().day.at("06:30").do(run_threaded, run_ai_score).tag('ai_score_job')