backend/app/cron_options_historical_flow.py
2024-09-18 22:56:16 +02:00

131 lines
4.7 KiB
Python

import time
from benzinga import financial_data
import ujson
import numpy as np
import sqlite3
import asyncio
from datetime import datetime, timedelta
import concurrent.futures
import os
from GetStartEndDate import GetStartEndDate
from dotenv import load_dotenv
# Load API key from .env
load_dotenv()
api_key = os.getenv('BENZINGA_API_KEY')
fin = financial_data.Benzinga(api_key)
# Connect to databases and fetch symbols
stock_con = sqlite3.connect('stocks.db')
stock_cursor = stock_con.cursor()
stock_cursor.execute("SELECT DISTINCT symbol FROM stocks")
stock_symbols = [row[0] for row in stock_cursor.fetchall()]
etf_con = sqlite3.connect('etf.db')
etf_cursor = etf_con.cursor()
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
# Close the database connections
stock_con.close()
etf_con.close()
# Define start and end dates for historical data
start_date = datetime.strptime('2023-01-01', '%Y-%m-%d')
end_date = datetime.now()
# Directory to save the JSON files
output_dir = "json/options-historical-data/flow-data/"
# Ensure the output directory exists
os.makedirs(output_dir, exist_ok=True)
# Function to fetch options activity data for a specific day
def process_page(page, date):
try:
data = fin.options_activity(date_from=date, date_to=date, page=page, pagesize=1000)
data = ujson.loads(fin.output(data))['option_activity']
return data
except Exception as e:
print(e)
return []
# Process the data for each day
def process_day(date_str):
# Check if the file for this date already exists
file_path = os.path.join(output_dir, f"{date_str}.json")
if os.path.exists(file_path):
#print(f"File for {date_str} already exists. Skipping...")
return
res_list = []
max_workers = 6 # Adjust max_workers to control parallelism
# Fetch pages concurrently for the given day
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_page = {executor.submit(process_page, page, date_str): page for page in range(130)}
for future in concurrent.futures.as_completed(future_to_page):
page = future_to_page[future]
try:
page_list = future.result()
res_list += page_list
except Exception as e:
print(f"Exception occurred: {e}")
break
# Filter and clean the data
res_list = [{key: value for key, value in item.items() if key not in ['description_extended', 'updated']} for item in res_list]
filtered_list = []
for item in res_list:
try:
if item['underlying_price'] != '':
ticker = item['ticker']
if ticker == 'BRK.A':
ticker = 'BRK-A'
elif ticker == 'BRK.B':
ticker = 'BRK-B'
put_call = 'Calls' if item['put_call'] == 'CALL' else 'Puts'
asset_type = 'stock' if ticker in stock_symbols else ('etf' if ticker in etf_symbols else '')
item['underlying_type'] = asset_type.lower()
item['put_call'] = put_call
item['ticker'] = ticker
item['price'] = round(float(item['price']), 2)
item['strike_price'] = round(float(item['strike_price']), 2)
item['cost_basis'] = round(float(item['cost_basis']), 2)
item['underlying_price'] = round(float(item['underlying_price']), 2)
item['option_activity_type'] = item['option_activity_type'].capitalize()
item['sentiment'] = item['sentiment'].capitalize()
item['execution_estimate'] = item['execution_estimate'].replace('_', ' ').title()
item['tradeCount'] = item['trade_count']
filtered_list.append(item)
except:
pass
# Sort the list by time in reverse order
filtered_list = sorted(filtered_list, key=lambda x: x['time'], reverse=True)
# Save the data to a JSON file named after the date
if len(filtered_list) > 0:
with open(file_path, 'w') as file:
ujson.dump(filtered_list, file)
#print(f"Data saved for {date_str}")
# Iterate through each weekday from the start_date to today
current_date = start_date
while current_date <= end_date:
# Check if it's a weekday (Monday=0, Sunday=6)
if current_date.weekday() < 5: # Monday to Friday
date_str = current_date.strftime("%Y-%m-%d")
process_day(date_str)
# Move to the next day
current_date += timedelta(days=1)