From 22bb3f0657ea23e63fa837448ad669a5d11757e7 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Fri, 7 Feb 2025 21:55:55 +0100 Subject: [PATCH] add gex/dex --- app/cron_options_gex_dex.py | 319 ++++++++++++++++++-------- app/cron_options_historical_volume.py | 31 ++- app/main.py | 26 ++- 3 files changed, 262 insertions(+), 114 deletions(-) diff --git a/app/cron_options_gex_dex.py b/app/cron_options_gex_dex.py index 43a771d..1aa3900 100644 --- a/app/cron_options_gex_dex.py +++ b/app/cron_options_gex_dex.py @@ -7,11 +7,12 @@ import os import sqlite3 import time from tqdm import tqdm +from collections import defaultdict +import numpy as np load_dotenv() -api_key = os.getenv('UNUSUAL_WHALES_API_KEY') -headers = {"Accept": "application/json, text/plain", "Authorization": api_key} +today = datetime.today().date() # Connect to the databases con = sqlite3.connect('stocks.db') @@ -32,22 +33,6 @@ con.close() etf_con.close() -def get_tickers_from_directory(directory: str): - try: - # Ensure the directory exists - if not os.path.exists(directory): - raise FileNotFoundError(f"The directory '{directory}' does not exist.") - - # Get all tickers from filenames - return [file.replace(".json", "") for file in os.listdir(directory) if file.endswith(".json")] - - except Exception as e: - print(f"An error occurred: {e}") - return [] - - - - def save_json(data, symbol, directory_path): os.makedirs(directory_path, exist_ok=True) # Ensure the directory exists with open(f"{directory_path}/{symbol}.json", 'wb') as file: # Use binary mode for orjson @@ -61,117 +46,255 @@ def safe_round(value, decimals=2): return value -def prepare_data(data, symbol, directory_path, sort_by = "date"): - data = [{k: v for k, v in item.items() if "charm" not in k and "vanna" not in k} for item in data] - res_list = [] - for item in data: - try: - new_item = { - key: safe_round(value) if isinstance(value, (int, float, str)) else value - for key, value in item.items() - } +def get_contracts_from_directory(directory: str): + try: + # Ensure the directory exists + if not os.path.exists(directory): + return [] + + # Get all tickers from filenames + return [file.replace(".json", "") for file in os.listdir(directory) if file.endswith(".json")] + + except Exception as e: + print(e) + return [] - res_list.append(new_item) - except: - pass - if res_list: - res_list = sorted(res_list, key=lambda x: x[sort_by], reverse=True) - save_json(res_list, symbol, directory_path) +def get_expiration_date(option_symbol): + # Define regex pattern to match the symbol structure + match = re.match(r"([A-Z]+)(\d{6})([CP])(\d+)", option_symbol) + if not match: + raise ValueError(f"Invalid option_symbol format: {option_symbol}") + + ticker, expiration, option_type, strike_price = match.groups() + + # Convert expiration to datetime + date_expiration = datetime.strptime(expiration, "%y%m%d").date() + return date_expiration +def aggregate_data_by_strike(symbol): + data_by_date = defaultdict(lambda: defaultdict(lambda: { + "strike": 0, + "call_gex": 0.0, + "put_gex": 0.0, + "call_dex": 0.0, + "put_dex": 0.0, + })) + + contract_dir = f"json/all-options-contracts/{symbol}" + contract_list = get_contracts_from_directory(contract_dir) + + # Only consider contracts that haven't expired + contract_list = [item for item in contract_list if get_expiration_date(item) >= today] + + # Load historical price data + with open(f"json/historical-price/max/{symbol}.json", "r") as file: + price_list = orjson.loads(file.read()) + + if contract_list: + for item in contract_list: + try: + file_path = os.path.join(contract_dir, f"{item}.json") + with open(file_path, "r") as file: + data = orjson.loads(file.read()) + + option_type = data.get("optionType") + if option_type not in ["call", "put"]: + continue + + strike = float(data.get("strike", 0)) + + # Get only the last element from 'history' (latest entry) + if not data.get("history"): + continue + + latest_entry = data["history"][-1] # Latest data point + date = latest_entry.get("date") + + open_interest = latest_entry.get("open_interest", 0) or 0 + gamma = latest_entry.get("gamma", 0) or 0 + delta = latest_entry.get("delta", 0) or 0 + + # Find the matching spot price for the date + matching_price = next((p for p in price_list if p.get("time") == date), None) + spot_price = matching_price["close"] if matching_price else 0 + + gex = open_interest * gamma * spot_price + dex = open_interest * delta * spot_price + + # Aggregate data by date and strike + daily_data = data_by_date[date][strike] + daily_data["strike"] = strike + + if option_type == "call": + daily_data["call_gex"] += round(gex, 2) + daily_data["call_dex"] += round(dex, 2) + elif option_type == "put": + daily_data["put_gex"] += round(gex, 2) + daily_data["put_dex"] += round(dex, 2) + + except: + continue + + # Convert defaultdict to sorted list format + final_output = [] + for date, strikes in sorted(data_by_date.items()): + for strike, data in sorted(strikes.items(), key=lambda x: x[0]): + final_output.append(data) + + return final_output + + return [] + + +def aggregate_data_by_expiration(symbol): + data_by_date = defaultdict(lambda: defaultdict(lambda: { + "expiration": "", + "call_gex": 0.0, + "put_gex": 0.0, + "call_dex": 0.0, + "put_dex": 0.0, + })) + + contract_dir = f"json/all-options-contracts/{symbol}" + contract_list = get_contracts_from_directory(contract_dir) + + # Only consider contracts that haven't expired + contract_list = [item for item in contract_list if get_expiration_date(item) >= today] + + # Load historical price data + with open(f"json/historical-price/max/{symbol}.json", "r") as file: + price_list = orjson.loads(file.read()) + + if contract_list: + for item in contract_list: + try: + file_path = os.path.join(contract_dir, f"{item}.json") + with open(file_path, "r") as file: + data = orjson.loads(file.read()) + + option_type = data.get("optionType") + if option_type not in ["call", "put"]: + continue + + expiration_date = data.get("expiration") + if not expiration_date: + continue # Skip if expiration date is missing + + # Get only the last element from 'history' (latest entry) + if not data.get("history"): + continue + + latest_entry = data["history"][-1] # Latest data point + date = latest_entry.get("date") + + open_interest = latest_entry.get("open_interest", 0) or 0 + gamma = latest_entry.get("gamma", 0) or 0 + delta = latest_entry.get("delta", 0) or 0 + + # Find the matching spot price for the date + matching_price = next((p for p in price_list if p.get("time") == date), None) + spot_price = matching_price["close"] if matching_price else 0 + + gex = open_interest * gamma * spot_price + dex = open_interest * delta * spot_price + + # Aggregate data by expiration date + daily_data = data_by_date[date][expiration_date] + daily_data["expiry"] = expiration_date + + if option_type == "call": + daily_data["call_gex"] += round(gex, 2) + daily_data["call_dex"] += round(dex, 2) + elif option_type == "put": + daily_data["put_gex"] += round(gex, 2) + daily_data["put_dex"] += round(dex, 2) + + except: + continue + + # Convert defaultdict to sorted list format and filter out empty GEX entries + final_output = [] + for date, expirations in sorted(data_by_date.items()): + for expiration_date, data in sorted(expirations.items(), key=lambda x: x[0]): + final_output.append(data) + + return final_output + + return [] def get_overview_data(): - print("Starting to download overview data...") directory_path = "json/gex-dex/overview" total_symbols = stocks_symbols+etf_symbols counter = 0 #Test mode - #total_symbols = ['GME','SPY'] + #total_symbols = ['TSLA'] for symbol in tqdm(total_symbols): try: - url = f"https://api.unusualwhales.com/api/stock/{symbol}/greek-exposure" - - response = requests.get(url, headers=headers) - if response.status_code == 200: - data = response.json()['data'] - prepare_data(data, symbol, directory_path) - - counter +=1 - - # If 50 chunks have been processed, sleep for 60 seconds - if counter == 260: - print("Sleeping...") - time.sleep(60) - counter = 0 - - except Exception as e: - print(f"Error for {symbol}:{e}") + with open(f"json/options-historical-data/companies/{symbol}.json","r") as file: + data = orjson.loads(file.read()) + filtered_data = [{k: d[k] for k in ['date','call_gex', 'call_dex', 'put_gex', 'put_dex']} for d in data] + + for item in filtered_data: + try: + item['netGex'] = item['call_gex'] + item['put_gex'] + item['netDex'] = item['call_dex'] + item['put_dex'] + except: + pass + + if filtered_data: + filtered_data = sorted(filtered_data, key=lambda x: x['date'], reverse=True) + save_json(filtered_data, symbol, directory_path) + + #prepare_data(data, symbol, directory_path) + + except: + pass def get_strike_data(): - print("Starting to download strike data...") - directory_path = "json/gex-dex/strike" + directory_path = "json/gex-dex/strike/" total_symbols = stocks_symbols+etf_symbols - counter = 0 #Test mode - #total_symbols = ['GME','SPY'] + #total_symbols = ['TSLA'] for symbol in tqdm(total_symbols): try: - url = f"https://api.unusualwhales.com/api/stock/{symbol}/greek-exposure/strike" - - response = requests.get(url, headers=headers) - if response.status_code == 200: - data = response.json()['data'] - prepare_data(data, symbol, directory_path, sort_by = 'strike') - - counter +=1 - - # If 50 chunks have been processed, sleep for 60 seconds - if counter == 260: - print("Sleeping...") - time.sleep(60) - counter = 0 - + data = aggregate_data_by_strike(symbol) + for key_element in ['gex','dex']: + val_sums = [item[f"call_{key_element}"] + item[f"put_{key_element}"] for item in data] + threshold = np.percentile(val_sums, 90) + filtered_data = [item for item in data if (item[f"call_{key_element}"] + item[f"put_{key_element}"]) >= threshold] + filtered_data = sorted(filtered_data, key=lambda x: x['strike'], reverse=True) + if filtered_data: + save_json(filtered_data, symbol, directory_path+key_element) except Exception as e: - print(f"Error for {symbol}:{e}") + print(e) def get_expiry_data(): - print("Starting to download expiry data...") - directory_path = "json/gex-dex/expiry" + directory_path = "json/gex-dex/expiry/" total_symbols = stocks_symbols+etf_symbols - counter = 0 - #total_symbols = ['GME','SPY'] - #total_symbols = ['GME','SPY'] + #Test mode + #total_symbols = ['TSLA'] for symbol in tqdm(total_symbols): try: - url = f"https://api.unusualwhales.com/api/stock/{symbol}/greek-exposure/expiry" - - response = requests.get(url, headers=headers) - if response.status_code == 200: - data = response.json()['data'] - prepare_data(data, symbol, directory_path) - - counter +=1 - - # If 50 chunks have been processed, sleep for 60 seconds - if counter == 260: - print("Sleeping...") - time.sleep(60) - counter = 0 - - except Exception as e: - print(f"Error for {symbol}:{e}") + data = aggregate_data_by_expiration(symbol) + for key_element in ['gex','dex']: + val_sums = [item[f"call_{key_element}"] + item[f"put_{key_element}"] for item in data] + threshold = np.percentile(val_sums, 90) + filtered_data = [item for item in data if (item[f"call_{key_element}"] + item[f"put_{key_element}"]) >= threshold] + filtered_data = sorted(filtered_data, key=lambda x: x['expiry'], reverse=True) + if filtered_data: + save_json(filtered_data, symbol, directory_path+key_element) + except Exception as e: + print(e) if __name__ == '__main__': get_overview_data() - ''' - time.sleep(60) get_strike_data() - time.sleep(60) get_expiry_data() - ''' \ No newline at end of file + \ No newline at end of file diff --git a/app/cron_options_historical_volume.py b/app/cron_options_historical_volume.py index a0d2e4e..053cd8d 100644 --- a/app/cron_options_historical_volume.py +++ b/app/cron_options_historical_volume.py @@ -60,8 +60,11 @@ def aggregate_data_by_date(symbol): contract_dir = f"json/all-options-contracts/{symbol}" contract_list = get_contracts_from_directory(contract_dir) + with open(f"json/historical-price/max/{symbol}.json","r") as file: + price_list = orjson.loads(file.read()) + if len(contract_list) > 0: - for item in contract_list: + for item in tqdm(contract_list): try: file_path = os.path.join(contract_dir, f"{item}.json") with open(file_path, "r") as file: @@ -73,9 +76,10 @@ def aggregate_data_by_date(symbol): for entry in data.get('history', []): date = entry.get('date') + # Skip entries older than one year - #if date < one_year_ago_str: - # continue + if date < one_year_ago_str: + continue volume = entry.get('volume', 0) or 0 open_interest = entry.get('open_interest', 0) or 0 @@ -83,7 +87,18 @@ def aggregate_data_by_date(symbol): implied_volatility = entry.get('implied_volatility', 0) or 0 gamma = entry.get('gamma',0) or 0 delta = entry.get('delta',0) or 0 - gex = 100 * open_interest * + + # Find the matching date in price_list + matching_price = next((p for p in price_list if p.get('time') == date), 0) + + if matching_price: + spot_price = matching_price['close'] + else: + spot_price = 0 # Or some default value + + gex = open_interest * gamma * spot_price + dex = open_interest * delta * spot_price + daily_data = data_by_date[date] daily_data["date"] = date @@ -92,10 +107,14 @@ def aggregate_data_by_date(symbol): daily_data["call_volume"] += int(volume) daily_data["call_open_interest"] += int(open_interest) daily_data["call_premium"] += int(total_premium) + daily_data["call_gex"] += round(gex,2) + daily_data["call_dex"] += round(dex,2) elif option_type == 'put': daily_data["put_volume"] += int(volume) daily_data["put_open_interest"] += int(open_interest) daily_data["put_premium"] += int(total_premium) + daily_data["put_gex"] += round(gex,2) + daily_data["put_dex"] += round(dex,2) # Aggregate IV for both calls and puts daily_data["iv"] += round(implied_volatility, 2) @@ -236,8 +255,8 @@ def prepare_data(data, symbol): new_item['price'] = None res_list.append(new_item) - except Exception as e: - print(e) + except: + pass res_list = sorted(res_list, key=lambda x: x['date']) diff --git a/app/main.py b/app/main.py index 4f8aef3..3cfbd9d 100755 --- a/app/main.py +++ b/app/main.py @@ -273,6 +273,11 @@ class ParamsData(BaseModel): params: str category: str +class GreekExposureData(BaseModel): + params: str + category: str + type: str + class MarketNews(BaseModel): newsType: str @@ -2677,11 +2682,12 @@ async def get_data(data:OptionContract, api_key: str = Security(get_api_key)): ) @app.post("/options-gex-dex") -async def get_data(data:ParamsData, api_key: str = Security(get_api_key)): +async def get_data(data:GreekExposureData, api_key: str = Security(get_api_key)): ticker = data.params.upper() category = data.category.lower() + type = data.type - cache_key = f"options-gex-dex-{ticker}-{category}" + cache_key = f"options-gex-dex-{ticker}-{category}-{type}" cached_result = redis_client.get(cache_key) if cached_result: return StreamingResponse( @@ -2689,17 +2695,17 @@ async def get_data(data:ParamsData, api_key: str = Security(get_api_key)): media_type="application/json", headers={"Content-Encoding": "gzip"}) + try: - with open(f"json/gex-dex/{category}/{ticker}.json", 'rb') as file: - data = orjson.loads(file.read()) - if category == 'strike': - key_element = 'gex' - val_sums = [item[f"call_{key_element}"] + item[f"put_{key_element}"] for item in data] - threshold = np.percentile(val_sums, 85) - data = [item for item in data if (item[f"call_{key_element}"] + item[f"put_{key_element}"]) >= threshold] - + if len(type) > 0: + with open(f"json/gex-dex/{category}/{type}/{ticker}.json", 'rb') as file: + data = orjson.loads(file.read()) + else: + with open(f"json/gex-dex/{category}/{ticker}.json", 'rb') as file: + data = orjson.loads(file.read()) except: data = [] + data = orjson.dumps(data) compressed_data = gzip.compress(data) redis_client.set(cache_key, compressed_data)