From fe5a2b4d6ec84ad7b17fa9d44b247564c60b115d Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Mon, 27 Jan 2025 13:41:50 +0100 Subject: [PATCH] add heatmap cron job --- app/cron_heatmap.py | 225 +++++++++++++++++++++++++++------------- app/cron_market_flow.py | 211 +++++++++++++------------------------ app/main.py | 39 ++----- app/primary_cron_job.py | 6 +- app/test.py | 150 --------------------------- 5 files changed, 236 insertions(+), 395 deletions(-) diff --git a/app/cron_heatmap.py b/app/cron_heatmap.py index 682b7f8..8db3ae0 100755 --- a/app/cron_heatmap.py +++ b/app/cron_heatmap.py @@ -1,74 +1,157 @@ -from mixpanel_utils import MixpanelUtils -import ujson -import asyncio -import aiohttp -from datetime import datetime, timedelta -from collections import Counter, OrderedDict +import pandas as pd +import plotly.express as px +import requests +import orjson -from dotenv import load_dotenv -import os -load_dotenv() -api_key = os.getenv('FMP_API_KEY') +def get_spy_heatmap(): + # Load stock screener data + with open(f"json/stock-screener/data.json", 'rb') as file: + stock_screener_data = orjson.loads(file.read()) + stock_screener_data_dict = {item['symbol']: item for item in stock_screener_data} + + with open(f"json/etf/holding/SPY.json","rb") as file: + data = orjson.loads(file.read())['holdings'] + + for item in data: + try: + item['marketCap'] = stock_screener_data_dict[item['symbol']]['marketCap'] + item['sector'] = stock_screener_data_dict[item['symbol']]['sector'] + item['industry'] = stock_screener_data_dict[item['symbol']]['industry'] + item['change1W'] = stock_screener_data_dict[item['symbol']]['change1W'] + item['change1M'] = stock_screener_data_dict[item['symbol']]['change1M'] + item['change3M'] = stock_screener_data_dict[item['symbol']]['change3M'] + item['change6M'] = stock_screener_data_dict[item['symbol']]['change6M'] + item['change1Y'] = stock_screener_data_dict[item['symbol']]['change1Y'] + item['change3Y'] = stock_screener_data_dict[item['symbol']]['change3Y'] + except: + pass + + # Create DataFrame + df = pd.DataFrame(data) + # Convert relevant columns to numeric types + df["marketCap"] = pd.to_numeric(df["marketCap"]) + + # Drop rows where the marketCap == 0 + df = df[df["marketCap"] > 0] + return df + +def create_treemap(time_period): + save_html = True + + df = get_spy_heatmap() + + if (time_period == '1D'): + change_percent = 'changesPercentage' + range_color = (-3,3) + elif (time_period == '1W'): + change_percent = 'change1W' + range_color = (-5,5) + elif (time_period == '1M'): + change_percent = 'change1M' + range_color = (-20,20) + elif (time_period == '3M'): + change_percent = 'change3M' + range_color = (-30,30) + elif (time_period == '6M'): + change_percent = 'change6M' + range_color = (-50,50) + elif (time_period == '1Y'): + change_percent = 'change6M' + range_color = (-100,100) + elif (time_period == '3Y'): + change_percent = 'change3Y' + range_color = (-100,100) + + color_scale = [ + (0, "#ff2c1c"), # Bright red at -5% + (0.5, "#484454"), # Grey around 0% + (1, "#30dc5c"), # Bright green at 5% + ] + + # Generate the treemap with fixed dimensions + fig = px.treemap( + df, + path=[px.Constant("S&P 500 - Stocknear.com"), "sector", "industry", "symbol"], + values="marketCap", + color=change_percent, + hover_data=[change_percent, "symbol", "marketCap"], + color_continuous_scale=color_scale, + range_color=range_color, + color_continuous_midpoint=0, + width=1200, # Fixed width + height=800 # Fixed height + ) + + # Update layout with fixed dimensions and other settings + fig.update_layout( + margin=dict(t=20, l=0, r=0, b=10), + font=dict(size=13), # Default font size for sectors/industries + coloraxis_colorbar=None, + paper_bgcolor="rgba(0,0,0,0)", + plot_bgcolor="rgba(0,0,0,0)", + autosize=True, # Disable autosize + width=1200, # Fixed width + height=1200 # Fixed height + ) + + templates = { + "root": "%{label}", + "sector": "%{label}", + "industry": "%{label}", + "symbol": "%{customdata[1]}
" + + "%{customdata[0]:.2f}%" + } + + # Update text templates based on the level + fig.data[0].texttemplate = templates["symbol"] # Default template for symbols + + + # Set the text position, border, and ensure the custom font sizes are applied + fig.update_traces( + textposition="middle center", + marker=dict(line=dict(color="black", width=1)), + hoverinfo='skip', + hovertemplate=None, + ) + + # Disable the color bar + fig.update(layout_coloraxis_showscale=False) + + + if save_html: + fixed_html = f""" + + + + + + +
+ {fig.to_html( + include_plotlyjs='cdn', + full_html=False, + config=dict( + displayModeBar=False, + responsive=False + ) + )} +
+ + + """ + with open(f"json/heatmap/{time_period}.html", "w") as f: + f.write(fixed_html) - -async def get_quote_of_stocks(ticker_list): - ticker_str = ','.join(ticker_list) - async with aiohttp.ClientSession() as session: - url = f"https://financialmodelingprep.com/api/v3/quote/{ticker_str}?apikey={api_key}" - async with session.get(url) as response: - if response.status == 200: - return await response.json() - else: - return [] - -async def run(): - index_list = ['sp500', 'nasdaq', 'dowjones'] - for index in index_list: - url = f"https://financialmodelingprep.com/api/v3/{index}_constituent?apikey={api_key}" - res_list = [] - - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - data = await response.json() - - for item in data: - res_list.append({'symbol': item['symbol'], 'sector': item['sector']}) - - ticker_list = [item['symbol'] for item in res_list] - latest_quote = await get_quote_of_stocks(ticker_list) - for quote in latest_quote: - symbol = quote['symbol'] - for item in res_list: - if item['symbol'] == symbol: - item['changesPercentage'] = round(quote['changesPercentage'],2) - item['marketCap'] = quote['marketCap'] - - # Create a dictionary to store sectors and their corresponding symbols and percentages - sector_dict = {} - - for item in res_list: - sector = item['sector'] - symbol = item['symbol'] - percentage = item['changesPercentage'] - marketCap = item['marketCap'] - - if sector not in sector_dict: - sector_dict[sector] = {'name': sector, 'value': 0, 'children': []} - - sector_dict[sector]['value'] += marketCap - sector_dict[sector]['children'].append({'name': symbol, 'value': marketCap, 'changesPercentage': percentage}) - - # Convert the dictionary to a list - result_list = list(sector_dict.values()) - - # Optionally, if you want to add the 'value' for each sector - for sector in result_list: - sector['value'] = round(sector['value'], 2) - #print(result_list) - - with open(f"json/heatmaps/{index}.json", 'w') as file: - ujson.dump(result_list, file) - - -asyncio.run(run()) \ No newline at end of file +if __name__ == "__main__": + for time in ['1D',"1W","1M","3M","6M","1Y","3Y"]: + create_treemap(time_period = time) \ No newline at end of file diff --git a/app/cron_market_flow.py b/app/cron_market_flow.py index a161c6e..2da5fb0 100644 --- a/app/cron_market_flow.py +++ b/app/cron_market_flow.py @@ -10,11 +10,27 @@ import aiohttp import pytz import requests # Add missing import from collections import defaultdict +import intrinio_sdk as intrinio +from intrinio_sdk.rest import ApiException +from GetStartEndDate import GetStartEndDate +from tqdm import tqdm + +import re + load_dotenv() fmp_api_key = os.getenv('FMP_API_KEY') +api_key = os.getenv('INTRINIO_API_KEY') + +intrinio.ApiClient().set_api_key(api_key) +intrinio.ApiClient().allow_retries(True) + + ny_tz = pytz.timezone('America/New_York') +today,_ = GetStartEndDate().run() +today = today.strftime("%Y-%m-%d") + def save_json(data): directory = "json/market-flow" @@ -34,137 +50,48 @@ def add_close_to_data(price_list, data): break # Match found, no need to continue searching return data -def convert_timestamps(data_list): - ny_tz = pytz.timezone('America/New_York') +def parse_contract_data(option_symbol): + # Define regex pattern to match the symbol structure + match = re.match(r"([A-Z]+)(\d{6})([CP])(\d+)", option_symbol) + if not match: + raise ValueError(f"Invalid option_symbol format: {option_symbol}") - for item in data_list: - try: - # Get the timestamp and split on '.' - timestamp = item['timestamp'] - base_time = timestamp.split('.')[0] - - # Handle microseconds if present - if '.' in timestamp: - microseconds = timestamp.split('.')[1].replace('Z', '') - microseconds = microseconds.ljust(6, '0') # Pad with zeros if needed - base_time = f"{base_time}.{microseconds}" - - # Replace 'Z' with '+00:00' (for UTC) - base_time = base_time.replace('Z', '+00:00') - - # Parse the timestamp - dt = datetime.fromisoformat(base_time) - - # Ensure the datetime is timezone-aware (assumed to be UTC initially) - if dt.tzinfo is None: - dt = pytz.utc.localize(dt) - - # Convert the time to New York timezone (automatically handles DST) - ny_time = dt.astimezone(ny_tz) - - # Optionally, format to include date and time - item['timestamp'] = ny_time.strftime('%Y-%m-%d %H:%M:%S') - - except ValueError as e: - raise ValueError(f"Invalid timestamp format: {item['timestamp']} - Error: {str(e)}") - except Exception as e: - raise Exception(f"Error processing timestamp: {item['timestamp']} - Error: {str(e)}") + ticker, expiration, option_type, strike_price = match.groups() - return data_list - - -def safe_round(value): - """Attempt to convert a value to float and round it. Return the original value if not possible.""" - try: - return round(float(value), 2) - except (ValueError, TypeError): - return value - -def calculate_neutral_premium(data_item): - """Calculate the neutral premium for a data item.""" - call_premium = float(data_item['call_premium']) - put_premium = float(data_item['put_premium']) - bearish_premium = float(data_item['bearish_premium']) - bullish_premium = float(data_item['bullish_premium']) + return option_type - total_premiums = bearish_premium + bullish_premium - observed_premiums = call_premium + put_premium - neutral_premium = observed_premiums - total_premiums + +async def get_intrinio_data(ticker): + url=f"https://api-v2.intrinio.com/options/unusual_activity/{ticker}/intraday?page_size=1000&api_key={api_key}" + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + data = await response.json() - return safe_round(neutral_premium) - -def generate_time_intervals(start_time, end_time): - """Generate 1-minute intervals from start_time to end_time.""" - intervals = [] - current_time = start_time - while current_time <= end_time: - intervals.append(current_time.strftime('%Y-%m-%d %H:%M:%S')) - current_time += timedelta(minutes=1) - return intervals - -def get_sector_data(): - try: - url = "https://api.unusualwhales.com/api/market/sector-etfs" - response = requests.get(url, headers=headers) - data = response.json().get('data', []) + data = data.get('trades',[]) + if data: res_list = [] - processed_data = [] - - for item in data: - symbol = item['ticker'] + try: + iso_timestamp = item['timestamp'].replace('Z', '+00:00') + # Parse timestamp and convert to New York time + timestamp = datetime.fromisoformat(iso_timestamp).astimezone(ny_tz) + formatted_time = timestamp.strftime('%Y-%m-%d %H:%M:%S') + put_call = parse_contract_data(item['contract'].replace("___","").replace("__","").replace("_",'')) + if put_call == 'C': + put_call = 'calls' + else: + put_call = 'puts' - bearish_premium = float(item['bearish_premium']) - bullish_premium = float(item['bullish_premium']) - neutral_premium = calculate_neutral_premium(item) - - # Step 1: Replace 'full_name' with 'name' if needed - new_item = { - 'name' if key == 'full_name' else key: safe_round(value) - for key, value in item.items() - if key != 'in_out_flow' - } - - # Step 2: Replace 'name' values - if str(new_item.get('name')) == 'Consumer Staples': - new_item['name'] = 'Consumer Defensive' - elif str(new_item.get('name')) == 'Consumer Discretionary': - new_item['name'] = 'Consumer Cyclical' - elif str(new_item.get('name')) == 'Health Care': - new_item['name'] = 'Healthcare' - elif str(new_item.get('name')) == 'Financials': - new_item['name'] = 'Financial Services' - elif str(new_item.get('name')) == 'Materials': - new_item['name'] = 'Basic Materials' + res_list.append({'timestamp': formatted_time, 'put_call': put_call, 'cost_basis': item['total_value'], 'volume': item['total_size'], 'sentiment': item['sentiment']}) + except: + pass - new_item['premium_ratio'] = [ - safe_round(bearish_premium), - neutral_premium, - safe_round(bullish_premium) - ] - - with open(f"json/quote/{symbol}.json") as file: - quote_data = orjson.loads(file.read()) - new_item['price'] = round(quote_data.get('price', 0), 2) - new_item['changesPercentage'] = round(quote_data.get('changesPercentage', 0), 2) - - #get prem tick data: - ''' - if symbol != 'SPY': - prem_tick_history = get_etf_tide(symbol) - #if symbol == 'XLB': - # print(prem_tick_history[10]) - - new_item['premTickHistory'] = prem_tick_history - ''' - - processed_data.append(new_item) - - return processed_data - except Exception as e: - print(e) + res_list.sort(key=lambda x: x['timestamp']) + return res_list + else: return [] + async def get_stock_chart_data(ticker): start_date_1d, end_date_1d = GetStartEndDate().run() start_date = start_date_1d.strftime("%Y-%m-%d") @@ -184,31 +111,39 @@ async def get_stock_chart_data(ticker): def get_market_tide(interval_5m=False): - ticker_list = ['SPY'] + with open(f"json/stocks-list/sp500_constituent.json","r") as file: + ticker_list = orjson.loads(file.read()) + ticker_list = [item['symbol'] for item in ticker_list][:10] + + res_list = [] - for ticker in ticker_list: + # Track changes per interval + delta_data = defaultdict(lambda: { + 'cumulative_net_call_premium': 0, + 'cumulative_net_put_premium': 0, + 'call_ask_vol': 0, + 'call_bid_vol': 0, + 'put_ask_vol': 0, + 'put_bid_vol': 0 + }) + + for ticker in tqdm(['SPY']): + ''' with open("json/options-flow/feed/data.json", "r") as file: data = orjson.loads(file.read()) + ''' + data = asyncio.run(get_intrinio_data(ticker)) + - # Filter and sort data - ticker_options = [item for item in data if item['ticker'] == ticker] - ticker_options.sort(key=lambda x: x['time']) + ticker_options = [item for item in data if item['timestamp'].startswith(today)] + ticker_options.sort(key=lambda x: x['timestamp']) - # Track changes per interval - delta_data = defaultdict(lambda: { - 'cumulative_net_call_premium': 0, - 'cumulative_net_put_premium': 0, - 'call_ask_vol': 0, - 'call_bid_vol': 0, - 'put_ask_vol': 0, - 'put_bid_vol': 0 - }) for item in ticker_options: try: # Parse and standardize timestamp - dt = datetime.strptime(f"{item['date']} {item['time']}", "%Y-%m-%d %H:%M:%S") + dt = datetime.strptime(f"{item['timestamp']}", "%Y-%m-%d %H:%M:%S") # Truncate to start of minute (for 1m summaries) dt = dt.replace(second=0, microsecond=0) @@ -282,9 +217,9 @@ def get_market_tide(interval_5m=False): res_list.sort(key=lambda x: x['timestamp']) - price_list = asyncio.run(get_stock_chart_data(ticker)) + price_list = asyncio.run(get_stock_chart_data('SPY')) if len(price_list) == 0: - with open(f"json/one-day-price/{ticker}.json") as file: + with open(f"json/one-day-price/'SPY'.json") as file: price_list = orjson.loads(file.read()) data = add_close_to_data(price_list, res_list) diff --git a/app/main.py b/app/main.py index 7e2a6ef..8b374dd 100755 --- a/app/main.py +++ b/app/main.py @@ -2568,36 +2568,12 @@ async def get_ipo_calendar(data:IPOData, api_key: str = Security(get_api_key)): headers={"Content-Encoding": "gzip"} ) -@app.get("/trending") -async def get_trending(api_key: str = Security(get_api_key)): - cache_key = f"get-trending" - cached_result = redis_client.get(cache_key) - if cached_result: - return StreamingResponse( - io.BytesIO(cached_result), - media_type="application/json", - headers={"Content-Encoding": "gzip"}) - try: - with open(f"json/trending/data.json", 'rb') as file: - res = orjson.loads(file.read()) - except: - res = [] - - data = orjson.dumps(res) - compressed_data = gzip.compress(data) - redis_client.set(cache_key, compressed_data) - redis_client.expire(cache_key, 60*15) # Set cache expiration time to 1 day - - return StreamingResponse( - io.BytesIO(compressed_data), - media_type="application/json", - headers={"Content-Encoding": "gzip"} - ) - -@app.get("/heatmap") -async def get_heatmap(api_key: str = Security(get_api_key)): - cache_key = "heatmap" +@app.post("/heatmap") +async def get_heatmap(data: GeneralData, api_key: str = Security(get_api_key)): + print(data) + time_period = data.params + cache_key = f"heatmap-{time_period}" cached_result = redis_client.get(cache_key) if cached_result: @@ -2608,7 +2584,7 @@ async def get_heatmap(api_key: str = Security(get_api_key)): ) try: - with open("json/heatmap/data.html", 'r', encoding='utf-8') as file: + with open(f"json/heatmap/{time_period}.html", 'r', encoding='utf-8') as file: html_content = file.read() except FileNotFoundError: raise HTTPException(status_code=404, detail="Heatmap file not found") @@ -2627,10 +2603,11 @@ async def get_heatmap(api_key: str = Security(get_api_key)): media_type="text/html", headers={ "Content-Encoding": "gzip", - "Cache-Control": "public, max-age=300" # 5 minutes cache } ) + + @app.post("/pre-post-quote") async def get_pre_post_quote(data:TickerData, api_key: str = Security(get_api_key)): ticker = data.ticker.upper() diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index f0aa2fa..2b487d2 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -187,10 +187,6 @@ def run_press_releases(): if week <= 4: run_command(["python3", "cron_press_releases.py"]) -def run_cron_heatmap(): - run_command(["python3", "cron_heatmap.py"]) - - def run_cron_options_flow(): week = datetime.today().weekday() current_time = datetime.now().time() @@ -300,6 +296,7 @@ def run_list(): week = datetime.today().weekday() if week <= 5: run_command(["python3", "cron_list.py"]) + run_command(["python3", "cron_heatmap.py"]) def run_financial_statements(): @@ -406,7 +403,6 @@ schedule.every(30).minutes.do(run_threaded, run_cron_market_news).tag('market_ne schedule.every(30).minutes.do(run_threaded, run_cron_industry).tag('industry_job') schedule.every(8).minutes.do(run_threaded, run_one_day_price).tag('one_day_price_job') -#schedule.every(15).minutes.do(run_threaded, run_cron_heatmap).tag('heatmap_job') schedule.every(20).minutes.do(run_threaded, run_tracker).tag('tracker_job') diff --git a/app/test.py b/app/test.py index b1e7cc6..e69de29 100644 --- a/app/test.py +++ b/app/test.py @@ -1,150 +0,0 @@ -from __future__ import print_function -import asyncio -import time -import intrinio_sdk as intrinio -from intrinio_sdk.rest import ApiException -from datetime import datetime, timedelta -import ast -import orjson -from tqdm import tqdm -import aiohttp -from concurrent.futures import ThreadPoolExecutor -import sqlite3 -import re - -from dotenv import load_dotenv -import os - -load_dotenv() - -api_key = os.getenv('INTRINIO_API_KEY') - -intrinio.ApiClient().set_api_key(api_key) -# intrinio.ApiClient().allow_retries(True) - -today = datetime.today() -start_date = (today - timedelta(150)).strftime("%Y-%m-%d") -end_date = (today + timedelta(30)).strftime("%Y-%m-%d") - -next_page = '' -page_size = 1000 -activity_type = '' -sentiment = '' -minimum_total_value = 0 -maximum_total_value = 2E10 - -# Database connection and symbol retrieval -def get_total_symbols(): - with sqlite3.connect('stocks.db') as con: - cursor = con.cursor() - cursor.execute("PRAGMA journal_mode = wal") - cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") - stocks_symbols = [row[0] for row in cursor.fetchall()] - - with sqlite3.connect('etf.db') as etf_con: - etf_cursor = etf_con.cursor() - etf_cursor.execute("PRAGMA journal_mode = wal") - etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") - etf_symbols = [row[0] for row in etf_cursor.fetchall()] - - return stocks_symbols + etf_symbols - - -def get_tickers_from_directory(): - directory = "json/options-historical-data/companies" - try: - # Ensure the directory exists - if not os.path.exists(directory): - raise FileNotFoundError(f"The directory '{directory}' does not exist.") - - # Get all tickers from filenames - return [file.replace(".json", "") for file in os.listdir(directory) if file.endswith(".json")] - - except Exception as e: - print(f"An error occurred: {e}") - return [] - - -async def save_json(data, symbol): - directory = "json/unusual-activity" - os.makedirs(directory, exist_ok=True) - with open(f"{directory}/{symbol}.json", 'wb') as file: - file.write(orjson.dumps(data)) - - -def parse_option_symbol(option_symbol): - # Define regex pattern to match the symbol structure - match = re.match(r"([A-Z]+)(\d{6})([CP])(\d+)", option_symbol) - if not match: - raise ValueError(f"Invalid option_symbol format: {option_symbol}") - - ticker, expiration, option_type, strike_price = match.groups() - - # Convert expiration to datetime - date_expiration = datetime.strptime(expiration, "%y%m%d").date() - - # Convert strike price to float - strike_price = int(strike_price) / 1000 - - return date_expiration, option_type, strike_price - - -async def get_data(symbol): - response = intrinio.OptionsApi().get_unusual_activity_intraday( - symbol, - next_page=next_page, - page_size=page_size, - activity_type=activity_type, - sentiment=sentiment, - start_date=start_date, - end_date=end_date, - minimum_total_value=minimum_total_value, - maximum_total_value=maximum_total_value - ) - data = (response.__dict__['_trades']) - res_list = [] - if len(data) > 0: - for item in data: - try: - trade_data = item.__dict__ - trade_data = {key.lstrip('_'): value for key, value in trade_data.items()} - option_symbol = trade_data['contract'].replace("___", "").replace("__", "").replace("_", "") - date_expiration, option_type, strike_price = parse_option_symbol(option_symbol) - - res_list.append({ - 'date': trade_data['timestamp'].strftime("%Y-%m-%d"), - 'askprice': trade_data['ask_at_execution'], - 'bidPrice': trade_data['bid_at_execution'], - 'premium': trade_data['total_value'], - 'sentiment': trade_data['sentiment'].capitalize(), - 'avgPrice': trade_data['average_price'], - 'price': trade_data['underlying_price_at_execution'], - 'unusualType': trade_data['type'].capitalize(), - 'size': trade_data['total_size'], - 'optionSymbol': option_symbol, - 'strike': strike_price, - 'expiry': date_expiration.strftime("%Y-%m-%d"), - 'optionType': option_type.replace("P", "Put").replace("C", "Call") - }) - except Exception as e: - print(e) - - res_list = sorted(res_list, key=lambda x: x['date'], reverse=True) - res_list = [item for item in res_list if item['date'] == '2025-01-24'] - print(len(res_list)) - - -async def main(): - total_symbols = get_tickers_from_directory() - if len(total_symbols) < 3000: - total_symbols = get_total_symbols() - - for symbol in tqdm(['XLU']): - try: - data = await get_data(symbol) - except Exception as e: - print(f"Error processing {symbol}: {e}") - - -if __name__ == "__main__": - asyncio.run(main())