From 0c7b8e393eb9c7db8e76fd837d4c53f287936907 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Wed, 18 Sep 2024 22:56:16 +0200 Subject: [PATCH] add options historical flow --- app/cron_options_historical_flow.py | 130 ++++++++++++++++++++++++++++ app/main.py | 33 +++++++ app/primary_cron_job.py | 3 + 3 files changed, 166 insertions(+) create mode 100644 app/cron_options_historical_flow.py diff --git a/app/cron_options_historical_flow.py b/app/cron_options_historical_flow.py new file mode 100644 index 0000000..3f8ea07 --- /dev/null +++ b/app/cron_options_historical_flow.py @@ -0,0 +1,130 @@ +import time +from benzinga import financial_data +import ujson +import numpy as np +import sqlite3 +import asyncio +from datetime import datetime, timedelta +import concurrent.futures +import os +from GetStartEndDate import GetStartEndDate +from dotenv import load_dotenv + +# Load API key from .env +load_dotenv() +api_key = os.getenv('BENZINGA_API_KEY') +fin = financial_data.Benzinga(api_key) + +# Connect to databases and fetch symbols +stock_con = sqlite3.connect('stocks.db') +stock_cursor = stock_con.cursor() +stock_cursor.execute("SELECT DISTINCT symbol FROM stocks") +stock_symbols = [row[0] for row in stock_cursor.fetchall()] + +etf_con = sqlite3.connect('etf.db') +etf_cursor = etf_con.cursor() +etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") +etf_symbols = [row[0] for row in etf_cursor.fetchall()] + +# Close the database connections +stock_con.close() +etf_con.close() + + +# Define start and end dates for historical data +start_date = datetime.strptime('2023-01-01', '%Y-%m-%d') +end_date = datetime.now() + +# Directory to save the JSON files +output_dir = "json/options-historical-data/flow-data/" + +# Ensure the output directory exists +os.makedirs(output_dir, exist_ok=True) + +# Function to fetch options activity data for a specific day +def process_page(page, date): + try: + data = fin.options_activity(date_from=date, date_to=date, page=page, pagesize=1000) + data = ujson.loads(fin.output(data))['option_activity'] + return data + except Exception as e: + print(e) + return [] + +# Process the data for each day +def process_day(date_str): + # Check if the file for this date already exists + file_path = os.path.join(output_dir, f"{date_str}.json") + if os.path.exists(file_path): + #print(f"File for {date_str} already exists. Skipping...") + return + + res_list = [] + max_workers = 6 # Adjust max_workers to control parallelism + + # Fetch pages concurrently for the given day + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_page = {executor.submit(process_page, page, date_str): page for page in range(130)} + for future in concurrent.futures.as_completed(future_to_page): + page = future_to_page[future] + try: + page_list = future.result() + res_list += page_list + except Exception as e: + print(f"Exception occurred: {e}") + break + + # Filter and clean the data + res_list = [{key: value for key, value in item.items() if key not in ['description_extended', 'updated']} for item in res_list] + filtered_list = [] + + for item in res_list: + try: + if item['underlying_price'] != '': + ticker = item['ticker'] + if ticker == 'BRK.A': + ticker = 'BRK-A' + elif ticker == 'BRK.B': + ticker = 'BRK-B' + + put_call = 'Calls' if item['put_call'] == 'CALL' else 'Puts' + + asset_type = 'stock' if ticker in stock_symbols else ('etf' if ticker in etf_symbols else '') + + item['underlying_type'] = asset_type.lower() + item['put_call'] = put_call + item['ticker'] = ticker + item['price'] = round(float(item['price']), 2) + item['strike_price'] = round(float(item['strike_price']), 2) + item['cost_basis'] = round(float(item['cost_basis']), 2) + item['underlying_price'] = round(float(item['underlying_price']), 2) + item['option_activity_type'] = item['option_activity_type'].capitalize() + item['sentiment'] = item['sentiment'].capitalize() + item['execution_estimate'] = item['execution_estimate'].replace('_', ' ').title() + item['tradeCount'] = item['trade_count'] + + filtered_list.append(item) + except: + pass + + # Sort the list by time in reverse order + filtered_list = sorted(filtered_list, key=lambda x: x['time'], reverse=True) + + # Save the data to a JSON file named after the date + if len(filtered_list) > 0: + with open(file_path, 'w') as file: + ujson.dump(filtered_list, file) + + #print(f"Data saved for {date_str}") + +# Iterate through each weekday from the start_date to today +current_date = start_date +while current_date <= end_date: + # Check if it's a weekday (Monday=0, Sunday=6) + if current_date.weekday() < 5: # Monday to Friday + date_str = current_date.strftime("%Y-%m-%d") + process_day(date_str) + + # Move to the next day + current_date += timedelta(days=1) + diff --git a/app/main.py b/app/main.py index c7aa708..3a1ccc5 100755 --- a/app/main.py +++ b/app/main.py @@ -300,6 +300,11 @@ class TransactionId(BaseModel): class InfoText(BaseModel): parameter: str +class HistoricalDate(BaseModel): + date: str + + + # Replace NaN values with None in the resulting JSON object def replace_nan_inf_with_none(obj): if isinstance(obj, list): @@ -2708,6 +2713,34 @@ async def get_options_chain(data:TransactionId, api_key: str = Security(get_api_ headers={"Content-Encoding": "gzip"} ) + +@app.post("/options-historical-flow") +async def get_options_chain(data:HistoricalDate, api_key: str = Security(get_api_key)): + selected_date = data.date + print(selected_date) + cache_key = f"options-historical-flow-{selected_date}" + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"}) + try: + with open(f"json/options-historical-data/flow-data/{selected_date}.json", 'rb') as file: + res_list = orjson.loads(file.read()) + except: + res_list = [] + data = orjson.dumps(res_list) + compressed_data = gzip.compress(data) + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 5 min + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + ''' @app.post("/options-flow-feed") async def get_options_flow_feed(data: LastOptionId, api_key: str = Security(get_api_key)): diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index f4425ba..0529668 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -459,6 +459,8 @@ def run_options_gex(): ] run_command(command) + run_command(["python3", "cron_options_historical_flow.py"]) + command = [ "sudo", "rsync", "-avz", "-e", "ssh", "/root/backend/app/json/options-historical-data", @@ -473,6 +475,7 @@ def run_options_gex(): ] run_command(command) + def run_government_contract(): run_command(["python3", "cron_government_contract.py"]) command = [