From 0efdd96ba853e6d25085cedb7bdf48463523b285 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Tue, 21 Jan 2025 13:30:26 +0100 Subject: [PATCH] update single contract job --- app/cron_options_single_contract.py | 328 +++++++++++++++++++--------- 1 file changed, 225 insertions(+), 103 deletions(-) diff --git a/app/cron_options_single_contract.py b/app/cron_options_single_contract.py index 191eb35..ff5483b 100644 --- a/app/cron_options_single_contract.py +++ b/app/cron_options_single_contract.py @@ -1,126 +1,248 @@ -import requests -import orjson -import re +from __future__ import print_function +import asyncio +import time +import intrinio_sdk as intrinio +from intrinio_sdk.rest import ApiException from datetime import datetime +import ast +import orjson +from tqdm import tqdm from dotenv import load_dotenv import os -import time -import asyncio import aiohttp -from tqdm import tqdm - -today = datetime.today() +from concurrent.futures import ThreadPoolExecutor +import sqlite3 load_dotenv() +api_key = os.getenv('INTRINIO_API_KEY') -api_key = os.getenv('UNUSUAL_WHALES_API_KEY') -headers = {"Accept": "application/json, text/plain", "Authorization": api_key} -keys_to_remove = {'high_price', 'low_price', 'iv_low', 'iv_high', 'last_tape_time'} +directory_path = "json/all-options-contracts" -def save_json(data, filename, directory): - os.makedirs(directory, exist_ok=True) - filepath = os.path.join(directory, f"{filename}.json") - with open(filepath, 'wb') as file: +async def save_json(data, symbol, contract_id): + directory_path = f"json/all-options-contracts/{symbol}" + os.makedirs(directory_path, exist_ok=True) # Ensure the directory exists + with open(f"{directory_path}/{contract_id}.json", 'wb') as file: # Use binary mode for orjson file.write(orjson.dumps(data)) -def get_tickers_from_directory(directory: str): +def safe_round(value): try: - if not os.path.exists(directory): - raise FileNotFoundError(f"The directory '{directory}' does not exist.") - return [file.replace(".json", "") for file in os.listdir(directory) if file.endswith(".json")] - except Exception as e: - print(f"An error occurred: {e}") - return [] - -def safe_round(value, decimals=2): - try: - return round(float(value), decimals) + return round(float(value), 2) except (ValueError, TypeError): return value - -def get_single_contract_historical_data(contract_id): - keys_to_remove = {'high_price', 'low_price', 'iv_low', 'iv_high', 'last_tape_time'} - - url = f"https://api.unusualwhales.com/api/option-contract/{contract_id}/historic" - response = requests.get(url, headers=headers) - data = response.json()['chains'] - data = sorted(data, key=lambda x: datetime.strptime(x.get('date', ''), '%Y-%m-%d')) - res_list = [] - for i, item in enumerate(data): - new_item = { - key: safe_round(value) if isinstance(value, (int, float, str)) else value - for key, value in item.items() - } +class OptionsResponse: + @property + def chain(self): + return self._chain - # Compute open interest change and percent if not the first item - if i > 0: - previous_open_interest = safe_round(data[i-1].get('open_interest', 0)) - open_interest = safe_round(item.get('open_interest', 0)) +class ChainItem: + @property + def prices(self): + return self._prices - if previous_open_interest > 0: - new_item['open_interest_change'] = safe_round(open_interest - previous_open_interest) - new_item['open_interest_change_percent'] = safe_round((open_interest / previous_open_interest - 1) * 100) - else: - new_item['open_interest_change'] = 0 - new_item['open_interest_change_percent'] = 0 +intrinio.ApiClient().set_api_key(api_key) +#intrinio.ApiClient().allow_retries(True) - res_list.append(new_item) +after = datetime.today().strftime('%Y-%m-%d') +before = '2045-12-31' +include_related_symbols = False +page_size = 5000 +MAX_CONCURRENT_REQUESTS = 20 # Adjust based on API rate limits +BATCH_SIZE = 2000 - if res_list: - res_list = [{key: value for key, value in item.items() if key not in keys_to_remove} for item in res_list] - res_list = sorted(res_list, key=lambda x: datetime.strptime(x.get('date', ''), '%Y-%m-%d'), reverse=True) +def get_all_expirations(symbol): + response = intrinio.OptionsApi().get_options_expirations_eod( + symbol, + after=after, + before=before, + include_related_symbols=include_related_symbols + ) + data = (response.__dict__).get('_expirations') + return data - save_json(res_list, contract_id,"json/hottest-contracts/contracts") - -if __name__ == '__main__': - directory_path = "json/hottest-contracts/companies" - - total_symbols = get_tickers_from_directory(directory_path) - contract_id_set = set() # Use a set to ensure uniqueness - for symbol in total_symbols: +async def get_options_chain(symbol, expiration, semaphore): + async with semaphore: try: - with open(f"json/hottest-contracts/companies/{symbol}.json", "r") as file: - data = orjson.loads(file.read()) - volume_list = data.get('volume',[]) - open_interest_list = data.get('openInterest',[]) - - if len(volume_list) > 0: - for item in volume_list: - try: - date_expiration = item.get('date_expiration',None) - if date_expiration != None and datetime.strptime(date_expiration, "%Y-%m-%d") >=today: - contract_id_set.add(item['option_symbol']) # Add to the set - except Exception as e: - print(e) + # Run the synchronous API call in a thread pool since intrinio doesn't support async + loop = asyncio.get_event_loop() + with ThreadPoolExecutor() as pool: + response = await loop.run_in_executor( + pool, + lambda: intrinio.OptionsApi().get_options_chain_eod( + symbol, + expiration, + include_related_symbols=include_related_symbols + ) + ) + + contracts = set() + for item in response.chain: + try: + contracts.add(item.option.code) + except Exception as e: + print(f"Error processing contract in {expiration}: {e}") + return contracts + + except Exception as e: + print(f"Error fetching chain for {expiration}: {e}") + return set() - if len(open_interest_list) > 0: - for item in open_interest_list: - try: - date_expiration = item.get('date_expiration',None) - if date_expiration != None and datetime.strptime(date_expiration, "%Y-%m-%d") >=today: - contract_id_set.add(item['option_symbol']) # Add to the set - except: - pass - except KeyboardInterrupt: - print("\nProcess interrupted by user.") - except: - pass # Handle missing files gracefully - # Convert the set to a list if needed - contract_id_list = list(contract_id_set) - - print("Number of contract chains:", len(contract_id_list)) - - counter = 0 - for item in tqdm(contract_id_list): +async def get_single_contract_eod_data(symbol, contract_id, semaphore): + async with semaphore: try: - get_single_contract_historical_data(item) - except: - pass - # If 50 chunks have been processed, sleep for 60 seconds - counter +=1 - if counter == 260: - print("Sleeping...") - time.sleep(60) - counter = 0 \ No newline at end of file + loop = asyncio.get_event_loop() + with ThreadPoolExecutor() as pool: + # Fetch data using ThreadPoolExecutor + response = await loop.run_in_executor( + pool, + lambda: intrinio.OptionsApi().get_options_prices_eod(identifier=contract_id) + ) + + # Extract and process the response data + history = [] + if response and hasattr(response, '_prices'): + for price in response._prices: + history.append({ + k: v for k, v in price.__dict__.items() + if isinstance(v, (str, int, float, bool, list, dict, type(None))) + }) + + #clean the data + history = [ + {key.lstrip('_'): value for key, value in record.items() if key not in ('_open_ask', '_ask_low','_close_bid_size','_close_ask_size','_close_ask','_close_bid','_close_size','_exercise_style','discriminator','_open_bid','_bid_low','_bid_high','_ask_high')} + for record in history + ] + + + #ignore small volume and oi contracts to filter trash contracts... oh hi mark + total_volume = sum(item['volume'] or 0 for item in history) + total_open_interest = sum(item['open_interest'] or 0 for item in history) + count = len(history) + avg_volume = int(total_volume / count) if count > 0 else 0 + avg_open_interest = int(total_open_interest / count) if count > 0 else 0 + + if avg_volume > 10 and avg_open_interest > 10: + print(history) + ''' + res_list = [] + + for item in history: + + new_item = { + key: safe_round(value) + for key, value in item.items() + if key != 'in_out_flow' + } + + res_list = sorted(res_list, key=lambda x: x['date']) + for i in range(1, len(res_list)): + try: + current_open_interest = res_list[i]['total_open_interest'] + previous_open_interest = res_list[i-1]['total_open_interest'] + changes_percentage_oi = round((current_open_interest/previous_open_interest -1)*100,2) + res_list[i]['changesPercentageOI'] = changes_percentage_oi + except: + res_list[i]['changesPercentageOI'] = None + + ''' + await save_json(history, symbol, contract_id) + + except Exception as e: + print(f"Error fetching data for {contract_id}: {e}") + return None + + + + +async def get_data(symbol, expiration_list): + # Use a semaphore to limit concurrent requests + semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) + + # Create tasks for all expirations + tasks = [get_options_chain(symbol, expiration, semaphore) for expiration in expiration_list] + + # Show progress bar for completed tasks + contract_sets = set() + for task in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Processing expirations"): + contracts = await task + contract_sets.update(contracts) + + # Convert final set to list + contract_list = list(contract_sets) + return contract_list + + +async def process_batch(symbol, batch, semaphore, pbar): + tasks = [get_single_contract_eod_data(symbol, contract, semaphore) for contract in batch] + results = [] + + for task in asyncio.as_completed(tasks): + result = await task + if result: + results.append(result) + pbar.update(1) + + return results + +async def process_contracts(symbol, contract_list): + results = [] + semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) + + # Calculate total batches for better progress tracking + total_contracts = len(contract_list) + total_batches = (total_contracts + BATCH_SIZE - 1) // BATCH_SIZE + + with tqdm(total=total_contracts, desc="Processing contracts") as pbar: + for batch_num in range(total_batches): + start_idx = batch_num * BATCH_SIZE + batch = contract_list[start_idx:start_idx + BATCH_SIZE] + + print(f"\nProcessing batch {batch_num + 1}/{total_batches} ({len(batch)} contracts)") + batch_start_time = time.time() + + # Process the batch concurrently + batch_results = await process_batch(symbol, batch, semaphore, pbar) + results.extend(batch_results) + + batch_time = time.time() - batch_start_time + print(f"Batch completed in {batch_time:.2f} seconds") + + # Sleep between batches if not the last batch + if batch_num < total_batches - 1: + print(f"Sleeping for 60 seconds before next batch...") + await asyncio.sleep(60) + + return results + +def get_total_symbols(): + with sqlite3.connect('stocks.db') as con: + cursor = con.cursor() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") + stocks_symbols = [row[0] for row in cursor.fetchall()] + + with sqlite3.connect('etf.db') as etf_con: + etf_cursor = etf_con.cursor() + etf_cursor.execute("PRAGMA journal_mode = wal") + etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") + etf_symbols = [row[0] for row in etf_cursor.fetchall()] + + return stocks_symbols + etf_symbols + +async def main(): + + total_symbols = ['AA'] #get_total_symbols() + + for symbol in tqdm(total_symbols): + print(f"==========Start Process for {symbol}==========") + expiration_list = get_all_expirations(symbol) + print(f"Found {len(expiration_list)} expiration dates") + contract_list = await get_data(symbol,expiration_list) + print(f"Unique contracts: {len(contract_list)}") + + results = await process_contracts(symbol, contract_list) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file