From bebb34c3e96ec7fd2822ac3951dbc9fe663f90bc Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Sun, 26 Jan 2025 00:18:48 +0100 Subject: [PATCH] bugfixing --- app/cron_options_single_contract.py | 105 +++++++++++----------------- app/primary_cron_job.py | 5 +- 2 files changed, 45 insertions(+), 65 deletions(-) diff --git a/app/cron_options_single_contract.py b/app/cron_options_single_contract.py index de1801b..2565a32 100644 --- a/app/cron_options_single_contract.py +++ b/app/cron_options_single_contract.py @@ -61,14 +61,14 @@ def calculate_net_premium(ask_price, bid_price, ask_size, bid_size): return ask_premium - bid_premium intrinio.ApiClient().set_api_key(api_key) -#intrinio.ApiClient().allow_retries(True) +intrinio.ApiClient().allow_retries(True) after = datetime.today().strftime('%Y-%m-%d') before = '2100-12-31' N_year_ago = datetime.now() - timedelta(days=365) include_related_symbols = False page_size = 5000 -MAX_CONCURRENT_REQUESTS = 50 # Adjust based on API rate limits +MAX_CONCURRENT_REQUESTS = 100 # Adjust based on API rate limits BATCH_SIZE = 1500 def get_all_expirations(symbol): @@ -103,51 +103,48 @@ async def get_options_chain(symbol, expiration, semaphore): print(f"Error processing contract in {expiration}: {e}") return contracts - except Exception as e: - print(f"Error fetching chain for {expiration}: {e}") + except: return set() async def get_single_contract_eod_data(symbol, contract_id, semaphore): + url = f"https://api-v2.intrinio.com/options/prices/{contract_id}/eod?api_key={api_key}" + async with semaphore: try: - loop = asyncio.get_event_loop() - with ThreadPoolExecutor() as pool: - # Fetch data using ThreadPoolExecutor - response = await loop.run_in_executor( - pool, - lambda: intrinio.OptionsApi().get_options_prices_eod(identifier=contract_id) - ) - + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status != 200: + print(f"Failed to fetch data for {contract_id}: {response.status}") + return None + + response_data = await response.json() + + # Extract and process the response data - key_data = {k: v for k, v in response._option.__dict__.items() if isinstance(v, (str, int, float, bool, list, dict, type(None)))} + key_data = {k: v for k, v in response_data.get("option", {}).items() if isinstance(v, (str, int, float, bool, list, dict, type(None)))} history = [] - - - - if response and hasattr(response, '_prices'): - for price in response._prices: + if "prices" in response_data: + for price in response_data["prices"]: history.append({ - k: v for k, v in price.__dict__.items() - if isinstance(v, (str, int, float, bool, list, dict, type(None))) + k: v for k, v in price.items() if isinstance(v, (str, int, float, bool, list, dict, type(None))) }) - - #clean the data + # Clean the data history = [ - {key.lstrip('_'): value for key, value in record.items() if key not in ('_close_time','_open_ask', '_ask_low','_close_size','_exercise_style','discriminator','_open_bid','_bid_low','_bid_high','_ask_high')} + {key.lstrip('_'): value for key, value in record.items() if key not in ('close_time', 'open_ask', 'ask_low', 'close_size', 'exercise_style', 'discriminator', 'open_bid', 'bid_low', 'bid_high', 'ask_high')} for record in history ] - - #ignore small volume and oi contracts to filter trash contracts... oh hi mark - total_volume = sum(item['volume'] or 0 for item in history) - total_open_interest = sum(item['open_interest'] or 0 for item in history) + # Ignore small volume and open interest contracts + total_volume = sum(item.get('volume', 0) or 0 for item in history) + total_open_interest = sum(item.get('open_interest', 0) or 0 for item in history) count = len(history) avg_volume = int(total_volume / count) if count > 0 else 0 avg_open_interest = int(total_open_interest / count) if count > 0 else 0 + #filter out the trash if avg_volume > 10 and avg_open_interest > 10: res_list = [] @@ -162,36 +159,30 @@ async def get_single_contract_eod_data(symbol, contract_id, semaphore): pass res_list = sorted(res_list, key=lambda x: x['date']) + for i in range(1, len(res_list)): try: current_open_interest = res_list[i]['open_interest'] previous_open_interest = res_list[i-1]['open_interest'] or 0 - changes_percentage_oi = round((current_open_interest/previous_open_interest -1)*100,2) + changes_percentage_oi = round((current_open_interest / previous_open_interest - 1) * 100, 2) res_list[i]['changeOI'] = current_open_interest - previous_open_interest res_list[i]['changesPercentageOI'] = changes_percentage_oi except: res_list[i]['changeOI'] = None res_list[i]['changesPercentageOI'] = None - for i in range(1,len(res_list)): + for i in range(1, len(res_list)): try: volume = res_list[i]['volume'] avg_fill = res_list[i]['mark'] res_list[i]['gex'] = res_list[i]['gamma'] * res_list[i]['open_interest'] * 100 res_list[i]['dex'] = res_list[i]['delta'] * res_list[i]['open_interest'] * 100 - - res_list[i]['total_premium'] = int(avg_fill*volume*100) - # Calculate the net premiums for call and put options - #res_list[i]['net_premium'] = calculate_net_premium(res_list[i]['close_ask'], res_list[i]['close_bid'], res_list[i]['close_ask_size'], res_list[i]['close_bid_size']) - + res_list[i]['total_premium'] = int(avg_fill * volume * 100) except: res_list[i]['total_premium'] = 0 - #res_list[i]['net_premium'] = 0 + data = {'expiration': key_data.get('expiration'), 'strike': key_data.get('strike'), 'optionType': key_data.get('type'), 'history': res_list} - - data = {'expiration': key_data['_expiration'], 'strike': key_data['_strike'], 'optionType': key_data['_type'], 'history': res_list} - if data: await save_json(data, symbol, contract_id) @@ -242,19 +233,16 @@ async def process_contracts(symbol, contract_list): with tqdm(total=total_contracts, desc="Processing contracts") as pbar: for batch_num in range(total_batches): - start_idx = batch_num * BATCH_SIZE - batch = contract_list[start_idx:start_idx + BATCH_SIZE] - - # Process the batch concurrently - batch_results = await process_batch(symbol, batch, semaphore, pbar) - results.extend(batch_results) - - ''' - # Sleep between batches if not the last batch - if batch_num < total_batches - 1: - print(f"Sleeping for 60 seconds before next batch...") - await asyncio.sleep(60) - ''' + try: + start_idx = batch_num * BATCH_SIZE + batch = contract_list[start_idx:start_idx + BATCH_SIZE] + + # Process the batch concurrently + batch_results = await process_batch(symbol, batch, semaphore, pbar) + results.extend(batch_results) + except: + pass + return results @@ -343,18 +331,9 @@ async def main(): total_symbols = get_tickers_from_directory(directory_path) if len(total_symbols) < 2000: total_symbols = get_total_symbols() + + for symbol in tqdm(total_symbols): + await process_symbol(symbol) - # Split the symbols into chunks of 2 - for i in tqdm(range(0, len(total_symbols), 2)): - symbols_chunk = total_symbols[i:i+2] - - # Run the symbols in the chunk concurrently - await asyncio.gather(*[process_symbol(symbol) for symbol in symbols_chunk]) - -# Example usage if __name__ == "__main__": asyncio.run(main()) - - -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index f8ee1a2..f0aa2fa 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -79,6 +79,7 @@ def run_options_stats(): hour = now.hour if week <= 4 and 9 <= hour <= 16: run_command(["python3", "cron_options_stats.py"]) + run_command(["python3", "cron_unusual_activity.py"]) def run_dark_pool_level(): now = datetime.now(ny_tz) @@ -101,8 +102,8 @@ def run_options_jobs(): run_command(["python3", "cron_options_single_contract.py"]) run_command(["python3", "cron_options_historical_volume.py"]) run_command(["python3", "cron_options_hottest_contracts.py"]) - run_command(["python3", "cron_options_oi.py"]) run_command(["python3", "cron_implied_volatility.py"]) + run_command(["python3", "cron_options_oi.py"]) ''' run_command(["python3", "cron_options_gex_dex.py"]) ''' @@ -421,7 +422,7 @@ schedule.every(1).hours.do(run_threaded, run_company_news).tag('company_news_job schedule.every(3).hours.do(run_threaded, run_press_releases).tag('press_release_job') -schedule.every(20).minutes.do(run_threaded, run_options_stats).tag('options_stats_job') +schedule.every(30).minutes.do(run_threaded, run_options_stats).tag('options_stats_job') schedule.every(5).minutes.do(run_threaded, run_market_flow).tag('market_flow_job') schedule.every(5).minutes.do(run_threaded, run_list).tag('stock_list_job')