From 0d58a83a51a1afff68d7882ae2f668cb235e4d8f Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Thu, 23 Jan 2025 14:45:44 +0100 Subject: [PATCH] update option oi --- app/cron_options_oi.py | 292 ++++++++++++++++++++++++----------------- 1 file changed, 173 insertions(+), 119 deletions(-) diff --git a/app/cron_options_oi.py b/app/cron_options_oi.py index cbf5818..8a7000f 100644 --- a/app/cron_options_oi.py +++ b/app/cron_options_oi.py @@ -1,147 +1,201 @@ -import requests -import orjson -import re -from datetime import datetime -from dotenv import load_dotenv -import os -import sqlite3 +from __future__ import print_function +import asyncio import time +import intrinio_sdk as intrinio +from intrinio_sdk.rest import ApiException +from datetime import datetime, timedelta +import orjson from tqdm import tqdm +import os +from collections import defaultdict +from dotenv import load_dotenv +from concurrent.futures import ThreadPoolExecutor + load_dotenv() +api_key = os.getenv('INTRINIO_API_KEY') -api_key = os.getenv('UNUSUAL_WHALES_API_KEY') -headers = {"Accept": "application/json, text/plain", "Authorization": api_key} +# Configure Intrinio SDK +intrinio.ApiClient().set_api_key(api_key) +intrinio.ApiClient().allow_retries(True) -# Connect to the databases -con = sqlite3.connect('stocks.db') -etf_con = sqlite3.connect('etf.db') -cursor = con.cursor() -cursor.execute("PRAGMA journal_mode = wal") -#cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%' AND marketCap > 1E9") -cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") -stocks_symbols = [row[0] for row in cursor.fetchall()] +# Configuration +MAX_CONCURRENT_REQUESTS = 50 +BATCH_SIZE = 1500 +include_related_symbols = False -etf_cursor = etf_con.cursor() -etf_cursor.execute("PRAGMA journal_mode = wal") -#etf_cursor.execute("SELECT DISTINCT symbol FROM etfs WHERE marketCap > 1E9") -etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") -etf_symbols = [row[0] for row in etf_cursor.fetchall()] +def save_json(data, symbol, category="strike"): + directory_path = f"json/oi/{category}/" + os.makedirs(directory_path, exist_ok=True) + with open(f"{directory_path}/{symbol}.json", 'wb') as file: + file.write(orjson.dumps(data)) -con.close() -etf_con.close() - - -def get_tickers_from_directory(directory: str): +def get_tickers_from_directory(): + directory = "json/options-historical-data/companies" try: - # Ensure the directory exists if not os.path.exists(directory): raise FileNotFoundError(f"The directory '{directory}' does not exist.") - - # Get all tickers from filenames return [file.replace(".json", "") for file in os.listdir(directory) if file.endswith(".json")] - except Exception as e: print(f"An error occurred: {e}") return [] - - - -def save_json(data, symbol, directory_path): - os.makedirs(directory_path, exist_ok=True) # Ensure the directory exists - with open(f"{directory_path}/{symbol}.json", 'wb') as file: # Use binary mode for orjson - file.write(orjson.dumps(data)) - - -def safe_round(value, decimals=2): +def get_contracts_from_directory(symbol): + directory = f"json/all-options-contracts/{symbol}/" try: - return round(float(value), decimals) - except (ValueError, TypeError): - return value + if not os.path.exists(directory): + raise FileNotFoundError(f"The directory '{directory}' does not exist.") + return [file.replace(".json", "") for file in os.listdir(directory) if file.endswith(".json")] + except: + return [] - -def prepare_data(data, symbol, directory_path, sort_by = "date"): - data = [{k: v for k, v in item.items() if "charm" not in k and "vanna" not in k} for item in data] - res_list = [] - for item in data: +async def get_single_contract_data(symbol, expiration, semaphore): + async with semaphore: try: - new_item = { - key: safe_round(value) if isinstance(value, (int, float, str)) else value - for key, value in item.items() + # Use ThreadPoolExecutor to run synchronous API calls + loop = asyncio.get_event_loop() + with ThreadPoolExecutor() as pool: + response = await loop.run_in_executor( + pool, + lambda: intrinio.OptionsApi().get_options_chain_eod(symbol, expiration, include_related_symbols=include_related_symbols) + ) + + # Process the options chain data + contract_data = [] + for item in response.chain: + try: + option_price_data = item.prices + dict_data = option_price_data.__dict__ + + contract_data.append({ + 'strike': item.option.strike, + 'expiration': item.option.expiration, + 'type': item.option.type, + 'open_interest': dict_data.get('_open_interest', 0), + 'contract_code': item.option.code + }) + except Exception as e: + print(f"Error processing contract item: {e}") + + return { + 'expiration': expiration, + 'contracts': contract_data } - - res_list.append(new_item) - except: - pass - - if res_list: - res_list = sorted(res_list, key=lambda x: x[sort_by], reverse=True) - save_json(res_list, symbol, directory_path) - - -def get_strike_data(): - print("Starting to download strike data...") - directory_path = "json/oi/strike" - total_symbols = get_tickers_from_directory(directory_path) - if len(total_symbols) < 100: - total_symbols = stocks_symbols+etf_symbols - - counter = 0 - #Test mode - #total_symbols = ['GME','SPY'] - for symbol in tqdm(total_symbols): - try: - url = f"https://api.unusualwhales.com/api/stock/{symbol}/oi-per-strike" - - response = requests.get(url, headers=headers) - if response.status_code == 200: - data = response.json()['data'] - prepare_data(data, symbol, directory_path, sort_by = 'strike') - - counter +=1 - - # If 50 chunks have been processed, sleep for 60 seconds - if counter == 260: - print("Sleeping...") - time.sleep(60) - counter = 0 - except Exception as e: - print(f"Error for {symbol}:{e}") + print(f"Error processing expiration {expiration}: {e}") + return None -def get_expiry_data(): - print("Starting to download expiry data...") - directory_path = "json/oi/expiry" - total_symbols = get_tickers_from_directory(directory_path) - if len(total_symbols) < 100: - total_symbols = stocks_symbols+etf_symbols +async def process_batch(symbol, batch, semaphore, pbar): + tasks = [get_single_contract_data(symbol, contract, semaphore) for contract in batch] + results = [] + + for task in asyncio.as_completed(tasks): + result = await task + if result: + results.append(result) + pbar.update(1) + return results - counter = 0 +async def process_contracts(symbol, contract_list): + results = [] + semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) + + total_contracts = len(contract_list) + total_batches = (total_contracts + BATCH_SIZE - 1) // BATCH_SIZE + + with tqdm(total=total_contracts, desc=f"Processing {symbol} contracts") as pbar: + for batch_num in range(total_batches): + start_idx = batch_num * BATCH_SIZE + batch = contract_list[start_idx:start_idx + BATCH_SIZE] + + print(f"\nProcessing batch {batch_num + 1}/{total_batches} ({len(batch)} contracts)") + batch_start_time = time.time() + + batch_results = await process_batch(symbol, batch, semaphore, pbar) + results.extend(batch_results) + + batch_time = time.time() - batch_start_time + + if batch_num < total_batches - 1: + print(f"Sleeping for 30 seconds before next batch...") + await asyncio.sleep(30) + return results - for symbol in tqdm(total_symbols): +def aggregate_open_interest(symbol, results): + strike_data = defaultdict(lambda: {'call_open_interest': 0, 'put_open_interest': 0}) + expiration_data = defaultdict(lambda: {'call_open_interest': 0, 'put_open_interest': 0}) + + for result in results: + if not result or 'contracts' not in result: + continue + + for contract in result['contracts']: + try: + strike = contract['strike'] + option_type = contract['type'] + open_interest = contract['open_interest'] + expiration = contract['expiration'] + + if option_type == 'call': + strike_data[strike]['call_open_interest'] += open_interest + expiration_data[expiration]['call_open_interest'] += open_interest + elif option_type == 'put': + strike_data[strike]['put_open_interest'] += open_interest + expiration_data[expiration]['put_open_interest'] += open_interest + except Exception as e: + print(f"Error processing contract: {e}") + + # Convert to sortable list format + strike_data = sorted(strike_data.items(), key=lambda x: x[0], reverse=True) + strike_data = [ + { + "call_oi": data[1]['call_open_interest'], + "put_oi": data[1]['put_open_interest'], + "strike": data[0], + } + for data in strike_data + ] + + expiration_data = sorted(expiration_data.items(), key=lambda x: x[0]) + expiration_data = [ + { + "call_oi": data[1]['call_open_interest'], + "put_oi": data[1]['put_open_interest'], + "expiry": data[0], + } + for data in expiration_data + ] + + # Save aggregated data + if strike_data: + save_json(strike_data, symbol, 'strike') + if expiration_data: + save_json(expiration_data, symbol, 'expiry') + + +async def main(): + # Get list of symbols + total_symbols = get_tickers_from_directory() + print(f"Number of tickers: {len(total_symbols)}") + + total_symbols = ['AA'] + + for symbol in total_symbols: try: - url = f"https://api.unusualwhales.com/api/stock/{symbol}/oi-per-expiry" - - response = requests.get(url, headers=headers) - if response.status_code == 200: - data = response.json()['data'] - prepare_data(data, symbol, directory_path) - - counter +=1 - - # If 50 chunks have been processed, sleep for 60 seconds - if counter == 260: - print("Sleeping...") - time.sleep(60) - counter = 0 + # Get list of contracts for the symbol + contract_list = get_contracts_from_directory(symbol) + if not contract_list: + print(f"No contracts found for {symbol}") + continue + + # Process contracts + results = await process_contracts(symbol, contract_list) + # Aggregate and save open interest data + aggregate_open_interest(symbol, results) + except Exception as e: - print(f"Error for {symbol}:{e}") - - -if __name__ == '__main__': - get_strike_data() - #time.sleep(60) - get_expiry_data() + print(f"Error processing {symbol}: {e}") +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file