add heatmap cron job

This commit is contained in:
MuslemRahimi 2025-01-27 13:41:50 +01:00
parent 37917fd039
commit fe5a2b4d6e
5 changed files with 236 additions and 395 deletions

View File

@ -1,74 +1,157 @@
from mixpanel_utils import MixpanelUtils
import ujson
import asyncio
import aiohttp
from datetime import datetime, timedelta
from collections import Counter, OrderedDict
import pandas as pd
import plotly.express as px
import requests
import orjson
from dotenv import load_dotenv
import os
load_dotenv()
api_key = os.getenv('FMP_API_KEY')
def get_spy_heatmap():
# Load stock screener data
with open(f"json/stock-screener/data.json", 'rb') as file:
stock_screener_data = orjson.loads(file.read())
stock_screener_data_dict = {item['symbol']: item for item in stock_screener_data}
with open(f"json/etf/holding/SPY.json","rb") as file:
data = orjson.loads(file.read())['holdings']
for item in data:
try:
item['marketCap'] = stock_screener_data_dict[item['symbol']]['marketCap']
item['sector'] = stock_screener_data_dict[item['symbol']]['sector']
item['industry'] = stock_screener_data_dict[item['symbol']]['industry']
item['change1W'] = stock_screener_data_dict[item['symbol']]['change1W']
item['change1M'] = stock_screener_data_dict[item['symbol']]['change1M']
item['change3M'] = stock_screener_data_dict[item['symbol']]['change3M']
item['change6M'] = stock_screener_data_dict[item['symbol']]['change6M']
item['change1Y'] = stock_screener_data_dict[item['symbol']]['change1Y']
item['change3Y'] = stock_screener_data_dict[item['symbol']]['change3Y']
except:
pass
# Create DataFrame
df = pd.DataFrame(data)
# Convert relevant columns to numeric types
df["marketCap"] = pd.to_numeric(df["marketCap"])
# Drop rows where the marketCap == 0
df = df[df["marketCap"] > 0]
return df
def create_treemap(time_period):
save_html = True
df = get_spy_heatmap()
if (time_period == '1D'):
change_percent = 'changesPercentage'
range_color = (-3,3)
elif (time_period == '1W'):
change_percent = 'change1W'
range_color = (-5,5)
elif (time_period == '1M'):
change_percent = 'change1M'
range_color = (-20,20)
elif (time_period == '3M'):
change_percent = 'change3M'
range_color = (-30,30)
elif (time_period == '6M'):
change_percent = 'change6M'
range_color = (-50,50)
elif (time_period == '1Y'):
change_percent = 'change6M'
range_color = (-100,100)
elif (time_period == '3Y'):
change_percent = 'change3Y'
range_color = (-100,100)
color_scale = [
(0, "#ff2c1c"), # Bright red at -5%
(0.5, "#484454"), # Grey around 0%
(1, "#30dc5c"), # Bright green at 5%
]
# Generate the treemap with fixed dimensions
fig = px.treemap(
df,
path=[px.Constant("S&P 500 - Stocknear.com"), "sector", "industry", "symbol"],
values="marketCap",
color=change_percent,
hover_data=[change_percent, "symbol", "marketCap"],
color_continuous_scale=color_scale,
range_color=range_color,
color_continuous_midpoint=0,
width=1200, # Fixed width
height=800 # Fixed height
)
# Update layout with fixed dimensions and other settings
fig.update_layout(
margin=dict(t=20, l=0, r=0, b=10),
font=dict(size=13), # Default font size for sectors/industries
coloraxis_colorbar=None,
paper_bgcolor="rgba(0,0,0,0)",
plot_bgcolor="rgba(0,0,0,0)",
autosize=True, # Disable autosize
width=1200, # Fixed width
height=1200 # Fixed height
)
templates = {
"root": "<span style='font-size:24px; color: white !important;'><b>%{label}</b></span>",
"sector": "<span style='font-size:22px; color: white !important;'><b>%{label}</b></span>",
"industry": "<span style='font-size:20px; color: white !important;'><b>%{label}</b></span>",
"symbol": "<span style='font-size:20px; color: white !important;'><b>%{customdata[1]}</b></span><br>" +
"<span style='font-size:18px; color: white !important;'>%{customdata[0]:.2f}%</span>"
}
# Update text templates based on the level
fig.data[0].texttemplate = templates["symbol"] # Default template for symbols
# Set the text position, border, and ensure the custom font sizes are applied
fig.update_traces(
textposition="middle center",
marker=dict(line=dict(color="black", width=1)),
hoverinfo='skip',
hovertemplate=None,
)
# Disable the color bar
fig.update(layout_coloraxis_showscale=False)
if save_html:
fixed_html = f"""
<!DOCTYPE html>
<html>
<head>
<style>
.container {{
background-color: #09090B;
}}
.plot-container {{
width: 1200px;
height: 1200px;
}}
</style>
</head>
<body class="container">
<div class="plot-container">
{fig.to_html(
include_plotlyjs='cdn',
full_html=False,
config=dict(
displayModeBar=False,
responsive=False
)
)}
</div>
</body>
</html>
"""
with open(f"json/heatmap/{time_period}.html", "w") as f:
f.write(fixed_html)
async def get_quote_of_stocks(ticker_list):
ticker_str = ','.join(ticker_list)
async with aiohttp.ClientSession() as session:
url = f"https://financialmodelingprep.com/api/v3/quote/{ticker_str}?apikey={api_key}"
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
return []
async def run():
index_list = ['sp500', 'nasdaq', 'dowjones']
for index in index_list:
url = f"https://financialmodelingprep.com/api/v3/{index}_constituent?apikey={api_key}"
res_list = []
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
for item in data:
res_list.append({'symbol': item['symbol'], 'sector': item['sector']})
ticker_list = [item['symbol'] for item in res_list]
latest_quote = await get_quote_of_stocks(ticker_list)
for quote in latest_quote:
symbol = quote['symbol']
for item in res_list:
if item['symbol'] == symbol:
item['changesPercentage'] = round(quote['changesPercentage'],2)
item['marketCap'] = quote['marketCap']
# Create a dictionary to store sectors and their corresponding symbols and percentages
sector_dict = {}
for item in res_list:
sector = item['sector']
symbol = item['symbol']
percentage = item['changesPercentage']
marketCap = item['marketCap']
if sector not in sector_dict:
sector_dict[sector] = {'name': sector, 'value': 0, 'children': []}
sector_dict[sector]['value'] += marketCap
sector_dict[sector]['children'].append({'name': symbol, 'value': marketCap, 'changesPercentage': percentage})
# Convert the dictionary to a list
result_list = list(sector_dict.values())
# Optionally, if you want to add the 'value' for each sector
for sector in result_list:
sector['value'] = round(sector['value'], 2)
#print(result_list)
with open(f"json/heatmaps/{index}.json", 'w') as file:
ujson.dump(result_list, file)
asyncio.run(run())
if __name__ == "__main__":
for time in ['1D',"1W","1M","3M","6M","1Y","3Y"]:
create_treemap(time_period = time)

