235 lines
8.7 KiB
Python
235 lines
8.7 KiB
Python
import orjson
|
|
import os
|
|
import sqlite3
|
|
import time
|
|
from tqdm import tqdm
|
|
import numpy as np
|
|
|
|
|
|
con = sqlite3.connect('stocks.db')
|
|
etf_con = sqlite3.connect('etf.db')
|
|
cursor = con.cursor()
|
|
cursor.execute("PRAGMA journal_mode = wal")
|
|
#cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%' AND marketCap > 1E9")
|
|
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
|
|
stocks_symbols = [row[0] for row in cursor.fetchall()]
|
|
|
|
etf_cursor = etf_con.cursor()
|
|
etf_cursor.execute("PRAGMA journal_mode = wal")
|
|
#etf_cursor.execute("SELECT DISTINCT symbol FROM etfs WHERE marketCap > 1E9")
|
|
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
|
|
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
|
|
|
|
index_symbols = ["^SPX","^VIX"]
|
|
con.close()
|
|
etf_con.close()
|
|
|
|
|
|
def get_tickers_from_directory(directory: str):
|
|
try:
|
|
# Ensure the directory exists
|
|
if not os.path.exists(directory):
|
|
raise FileNotFoundError(f"The directory '{directory}' does not exist.")
|
|
|
|
# Get all tickers from filenames
|
|
return [file.replace(".json", "") for file in os.listdir(directory) if file.endswith(".json")]
|
|
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|
|
return []
|
|
|
|
|
|
def convert_to_serializable(obj):
|
|
if isinstance(obj, np.float64):
|
|
return float(obj)
|
|
elif isinstance(obj, (np.int64, np.int32)):
|
|
return int(obj)
|
|
elif isinstance(obj, (list, np.ndarray)):
|
|
return [convert_to_serializable(item) for item in obj]
|
|
elif isinstance(obj, dict):
|
|
return {key: convert_to_serializable(value) for key, value in obj.items()}
|
|
else:
|
|
return obj
|
|
|
|
def save_json(data, symbol):
|
|
directory_path = "json/implied-volatility"
|
|
os.makedirs(directory_path, exist_ok=True) # Ensure the directory exists
|
|
|
|
# Convert numpy types to JSON-serializable types
|
|
serializable_data = convert_to_serializable(data)
|
|
|
|
with open(f"{directory_path}/{symbol}.json", 'wb') as file: # Use binary mode for orjson
|
|
file.write(orjson.dumps(serializable_data))
|
|
|
|
|
|
def is_outlier(value, values, n_sigmas=3):
|
|
"""
|
|
Detect if a value is an outlier using the z-score method
|
|
|
|
Args:
|
|
value: The value to check
|
|
values: List of values to compare against
|
|
n_sigmas: Number of standard deviations to use as threshold (default: 3)
|
|
|
|
Returns:
|
|
bool: True if the value is an outlier, False otherwise
|
|
"""
|
|
if value is None or not values:
|
|
return False
|
|
|
|
values = [v for v in values if v is not None]
|
|
if not values:
|
|
return False
|
|
|
|
mean = np.mean(values)
|
|
std = np.std(values)
|
|
|
|
if std == 0:
|
|
return False
|
|
|
|
z_score = abs((value - mean) / std)
|
|
return z_score > n_sigmas
|
|
|
|
def clean_iv_data(data):
|
|
"""
|
|
Clean IV data by handling outliers
|
|
|
|
Args:
|
|
data: List of dictionaries containing IV values
|
|
|
|
Returns:
|
|
List of dictionaries with cleaned IV values
|
|
"""
|
|
# Extract IV values
|
|
iv_values = [item.get('iv') for item in data]
|
|
|
|
# Create a copy of the data to modify
|
|
cleaned_data = []
|
|
|
|
window_size = 20 # Rolling window size for outlier detection
|
|
|
|
for i, item in enumerate(data):
|
|
cleaned_item = item.copy()
|
|
iv = item.get('iv')
|
|
|
|
if iv is not None:
|
|
# Get a window of IV values centered around the current point
|
|
start_idx = max(0, i - window_size // 2)
|
|
end_idx = min(len(data), i + window_size // 2)
|
|
window_values = [data[j].get('iv') for j in range(start_idx, end_idx)]
|
|
|
|
# Check if the current IV is an outlier
|
|
if is_outlier(iv, window_values):
|
|
# Replace outlier with the median of nearby non-outlier values
|
|
non_outlier_values = [
|
|
v for v in window_values
|
|
if v is not None and not is_outlier(v, window_values)
|
|
]
|
|
|
|
if non_outlier_values:
|
|
cleaned_item['iv'] = round(np.median(non_outlier_values), 2)
|
|
else:
|
|
cleaned_item['iv'] = None
|
|
else:
|
|
cleaned_item['iv'] = round(iv, 2)
|
|
|
|
cleaned_data.append(cleaned_item)
|
|
|
|
return cleaned_data
|
|
|
|
def compute_realized_volatility(data, window_size=20):
|
|
"""
|
|
Compute the realized volatility of stock prices over a rolling window.
|
|
Realized volatility is the annualized standard deviation of log returns of stock prices.
|
|
"""
|
|
# First clean the IV data
|
|
data = clean_iv_data(data)
|
|
|
|
# Sort data by date (oldest first)
|
|
data = sorted(data, key=lambda x: x['date'])
|
|
|
|
# Extract stock prices and dates
|
|
prices = [item.get('price') for item in data] # Use .get() to handle missing keys
|
|
dates = [item['date'] for item in data]
|
|
|
|
# Compute log returns of stock prices, skipping None values
|
|
log_returns = []
|
|
for i in range(1, len(prices)):
|
|
if prices[i] is not None and prices[i - 1] is not None and prices[i - 1] != 0:
|
|
log_returns.append(np.log(prices[i] / prices[i - 1]))
|
|
else:
|
|
log_returns.append(None) # Append None if price is missing or invalid
|
|
|
|
# Compute realized volatility using a rolling window
|
|
realized_volatility = []
|
|
for i in range(len(log_returns)):
|
|
if i < window_size - 1:
|
|
# Not enough data for the window, append None
|
|
realized_volatility.append(None)
|
|
else:
|
|
# Collect valid log returns in the window
|
|
window_returns = []
|
|
for j in range(i - window_size + 1, i + 1):
|
|
if log_returns[j] is not None:
|
|
window_returns.append(log_returns[j])
|
|
|
|
if len(window_returns) >= window_size:
|
|
# Compute standard deviation of log returns over the window
|
|
rv_daily = np.sqrt(np.sum(np.square(window_returns)) / window_size)
|
|
# Annualize the realized volatility
|
|
rv_annualized = rv_daily * np.sqrt(252)
|
|
realized_volatility.append(rv_annualized)
|
|
else:
|
|
# Not enough valid data in the window, append None
|
|
realized_volatility.append(None)
|
|
|
|
# Shift realized volatility FORWARD by window_size days to align with IV from window_size days ago
|
|
realized_volatility = realized_volatility[window_size - 1:] + [None] * (window_size - 1)
|
|
|
|
# Create the resulting list
|
|
rv_list = []
|
|
for i in range(len(data)):
|
|
try:
|
|
rv_list.append({
|
|
"date": data[i]["date"],
|
|
"price": data[i].get("price"), # Use .get() to handle missing keys
|
|
"changesPercentage": data[i].get("changesPercentage", None), # Default to None if missing
|
|
"putCallRatio": data[i].get("putCallRatio", None), # Default to None if missing
|
|
"total_open_interest": data[i].get("total_open_interest", None), # Default to None if missing
|
|
"changesPercentageOI": data[i].get("changesPercentageOI", None), # Default to None if missing
|
|
"iv": data[i].get("iv", None), # Default to None if missing
|
|
"rv": round(realized_volatility[i], 2) if realized_volatility[i] is not None else None
|
|
})
|
|
except Exception as e:
|
|
# If any error occurs, append a dictionary with default values
|
|
rv_list.append({
|
|
"date": data[i]["date"],
|
|
"price": data[i].get("price", None),
|
|
"changesPercentage": data[i].get("changesPercentage", None),
|
|
"putCallRatio": data[i].get("putCallRatio", None),
|
|
"total_open_interest": data[i].get("total_open_interest", None),
|
|
"changesPercentageOI": data[i].get("changesPercentageOI", None),
|
|
"iv": data[i].get("iv", None),
|
|
"rv": None
|
|
})
|
|
|
|
# Sort the final list by date in descending order
|
|
rv_list = sorted(rv_list, key=lambda x: x['date'], reverse=True)
|
|
return rv_list
|
|
|
|
if __name__ == '__main__':
|
|
directory_path = "json/implied-volatility"
|
|
total_symbols = stocks_symbols + etf_symbols + index_symbols
|
|
|
|
|
|
for symbol in tqdm(total_symbols):
|
|
try:
|
|
with open(f"json/options-historical-data/companies/{symbol}.json", "r") as file:
|
|
data = orjson.loads(file.read())
|
|
rv_list = compute_realized_volatility(data)
|
|
|
|
if rv_list:
|
|
save_json(rv_list, symbol)
|
|
except:
|
|
pass
|