From 8f9f15db162a8d720c6fd427af6f78f1d57b8519 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Thu, 23 Jan 2025 20:05:06 +0100 Subject: [PATCH] update all options cron jobs --- app/cron_implied_volatility.py | 199 +++++++++++++------------- app/cron_options_historical_volume.py | 147 +++++++++---------- app/cron_options_hottest_contracts.py | 25 ++-- app/cron_options_oi.py | 2 - app/cron_options_single_contract.py | 87 ++++++++--- app/cron_options_stats.py | 26 ++-- app/primary_cron_job.py | 12 +- 7 files changed, 277 insertions(+), 221 deletions(-) diff --git a/app/cron_implied_volatility.py b/app/cron_implied_volatility.py index 1d24f17..dea6905 100644 --- a/app/cron_implied_volatility.py +++ b/app/cron_implied_volatility.py @@ -1,20 +1,16 @@ import requests import orjson +import ujson import re from datetime import datetime -from dotenv import load_dotenv import os import sqlite3 import time from tqdm import tqdm +import pandas as pd +import numpy as np -load_dotenv() -api_key = os.getenv('UNUSUAL_WHALES_API_KEY') -querystring = {"timeframe":"5Y"} -headers = {"Accept": "application/json, text/plain", "Authorization": api_key} - -# Connect to the databases con = sqlite3.connect('stocks.db') etf_con = sqlite3.connect('etf.db') cursor = con.cursor() @@ -47,112 +43,121 @@ def get_tickers_from_directory(directory: str): return [] -def save_json(data, symbol, directory_path): - os.makedirs(directory_path, exist_ok=True) # Ensure the directory exists - with open(f"{directory_path}/{symbol}.json", 'wb') as file: # Use binary mode for orjson - file.write(orjson.dumps(data)) +def convert_to_serializable(obj): + if isinstance(obj, np.float64): + return float(obj) + elif isinstance(obj, (np.int64, np.int32)): + return int(obj) + elif isinstance(obj, (list, np.ndarray)): + return [convert_to_serializable(item) for item in obj] + elif isinstance(obj, dict): + return {key: convert_to_serializable(value) for key, value in obj.items()} + else: + return obj - -def safe_round(value, decimals=2): - try: - return round(float(value), decimals) - except (ValueError, TypeError): - return value - - -def add_data(data, historical_data): - res_list = [] - for item in data: - date = item['date'] - for item2 in historical_data: - try: - if date == item2['date']: - item['changesPercentage'] = item2['changesPercentage'] - item['putCallRatio'] = item2['putCallRatio'] - item['total_open_interest'] = item2['total_open_interest'] - item['changesPercentageOI'] = item2.get('changesPercentageOI',None) - except Exception as e: - print(e) - - if 'changesPercentage' in item: - res_list.append(item) - - return res_list - - - -def prepare_data(data, symbol, directory_path, sort_by = "date"): - res_list = [] - for item in data: - try: - new_item = { - key: safe_round(value) if isinstance(value, (int, float, str)) else value - for key, value in item.items() - } - - res_list.append(new_item) - except: - pass - - if res_list: - data = sorted(res_list, key=lambda x: x[sort_by], reverse=True) - with open(f"json/options-historical-data/companies/{symbol}.json", "r") as file: - historical_data = orjson.loads(file.read()) - - res_list = add_data(data,historical_data) - save_json(res_list, symbol, directory_path) - - - -def get_iv_data(): - print("Starting to download iv data...") +def save_json(data, symbol): directory_path = "json/implied-volatility" - total_symbols = get_tickers_from_directory(directory_path) - if len(total_symbols) < 100: - total_symbols = stocks_symbols+etf_symbols + os.makedirs(directory_path, exist_ok=True) # Ensure the directory exists + + # Convert numpy types to JSON-serializable types + serializable_data = convert_to_serializable(data) + + with open(f"{directory_path}/{symbol}.json", 'wb') as file: # Use binary mode for orjson + file.write(orjson.dumps(serializable_data)) - counter = 0 - for symbol in tqdm(total_symbols): + +def compute_realized_volatility(data, window_size=20): + """ + Compute the realized volatility of stock prices over a rolling window. + Realized volatility is the annualized standard deviation of log returns of stock prices. + """ + # Sort data by date (oldest first) + data = sorted(data, key=lambda x: x['date']) + + # Extract stock prices and dates + prices = [item.get('price') for item in data] # Use .get() to handle missing keys + dates = [item['date'] for item in data] + + # Compute log returns of stock prices, skipping None values + log_returns = [] + for i in range(1, len(prices)): + if prices[i] is not None and prices[i - 1] is not None and prices[i - 1] != 0: + log_returns.append(np.log(prices[i] / prices[i - 1])) + else: + log_returns.append(None) # Append None if price is missing or invalid + + # Compute realized volatility using a rolling window + realized_volatility = [] + for i in range(len(log_returns)): + if i < window_size - 1: + # Not enough data for the window, append None + realized_volatility.append(None) + else: + # Collect valid log returns in the window + window_returns = [] + for j in range(i - window_size + 1, i + 1): + if log_returns[j] is not None: + window_returns.append(log_returns[j]) + + if len(window_returns) >= window_size: + # Compute standard deviation of log returns over the window + rv_daily = np.sqrt(np.sum(np.square(window_returns)) / window_size) + # Annualize the realized volatility + rv_annualized = rv_daily * np.sqrt(252) + realized_volatility.append(rv_annualized) + else: + # Not enough valid data in the window, append None + realized_volatility.append(None) + + # Shift realized volatility FORWARD by window_size days to align with IV from window_size days ago + realized_volatility = realized_volatility[window_size - 1:] + [None] * (window_size - 1) + + # Create the resulting list + rv_list = [] + for i in range(len(data)): try: - url = f"https://api.unusualwhales.com/api/stock/{symbol}/volatility/realized" - - response = requests.get(url, headers=headers, params=querystring) - if response.status_code == 200: - data = response.json()['data'] - prepare_data(data, symbol, directory_path) - - counter +=1 - - # If 50 chunks have been processed, sleep for 60 seconds - if counter == 260: - print("Sleeping...") - time.sleep(60) - counter = 0 - + rv_list.append({ + "date": data[i]["date"], + "price": data[i].get("price"), # Use .get() to handle missing keys + "changesPercentage": data[i].get("changesPercentage", None), # Default to None if missing + "putCallRatio": data[i].get("putCallRatio", None), # Default to None if missing + "total_open_interest": data[i].get("total_open_interest", None), # Default to None if missing + "changesPercentageOI": data[i].get("changesPercentageOI", None), # Default to None if missing + "iv": data[i].get("iv", None), # Default to None if missing + "rv": round(realized_volatility[i], 4) if realized_volatility[i] is not None else None + }) except Exception as e: - print(f"Error for {symbol}:{e}") - + # If any error occurs, append a dictionary with default values + rv_list.append({ + "date": data[i]["date"], + "price": data[i].get("price", None), + "changesPercentage": data[i].get("changesPercentage", None), + "putCallRatio": data[i].get("putCallRatio", None), + "total_open_interest": data[i].get("total_open_interest", None), + "changesPercentageOI": data[i].get("changesPercentageOI", None), + "iv": data[i].get("iv", None), + "rv": None + }) + + # Sort the final list by date in descending order + rv_list = sorted(rv_list, key=lambda x: x['date'], reverse=True) + + return rv_list if __name__ == '__main__': - get_iv_data() - - ''' directory_path = "json/implied-volatility" total_symbols = get_tickers_from_directory(directory_path) if len(total_symbols) < 100: - total_symbols = stocks_symbols+etf_symbols + total_symbols = stocks_symbols + etf_symbols # Assuming these are defined elsewhere for symbol in tqdm(total_symbols): try: with open(f"json/options-historical-data/companies/{symbol}.json", "r") as file: - historical_data = orjson.loads(file.read()) - - with open(f"json/implied-volatility/{symbol}.json", "r") as file: data = orjson.loads(file.read()) - res_list = add_data(data,historical_data) + rv_list = compute_realized_volatility(data) - save_json(res_list, symbol, directory_path) + if rv_list: + save_json(rv_list, symbol) except: pass - ''' \ No newline at end of file diff --git a/app/cron_options_historical_volume.py b/app/cron_options_historical_volume.py index 0a98c9a..7cdc0aa 100644 --- a/app/cron_options_historical_volume.py +++ b/app/cron_options_historical_volume.py @@ -145,24 +145,17 @@ def get_contracts_from_directory(directory: str): try: # Ensure the directory exists if not os.path.exists(directory): - raise FileNotFoundError(f"The directory '{directory}' does not exist.") + return [] # Get all tickers from filenames return [file.replace(".json", "") for file in os.listdir(directory) if file.endswith(".json")] except Exception as e: - print(f"An error occurred: {e}") + print(e) return [] - -def get_contracts_from_directory(directory): - """Retrieve a list of contract files from a directory.""" - return [f.split('.')[0] for f in os.listdir(directory) if f.endswith('.json')] - - - -def aggregate_data_by_date(total_symbols): +def aggregate_data_by_date(symbol): data_by_date = defaultdict(lambda: { "date": "", "call_volume": 0, @@ -177,71 +170,68 @@ def aggregate_data_by_date(total_symbols): "iv_count": 0, # Count of entries for IV }) - for symbol in tqdm(total_symbols): - try: - contract_dir = f"json/all-options-contracts/{symbol}" - if not os.path.exists(contract_dir): - print(f"Directory does not exist: {contract_dir}") - continue - - contract_list = get_contracts_from_directory(contract_dir) - - for item in tqdm(contract_list, desc=f"Processing {symbol} contracts", leave=False): - try: - file_path = os.path.join(contract_dir, f"{item}.json") - with open(file_path, "r") as file: - data = orjson.loads(file.read()) - - option_type = data.get('optionType', None) - if option_type not in ['call', 'put']: - continue - - for entry in data.get('history', []): - date = entry.get('date') - volume = entry.get('volume', 0) or 0 - open_interest = entry.get('open_interest', 0) or 0 - total_premium = entry.get('total_premium', 0) or 0 - implied_volatility = entry.get('implied_volatility', 0) or 0 - - if date: - daily_data = data_by_date[date] - daily_data["date"] = date - - if option_type == 'call': - daily_data["call_volume"] += int(volume) - daily_data["call_open_interest"] += int(open_interest) - daily_data["call_premium"] += int(total_premium) - elif option_type == 'put': - daily_data["put_volume"] += int(volume) - daily_data["put_open_interest"] += int(open_interest) - daily_data["put_premium"] += int(total_premium) - daily_data["iv"] += round(implied_volatility, 2) - daily_data["iv_count"] += 1 - - try: - daily_data["putCallRatio"] = round(daily_data["put_volume"] / daily_data["call_volume"], 2) - except ZeroDivisionError: - daily_data["putCallRatio"] = None - - except Exception as e: - print(f"Error processing contract {item} for {symbol}: {e}") + contract_dir = f"json/all-options-contracts/{symbol}" + contract_list = get_contracts_from_directory(contract_dir) + + if len(contract_list) > 0: + + for item in contract_list: + try: + file_path = os.path.join(contract_dir, f"{item}.json") + with open(file_path, "r") as file: + data = orjson.loads(file.read()) + + option_type = data.get('optionType', None) + if option_type not in ['call', 'put']: continue - except Exception as e: - print(f"Error processing symbol {symbol}: {e}") - continue - - # Convert to list of dictionaries and sort by date - data = list(data_by_date.values()) - for daily_data in data: - # Compute the average IV if there are valid entries - if daily_data["iv_count"] > 0: - daily_data["iv"] = round(daily_data["iv"] / daily_data["iv_count"], 2) - else: - daily_data["iv"] = None # Or set it to 0 if you prefer - - data = sorted(data, key=lambda x: x['date'], reverse=True) - data = calculate_iv_rank_for_all(data) - data = prepare_data(data, symbol) + + for entry in data.get('history', []): + date = entry.get('date') + volume = entry.get('volume', 0) or 0 + open_interest = entry.get('open_interest', 0) or 0 + total_premium = entry.get('total_premium', 0) or 0 + implied_volatility = entry.get('implied_volatility', 0) or 0 + + if date: + daily_data = data_by_date[date] + daily_data["date"] = date + + if option_type == 'call': + daily_data["call_volume"] += int(volume) + daily_data["call_open_interest"] += int(open_interest) + daily_data["call_premium"] += int(total_premium) + elif option_type == 'put': + daily_data["put_volume"] += int(volume) + daily_data["put_open_interest"] += int(open_interest) + daily_data["put_premium"] += int(total_premium) + daily_data["iv"] += round(implied_volatility, 2) + daily_data["iv_count"] += 1 + + try: + daily_data["putCallRatio"] = round(daily_data["put_volume"] / daily_data["call_volume"], 2) + except ZeroDivisionError: + daily_data["putCallRatio"] = None + + except: + pass + + # Convert to list of dictionaries and sort by date + data = list(data_by_date.values()) + for daily_data in data: + try: + if daily_data["iv_count"] > 0: + daily_data["iv"] = round(daily_data["iv"] / daily_data["iv_count"], 2) + else: + daily_data["iv"] = None # Or set it to 0 if you prefer + except: + daily_data["iv"] = None + + data = sorted(data, key=lambda x: x['date'], reverse=True) + data = calculate_iv_rank_for_all(data) + + return data + else: + return [] @@ -263,9 +253,12 @@ etf_symbols = [row[0] for row in etf_cursor.fetchall()] total_symbols = stocks_symbols + etf_symbols - -total_symbols = ['AA'] -data = aggregate_data_by_date(total_symbols) +for symbol in tqdm(total_symbols): + try: + data = aggregate_data_by_date(symbol) + data = prepare_data(data, symbol) + except: + pass con.close() etf_con.close() \ No newline at end of file diff --git a/app/cron_options_hottest_contracts.py b/app/cron_options_hottest_contracts.py index b4bdf37..8273f9f 100644 --- a/app/cron_options_hottest_contracts.py +++ b/app/cron_options_hottest_contracts.py @@ -123,8 +123,7 @@ def process_contract(item, symbol): 'low': latest_entry['low'], 'high': latest_entry['high'] } - except Exception as e: - print(e) + except: return None def prepare_data(highest_volume_list, highest_oi_list, symbol): @@ -142,7 +141,7 @@ def prepare_data(highest_volume_list, highest_oi_list, symbol): res_dict = {'volume': highest_volume, 'openInterest': highest_oi} - if res_dict: + if highest_volume and highest_oi: save_json(res_dict, symbol, "json/hottest-contracts/companies") return res_dict @@ -228,21 +227,21 @@ def get_hottest_contracts(base_dir="json/all-options-contracts"): if len(top_by_open_interest) > 10: top_by_open_interest.pop() - except Exception as e: - print(f"Error processing {contract_file}: {e}") + except: + pass - # Process each symbol directory - total_symbols = ['AA'] + for symbol in tqdm(total_symbols): try: process_symbol(symbol) - except Exception as e: - print(f"Error processing symbol {symbol}: {e}") - - top_by_volume_contracts = [contract_info for _, contract_info in top_by_volume] - top_by_open_interest_contracts = [contract_info for _, contract_info in top_by_open_interest] + top_by_volume_contracts = [contract_info for _, contract_info in top_by_volume] + top_by_open_interest_contracts = [contract_info for _, contract_info in top_by_open_interest] - prepare_data(top_by_volume_contracts, top_by_open_interest_contracts, symbol) + prepare_data(top_by_volume_contracts, top_by_open_interest_contracts, symbol) + + except: + pass + # Example usage if __name__ == "__main__": diff --git a/app/cron_options_oi.py b/app/cron_options_oi.py index b40f47b..d23568e 100644 --- a/app/cron_options_oi.py +++ b/app/cron_options_oi.py @@ -170,8 +170,6 @@ async def main(): total_symbols = get_tickers_from_directory() print(f"Number of tickers: {len(total_symbols)}") - total_symbols = ['TSLA'] - for symbol in total_symbols: try: # Get list of contracts for the symbol diff --git a/app/cron_options_single_contract.py b/app/cron_options_single_contract.py index 1aec559..b54e6f1 100644 --- a/app/cron_options_single_contract.py +++ b/app/cron_options_single_contract.py @@ -17,6 +17,7 @@ load_dotenv() api_key = os.getenv('INTRINIO_API_KEY') directory_path = "json/all-options-contracts" +current_date = datetime.now().date() async def save_json(data, symbol, contract_id): directory_path = f"json/all-options-contracts/{symbol}" @@ -190,7 +191,9 @@ async def get_single_contract_eod_data(symbol, contract_id, semaphore): data = {'expiration': key_data['_expiration'], 'strike': key_data['_strike'], 'optionType': key_data['_type'], 'history': res_list} - await save_json(data, symbol, contract_id) + + if data: + await save_json(data, symbol, contract_id) except Exception as e: print(f"Error fetching data for {contract_id}: {e}") @@ -241,21 +244,17 @@ async def process_contracts(symbol, contract_list): for batch_num in range(total_batches): start_idx = batch_num * BATCH_SIZE batch = contract_list[start_idx:start_idx + BATCH_SIZE] - - print(f"\nProcessing batch {batch_num + 1}/{total_batches} ({len(batch)} contracts)") - batch_start_time = time.time() - + # Process the batch concurrently batch_results = await process_batch(symbol, batch, semaphore, pbar) results.extend(batch_results) - batch_time = time.time() - batch_start_time - print(f"Batch completed in {batch_time:.2f} seconds") - + ''' # Sleep between batches if not the last batch if batch_num < total_batches - 1: print(f"Sleeping for 60 seconds before next batch...") await asyncio.sleep(60) + ''' return results @@ -274,21 +273,71 @@ def get_total_symbols(): return stocks_symbols + etf_symbols -async def main(): + + +def get_expiration_date(contract_id): + # Extract the date part (YYMMDD) from the contract ID + date_str = contract_id[2:8] + # Convert to datetime object + return datetime.strptime(date_str, "%y%m%d").date() + +def check_contract_expiry(symbol): + directory = f"{directory_path}/{symbol}/" + try: + # Ensure the directory exists + if not os.path.exists(directory): + raise FileNotFoundError(f"The directory '{directory}' does not exist.") + + # Iterate through all JSON files in the directory + for file in os.listdir(directory): + try: + if file.endswith(".json"): + contract_id = file.replace(".json", "") + expiration_date = get_expiration_date(contract_id) + + # Check if the contract is expired + if expiration_date < current_date: + # Delete the expired contract JSON file + os.remove(os.path.join(directory, file)) + print(f"Deleted expired contract: {contract_id}") + except: + pass + + # Return the list of non-expired contracts + return [file.replace(".json", "") for file in os.listdir(directory) if file.endswith(".json")] - total_symbols = ['AA'] #get_total_symbols() + except: + pass - for symbol in tqdm(total_symbols): - try: - print(f"==========Start Process for {symbol}==========") - expiration_list = get_all_expirations(symbol) - print(f"Found {len(expiration_list)} expiration dates") - contract_list = await get_data(symbol,expiration_list) - print(f"Unique contracts: {len(contract_list)}") +async def process_symbol(symbol): + try: + print(f"==========Start Process for {symbol}==========") + expiration_list = get_all_expirations(symbol) + #check existing contracts and delete expired ones + check_contract_expiry(symbol) + print(f"Found {len(expiration_list)} expiration dates") + contract_list = await get_data(symbol, expiration_list) + print(f"Unique contracts: {len(contract_list)}") + + if len(contract_list) > 0: results = await process_contracts(symbol, contract_list) - except: - pass + except: + pass + +async def main(): + total_symbols = get_total_symbols() + + # Split the symbols into chunks of 2 + for i in tqdm(range(0, len(total_symbols), 4)): + symbols_chunk = total_symbols[i:i+4] + + # Run the symbols in the chunk concurrently + await asyncio.gather(*[process_symbol(symbol) for symbol in symbols_chunk]) + +# Example usage +if __name__ == "__main__": + asyncio.run(main()) if __name__ == "__main__": diff --git a/app/cron_options_stats.py b/app/cron_options_stats.py index d1d1514..544c273 100644 --- a/app/cron_options_stats.py +++ b/app/cron_options_stats.py @@ -20,6 +20,8 @@ api_key = os.getenv('INTRINIO_API_KEY') intrinio.ApiClient().set_api_key(api_key) #intrinio.ApiClient().allow_retries(True) +current_date = datetime.now().date() + source = '' show_stats = '' stock_price_source = '' @@ -36,6 +38,11 @@ BATCH_SIZE = 1500 +def get_expiration_date(contract_id): + # Extract the date part (YYMMDD) from the contract ID + date_str = contract_id[2:8] + # Convert to datetime object + return datetime.strptime(date_str, "%y%m%d").date() # Database connection and symbol retrieval @@ -72,12 +79,7 @@ def get_tickers_from_directory(): def get_contracts_from_directory(symbol): directory = f"json/all-options-contracts/{symbol}/" try: - # Ensure the directory exists - if not os.path.exists(directory): - raise FileNotFoundError(f"The directory '{directory}' does not exist.") - # Get all tickers from filenames return [file.replace(".json", "") for file in os.listdir(directory) if file.endswith(".json")] - except: return [] @@ -127,8 +129,7 @@ async def get_options_chain(symbol, expiration, semaphore): print(f"Error processing contract in {expiration}: {e}") return contracts - except Exception as e: - print(f"Error fetching chain for {expiration}: {e}") + except: return set() @@ -148,6 +149,7 @@ async def get_price_batch_realtime(symbol,contract_list): time = None iv_list = [] + for item in data: try: price_data = (item.__dict__)['_price'].__dict__ @@ -189,7 +191,7 @@ async def get_price_batch_realtime(symbol,contract_list): res_dict['iv'] = round((sum(iv_list) / len(iv_list)*100),2) if iv_list else 0 res_dict['putCallRatio'] = round(res_dict['put_volume'] / res_dict['call_volume'],2) if res_dict['call_volume'] > 0 else 0 - with open("json/options-historical-data/companies/AA.json", "r") as file: + with open(f"json/options-historical-data/companies/{symbol}.json", "r") as file: past_data = orjson.loads(file.read()) index = next((i for i, item in enumerate(past_data) if item['date'] == time), 0) previous_open_interest = past_data[index]['total_open_interest'] @@ -197,13 +199,10 @@ async def get_price_batch_realtime(symbol,contract_list): res_dict['changesPercentageOI'] = round((res_dict['total_open_interest']/previous_open_interest-1)*100,2) res_dict['changeOI'] = res_dict['total_open_interest'] - previous_open_interest + if res_dict: save_json(res_dict, symbol) - - - - async def prepare_dataset(symbol): expiration_list = get_all_expirations(symbol) @@ -232,6 +231,9 @@ async def main(): try: contract_list = get_contracts_from_directory(symbol) if len(contract_list) > 0: + if len(contract_list) > 250: + contract_list = contract_list[:250] + #to-do: intrinio allows only 250 contracts per batch. Need to consider all batches. await get_price_batch_realtime(symbol, contract_list) except: pass diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index 9a3f734..49df528 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -98,6 +98,15 @@ def run_options_jobs(): now = datetime.now(ny_tz) week = now.weekday() if week <= 5: + run_command(["python3", "cron_options_single_contract.py"]) + time.sleep(60) + run_command(["python3", "cron_options_historical_volume.py"]) + run_command(["python3", "cron_options_hottest_contracts.py"]) + run_command(["python3", "cron_options_oi.py"]) + run_command(["python3", "cron_implied_volatility.py"]) + run_command(["python3", "cron_options_stats.py"]) + + ''' run_command(["python3", "cron_options_gex_dex.py"]) time.sleep(60) run_command(["python3", "cron_options_oi.py"]) @@ -111,6 +120,7 @@ def run_options_jobs(): run_command(["python3", "cron_options_hottest_contracts.py"]) time.sleep(60) run_command(["python3", "cron_options_single_contract.py"]) + ''' def run_fda_calendar(): now = datetime.now(ny_tz) @@ -427,7 +437,7 @@ schedule.every(3).hours.do(run_threaded, run_press_releases).tag('press_release_ schedule.every(1).hours.do(run_threaded, run_fda_calendar).tag('fda_calendar_job') -#schedule.every(10).minutes.do(run_threaded, run_options_stats).tag('options_stats_job') +schedule.every(15).minutes.do(run_threaded, run_options_stats).tag('options_stats_job') schedule.every(5).minutes.do(run_threaded, run_market_flow).tag('market_flow_job') schedule.every(5).minutes.do(run_threaded, run_list).tag('stock_list_job')