From f0e0018ab9ff3d60232cf6856d98af8d30f71942 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Wed, 5 Feb 2025 19:15:57 +0100 Subject: [PATCH] add market flow --- app/cron_market_flow.py | 211 ++++++++++++++++---------------------- app/cron_options_stats.py | 2 +- 2 files changed, 88 insertions(+), 125 deletions(-) diff --git a/app/cron_market_flow.py b/app/cron_market_flow.py index 2da5fb0..53216c1 100644 --- a/app/cron_market_flow.py +++ b/app/cron_market_flow.py @@ -10,8 +10,6 @@ import aiohttp import pytz import requests # Add missing import from collections import defaultdict -import intrinio_sdk as intrinio -from intrinio_sdk.rest import ApiException from GetStartEndDate import GetStartEndDate from tqdm import tqdm @@ -20,10 +18,6 @@ import re load_dotenv() fmp_api_key = os.getenv('FMP_API_KEY') -api_key = os.getenv('INTRINIO_API_KEY') - -intrinio.ApiClient().set_api_key(api_key) -intrinio.ApiClient().allow_retries(True) ny_tz = pytz.timezone('America/New_York') @@ -38,11 +32,17 @@ def save_json(data): with open(f"{directory}/data.json", 'wb') as file: # Use binary mode for orjson file.write(orjson.dumps(data)) + +def safe_round(value): + try: + return round(float(value), 2) + except (ValueError, TypeError): + return value + # Function to convert and match timestamps def add_close_to_data(price_list, data): for entry in data: - formatted_time = entry['timestamp'] - + formatted_time = entry['time'] # Match with price_list for price in price_list: if price['date'] == formatted_time: @@ -50,46 +50,6 @@ def add_close_to_data(price_list, data): break # Match found, no need to continue searching return data -def parse_contract_data(option_symbol): - # Define regex pattern to match the symbol structure - match = re.match(r"([A-Z]+)(\d{6})([CP])(\d+)", option_symbol) - if not match: - raise ValueError(f"Invalid option_symbol format: {option_symbol}") - - ticker, expiration, option_type, strike_price = match.groups() - - return option_type - - -async def get_intrinio_data(ticker): - url=f"https://api-v2.intrinio.com/options/unusual_activity/{ticker}/intraday?page_size=1000&api_key={api_key}" - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - data = await response.json() - - data = data.get('trades',[]) - if data: - res_list = [] - for item in data: - try: - iso_timestamp = item['timestamp'].replace('Z', '+00:00') - # Parse timestamp and convert to New York time - timestamp = datetime.fromisoformat(iso_timestamp).astimezone(ny_tz) - formatted_time = timestamp.strftime('%Y-%m-%d %H:%M:%S') - put_call = parse_contract_data(item['contract'].replace("___","").replace("__","").replace("_",'')) - if put_call == 'C': - put_call = 'calls' - else: - put_call = 'puts' - - res_list.append({'timestamp': formatted_time, 'put_call': put_call, 'cost_basis': item['total_value'], 'volume': item['total_size'], 'sentiment': item['sentiment']}) - except: - pass - - res_list.sort(key=lambda x: x['timestamp']) - return res_list - else: - return [] async def get_stock_chart_data(ticker): @@ -110,15 +70,10 @@ async def get_stock_chart_data(ticker): -def get_market_tide(interval_5m=False): - with open(f"json/stocks-list/sp500_constituent.json","r") as file: - ticker_list = orjson.loads(file.read()) - ticker_list = [item['symbol'] for item in ticker_list][:10] - - +def get_market_tide(interval_5m=True): res_list = [] - # Track changes per interval + # Track changes per interval using a defaultdict. delta_data = defaultdict(lambda: { 'cumulative_net_call_premium': 0, 'cumulative_net_put_premium': 0, @@ -128,58 +83,58 @@ def get_market_tide(interval_5m=False): 'put_bid_vol': 0 }) + # Process for each ticker (in this case only 'SPY') for ticker in tqdm(['SPY']): - ''' + # Load the data from JSON. with open("json/options-flow/feed/data.json", "r") as file: data = orjson.loads(file.read()) - ''' - data = asyncio.run(get_intrinio_data(ticker)) + # Filter and sort data for the given ticker. + data = [item for item in data if item['ticker'] == ticker] + data.sort(key=lambda x: x['time']) - ticker_options = [item for item in data if item['timestamp'].startswith(today)] - ticker_options.sort(key=lambda x: x['timestamp']) - - - for item in ticker_options: + # Process each item in the data + for item in data: try: - # Parse and standardize timestamp - dt = datetime.strptime(f"{item['timestamp']}", "%Y-%m-%d %H:%M:%S") - - # Truncate to start of minute (for 1m summaries) + # Combine date and time from the item. + dt = datetime.strptime(f"{item['date']} {item['time']}", "%Y-%m-%d %H:%M:%S") + # Truncate to the start of the minute. dt = dt.replace(second=0, microsecond=0) - # Adjust for 5-minute intervals if needed + # Adjust for 5-minute intervals if requested. if interval_5m: - dt -= timedelta(minutes=dt.minute % 5) + # Round down minutes to the nearest 5-minute mark. + minute = dt.minute - (dt.minute % 5) + dt = dt.replace(minute=minute) rounded_ts = dt.strftime("%Y-%m-%d %H:%M:%S") - # Extract metrics + # Extract metrics. cost = float(item.get("cost_basis", 0)) - sentiment = item.get("sentiment", "").lower() - put_call = item.get("put_call", "").lower() - vol = int(item.get("volume", 1)) + sentiment = item.get("sentiment", "") + put_call = item.get("put_call", "") + vol = int(item.get("volume", 0)) - # Update premium metrics - if put_call == "calls": - if sentiment == "bullish": + # Update premium and volume metrics. + if put_call == "Calls": + if sentiment == "Bullish": delta_data[rounded_ts]['cumulative_net_call_premium'] += cost delta_data[rounded_ts]['call_ask_vol'] += vol - elif sentiment == "bearish": + elif sentiment == "Bearish": delta_data[rounded_ts]['cumulative_net_call_premium'] -= cost delta_data[rounded_ts]['call_bid_vol'] += vol - elif put_call == "puts": - if sentiment == "bullish": + elif put_call == "Puts": + if sentiment == "Bullish": delta_data[rounded_ts]['cumulative_net_put_premium'] -= cost delta_data[rounded_ts]['put_ask_vol'] += vol - elif sentiment == "bearish": + elif sentiment == "Bearish": delta_data[rounded_ts]['cumulative_net_put_premium'] += cost delta_data[rounded_ts]['put_bid_vol'] += vol except Exception as e: print(f"Error processing item: {e}") - # Calculate cumulative values over time + # Calculate cumulative values over time. sorted_ts = sorted(delta_data.keys()) cumulative = { 'net_call_premium': 0, @@ -191,7 +146,7 @@ def get_market_tide(interval_5m=False): } for ts in sorted_ts: - # Update cumulative values + # Update cumulative values. cumulative['net_call_premium'] += delta_data[ts]['cumulative_net_call_premium'] cumulative['net_put_premium'] += delta_data[ts]['cumulative_net_put_premium'] cumulative['call_ask'] += delta_data[ts]['call_ask_vol'] @@ -199,14 +154,13 @@ def get_market_tide(interval_5m=False): cumulative['put_ask'] += delta_data[ts]['put_ask_vol'] cumulative['put_bid'] += delta_data[ts]['put_bid_vol'] - # Calculate derived metrics + # Calculate derived metrics. call_volume = cumulative['call_ask'] + cumulative['call_bid'] put_volume = cumulative['put_ask'] + cumulative['put_bid'] - net_volume = (cumulative['call_ask'] - cumulative['call_bid']) - \ - (cumulative['put_ask'] - cumulative['put_bid']) + net_volume = (cumulative['call_ask'] - cumulative['call_bid']) - (cumulative['put_ask'] - cumulative['put_bid']) res_list.append({ - 'timestamp': ts, + 'time': ts, 'ticker': ticker, 'net_call_premium': cumulative['net_call_premium'], 'net_put_premium': cumulative['net_put_premium'], @@ -215,18 +169,32 @@ def get_market_tide(interval_5m=False): 'net_volume': net_volume }) - res_list.sort(key=lambda x: x['timestamp']) + # Sort the results list by time. + res_list.sort(key=lambda x: x['time']) + # Retrieve price list data (either via asyncio or from file as a fallback). price_list = asyncio.run(get_stock_chart_data('SPY')) if len(price_list) == 0: - with open(f"json/one-day-price/'SPY'.json") as file: + with open("json/one-day-price/SPY.json", "r") as file: price_list = orjson.loads(file.read()) + # Append closing prices to the data. data = add_close_to_data(price_list, res_list) + # Ensure that each minute until 16:10:00 is present in the data. + fields = ['net_call_premium', 'net_put_premium', 'call_volume', 'put_volume', 'net_volume', 'close'] + last_time = datetime.strptime(data[-1]['time'], "%Y-%m-%d %H:%M:%S") + end_time = datetime.strptime("2025-02-05 16:10:00", "%Y-%m-%d %H:%M:%S") - return res_list + while last_time < end_time: + last_time += timedelta(minutes=1) + data.append({ + 'time': last_time.strftime("%Y-%m-%d %H:%M:%S"), + 'ticker': ticker, + **{field: None for field in fields} + }) + return data def get_top_sector_tickers(): keep_elements = ['price', 'ticker', 'name', 'changesPercentage','netPremium','netCallPremium','netPutPremium','gexRatio','gexNetChange','ivRank'] @@ -293,55 +261,51 @@ def get_top_sector_tickers(): def get_top_spy_tickers(): - keep_elements = ['price', 'ticker', 'name', 'changesPercentage','netPremium','netCallPremium','netPutPremium','gexRatio','gexNetChange','ivRank'] + with open(f"json/stocks-list/sp500_constituent.json", "r") as file: + data = orjson.loads(file.read()) - headers = { - "Accept": "application/json, text/plain", - "Authorization": api_key - } - url = "https://api.unusualwhales.com/api/screener/stocks" - - querystring = {"is_s_p_500":"true"} - - - response = requests.get(url, headers=headers, params=querystring) - data = response.json().get('data', []) - - updated_data = [] - for item in data[:10]: + res_list = [] + for item in data: try: - new_item = {key: safe_round(value) for key, value in item.items()} - with open(f"json/quote/{item['ticker']}.json") as file: + symbol = item['symbol'] + with open(f"json/options-stats/companies/{symbol}.json","r") as file: + stats_data = orjson.loads(file.read()) + + new_item = {key: safe_round(value) for key, value in stats_data.items()} + + with open(f"json/quote/{symbol}.json") as file: quote_data = orjson.loads(file.read()) + new_item['symbol'] = symbol new_item['name'] = quote_data['name'] new_item['price'] = round(float(quote_data['price']), 2) new_item['changesPercentage'] = round(float(quote_data['changesPercentage']), 2) - new_item['ivRank'] = round(float(new_item['iv_rank']),2) - new_item['gexRatio'] = new_item['gex_ratio'] - new_item['gexNetChange'] = new_item['gex_net_change'] - new_item['netCallPremium'] = new_item['net_call_premium'] - new_item['netPutPremium'] = new_item['net_put_premium'] - - new_item['netPremium'] = abs(new_item['netCallPremium'] - new_item['netPutPremium']) - # Filter new_item to keep only specified elements - filtered_item = {key: new_item[key] for key in keep_elements if key in new_item} - updated_data.append(filtered_item) - except Exception as e: - print(f"Error processing ticker {item.get('ticker', 'unknown')}: {e}") + if new_item['net_premium']: + res_list.append(new_item) + except: + pass # Add rank to each item - for rank, item in enumerate(updated_data, 1): + res_list = sorted(res_list, key=lambda item: item['net_premium'], reverse=True) + + for rank, item in enumerate(res_list, 1): item['rank'] = rank - return updated_data + return res_list def main(): - + top_sector_tickers = {} + market_tide = get_market_tide() - data = {'marketTide': market_tide} + top_spy_tickers = get_top_spy_tickers() + top_sector_tickers['SPY'] = top_spy_tickers[:10] + + data = {'marketTide': market_tide, 'topSectorTickers': top_sector_tickers} + + if data: + save_json(data) ''' sector_data = get_sector_data() top_sector_tickers = get_top_sector_tickers() @@ -349,8 +313,7 @@ def main(): top_sector_tickers['SPY'] = top_spy_tickers data = {'sectorData': sector_data, 'topSectorTickers': top_sector_tickers, 'marketTide': market_tide} ''' - if len(data) > 0: - save_json(data) + if __name__ == '__main__': diff --git a/app/cron_options_stats.py b/app/cron_options_stats.py index 9d6e929..e5ef5e3 100644 --- a/app/cron_options_stats.py +++ b/app/cron_options_stats.py @@ -105,7 +105,7 @@ async def main(): #changeOI = total_open_interest - previous_open_interest put_call_ratio = round(put_volume/call_volume,2) if call_volume > 0 else 0 - net_premium = net_call_premium + net_put_premium + net_premium = net_call_premium - net_put_premium premium_ratio = [ safe_round(bearish_premium), safe_round(neutral_premium),