From 5a2ac87c4eb97f0b02c13b3f72cc34ef0ed1f9f5 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Tue, 21 Jan 2025 15:54:27 +0100 Subject: [PATCH] bugfixing --- app/cron_market_movers.py | 7 +- app/cron_options_historical_volume.py | 173 ++++++++++---------------- app/cron_options_single_contract.py | 54 ++++---- 3 files changed, 96 insertions(+), 138 deletions(-) diff --git a/app/cron_market_movers.py b/app/cron_market_movers.py index 75c9a6d..5d4ff9c 100755 --- a/app/cron_market_movers.py +++ b/app/cron_market_movers.py @@ -90,12 +90,7 @@ async def get_gainer_loser_active_stocks(symbols): # Ensure the stock meets criteria if market_cap >= market_cap_threshold: - with open(f"json/one-day-price/{symbol}.json", 'rb') as file: - one_day_price = orjson.loads(file.read()) - # Filter out entries with None 'close' - filtered_prices = [p for p in one_day_price if p['close'] is not None] - - if price and changes_percentage and len(filtered_prices) > 100: + if price and changes_percentage: res_list.append({ "symbol": symbol, "name": name, diff --git a/app/cron_options_historical_volume.py b/app/cron_options_historical_volume.py index d75914d..5448bc3 100644 --- a/app/cron_options_historical_volume.py +++ b/app/cron_options_historical_volume.py @@ -8,86 +8,7 @@ import sqlite3 import pandas as pd import time from tqdm import tqdm - -load_dotenv() - -api_key = os.getenv('UNUSUAL_WHALES_API_KEY') - -# Connect to the databases -con = sqlite3.connect('stocks.db') -etf_con = sqlite3.connect('etf.db') -cursor = con.cursor() -cursor.execute("PRAGMA journal_mode = wal") -#cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%' AND marketCap > 1E9") -cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") -stocks_symbols = [row[0] for row in cursor.fetchall()] - -etf_cursor = etf_con.cursor() -etf_cursor.execute("PRAGMA journal_mode = wal") -#etf_cursor.execute("SELECT DISTINCT symbol FROM etfs WHERE marketCap > 1E9") -etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") -etf_symbols = [row[0] for row in etf_cursor.fetchall()] - - - -total_symbols = stocks_symbols + etf_symbols - -#today = datetime.today() -#N_days_ago = today - timedelta(days=90) - -query_template = """ - SELECT date, close, change_percent - FROM "{ticker}" - WHERE date BETWEEN ? AND ? -""" - -def get_tickers_from_directory(directory: str): - try: - # Ensure the directory exists - if not os.path.exists(directory): - raise FileNotFoundError(f"The directory '{directory}' does not exist.") - - # Get all tickers from filenames - return [file.replace(".json", "") for file in os.listdir(directory) if file.endswith(".json")] - - except Exception as e: - print(f"An error occurred: {e}") - return [] - -directory_path = "json/options-historical-data/companies" -total_symbols = get_tickers_from_directory(directory_path) - -if len(total_symbols) < 100: - total_symbols = stocks_symbols+etf_symbols - -print(len(total_symbols)) - - -def save_json(data, symbol): - os.makedirs(directory_path, exist_ok=True) # Ensure the directory exists - with open(f"{directory_path}/{symbol}.json", 'wb') as file: # Use binary mode for orjson - file.write(orjson.dumps(data)) - - -def safe_round(value, decimals=2): - try: - return round(float(value), decimals) - except (ValueError, TypeError): - return value - - -def calculate_neutral_premium(data_item): - """Calculate the neutral premium for a data item.""" - call_premium = float(data_item['call_premium']) - put_premium = float(data_item['put_premium']) - bearish_premium = float(data_item['bearish_premium']) - bullish_premium = float(data_item['bullish_premium']) - - total_premiums = bearish_premium + bullish_premium - observed_premiums = call_premium + put_premium - neutral_premium = observed_premiums - total_premiums - - return safe_round(neutral_premium) +from collections import defaultdict def prepare_data(data, symbol): @@ -158,37 +79,71 @@ def prepare_data(data, symbol): save_json(res_list, symbol) - -querystring = {"limit":"300"} -headers = { - "Accept": "application/json, text/plain", - "Authorization": api_key -} - -total_symbols = ['NVDA'] - -counter = 0 -for symbol in tqdm(total_symbols): +def get_contracts_from_directory(directory: str): try: + # Ensure the directory exists + if not os.path.exists(directory): + raise FileNotFoundError(f"The directory '{directory}' does not exist.") - url = f"https://api.unusualwhales.com/api/stock/{symbol}/options-volume" - - response = requests.get(url, headers=headers, params=querystring) - - if response.status_code == 200: - data = response.json()['data'] - prepare_data(data, symbol) - - counter +=1 - # If 50 chunks have been processed, sleep for 60 seconds - if counter == 260: - print("Sleeping...") - time.sleep(60) # Sleep for 60 seconds - counter = 0 - + # Get all tickers from filenames + return [file.replace(".json", "") for file in os.listdir(directory) if file.endswith(".json")] + except Exception as e: - print(f"Error for {symbol}:{e}") + print(f"An error occurred: {e}") + return [] -con.close() -etf_con.close() \ No newline at end of file + +def aggregate_data_by_date(): + total_symbols = ['AA'] + data_by_date = defaultdict(lambda: {"volume": 0, "open_interest": 0}) + contracts_processed = 0 + + for symbol in tqdm(total_symbols, desc="Processing symbols"): + try: + contract_list = get_contracts_from_directory(f"json/all-options-contracts/{symbol}") + + for item in tqdm(contract_list, desc=f"Processing {symbol} contracts", leave=False): + try: + with open(f"json/all-options-contracts/{symbol}/{item}.json", "r") as file: + data = orjson.loads(file.read()) + + # Process historical data + for entry in data.get('history', []): + date = entry.get('date') + volume = entry.get('volume') + open_interest = entry.get('open_interest') + + if date: + # Aggregate volume + if volume is not None: + data_by_date[date]["volume"] += int(volume) + + # Aggregate open interest + if open_interest is not None: + data_by_date[date]["open_interest"] += int(open_interest) + + contracts_processed += 1 + + except Exception as e: + print(f"Error processing contract {item} for {symbol}: {e}") + continue + + except Exception as e: + print(f"Error processing symbol {symbol}: {e}") + continue + + # Sort results by date + sorted_results = {date: metrics for date, metrics in sorted(data_by_date.items())} + + return sorted_results, contracts_processed + +if __name__ == '__main__': + # Run the aggregation + results, total_processed = aggregate_data_by_date() + + print("\nData by date:") + for date, metrics in results.items(): + print(f"{date}: Volume = {metrics['volume']:,}, Open Interest = {metrics['open_interest']:,}") + + print(f"\nTotal contracts processed: {total_processed}") \ No newline at end of file diff --git a/app/cron_options_single_contract.py b/app/cron_options_single_contract.py index ff5483b..4eb9974 100644 --- a/app/cron_options_single_contract.py +++ b/app/cron_options_single_contract.py @@ -3,7 +3,7 @@ import asyncio import time import intrinio_sdk as intrinio from intrinio_sdk.rest import ApiException -from datetime import datetime +from datetime import datetime, timedelta import ast import orjson from tqdm import tqdm @@ -44,7 +44,8 @@ intrinio.ApiClient().set_api_key(api_key) #intrinio.ApiClient().allow_retries(True) after = datetime.today().strftime('%Y-%m-%d') -before = '2045-12-31' +before = '2100-12-31' +N_year_ago = datetime.now() - timedelta(days=365) include_related_symbols = False page_size = 5000 MAX_CONCURRENT_REQUESTS = 20 # Adjust based on API rate limits @@ -74,7 +75,6 @@ async def get_options_chain(symbol, expiration, semaphore): include_related_symbols=include_related_symbols ) ) - contracts = set() for item in response.chain: try: @@ -100,6 +100,8 @@ async def get_single_contract_eod_data(symbol, contract_id, semaphore): ) # Extract and process the response data + key_data = {k: v for k, v in response._option.__dict__.items() if isinstance(v, (str, int, float, bool, list, dict, type(None)))} + history = [] if response and hasattr(response, '_prices'): for price in response._prices: @@ -108,9 +110,10 @@ async def get_single_contract_eod_data(symbol, contract_id, semaphore): if isinstance(v, (str, int, float, bool, list, dict, type(None))) }) + #clean the data history = [ - {key.lstrip('_'): value for key, value in record.items() if key not in ('_open_ask', '_ask_low','_close_bid_size','_close_ask_size','_close_ask','_close_bid','_close_size','_exercise_style','discriminator','_open_bid','_bid_low','_bid_high','_ask_high')} + {key.lstrip('_'): value for key, value in record.items() if key not in ('_close_time','_open_ask', '_ask_low','_close_bid_size','_close_ask_size','_close_ask','_close_bid','_close_size','_exercise_style','discriminator','_open_bid','_bid_low','_bid_high','_ask_high')} for record in history ] @@ -123,30 +126,32 @@ async def get_single_contract_eod_data(symbol, contract_id, semaphore): avg_open_interest = int(total_open_interest / count) if count > 0 else 0 if avg_volume > 10 and avg_open_interest > 10: - print(history) - ''' res_list = [] for item in history: - - new_item = { - key: safe_round(value) - for key, value in item.items() - if key != 'in_out_flow' - } + try: + new_item = { + key: safe_round(value) + for key, value in item.items() + } + res_list.append(new_item) + except: + pass res_list = sorted(res_list, key=lambda x: x['date']) for i in range(1, len(res_list)): try: - current_open_interest = res_list[i]['total_open_interest'] - previous_open_interest = res_list[i-1]['total_open_interest'] + current_open_interest = res_list[i]['open_interest'] + previous_open_interest = res_list[i-1]['open_interest'] or 0 changes_percentage_oi = round((current_open_interest/previous_open_interest -1)*100,2) + res_list[i]['changeOI'] = current_open_interest - previous_open_interest res_list[i]['changesPercentageOI'] = changes_percentage_oi except: + res_list[i]['changeOI'] = None res_list[i]['changesPercentageOI'] = None - - ''' - await save_json(history, symbol, contract_id) + + data = {'expiration': key_data['_expiration'], 'strike': key_data['_strike'], 'optionType': key_data['_type'], 'history': history} + await save_json(data, symbol, contract_id) except Exception as e: print(f"Error fetching data for {contract_id}: {e}") @@ -235,13 +240,16 @@ async def main(): total_symbols = ['AA'] #get_total_symbols() for symbol in tqdm(total_symbols): - print(f"==========Start Process for {symbol}==========") - expiration_list = get_all_expirations(symbol) - print(f"Found {len(expiration_list)} expiration dates") - contract_list = await get_data(symbol,expiration_list) - print(f"Unique contracts: {len(contract_list)}") + try: + print(f"==========Start Process for {symbol}==========") + expiration_list = get_all_expirations(symbol) + print(f"Found {len(expiration_list)} expiration dates") + contract_list = await get_data(symbol,expiration_list) + print(f"Unique contracts: {len(contract_list)}") - results = await process_contracts(symbol, contract_list) + results = await process_contracts(symbol, contract_list) + except: + pass if __name__ == "__main__":