add implied volatiltiy

This commit is contained in:
MuslemRahimi 2024-11-07 13:46:16 +01:00
parent c280d40167
commit 1df37cf81a

View File

@ -7,6 +7,13 @@ from dotenv import load_dotenv
from benzinga import financial_data from benzinga import financial_data
from tqdm import tqdm from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import pandas as pd
import math
from scipy.stats import norm
from scipy.optimize import brentq
load_dotenv() load_dotenv()
api_key = os.getenv('BENZINGA_API_KEY') api_key = os.getenv('BENZINGA_API_KEY')
@ -14,6 +21,28 @@ api_key = os.getenv('BENZINGA_API_KEY')
fin = financial_data.Benzinga(api_key) fin = financial_data.Benzinga(api_key)
risk_free_rate = 0.05
def black_scholes_price(S, K, T, r, sigma, option_type="CALL"):
d1 = (math.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * math.sqrt(T))
d2 = d1 - sigma * math.sqrt(T)
if option_type == "CALL":
return S * norm.cdf(d1) - K * math.exp(-r * T) * norm.cdf(d2)
elif option_type == "PUT":
return K * math.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)
# Implied volatility function
def implied_volatility(S, K, T, r, market_price, option_type="CALL"):
def objective_function(sigma):
return black_scholes_price(S, K, T, r, sigma, option_type) - market_price
# Use brentq to solve for the implied volatility
try:
return brentq(objective_function, 1e-6, 3) # Bounds for volatility
except ValueError:
return None # Return None if there's no solution
def calculate_dte(date_expiration): def calculate_dte(date_expiration):
expiration_date = datetime.strptime(date_expiration, "%Y-%m-%d") expiration_date = datetime.strptime(date_expiration, "%Y-%m-%d")
return (expiration_date - datetime.today()).days return (expiration_date - datetime.today()).days
@ -52,7 +81,60 @@ def options_bubble_data(chunk):
except: except:
break break
res_filtered = [{key: value for key, value in item.items() if key in ['ticker','date', 'date_expiration', 'put_call', 'volume', 'open_interest']} for item in res_list] res_filtered = [{key: value for key, value in item.items() if key in ['ticker','underlying_price','strike_price','price','date', 'date_expiration', 'put_call', 'volume', 'open_interest']} for item in res_list]
#================Start computing historical iv60=====================#
# Convert to DataFrame for easier manipulation
df = pd.DataFrame(res_filtered)
# Ensure correct types for dates and numerical fields
df['date'] = pd.to_datetime(df['date'])
df['date_expiration'] = pd.to_datetime(df['date_expiration'])
df['underlying_price'] = pd.to_numeric(df['underlying_price'], errors='coerce')
df['strike_price'] = pd.to_numeric(df['strike_price'], errors='coerce')
df['price'] = pd.to_numeric(df['price'], errors='coerce')
df['volume'] = pd.to_numeric(df['volume'], errors='coerce')
df['open_interest'] = pd.to_numeric(df['open_interest'], errors='coerce')
df['days_to_expiration'] = (df['date_expiration'] - df['date']).dt.days
df_30d = df[(df['days_to_expiration'] >= 40) & (df['days_to_expiration'] <= 80)]
# Calculate implied volatility for options in the 30-day range
iv_data = []
for _, option in df_30d.iterrows():
S = option['underlying_price']
K = option['strike_price']
T = option['days_to_expiration'] / 365
market_price = option['price']
option_type = "CALL" if option['put_call'] == "CALL" else "PUT"
# Check for missing values
if pd.notna(S) and pd.notna(K) and pd.notna(T) and pd.notna(market_price):
# Calculate IV
iv = implied_volatility(S, K, T, risk_free_rate, market_price, option_type)
if iv is not None:
iv_data.append({
"date": option['date'],
"IV": iv,
"volume": option['volume']
})
# Create a DataFrame with the calculated IV data
iv_df = pd.DataFrame(iv_data)
# Calculate daily IV60 by averaging IVs (weighted by volume)
def calculate_daily_iv60(group):
weighted_iv = (group["IV"] * group["volume"]).sum() / group["volume"].sum()
return weighted_iv
# Group by date and compute daily IV60
iv60_history = iv_df.groupby("date").apply(calculate_daily_iv60)
# Fill NaN values using forward fill to carry the last valid IV60 forward
iv60_history = iv60_history.ffill()
iv60_history = iv60_history.to_dict()
iv60_dict = {k.strftime('%Y-%m-%d'): v for k, v in iv60_history.items()}
#print(iv60_dict)
#====================================================================#
for option_type in ['CALL', 'PUT']: for option_type in ['CALL', 'PUT']:
for item in res_filtered: for item in res_filtered:
@ -65,31 +147,41 @@ def options_bubble_data(chunk):
pass pass
#Save raw data for each ticker for options page stack bar chart #Save raw data for each ticker for options page stack bar chart
result_list = []
for ticker in chunk: for ticker in chunk:
try: try:
ticker_filtered_data = [entry for entry in res_filtered if entry['ticker'] == ticker] ticker_filtered_data = [entry for entry in res_filtered if entry['ticker'] == ticker]
if len(ticker_filtered_data) != 0: if len(ticker_filtered_data) != 0:
#sum up calls and puts for each day for the plot # Sum up calls and puts for each day for the plot
summed_data = {} summed_data = {}
for entry in ticker_filtered_data: for entry in ticker_filtered_data:
volume = int(entry['volume']) volume = int(entry['volume'])
open_interest = int(entry['open_interest']) open_interest = int(entry['open_interest'])
put_call = entry['put_call'] put_call = entry['put_call']
date_str = entry['date']
if entry['date'] not in summed_data: if date_str not in summed_data:
summed_data[entry['date']] = {'CALL': {'volume': 0, 'open_interest': 0}, 'PUT': {'volume': 0, 'open_interest': 0}} summed_data[date_str] = {'CALL': {'volume': 0, 'open_interest': 0}, 'PUT': {'volume': 0, 'open_interest': 0}, 'iv60': None}
summed_data[entry['date']][put_call]['volume'] += volume summed_data[date_str][put_call]['volume'] += volume
summed_data[entry['date']][put_call]['open_interest'] += open_interest summed_data[date_str][put_call]['open_interest'] += open_interest
result_list = [{'date': date, 'CALL': summed_data[date]['CALL'], 'PUT': summed_data[date]['PUT']} for date in summed_data] if date_str in iv60_dict:
#reverse the list summed_data[date_str]['iv60'] = round(iv60_dict[date_str]*100,1)
result_list.extend([{'date': date, 'CALL': summed_data[date]['CALL'], 'PUT': summed_data[date]['PUT'], 'iv60': summed_data[date]['iv60']} for date in summed_data])
# Reverse the list
result_list = result_list[::-1] result_list = result_list[::-1]
with open(f"json/options-flow/company/{ticker}.json", 'w') as file: with open(f"json/options-flow/company/{ticker}.json", 'w') as file:
ujson.dump(result_list, file) ujson.dump(result_list, file)
except: except Exception as e:
print(e)
pass pass
#Save bubble data for each ticker for overview page #Save bubble data for each ticker for overview page
for ticker in chunk: for ticker in chunk:
@ -139,7 +231,7 @@ async def main():
chunk_size = len(total_symbols) // 2000 # Divide the list into N chunks chunk_size = len(total_symbols) // 2000 # Divide the list into N chunks
chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)] chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)]
#chunks = [['NVDA']]
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
with ThreadPoolExecutor(max_workers=4) as executor: with ThreadPoolExecutor(max_workers=4) as executor:
tasks = [loop.run_in_executor(executor, options_bubble_data, chunk) for chunk in chunks] tasks = [loop.run_in_executor(executor, options_bubble_data, chunk) for chunk in chunks]