View File

@ -10,11 +10,27 @@ import aiohttp
import pytz
import requests # Add missing import
from collections import defaultdict
import intrinio_sdk as intrinio
from intrinio_sdk.rest import ApiException
from GetStartEndDate import GetStartEndDate
from tqdm import tqdm
import re
load_dotenv()
fmp_api_key = os.getenv('FMP_API_KEY')
api_key = os.getenv('INTRINIO_API_KEY')
intrinio.ApiClient().set_api_key(api_key)
intrinio.ApiClient().allow_retries(True)
ny_tz = pytz.timezone('America/New_York')
today,_ = GetStartEndDate().run()
today = today.strftime("%Y-%m-%d")
def save_json(data):
directory = "json/market-flow"
@ -34,137 +50,48 @@ def add_close_to_data(price_list, data):
break # Match found, no need to continue searching
return data
def convert_timestamps(data_list):
ny_tz = pytz.timezone('America/New_York')
def parse_contract_data(option_symbol):
# Define regex pattern to match the symbol structure
match = re.match(r"([A-Z]+)(\d{6})([CP])(\d+)", option_symbol)
if not match:
raise ValueError(f"Invalid option_symbol format: {option_symbol}")
for item in data_list:
try:
# Get the timestamp and split on '.'
timestamp = item['timestamp']
base_time = timestamp.split('.')[0]
# Handle microseconds if present
if '.' in timestamp:
microseconds = timestamp.split('.')[1].replace('Z', '')
microseconds = microseconds.ljust(6, '0') # Pad with zeros if needed
base_time = f"{base_time}.{microseconds}"
# Replace 'Z' with '+00:00' (for UTC)
base_time = base_time.replace('Z', '+00:00')
# Parse the timestamp
dt = datetime.fromisoformat(base_time)
# Ensure the datetime is timezone-aware (assumed to be UTC initially)
if dt.tzinfo is None:
dt = pytz.utc.localize(dt)
# Convert the time to New York timezone (automatically handles DST)
ny_time = dt.astimezone(ny_tz)
# Optionally, format to include date and time
item['timestamp'] = ny_time.strftime('%Y-%m-%d %H:%M:%S')
except ValueError as e:
raise ValueError(f"Invalid timestamp format: {item['timestamp']} - Error: {str(e)}")
except Exception as e:
raise Exception(f"Error processing timestamp: {item['timestamp']} - Error: {str(e)}")
ticker, expiration, option_type, strike_price = match.groups()
return data_list
def safe_round(value):
"""Attempt to convert a value to float and round it. Return the original value if not possible."""
try:
return round(float(value), 2)
except (ValueError, TypeError):
return value
def calculate_neutral_premium(data_item):
"""Calculate the neutral premium for a data item."""
call_premium = float(data_item['call_premium'])
put_premium = float(data_item['put_premium'])
bearish_premium = float(data_item['bearish_premium'])
bullish_premium = float(data_item['bullish_premium'])
return option_type
total_premiums = bearish_premium + bullish_premium
observed_premiums = call_premium + put_premium
neutral_premium = observed_premiums - total_premiums
async def get_intrinio_data(ticker):
url=f"https://api-v2.intrinio.com/options/unusual_activity/{ticker}/intraday?page_size=1000&api_key={api_key}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
return safe_round(neutral_premium)
def generate_time_intervals(start_time, end_time):
"""Generate 1-minute intervals from start_time to end_time."""
intervals = []
current_time = start_time
while current_time <= end_time:
intervals.append(current_time.strftime('%Y-%m-%d %H:%M:%S'))
current_time += timedelta(minutes=1)
return intervals
def get_sector_data():
try:
url = "https://api.unusualwhales.com/api/market/sector-etfs"
response = requests.get(url, headers=headers)
data = response.json().get('data', [])
data = data.get('trades',[])
if data:
res_list = []
processed_data = []
for item in data:
symbol = item['ticker']
try:
iso_timestamp = item['timestamp'].replace('Z', '+00:00')
# Parse timestamp and convert to New York time
timestamp = datetime.fromisoformat(iso_timestamp).astimezone(ny_tz)
formatted_time = timestamp.strftime('%Y-%m-%d %H:%M:%S')
put_call = parse_contract_data(item['contract'].replace("___","").replace("__","").replace("_",''))
if put_call == 'C':
put_call = 'calls'
else:
put_call = 'puts'
bearish_premium = float(item['bearish_premium'])
bullish_premium = float(item['bullish_premium'])
neutral_premium = calculate_neutral_premium(item)
# Step 1: Replace 'full_name' with 'name' if needed
new_item = {
'name' if key == 'full_name' else key: safe_round(value)
for key, value in item.items()
if key != 'in_out_flow'
}
# Step 2: Replace 'name' values
if str(new_item.get('name')) == 'Consumer Staples':
new_item['name'] = 'Consumer Defensive'
elif str(new_item.get('name')) == 'Consumer Discretionary':
new_item['name'] = 'Consumer Cyclical'
elif str(new_item.get('name')) == 'Health Care':
new_item['name'] = 'Healthcare'
elif str(new_item.get('name')) == 'Financials':
new_item['name'] = 'Financial Services'
elif str(new_item.get('name')) == 'Materials':
new_item['name'] = 'Basic Materials'
res_list.append({'timestamp': formatted_time, 'put_call': put_call, 'cost_basis': item['total_value'], 'volume': item['total_size'], 'sentiment': item['sentiment']})
except:
pass
new_item['premium_ratio'] = [
safe_round(bearish_premium),
neutral_premium,
safe_round(bullish_premium)
]
with open(f"json/quote/{symbol}.json") as file:
quote_data = orjson.loads(file.read())
new_item['price'] = round(quote_data.get('price', 0), 2)
new_item['changesPercentage'] = round(quote_data.get('changesPercentage', 0), 2)
#get prem tick data:
'''
if symbol != 'SPY':
prem_tick_history = get_etf_tide(symbol)
#if symbol == 'XLB':
# print(prem_tick_history[10])
new_item['premTickHistory'] = prem_tick_history
'''
processed_data.append(new_item)
return processed_data
except Exception as e:
print(e)
res_list.sort(key=lambda x: x['timestamp'])
return res_list
else:
return []
async def get_stock_chart_data(ticker):
start_date_1d, end_date_1d = GetStartEndDate().run()
start_date = start_date_1d.strftime("%Y-%m-%d")
@ -184,31 +111,39 @@ async def get_stock_chart_data(ticker):
def get_market_tide(interval_5m=False):
ticker_list = ['SPY']
with open(f"json/stocks-list/sp500_constituent.json","r") as file:
ticker_list = orjson.loads(file.read())
ticker_list = [item['symbol'] for item in ticker_list][:10]
res_list = []
for ticker in ticker_list:
# Track changes per interval
delta_data = defaultdict(lambda: {
'cumulative_net_call_premium': 0,
'cumulative_net_put_premium': 0,
'call_ask_vol': 0,
'call_bid_vol': 0,
'put_ask_vol': 0,
'put_bid_vol': 0
})
for ticker in tqdm(['SPY']):
'''
with open("json/options-flow/feed/data.json", "r") as file:
data = orjson.loads(file.read())
'''
data = asyncio.run(get_intrinio_data(ticker))
# Filter and sort data
ticker_options = [item for item in data if item['ticker'] == ticker]
ticker_options.sort(key=lambda x: x['time'])
ticker_options = [item for item in data if item['timestamp'].startswith(today)]
ticker_options.sort(key=lambda x: x['timestamp'])
# Track changes per interval
delta_data = defaultdict(lambda: {
'cumulative_net_call_premium': 0,
'cumulative_net_put_premium': 0,
'call_ask_vol': 0,
'call_bid_vol': 0,
'put_ask_vol': 0,
'put_bid_vol': 0
})
for item in ticker_options:
try:
# Parse and standardize timestamp
dt = datetime.strptime(f"{item['date']} {item['time']}", "%Y-%m-%d %H:%M:%S")
dt = datetime.strptime(f"{item['timestamp']}", "%Y-%m-%d %H:%M:%S")
# Truncate to start of minute (for 1m summaries)
dt = dt.replace(second=0, microsecond=0)
@ -282,9 +217,9 @@ def get_market_tide(interval_5m=False):
res_list.sort(key=lambda x: x['timestamp'])
price_list = asyncio.run(get_stock_chart_data(ticker))
price_list = asyncio.run(get_stock_chart_data('SPY'))
if len(price_list) == 0:
with open(f"json/one-day-price/{ticker}.json") as file:
with open(f"json/one-day-price/'SPY'.json") as file:
price_list = orjson.loads(file.read())
data = add_close_to_data(price_list, res_list)

