update cron job

This commit is contained in:
MuslemRahimi 2025-01-27 16:13:05 +01:00
parent cba3cec00b
commit 271f150ac3

View File

@ -1,5 +1,6 @@
from __future__ import print_function from __future__ import print_function
import asyncio import asyncio
import aiohttp
import time import time
import intrinio_sdk as intrinio import intrinio_sdk as intrinio
from intrinio_sdk.rest import ApiException from intrinio_sdk.rest import ApiException
@ -104,50 +105,29 @@ def safe_round(value):
return value return value
def get_all_expirations(symbol):
response = intrinio.OptionsApi().get_options_expirations_eod(
symbol,
after=after,
before=before,
include_related_symbols=include_related_symbols
)
data = (response.__dict__).get('_expirations')
return data
async def get_options_chain(symbol, expiration, semaphore):
async with semaphore:
try:
# Run the synchronous API call in a thread pool since intrinio doesn't support async
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
response = await loop.run_in_executor(
pool,
lambda: intrinio.OptionsApi().get_options_chain_eod(
symbol,
expiration,
include_related_symbols=include_related_symbols
)
)
contracts = set()
for item in response.chain:
try:
contracts.add(item.option.code)
except Exception as e:
print(f"Error processing contract in {expiration}: {e}")
return contracts
except:
return set()
async def get_price_batch_realtime(symbol, contract_list): async def get_price_batch_realtime(symbol, contract_list):
# API Configuration
api_url = "https://api-v2.intrinio.com/options/prices/realtime/batch"
headers = {
"Authorization": f"Bearer {api_key}" # Replace with your actual API key
}
params = {
"source": source,
"show_stats": show_stats,
"stock_price_source": stock_price_source,
"model": model,
"show_extended_price": show_extended_price
}
body = { body = {
"contracts": contract_list "contracts": contract_list
} }
response = intrinio.OptionsApi().get_options_prices_batch_realtime(body, source=source, show_stats=show_stats, stock_price_source=stock_price_source, model=model, show_extended_price=show_extended_price)
data = response.__dict__ # Make API request
data = data['_contracts'] async with aiohttp.ClientSession() as session:
async with session.post(api_url, headers=headers, params=params, json=body) as response:
response_data = await response.json()
contracts_data = response_data.get('contracts', [])
res_dict = { res_dict = {
'total_premium': 0, 'call_premium': 0, 'put_premium': 0, 'total_premium': 0, 'call_premium': 0, 'put_premium': 0,
@ -158,61 +138,56 @@ async def get_price_batch_realtime(symbol, contract_list):
'time': None 'time': None
} }
for item in data: for item in contracts_data:
try: try:
price_data = (item.__dict__)['_price'].__dict__ price_data = item.get('price', {})
stats_data = (item.__dict__)['_stats'].__dict__ stats_data = item.get('stats', {})
option_type = ((item.__dict__)['_option'].__dict__)['_type'] option_data = item.get('option', {})
volume = int(price_data['_volume']) if price_data['_volume'] != None else 0 option_type = option_data.get('type', '').lower()
volume = int(price_data.get('volume', 0)) if price_data.get('volume') is not None else 0
total_open_interest = int(price_data['_open_interest']) if price_data['_open_interest'] != None else 0 open_interest = int(price_data.get('open_interest', 0)) if price_data.get('open_interest') is not None else 0
last_price = price_data['_last'] if price_data['_last'] != None else 0 last_price = price_data.get('last', 0) or 0
premium = int(volume * last_price * 100) premium = int(volume * last_price * 100)
implied_volatility = stats_data['_implied_volatility']
gamma = stats_data['_gamma'] if stats_data['_gamma'] != None else 0
delta = stats_data['_delta'] if stats_data['_delta'] != None else 0
res_dict['gex'] += gamma * total_open_interest * 100 implied_volatility = stats_data.get('implied_volatility')
res_dict['dex'] += delta * total_open_interest * 100 gamma = stats_data.get('gamma', 0) or 0
delta = stats_data.get('delta', 0) or 0
# Update metrics
res_dict['gex'] += gamma * open_interest * 100
res_dict['dex'] += delta * open_interest * 100
res_dict['total_premium'] += premium res_dict['total_premium'] += premium
res_dict['volume'] += volume res_dict['volume'] += volume
res_dict['total_open_interest'] += total_open_interest res_dict['total_open_interest'] += open_interest
if option_type == 'call': if option_type == 'call':
res_dict['call_premium'] += premium res_dict['call_premium'] += premium
res_dict['call_volume'] += volume res_dict['call_volume'] += volume
res_dict['call_open_interest'] += total_open_interest res_dict['call_open_interest'] += open_interest
else: else:
res_dict['put_premium'] += premium res_dict['put_premium'] += premium
res_dict['put_volume'] += volume res_dict['put_volume'] += volume
res_dict['put_open_interest'] += total_open_interest res_dict['put_open_interest'] += open_interest
if implied_volatility is not None:
res_dict['iv_list'].append(implied_volatility) res_dict['iv_list'].append(implied_volatility)
res_dict['time'] = price_data['_ask_timestamp'].strftime("%Y-%m-%d")
# Handle timestamp
if 'ask_timestamp' in price_data and price_data['ask_timestamp']:
timestamp_str = price_data['ask_timestamp']
try:
dt = datetime.datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
res_dict['time'] = dt.strftime("%Y-%m-%d")
except: except:
pass res_dict['time'] = timestamp_str[:10] # Fallback to string slicing
except Exception as e:
print(f"Error processing contract: {e}")
continue
return res_dict return res_dict
async def prepare_dataset(symbol):
expiration_list = get_all_expirations(symbol)
semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
# Create tasks for all expirations
tasks = [get_options_chain(symbol, expiration, semaphore) for expiration in expiration_list]
# Show progress bar for completed tasks
contract_sets = set()
for task in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Processing expirations"):
contracts = await task
contract_sets.update(contracts)
# Convert final set to list
contract_list = list(contract_sets)
async def main(): async def main():
total_symbols = get_tickers_from_directory() total_symbols = get_tickers_from_directory()
if len(total_symbols) < 3000: if len(total_symbols) < 3000: