bugfixing

This commit is contained in:
MuslemRahimi 2025-01-26 00:18:48 +01:00
parent f937f3b3da
commit bebb34c3e9
2 changed files with 45 additions and 65 deletions

View File

@ -61,14 +61,14 @@ def calculate_net_premium(ask_price, bid_price, ask_size, bid_size):
return ask_premium - bid_premium return ask_premium - bid_premium
intrinio.ApiClient().set_api_key(api_key) intrinio.ApiClient().set_api_key(api_key)
#intrinio.ApiClient().allow_retries(True) intrinio.ApiClient().allow_retries(True)
after = datetime.today().strftime('%Y-%m-%d') after = datetime.today().strftime('%Y-%m-%d')
before = '2100-12-31' before = '2100-12-31'
N_year_ago = datetime.now() - timedelta(days=365) N_year_ago = datetime.now() - timedelta(days=365)
include_related_symbols = False include_related_symbols = False
page_size = 5000 page_size = 5000
MAX_CONCURRENT_REQUESTS = 50 # Adjust based on API rate limits MAX_CONCURRENT_REQUESTS = 100 # Adjust based on API rate limits
BATCH_SIZE = 1500 BATCH_SIZE = 1500
def get_all_expirations(symbol): def get_all_expirations(symbol):
@ -103,51 +103,48 @@ async def get_options_chain(symbol, expiration, semaphore):
print(f"Error processing contract in {expiration}: {e}") print(f"Error processing contract in {expiration}: {e}")
return contracts return contracts
except Exception as e: except:
print(f"Error fetching chain for {expiration}: {e}")
return set() return set()
async def get_single_contract_eod_data(symbol, contract_id, semaphore): async def get_single_contract_eod_data(symbol, contract_id, semaphore):
url = f"https://api-v2.intrinio.com/options/prices/{contract_id}/eod?api_key={api_key}"
async with semaphore: async with semaphore:
try: try:
loop = asyncio.get_event_loop() async with aiohttp.ClientSession() as session:
with ThreadPoolExecutor() as pool: async with session.get(url) as response:
# Fetch data using ThreadPoolExecutor if response.status != 200:
response = await loop.run_in_executor( print(f"Failed to fetch data for {contract_id}: {response.status}")
pool, return None
lambda: intrinio.OptionsApi().get_options_prices_eod(identifier=contract_id)
) response_data = await response.json()
# Extract and process the response data # Extract and process the response data
key_data = {k: v for k, v in response._option.__dict__.items() if isinstance(v, (str, int, float, bool, list, dict, type(None)))} key_data = {k: v for k, v in response_data.get("option", {}).items() if isinstance(v, (str, int, float, bool, list, dict, type(None)))}
history = [] history = []
if "prices" in response_data:
for price in response_data["prices"]:
if response and hasattr(response, '_prices'):
for price in response._prices:
history.append({ history.append({
k: v for k, v in price.__dict__.items() k: v for k, v in price.items() if isinstance(v, (str, int, float, bool, list, dict, type(None)))
if isinstance(v, (str, int, float, bool, list, dict, type(None)))
}) })
# Clean the data
#clean the data
history = [ history = [
{key.lstrip('_'): value for key, value in record.items() if key not in ('_close_time','_open_ask', '_ask_low','_close_size','_exercise_style','discriminator','_open_bid','_bid_low','_bid_high','_ask_high')} {key.lstrip('_'): value for key, value in record.items() if key not in ('close_time', 'open_ask', 'ask_low', 'close_size', 'exercise_style', 'discriminator', 'open_bid', 'bid_low', 'bid_high', 'ask_high')}
for record in history for record in history
] ]
# Ignore small volume and open interest contracts
#ignore small volume and oi contracts to filter trash contracts... oh hi mark total_volume = sum(item.get('volume', 0) or 0 for item in history)
total_volume = sum(item['volume'] or 0 for item in history) total_open_interest = sum(item.get('open_interest', 0) or 0 for item in history)
total_open_interest = sum(item['open_interest'] or 0 for item in history)
count = len(history) count = len(history)
avg_volume = int(total_volume / count) if count > 0 else 0 avg_volume = int(total_volume / count) if count > 0 else 0
avg_open_interest = int(total_open_interest / count) if count > 0 else 0 avg_open_interest = int(total_open_interest / count) if count > 0 else 0
#filter out the trash
if avg_volume > 10 and avg_open_interest > 10: if avg_volume > 10 and avg_open_interest > 10:
res_list = [] res_list = []
@ -162,36 +159,30 @@ async def get_single_contract_eod_data(symbol, contract_id, semaphore):
pass pass
res_list = sorted(res_list, key=lambda x: x['date']) res_list = sorted(res_list, key=lambda x: x['date'])
for i in range(1, len(res_list)): for i in range(1, len(res_list)):
try: try:
current_open_interest = res_list[i]['open_interest'] current_open_interest = res_list[i]['open_interest']
previous_open_interest = res_list[i-1]['open_interest'] or 0 previous_open_interest = res_list[i-1]['open_interest'] or 0
changes_percentage_oi = round((current_open_interest/previous_open_interest -1)*100,2) changes_percentage_oi = round((current_open_interest / previous_open_interest - 1) * 100, 2)
res_list[i]['changeOI'] = current_open_interest - previous_open_interest res_list[i]['changeOI'] = current_open_interest - previous_open_interest
res_list[i]['changesPercentageOI'] = changes_percentage_oi res_list[i]['changesPercentageOI'] = changes_percentage_oi
except: except:
res_list[i]['changeOI'] = None res_list[i]['changeOI'] = None
res_list[i]['changesPercentageOI'] = None res_list[i]['changesPercentageOI'] = None
for i in range(1,len(res_list)): for i in range(1, len(res_list)):
try: try:
volume = res_list[i]['volume'] volume = res_list[i]['volume']
avg_fill = res_list[i]['mark'] avg_fill = res_list[i]['mark']
res_list[i]['gex'] = res_list[i]['gamma'] * res_list[i]['open_interest'] * 100 res_list[i]['gex'] = res_list[i]['gamma'] * res_list[i]['open_interest'] * 100
res_list[i]['dex'] = res_list[i]['delta'] * res_list[i]['open_interest'] * 100 res_list[i]['dex'] = res_list[i]['delta'] * res_list[i]['open_interest'] * 100
res_list[i]['total_premium'] = int(avg_fill * volume * 100)
res_list[i]['total_premium'] = int(avg_fill*volume*100)
# Calculate the net premiums for call and put options
#res_list[i]['net_premium'] = calculate_net_premium(res_list[i]['close_ask'], res_list[i]['close_bid'], res_list[i]['close_ask_size'], res_list[i]['close_bid_size'])
except: except:
res_list[i]['total_premium'] = 0 res_list[i]['total_premium'] = 0
#res_list[i]['net_premium'] = 0
data = {'expiration': key_data.get('expiration'), 'strike': key_data.get('strike'), 'optionType': key_data.get('type'), 'history': res_list}
data = {'expiration': key_data['_expiration'], 'strike': key_data['_strike'], 'optionType': key_data['_type'], 'history': res_list}
if data: if data:
await save_json(data, symbol, contract_id) await save_json(data, symbol, contract_id)
@ -242,19 +233,16 @@ async def process_contracts(symbol, contract_list):
with tqdm(total=total_contracts, desc="Processing contracts") as pbar: with tqdm(total=total_contracts, desc="Processing contracts") as pbar:
for batch_num in range(total_batches): for batch_num in range(total_batches):
start_idx = batch_num * BATCH_SIZE try:
batch = contract_list[start_idx:start_idx + BATCH_SIZE] start_idx = batch_num * BATCH_SIZE
batch = contract_list[start_idx:start_idx + BATCH_SIZE]
# Process the batch concurrently
batch_results = await process_batch(symbol, batch, semaphore, pbar) # Process the batch concurrently
results.extend(batch_results) batch_results = await process_batch(symbol, batch, semaphore, pbar)
results.extend(batch_results)
''' except:
# Sleep between batches if not the last batch pass
if batch_num < total_batches - 1:
print(f"Sleeping for 60 seconds before next batch...")
await asyncio.sleep(60)
'''
return results return results
@ -343,18 +331,9 @@ async def main():
total_symbols = get_tickers_from_directory(directory_path) total_symbols = get_tickers_from_directory(directory_path)
if len(total_symbols) < 2000: if len(total_symbols) < 2000:
total_symbols = get_total_symbols() total_symbols = get_total_symbols()
for symbol in tqdm(total_symbols):
await process_symbol(symbol)
# Split the symbols into chunks of 2
for i in tqdm(range(0, len(total_symbols), 2)):
symbols_chunk = total_symbols[i:i+2]
# Run the symbols in the chunk concurrently
await asyncio.gather(*[process_symbol(symbol) for symbol in symbols_chunk])
# Example usage
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())
if __name__ == "__main__":
asyncio.run(main())