View File

@ -2568,36 +2568,12 @@ async def get_ipo_calendar(data:IPOData, api_key: str = Security(get_api_key)):
headers={"Content-Encoding": "gzip"}
)
@app.get("/trending")
async def get_trending(api_key: str = Security(get_api_key)):
cache_key = f"get-trending"
cached_result = redis_client.get(cache_key)
if cached_result:
return StreamingResponse(
io.BytesIO(cached_result),
media_type="application/json",
headers={"Content-Encoding": "gzip"})
try:
with open(f"json/trending/data.json", 'rb') as file:
res = orjson.loads(file.read())
except:
res = []
data = orjson.dumps(res)
compressed_data = gzip.compress(data)
redis_client.set(cache_key, compressed_data)
redis_client.expire(cache_key, 60*15) # Set cache expiration time to 1 day
return StreamingResponse(
io.BytesIO(compressed_data),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)
@app.get("/heatmap")
async def get_heatmap(api_key: str = Security(get_api_key)):
cache_key = "heatmap"
@app.post("/heatmap")
async def get_heatmap(data: GeneralData, api_key: str = Security(get_api_key)):
print(data)
time_period = data.params
cache_key = f"heatmap-{time_period}"
cached_result = redis_client.get(cache_key)
if cached_result:
@ -2608,7 +2584,7 @@ async def get_heatmap(api_key: str = Security(get_api_key)):
)
try:
with open("json/heatmap/data.html", 'r', encoding='utf-8') as file:
with open(f"json/heatmap/{time_period}.html", 'r', encoding='utf-8') as file:
html_content = file.read()
except FileNotFoundError:
raise HTTPException(status_code=404, detail="Heatmap file not found")
@ -2627,10 +2603,11 @@ async def get_heatmap(api_key: str = Security(get_api_key)):
media_type="text/html",
headers={
"Content-Encoding": "gzip",
"Cache-Control": "public, max-age=300" # 5 minutes cache
}
)
@app.post("/pre-post-quote")
async def get_pre_post_quote(data:TickerData, api_key: str = Security(get_api_key)):
ticker = data.ticker.upper()

