diff --git a/app/cron_options_stats.py b/app/cron_options_stats.py index 81deed0..0d97ee7 100644 --- a/app/cron_options_stats.py +++ b/app/cron_options_stats.py @@ -1,5 +1,6 @@ from __future__ import print_function import asyncio +import aiohttp import time import intrinio_sdk as intrinio from intrinio_sdk.rest import ApiException @@ -104,50 +105,29 @@ def safe_round(value): return value -def get_all_expirations(symbol): - response = intrinio.OptionsApi().get_options_expirations_eod( - symbol, - after=after, - before=before, - include_related_symbols=include_related_symbols - ) - data = (response.__dict__).get('_expirations') - return data - -async def get_options_chain(symbol, expiration, semaphore): - async with semaphore: - try: - # Run the synchronous API call in a thread pool since intrinio doesn't support async - loop = asyncio.get_event_loop() - with ThreadPoolExecutor() as pool: - response = await loop.run_in_executor( - pool, - lambda: intrinio.OptionsApi().get_options_chain_eod( - symbol, - expiration, - include_related_symbols=include_related_symbols - ) - ) - contracts = set() - for item in response.chain: - try: - contracts.add(item.option.code) - except Exception as e: - print(f"Error processing contract in {expiration}: {e}") - return contracts - - except: - return set() - - - async def get_price_batch_realtime(symbol, contract_list): - body = { - "contracts": contract_list + # API Configuration + api_url = "https://api-v2.intrinio.com/options/prices/realtime/batch" + headers = { + "Authorization": f"Bearer {api_key}" # Replace with your actual API key } - response = intrinio.OptionsApi().get_options_prices_batch_realtime(body, source=source, show_stats=show_stats, stock_price_source=stock_price_source, model=model, show_extended_price=show_extended_price) - data = response.__dict__ - data = data['_contracts'] + params = { + "source": source, + "show_stats": show_stats, + "stock_price_source": stock_price_source, + "model": model, + "show_extended_price": show_extended_price + } + body = { + "contracts": contract_list + } + + # Make API request + async with aiohttp.ClientSession() as session: + async with session.post(api_url, headers=headers, params=params, json=body) as response: + response_data = await response.json() + + contracts_data = response_data.get('contracts', []) res_dict = { 'total_premium': 0, 'call_premium': 0, 'put_premium': 0, @@ -158,61 +138,56 @@ async def get_price_batch_realtime(symbol, contract_list): 'time': None } - for item in data: + for item in contracts_data: try: - price_data = (item.__dict__)['_price'].__dict__ - stats_data = (item.__dict__)['_stats'].__dict__ - option_type = ((item.__dict__)['_option'].__dict__)['_type'] + price_data = item.get('price', {}) + stats_data = item.get('stats', {}) + option_data = item.get('option', {}) - volume = int(price_data['_volume']) if price_data['_volume'] != None else 0 - - total_open_interest = int(price_data['_open_interest']) if price_data['_open_interest'] != None else 0 - last_price = price_data['_last'] if price_data['_last'] != None else 0 + option_type = option_data.get('type', '').lower() + volume = int(price_data.get('volume', 0)) if price_data.get('volume') is not None else 0 + open_interest = int(price_data.get('open_interest', 0)) if price_data.get('open_interest') is not None else 0 + last_price = price_data.get('last', 0) or 0 premium = int(volume * last_price * 100) - implied_volatility = stats_data['_implied_volatility'] - gamma = stats_data['_gamma'] if stats_data['_gamma'] != None else 0 - delta = stats_data['_delta'] if stats_data['_delta'] != None else 0 + + implied_volatility = stats_data.get('implied_volatility') + gamma = stats_data.get('gamma', 0) or 0 + delta = stats_data.get('delta', 0) or 0 - res_dict['gex'] += gamma * total_open_interest * 100 - res_dict['dex'] += delta * total_open_interest * 100 + # Update metrics + res_dict['gex'] += gamma * open_interest * 100 + res_dict['dex'] += delta * open_interest * 100 res_dict['total_premium'] += premium res_dict['volume'] += volume - res_dict['total_open_interest'] += total_open_interest + res_dict['total_open_interest'] += open_interest if option_type == 'call': res_dict['call_premium'] += premium res_dict['call_volume'] += volume - res_dict['call_open_interest'] += total_open_interest + res_dict['call_open_interest'] += open_interest else: res_dict['put_premium'] += premium res_dict['put_volume'] += volume - res_dict['put_open_interest'] += total_open_interest + res_dict['put_open_interest'] += open_interest - res_dict['iv_list'].append(implied_volatility) - res_dict['time'] = price_data['_ask_timestamp'].strftime("%Y-%m-%d") - except: - pass + if implied_volatility is not None: + res_dict['iv_list'].append(implied_volatility) + + # Handle timestamp + if 'ask_timestamp' in price_data and price_data['ask_timestamp']: + timestamp_str = price_data['ask_timestamp'] + try: + dt = datetime.datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) + res_dict['time'] = dt.strftime("%Y-%m-%d") + except: + res_dict['time'] = timestamp_str[:10] # Fallback to string slicing + except Exception as e: + print(f"Error processing contract: {e}") + continue return res_dict -async def prepare_dataset(symbol): - expiration_list = get_all_expirations(symbol) - - semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) - - # Create tasks for all expirations - tasks = [get_options_chain(symbol, expiration, semaphore) for expiration in expiration_list] - # Show progress bar for completed tasks - contract_sets = set() - for task in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Processing expirations"): - contracts = await task - contract_sets.update(contracts) - - # Convert final set to list - contract_list = list(contract_sets) - - async def main(): total_symbols = get_tickers_from_directory() if len(total_symbols) < 3000: