From 1df37cf81a152c6e293ae01e606e72b2ed6c5955 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Thu, 7 Nov 2024 13:46:16 +0100 Subject: [PATCH] add implied volatiltiy --- app/cron_options_bubble.py | 114 +++++++++++++++++++++++++++++++++---- 1 file changed, 103 insertions(+), 11 deletions(-) diff --git a/app/cron_options_bubble.py b/app/cron_options_bubble.py index 262b287..6cb51b6 100755 --- a/app/cron_options_bubble.py +++ b/app/cron_options_bubble.py @@ -7,6 +7,13 @@ from dotenv import load_dotenv from benzinga import financial_data from tqdm import tqdm from concurrent.futures import ThreadPoolExecutor +import pandas as pd +import math +from scipy.stats import norm +from scipy.optimize import brentq + + + load_dotenv() api_key = os.getenv('BENZINGA_API_KEY') @@ -14,6 +21,28 @@ api_key = os.getenv('BENZINGA_API_KEY') fin = financial_data.Benzinga(api_key) +risk_free_rate = 0.05 + +def black_scholes_price(S, K, T, r, sigma, option_type="CALL"): + d1 = (math.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * math.sqrt(T)) + d2 = d1 - sigma * math.sqrt(T) + if option_type == "CALL": + return S * norm.cdf(d1) - K * math.exp(-r * T) * norm.cdf(d2) + elif option_type == "PUT": + return K * math.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1) + +# Implied volatility function +def implied_volatility(S, K, T, r, market_price, option_type="CALL"): + def objective_function(sigma): + return black_scholes_price(S, K, T, r, sigma, option_type) - market_price + + # Use brentq to solve for the implied volatility + try: + return brentq(objective_function, 1e-6, 3) # Bounds for volatility + except ValueError: + return None # Return None if there's no solution + + def calculate_dte(date_expiration): expiration_date = datetime.strptime(date_expiration, "%Y-%m-%d") return (expiration_date - datetime.today()).days @@ -52,7 +81,60 @@ def options_bubble_data(chunk): except: break - res_filtered = [{key: value for key, value in item.items() if key in ['ticker','date', 'date_expiration', 'put_call', 'volume', 'open_interest']} for item in res_list] + res_filtered = [{key: value for key, value in item.items() if key in ['ticker','underlying_price','strike_price','price','date', 'date_expiration', 'put_call', 'volume', 'open_interest']} for item in res_list] + + #================Start computing historical iv60=====================# + # Convert to DataFrame for easier manipulation + df = pd.DataFrame(res_filtered) + + # Ensure correct types for dates and numerical fields + df['date'] = pd.to_datetime(df['date']) + df['date_expiration'] = pd.to_datetime(df['date_expiration']) + df['underlying_price'] = pd.to_numeric(df['underlying_price'], errors='coerce') + df['strike_price'] = pd.to_numeric(df['strike_price'], errors='coerce') + df['price'] = pd.to_numeric(df['price'], errors='coerce') + df['volume'] = pd.to_numeric(df['volume'], errors='coerce') + df['open_interest'] = pd.to_numeric(df['open_interest'], errors='coerce') + + df['days_to_expiration'] = (df['date_expiration'] - df['date']).dt.days + df_30d = df[(df['days_to_expiration'] >= 40) & (df['days_to_expiration'] <= 80)] + # Calculate implied volatility for options in the 30-day range + iv_data = [] + for _, option in df_30d.iterrows(): + S = option['underlying_price'] + K = option['strike_price'] + T = option['days_to_expiration'] / 365 + market_price = option['price'] + option_type = "CALL" if option['put_call'] == "CALL" else "PUT" + + # Check for missing values + if pd.notna(S) and pd.notna(K) and pd.notna(T) and pd.notna(market_price): + # Calculate IV + iv = implied_volatility(S, K, T, risk_free_rate, market_price, option_type) + if iv is not None: + iv_data.append({ + "date": option['date'], + "IV": iv, + "volume": option['volume'] + }) + + # Create a DataFrame with the calculated IV data + iv_df = pd.DataFrame(iv_data) + + # Calculate daily IV60 by averaging IVs (weighted by volume) + def calculate_daily_iv60(group): + weighted_iv = (group["IV"] * group["volume"]).sum() / group["volume"].sum() + return weighted_iv + + # Group by date and compute daily IV60 + iv60_history = iv_df.groupby("date").apply(calculate_daily_iv60) + + # Fill NaN values using forward fill to carry the last valid IV60 forward + iv60_history = iv60_history.ffill() + iv60_history = iv60_history.to_dict() + iv60_dict = {k.strftime('%Y-%m-%d'): v for k, v in iv60_history.items()} + #print(iv60_dict) + #====================================================================# for option_type in ['CALL', 'PUT']: for item in res_filtered: @@ -65,31 +147,41 @@ def options_bubble_data(chunk): pass #Save raw data for each ticker for options page stack bar chart + result_list = [] for ticker in chunk: try: ticker_filtered_data = [entry for entry in res_filtered if entry['ticker'] == ticker] if len(ticker_filtered_data) != 0: - #sum up calls and puts for each day for the plot + # Sum up calls and puts for each day for the plot summed_data = {} for entry in ticker_filtered_data: volume = int(entry['volume']) open_interest = int(entry['open_interest']) put_call = entry['put_call'] + date_str = entry['date'] - if entry['date'] not in summed_data: - summed_data[entry['date']] = {'CALL': {'volume': 0, 'open_interest': 0}, 'PUT': {'volume': 0, 'open_interest': 0}} + if date_str not in summed_data: + summed_data[date_str] = {'CALL': {'volume': 0, 'open_interest': 0}, 'PUT': {'volume': 0, 'open_interest': 0}, 'iv60': None} - summed_data[entry['date']][put_call]['volume'] += volume - summed_data[entry['date']][put_call]['open_interest'] += open_interest - - result_list = [{'date': date, 'CALL': summed_data[date]['CALL'], 'PUT': summed_data[date]['PUT']} for date in summed_data] - #reverse the list + summed_data[date_str][put_call]['volume'] += volume + summed_data[date_str][put_call]['open_interest'] += open_interest + + if date_str in iv60_dict: + summed_data[date_str]['iv60'] = round(iv60_dict[date_str]*100,1) + + result_list.extend([{'date': date, 'CALL': summed_data[date]['CALL'], 'PUT': summed_data[date]['PUT'], 'iv60': summed_data[date]['iv60']} for date in summed_data]) + + # Reverse the list result_list = result_list[::-1] + with open(f"json/options-flow/company/{ticker}.json", 'w') as file: ujson.dump(result_list, file) - except: + except Exception as e: + print(e) pass + + #Save bubble data for each ticker for overview page for ticker in chunk: @@ -139,7 +231,7 @@ async def main(): chunk_size = len(total_symbols) // 2000 # Divide the list into N chunks chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)] - + #chunks = [['NVDA']] loop = asyncio.get_running_loop() with ThreadPoolExecutor(max_workers=4) as executor: tasks = [loop.run_in_executor(executor, options_bubble_data, chunk) for chunk in chunks]