backend/app/support.py
2024-10-25 13:42:02 +02:00

266 lines
9.7 KiB
Python

import pandas as pd
import numpy as np
import ujson
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
from scipy.stats import norm
from datetime import datetime, date, timedelta
from benzinga import financial_data
import os
from dotenv import load_dotenv
import seaborn as sns
import sqlite3
def calculate_volatility(prices_df):
prices_df = prices_df.sort_values(by='date')
prices_df['return'] = prices_df['close'].pct_change()
returns = prices_df['return'].dropna()
return returns.std() * np.sqrt(252)
# Load API key from environment
load_dotenv()
api_key = os.getenv('BENZINGA_API_KEY')
fin = financial_data.Benzinga(api_key)
# Connect to SQLite database
stock_con = sqlite3.connect('stocks.db')
etf_con = sqlite3.connect('etf.db')
stock_cursor = stock_con.cursor()
stock_cursor.execute("PRAGMA journal_mode = wal")
stock_cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%' AND marketCap >= 500E6")
stock_symbols = [row[0] for row in stock_cursor.fetchall()]
etf_cursor = etf_con.cursor()
etf_cursor.execute("PRAGMA journal_mode = wal")
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
query_template = """
SELECT date, close,change_percent
FROM "{ticker}"
WHERE date BETWEEN ? AND ?
"""
ticker = 'NVDA'
end_date = date.today()
start_date = end_date - timedelta(1)
end_date_str = end_date.strftime('%Y-%m-%d')
start_date_str = start_date.strftime('%Y-%m-%d')
query = query_template.format(ticker=ticker)
df_price = pd.read_sql_query(query, stock_con if ticker in stock_symbols else etf_con, params=('2024-01-01', end_date_str)).round(2)
df_price = df_price.rename(columns={"change_percent": "changesPercentage"})
volatility = calculate_volatility(df_price)
print(start_date, end_date)
print('volatility', volatility)
def get_data(ticker):
res_list = []
page = 0
while True:
try:
data = fin.options_activity(date_from=start_date_str, date_to=end_date_str, company_tickers=ticker, page=page, pagesize=1000)
data = ujson.loads(fin.output(data))['option_activity']
if not data:
break # Stop when no more data is returned
filtered_data = [{key: value for key, value in item.items() if key not in ['description_extended', 'updated']} for item in data]
res_list += filtered_data
except Exception as e:
print(e)
break
page += 1
return res_list
ticker_data = get_data(ticker)
ticker_data = [
item for item in ticker_data
if datetime.strptime(item['date_expiration'], '%Y-%m-%d') >= datetime.now() and
datetime.strptime(item['date_expiration'], '%Y-%m-%d') <= datetime.now() + timedelta(days=5)
]
print(len(ticker_data))
def calculate_option_greeks(S, K, T, r, sigma, option_type='CALL'):
"""
Calculate option Greeks using Black-Scholes formula with improved accuracy
S: Current stock price
K: Strike price
T: Time to expiration (in years)
r: Risk-free rate
sigma: Volatility
"""
if T <= 0 or sigma <= 0 or S <= 0:
return 0, 0
try:
d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
if option_type == 'CALL':
delta = norm.cdf(d1)
gamma = norm.pdf(d1) / (S * sigma * np.sqrt(T))
else: # PUT
delta = -norm.cdf(-d1)
gamma = norm.pdf(d1) / (S * sigma * np.sqrt(T))
return delta, gamma
except:
return 0, 0
def process_options_data_by_expiry(df):
"""
Process options data with separate calculations for each expiration date
"""
# Convert data types
df['strike_price'] = pd.to_numeric(df['strike_price'])
df['volume'] = pd.to_numeric(df['volume'])
df['open_interest'] = pd.to_numeric(df['open_interest'])
df['underlying_price'] = pd.to_numeric(df['underlying_price'])
df['date_expiration'] = pd.to_datetime(df['date_expiration'])
df['date'] = pd.to_datetime(df['date'])
df['price'] = pd.to_numeric(df['price'])
# Calculate time to expiration in years
df['T'] = (df['date_expiration'] - df['date']).dt.days / 365.0
# Use current risk-free rate
risk_free_rate = 0.0525
# Calculate Greeks for each option
greeks = df.apply(lambda row: calculate_option_greeks(
row['underlying_price'],
row['strike_price'],
row['T'],
risk_free_rate,
volatility,
row['put_call']
), axis=1)
df['delta'], df['gamma'] = zip(*greeks)
# Calculate exposures
contract_multiplier = 100
df['delta_exposure'] = (df['delta'] * df['volume'] * contract_multiplier *
df['underlying_price'])
# Separate calls and puts
df['option_type'] = np.where(df['put_call'] == 'CALL', 'call', 'put')
return df
def plot_delta_exposure_by_expiry(df, current_price):
"""
Create a visualization similar to the screenshot with delta exposure by expiration date
"""
plt.style.use('dark_background')
fig, ax = plt.subplots(figsize=(15, 12))
# Create custom colormap for calls and puts
call_colors = ['#90EE90', '#32CD32', '#228B22'] # Light to dark green
put_colors = ['#FFB6C1', '#DC143C', '#8B0000'] # Light to dark red
# Get unique strike prices and expiration dates
strike_prices = sorted(df['strike_price'].unique())
expiry_dates = sorted(df['date_expiration'].unique())
# Create y-axis ticks for strike prices
y_ticks = np.arange(len(strike_prices))
# Calculate total exposure for each strike and type
total_exposure = pd.DataFrame()
for expiry in expiry_dates:
expiry_data = df[df['date_expiration'] == expiry]
# Process calls
calls = expiry_data[expiry_data['put_call'] == 'CALL']
call_exposure = calls.groupby('strike_price')['delta_exposure'].sum()
# Process puts
puts = expiry_data[expiry_data['put_call'] == 'PUT']
put_exposure = puts.groupby('strike_price')['delta_exposure'].sum()
# Plot calls (positive x-axis)
if not call_exposure.empty:
ax.barh(y_ticks, call_exposure.reindex(strike_prices).fillna(0),
alpha=0.7, left=total_exposure.get('calls', 0),
color=call_colors[expiry_dates.tolist().index(expiry) % len(call_colors)],
height=0.8)
# Plot puts (negative x-axis)
if not put_exposure.empty:
ax.barh(y_ticks, put_exposure.reindex(strike_prices).fillna(0),
alpha=0.7, left=total_exposure.get('puts', 0),
color=put_colors[expiry_dates.tolist().index(expiry) % len(put_colors)],
height=0.8)
# Update total exposure
total_exposure['calls'] = total_exposure.get('calls', 0) + call_exposure.reindex(strike_prices).fillna(0)
total_exposure['puts'] = total_exposure.get('puts', 0) + put_exposure.reindex(strike_prices).fillna(0)
# Add strike price labels
ax.set_yticks(y_ticks)
ax.set_yticklabels([f'${price:.2f}' for price in strike_prices])
# Add current price line
current_price_idx = np.searchsorted(strike_prices, current_price)
ax.axhline(y=current_price_idx, color='red', linestyle='--', alpha=0.5,
label=f'Current Price: ${current_price:.2f}')
# Format x-axis
ax.xaxis.set_major_formatter(plt.FuncFormatter(
lambda x, p: f'${abs(x/1e6):.1f}M' if abs(x) >= 1e6 else f'${abs(x/1e3):.1f}K'))
# Add labels and title
ax.set_title(f'Delta Hedging Exposure by Strike and Expiration\n{df["ticker"].iloc[0]}',
pad=20)
ax.set_xlabel('Delta Exposure ($)')
ax.set_ylabel('Strike Price ($)')
# Add grid
ax.grid(True, alpha=0.2)
# Add legend for expiration dates
legend_elements = []
for i, expiry in enumerate(expiry_dates):
days_to_expiry = (expiry - df['date'].iloc[0]).days
legend_elements.append(plt.Rectangle((0,0), 1, 1,
fc=call_colors[i % len(call_colors)],
alpha=0.7,
label=f'Exp: {expiry.strftime("%Y-%m-%d")} ({days_to_expiry}d)'))
ax.legend(handles=legend_elements, loc='center left', bbox_to_anchor=(1, 0.5))
plt.tight_layout()
return fig
# Main execution
if __name__ == "__main__":
# Assuming we have the data loaded in ticker_data
df = pd.DataFrame(ticker_data)
if not df.empty:
# Process the data
processed_df = process_options_data_by_expiry(df)
current_price = float(df['underlying_price'].iloc[0])
# Create and show the plot
fig = plot_delta_exposure_by_expiry(processed_df, current_price)
plt.show()
# Print summary statistics
print("\nDelta Exposure Summary:")
total_call_exposure = processed_df[processed_df['put_call'] == 'CALL']['delta_exposure'].sum()
total_put_exposure = processed_df[processed_df['put_call'] == 'PUT']['delta_exposure'].sum()
net_exposure = total_call_exposure + total_put_exposure
print(f"Total Call Delta Exposure: ${total_call_exposure/1e6:.2f}M")
print(f"Total Put Delta Exposure: ${total_put_exposure/1e6:.2f}M")
print(f"Net Delta Exposure: ${net_exposure/1e6:.2f}M")
else:
print("No data available for analysis")