backend/app/cron_options_net_flow.py
2024-09-11 22:40:39 +02:00

146 lines
5.5 KiB
Python

import sqlite3
from datetime import datetime, timedelta, date
import ujson
import os
import numpy as np
from dotenv import load_dotenv
from benzinga import financial_data
from collections import defaultdict
from tqdm import tqdm
load_dotenv()
api_key = os.getenv('BENZINGA_API_KEY')
fin = financial_data.Benzinga(api_key)
def save_json(symbol, data):
with open(f"json/options-net-flow/companies/{symbol}.json", 'w') as file:
ujson.dump(data, file)
def calculate_moving_average(data, window_size):
data = np.array(data, dtype=float)
cumsum = np.cumsum(data)
moving_avg = (cumsum[window_size - 1:] - np.concatenate(([0], cumsum[:-window_size]))) / window_size
return moving_avg.tolist()
def calculate_net_flow(data):
date_data = defaultdict(lambda: {'price': [], 'netCall': 0, 'netPut': 0})
for item in data:
date_str = item['date']
time_str = item['time']
datetime_str = f"{date_str} {time_str}"
# Parse the combined date and time into a datetime object
date_time = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S')
try:
premium = float(item['cost_basis'])
date_data[date_time]['price'].append(round(float(item['underlying_price']), 2))
if item['put_call'] == 'CALL':
if item['execution_estimate'] == 'AT_ASK':
date_data[date_time]['netCall'] += premium
elif item['execution_estimate'] == 'AT_BID':
date_data[date_time]['netCall'] -= premium
elif item['put_call'] == 'PUT':
if item['execution_estimate'] == 'AT_ASK':
date_data[date_time]['netPut'] -= premium
elif item['execution_estimate'] == 'AT_BID':
date_data[date_time]['netPut'] += premium
except:
pass
# Calculate average underlying price and format the results
result = []
for date_time, values in date_data.items():
result.append({
'date': date_time.strftime('%Y-%m-%d %H:%M:%S'),
'price': sum(values['price']) / len(values['price']) if values['price'] else 0,
'netCall': int(values['netCall']),
'netPut': int(values['netPut']),
})
sorted_data = sorted(result, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d %H:%M:%S'))
# Compute 30-minute interval averages
interval_data = defaultdict(lambda: {'price': [], 'netCall': [], 'netPut': []})
for item in sorted_data:
date_time = datetime.strptime(item['date'], '%Y-%m-%d %H:%M:%S')
interval_start = date_time.replace(minute=date_time.minute // 120 * 120, second=0)
interval_data[interval_start]['price'].append(item['price'])
interval_data[interval_start]['netCall'].append(item['netCall'])
interval_data[interval_start]['netPut'].append(item['netPut'])
# Calculate averages for each 30-minute interval
averaged_data = []
for interval_start, values in interval_data.items():
if values['price']:
averaged_data.append({
'date': interval_start.strftime('%Y-%m-%d %H:%M:%S'),
#'price': sum(values['price']) / len(values['price']) ,
'netCall': sum(values['netCall']) if values['netCall'] else 0,
'netPut': sum(values['netPut']) if values['netPut'] else 0,
})
# Sort the averaged data by interval start time
averaged_data.sort(key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d %H:%M:%S'))
return averaged_data
def get_data(symbol):
try:
end_date = date.today()
start_date = end_date - timedelta(10)
end_date_str = end_date.strftime('%Y-%m-%d')
start_date_str = start_date.strftime('%Y-%m-%d')
res_list = []
for page in range(0, 1000):
try:
data = fin.options_activity(company_tickers=symbol, page=page, pagesize=1000, date_from=start_date_str, date_to=end_date_str)
data = ujson.loads(fin.output(data))['option_activity']
res_list += data
except:
break
res_filtered = [{key: value for key, value in item.items() if key in ['ticker','time','date','execution_estimate', 'underlying_price', 'put_call', 'cost_basis']} for item in res_list]
#Save raw data for each ticker for options page stack bar chart
ticker_filtered_data = [entry for entry in res_filtered if entry['ticker'] == symbol]
if len(ticker_filtered_data) > 100:
net_flow_data = calculate_net_flow(ticker_filtered_data)
if len(net_flow_data) > 0:
save_json(symbol, net_flow_data)
except ValueError as ve:
print(ve)
except Exception as e:
print(e)
try:
stock_con = sqlite3.connect('stocks.db')
stock_cursor = stock_con.cursor()
stock_cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >500E6 AND symbol NOT LIKE '%.%'")
stock_symbols = [row[0] for row in stock_cursor.fetchall()]
etf_con = sqlite3.connect('etf.db')
etf_cursor = etf_con.cursor()
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
stock_con.close()
etf_con.close()
total_symbols = stock_symbols + etf_symbols
for symbol in tqdm(total_symbols):
get_data(symbol)
except Exception as e:
print(e)