View File

@ -187,10 +187,6 @@ def run_press_releases():
if week <= 4:
run_command(["python3", "cron_press_releases.py"])
def run_cron_heatmap():
run_command(["python3", "cron_heatmap.py"])
def run_cron_options_flow():
week = datetime.today().weekday()
current_time = datetime.now().time()
@ -300,6 +296,7 @@ def run_list():
week = datetime.today().weekday()
if week <= 5:
run_command(["python3", "cron_list.py"])
run_command(["python3", "cron_heatmap.py"])
def run_financial_statements():
@ -406,7 +403,6 @@ schedule.every(30).minutes.do(run_threaded, run_cron_market_news).tag('market_ne
schedule.every(30).minutes.do(run_threaded, run_cron_industry).tag('industry_job')
schedule.every(8).minutes.do(run_threaded, run_one_day_price).tag('one_day_price_job')
#schedule.every(15).minutes.do(run_threaded, run_cron_heatmap).tag('heatmap_job')
schedule.every(20).minutes.do(run_threaded, run_tracker).tag('tracker_job')

View File

@ -1,150 +0,0 @@
from __future__ import print_function
import asyncio
import time
import intrinio_sdk as intrinio
from intrinio_sdk.rest import ApiException
from datetime import datetime, timedelta
import ast
import orjson
from tqdm import tqdm
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import sqlite3
import re
from dotenv import load_dotenv
import os
load_dotenv()
api_key = os.getenv('INTRINIO_API_KEY')
intrinio.ApiClient().set_api_key(api_key)
# intrinio.ApiClient().allow_retries(True)
today = datetime.today()
start_date = (today - timedelta(150)).strftime("%Y-%m-%d")
end_date = (today + timedelta(30)).strftime("%Y-%m-%d")
next_page = ''
page_size = 1000
activity_type = ''
sentiment = ''
minimum_total_value = 0
maximum_total_value = 2E10
# Database connection and symbol retrieval
def get_total_symbols():
with sqlite3.connect('stocks.db') as con:
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
stocks_symbols = [row[0] for row in cursor.fetchall()]
with sqlite3.connect('etf.db') as etf_con:
etf_cursor = etf_con.cursor()
etf_cursor.execute("PRAGMA journal_mode = wal")
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
return stocks_symbols + etf_symbols
def get_tickers_from_directory():
directory = "json/options-historical-data/companies"
try:
# Ensure the directory exists
if not os.path.exists(directory):
raise FileNotFoundError(f"The directory '{directory}' does not exist.")
# Get all tickers from filenames
return [file.replace(".json", "") for file in os.listdir(directory) if file.endswith(".json")]
except Exception as e:
print(f"An error occurred: {e}")
return []
async def save_json(data, symbol):
directory = "json/unusual-activity"
os.makedirs(directory, exist_ok=True)
with open(f"{directory}/{symbol}.json", 'wb') as file:
file.write(orjson.dumps(data))
def parse_option_symbol(option_symbol):
# Define regex pattern to match the symbol structure
match = re.match(r"([A-Z]+)(\d{6})([CP])(\d+)", option_symbol)
if not match:
raise ValueError(f"Invalid option_symbol format: {option_symbol}")
ticker, expiration, option_type, strike_price = match.groups()
# Convert expiration to datetime
date_expiration = datetime.strptime(expiration, "%y%m%d").date()
# Convert strike price to float
strike_price = int(strike_price) / 1000
return date_expiration, option_type, strike_price
async def get_data(symbol):
response = intrinio.OptionsApi().get_unusual_activity_intraday(
symbol,
next_page=next_page,
page_size=page_size,
activity_type=activity_type,
sentiment=sentiment,
start_date=start_date,
end_date=end_date,
minimum_total_value=minimum_total_value,
maximum_total_value=maximum_total_value
)
data = (response.__dict__['_trades'])
res_list = []
if len(data) > 0:
for item in data:
try:
trade_data = item.__dict__
trade_data = {key.lstrip('_'): value for key, value in trade_data.items()}
option_symbol = trade_data['contract'].replace("___", "").replace("__", "").replace("_", "")
date_expiration, option_type, strike_price = parse_option_symbol(option_symbol)
res_list.append({
'date': trade_data['timestamp'].strftime("%Y-%m-%d"),
'askprice': trade_data['ask_at_execution'],
'bidPrice': trade_data['bid_at_execution'],
'premium': trade_data['total_value'],
'sentiment': trade_data['sentiment'].capitalize(),
'avgPrice': trade_data['average_price'],
'price': trade_data['underlying_price_at_execution'],
'unusualType': trade_data['type'].capitalize(),
'size': trade_data['total_size'],
'optionSymbol': option_symbol,
'strike': strike_price,
'expiry': date_expiration.strftime("%Y-%m-%d"),
'optionType': option_type.replace("P", "Put").replace("C", "Call")
})
except Exception as e:
print(e)
res_list = sorted(res_list, key=lambda x: x['date'], reverse=True)
res_list = [item for item in res_list if item['date'] == '2025-01-24']
print(len(res_list))
async def main():
total_symbols = get_tickers_from_directory()
if len(total_symbols) < 3000:
total_symbols = get_total_symbols()
for symbol in tqdm(['XLU']):
try:
data = await get_data(symbol)
except Exception as e:
print(f"Error processing {symbol}: {e}")
if __name__ == "__main__":
asyncio.run(main())