From b5a939d0d676f5681947e834c054dd8d444164f5 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Fri, 6 Sep 2024 23:39:55 +0200 Subject: [PATCH] add options gex --- app/cron_options_bubble.py | 2 +- app/cron_options_flow.py | 2 +- app/cron_options_gex.py | 129 +++++++++++++++++++++++++++++++++++++ app/main.py | 28 ++++++++ app/primary_cron_job.py | 10 +++ 5 files changed, 169 insertions(+), 2 deletions(-) create mode 100644 app/cron_options_gex.py diff --git a/app/cron_options_bubble.py b/app/cron_options_bubble.py index 5132db7..6093c4c 100755 --- a/app/cron_options_bubble.py +++ b/app/cron_options_bubble.py @@ -44,7 +44,7 @@ def options_bubble_data(chunk): res_list = [] for page in range(0, 500): try: - data = fin.options_activity(company_tickers=company_tickers, page=page, pagesize=500, date_from=start_date_str, date_to=end_date_str) + data = fin.options_activity(company_tickers=company_tickers, page=page, pagesize=1000, date_from=start_date_str, date_to=end_date_str) data = ujson.loads(fin.output(data))['option_activity'] res_list += data except: diff --git a/app/cron_options_flow.py b/app/cron_options_flow.py index 45df2c2..3778edd 100755 --- a/app/cron_options_flow.py +++ b/app/cron_options_flow.py @@ -78,7 +78,7 @@ max_workers = 6 # Fetch pages concurrently with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_page = {executor.submit(process_page, page): page for page in range(20)} + future_to_page = {executor.submit(process_page, page): page for page in range(150)} for future in concurrent.futures.as_completed(future_to_page): page = future_to_page[future] try: diff --git a/app/cron_options_gex.py b/app/cron_options_gex.py new file mode 100644 index 0000000..c1e9d7e --- /dev/null +++ b/app/cron_options_gex.py @@ -0,0 +1,129 @@ +import numpy as np +from scipy.stats import norm +from datetime import datetime, date, timedelta +import pandas as pd +from benzinga import financial_data +import ujson +import sqlite3 +import os +from dotenv import load_dotenv + +# Load API key from environment +load_dotenv() +api_key = os.getenv('BENZINGA_API_KEY') +fin = financial_data.Benzinga(api_key) + +def save_json(symbol, data): + with open(f'json/options-gex/companies/{symbol}.json', 'w') as file: + ujson.dump(data, file) + +def calculate_volatility(prices_df): + prices_df = prices_df.sort_values(by='date') + prices_df['return'] = prices_df['close'].pct_change() + returns = prices_df['return'].dropna() + return returns.std() * np.sqrt(252) + +def black_scholes_d1(S, K, T, r, sigma): + try: + if sigma <= 0 or np.sqrt(T) <= 0: + return 0 + return (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T)) + except ZeroDivisionError: + return 0 + +def black_scholes_d2(S, K, T, r, sigma): + return black_scholes_d1(S, K, T, r, sigma) - sigma * np.sqrt(T) + +def delta(S, K, T, r, sigma, option_type='CALL'): + d1 = black_scholes_d1(S, K, T, r, sigma) + return norm.cdf(d1) if option_type == 'CALL' else norm.cdf(d1) - 1 + +def gamma(S, K, T, r, sigma): + try: + d1 = black_scholes_d1(S, K, T, r, sigma) + return norm.pdf(d1) / (S * sigma * np.sqrt(T)) if S > 0 and sigma > 0 and np.sqrt(T) > 0 else 0 + except ZeroDivisionError: + return 0 + +def compute_gex(option_data, r=0.05, sigma=0.2): + S = float(option_data['underlying_price']) + K = float(option_data['strike_price']) + size = float(option_data['open_interest']) + expiration_date = datetime.strptime(option_data['date_expiration'], "%Y-%m-%d") + timestamp = datetime.strptime(option_data['date'], "%Y-%m-%d") + T = (expiration_date - timestamp).days / 365.0 + if T <= 0: + return 0, timestamp.date() + + option_type = option_data['put_call'] + delta_value = delta(S, K, T, r, sigma, option_type) + gamma_value = gamma(S, K, T, r, sigma) + notional = size * S + gex = gamma_value * notional * delta_value + return gex, timestamp.date() + +def compute_daily_gex(option_data_list, volatility): + gex_data = [] + for option_data in option_data_list: + gex, trade_date = compute_gex(option_data, sigma=volatility) + if gex != 0: + gex_data.append({'date': trade_date, 'gex': gex}) + + gex_df = pd.DataFrame(gex_data) + daily_gex = gex_df.groupby('date')['gex'].sum().reset_index() + daily_gex['gex'] = round(daily_gex['gex'], 0) + daily_gex['date'] = daily_gex['date'].astype(str) + return daily_gex + +def get_data(ticker): + res_list = [] + page = 0 + while True: + try: + data = fin.options_activity(date_from=start_date_str, date_to=end_date_str, company_tickers=ticker, page=page, pagesize=1000) + data = ujson.loads(fin.output(data))['option_activity'] + filtered_data = [{key: value for key, value in item.items() if key not in ['description_extended', 'updated']} for item in data] + res_list += filtered_data + page += 1 + except Exception as e: + print(f"Error retrieving data for {ticker}: {e}") + break + return res_list + +# Define date range +end_date = date.today() +start_date = end_date - timedelta(180) +end_date_str = end_date.strftime('%Y-%m-%d') +start_date_str = start_date.strftime('%Y-%m-%d') + +# Connect to SQLite database +stock_con = sqlite3.connect('stocks.db') +stock_cursor = stock_con.cursor() +stock_cursor.execute("PRAGMA journal_mode = wal") +stock_cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%' AND marketCap >= 1E9") +stock_symbols = [row[0] for row in stock_cursor.fetchall()] + +query_template = """ + SELECT date, close + FROM "{ticker}" + WHERE date BETWEEN ? AND ? +""" + +# Process each symbol +for ticker in stock_symbols: + try: + query = query_template.format(ticker=ticker) + df_price = pd.read_sql_query(query, stock_con, params=(start_date_str, end_date_str)).round(2) + volatility = calculate_volatility(df_price) + + ticker_data = get_data(ticker) + daily_gex = compute_daily_gex(ticker_data, volatility) + daily_gex = daily_gex.merge(df_price, on='date', how='inner') + + if not daily_gex.empty: + save_json(ticker, daily_gex.to_dict('records')) + except: + pass + +# Close the database connection +stock_con.close() diff --git a/app/main.py b/app/main.py index d027c52..d1091b3 100755 --- a/app/main.py +++ b/app/main.py @@ -2534,6 +2534,34 @@ async def get_options_flow_ticker(data:TickerData, api_key: str = Security(get_a ) +@app.post("/options-gex-ticker") +async def get_options_flow_ticker(data:TickerData, api_key: str = Security(get_api_key)): + ticker = data.ticker.upper() + cache_key = f"options-gex-{ticker}" + + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"}) + try: + with open(f"json/options-gex/companies/{ticker}.json", 'rb') as file: + res_list = orjson.loads(file.read()) + except: + res_list = [] + + data = orjson.dumps(res_list) + compressed_data = gzip.compress(data) + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 5 min + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + ''' @app.post("/options-flow-feed") async def get_options_flow_feed(data: LastOptionId, api_key: str = Security(get_api_key)): diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index 088f11d..2cbb132 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -455,6 +455,15 @@ def run_options_net_flow(): ] run_command(command) +def run_options_gex(): + run_command(["python3", "cron_options_gex.py"]) + command = [ + "sudo", "rsync", "-avz", "-e", "ssh", + "/root/backend/app/json/options-gex", + f"root@{useast_ip_address}:/root/backend/app/json" + ] + run_command(command) + def run_government_contract(): run_command(["python3", "cron_government_contract.py"]) command = [ @@ -553,6 +562,7 @@ schedule.every().day.at("01:00").do(run_threaded, run_options_bubble_ticker).tag schedule.every().day.at("02:00").do(run_threaded, run_db_schedule_job) schedule.every().day.at("03:00").do(run_threaded, run_dark_pool) schedule.every().day.at("04:00").do(run_threaded, run_options_net_flow).tag('options_net_flow_job') +schedule.every().day.at("05:00").do(run_threaded, run_options_gex).tag('options_gex_job') schedule.every().day.at("06:00").do(run_threaded, run_historical_price).tag('historical_job') schedule.every().day.at("06:30").do(run_threaded, run_pocketbase).tag('pocketbase_job')