View File

@ -79,6 +79,7 @@ def run_options_stats():
hour = now.hour hour = now.hour
if week <= 4 and 9 <= hour <= 16: if week <= 4 and 9 <= hour <= 16:
run_command(["python3", "cron_options_stats.py"]) run_command(["python3", "cron_options_stats.py"])
run_command(["python3", "cron_unusual_activity.py"])
def run_dark_pool_level(): def run_dark_pool_level():
now = datetime.now(ny_tz) now = datetime.now(ny_tz)
@ -101,8 +102,8 @@ def run_options_jobs():
run_command(["python3", "cron_options_single_contract.py"]) run_command(["python3", "cron_options_single_contract.py"])
run_command(["python3", "cron_options_historical_volume.py"]) run_command(["python3", "cron_options_historical_volume.py"])
run_command(["python3", "cron_options_hottest_contracts.py"]) run_command(["python3", "cron_options_hottest_contracts.py"])
run_command(["python3", "cron_options_oi.py"])
run_command(["python3", "cron_implied_volatility.py"]) run_command(["python3", "cron_implied_volatility.py"])
run_command(["python3", "cron_options_oi.py"])
''' '''
run_command(["python3", "cron_options_gex_dex.py"]) run_command(["python3", "cron_options_gex_dex.py"])
''' '''
@ -421,7 +422,7 @@ schedule.every(1).hours.do(run_threaded, run_company_news).tag('company_news_job
schedule.every(3).hours.do(run_threaded, run_press_releases).tag('press_release_job') schedule.every(3).hours.do(run_threaded, run_press_releases).tag('press_release_job')
schedule.every(20).minutes.do(run_threaded, run_options_stats).tag('options_stats_job') schedule.every(30).minutes.do(run_threaded, run_options_stats).tag('options_stats_job')
schedule.every(5).minutes.do(run_threaded, run_market_flow).tag('market_flow_job') schedule.every(5).minutes.do(run_threaded, run_market_flow).tag('market_flow_job')
schedule.every(5).minutes.do(run_threaded, run_list).tag('stock_list_job') schedule.every(5).minutes.do(run_threaded, run_list).tag('stock_list_job')