backend/app/cron_market_maker.py
2024-06-25 19:51:07 +02:00

130 lines
4.9 KiB
Python

import ujson
import asyncio
import aiohttp
import sqlite3
from tqdm import tqdm
from datetime import datetime,timedelta
import os
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor
from finra_api_queries import finra_api_queries
# Load environment variables
load_dotenv()
api_key = os.getenv('FINRA_API_KEY')
api_secret = os.getenv('FINRA_API_SECRET')
api_token = finra_api_queries.retrieve_api_token(finra_api_key_input=api_key, finra_api_secret_input=api_secret)
start_date = datetime.today() - timedelta(365)
end_date = datetime.today()
start_date = start_date.strftime("%Y-%m-%d")
end_date = end_date.strftime("%Y-%m-%d")
dataset_name = "weekly_summary"
filtered_columns_input = ['issueSymbolIdentifier', 'marketParticipantName', 'totalWeeklyTradeCount', 'totalWeeklyShareQuantity', 'totalNotionalSum', 'initialPublishedDate']
date_filter_inputs = [{'startDate': start_date, 'endDate': end_date, 'fieldName': 'initialPublishedDate'}]
def preserve_title_case(input_string):
# Convert the input string to title case
exceptions = ['LLC', 'LP', 'HRT', 'XTX', 'UBS']
title_case_string = input_string.title()
# Split the title case string into words
words = title_case_string.split()
# Check each word against the exceptions list and replace if necessary
for i, word in enumerate(words):
if word.upper() in exceptions:
words[i] = word.upper()
# Join the words back into a single string
result_string = ' '.join(words)
return result_string.replace('And', '&')
async def get_data(ticker):
try:
filters_input = {'issueSymbolIdentifier': [ticker]}
df = finra_api_queries.retrieve_dataset(
dataset_name,
api_token,
filtered_columns=filtered_columns_input,
filters = filters_input,
date_filter=date_filter_inputs)
df = df.rename(columns={"initialPublishedDate": "date","marketParticipantName": "name", "issueSymbolIdentifier": "symbol"})
df_copy = df.copy()
#Create new dataset for top 10 market makers with the highest activity
top_market_makers_df = df_copy.drop(['symbol','date'], axis=1)
top_market_makers_df = top_market_makers_df.groupby(['name']).mean().reset_index()
top_market_makers_df = top_market_makers_df.rename(columns={"totalWeeklyTradeCount": "avgWeeklyTradeCount","totalWeeklyShareQuantity": "avgWeeklyShareQuantity", "totalNotionalSum": "avgNotionalSum"})
top_market_makers_list = top_market_makers_df.to_dict('records')
top_market_makers_list = sorted(top_market_makers_list, key=lambda x: x['avgNotionalSum'], reverse=True)[0:10]
for item in top_market_makers_list:
item['name'] = preserve_title_case(item['name'])
#Create new dataset for historical movements
history_df = df_copy.drop(['symbol','name'], axis=1)
history_df = history_df.groupby(['date']).sum().reset_index()
history_data = history_df.to_dict('records')
return {'topMarketMakers': top_market_makers_list, 'history': history_data}
except Exception as e:
print(f"Error fetching data for {ticker}: {e}")
return {}
async def save_json(symbol, data):
# Use async file writing to avoid blocking the event loop
loop = asyncio.get_event_loop()
path = f"json/market-maker/companies/{symbol}.json"
os.makedirs(os.path.dirname(path), exist_ok=True)
await loop.run_in_executor(None, ujson.dump, data, open(path, 'w'))
async def process_ticker(ticker):
data = await get_data(ticker)
if len(data) > 0:
await save_json(ticker, data)
async def run():
con = sqlite3.connect('stocks.db')
etf_con = sqlite3.connect('etf.db')
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%'")
stocks_symbols = [row[0] for row in cursor.fetchall()]
etf_cursor = etf_con.cursor()
etf_cursor.execute("PRAGMA journal_mode = wal")
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
con.close()
etf_con.close()
total_symbols = stocks_symbols + etf_symbols
async with aiohttp.ClientSession() as session:
tasks = []
for ticker in total_symbols:
tasks.append(process_ticker(ticker))
# Run tasks concurrently in batches to avoid too many open connections
batch_size = 10 # Adjust based on your system's capacity
for i in tqdm(range(0, len(tasks), batch_size)):
batch = tasks[i:i + batch_size]
await asyncio.gather(*batch)
if __name__ == "__main__":
try:
asyncio.run(run())
except Exception as e:
print(f"An error occurred: {e}")