add market makers endpoint & cron job
This commit is contained in:
parent
10f138ac61
commit
c31c627503
129
app/cron_market_maker.py
Normal file
129
app/cron_market_maker.py
Normal file
@ -0,0 +1,129 @@
|
|||||||
|
import ujson
|
||||||
|
import asyncio
|
||||||
|
import aiohttp
|
||||||
|
import sqlite3
|
||||||
|
from tqdm import tqdm
|
||||||
|
from datetime import datetime,timedelta
|
||||||
|
import os
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from finra_api_queries import finra_api_queries
|
||||||
|
|
||||||
|
# Load environment variables
|
||||||
|
load_dotenv()
|
||||||
|
api_key = os.getenv('FINRA_API_KEY')
|
||||||
|
api_secret = os.getenv('FINRA_API_SECRET')
|
||||||
|
api_token = finra_api_queries.retrieve_api_token(finra_api_key_input=api_key, finra_api_secret_input=api_secret)
|
||||||
|
|
||||||
|
start_date = datetime.today() - timedelta(365)
|
||||||
|
end_date = datetime.today()
|
||||||
|
start_date = start_date.strftime("%Y-%m-%d")
|
||||||
|
end_date = end_date.strftime("%Y-%m-%d")
|
||||||
|
|
||||||
|
dataset_name = "weekly_summary"
|
||||||
|
filtered_columns_input = ['issueSymbolIdentifier', 'marketParticipantName', 'totalWeeklyTradeCount', 'totalWeeklyShareQuantity', 'totalNotionalSum', 'initialPublishedDate']
|
||||||
|
date_filter_inputs = [{'startDate': start_date, 'endDate': end_date, 'fieldName': 'initialPublishedDate'}]
|
||||||
|
|
||||||
|
|
||||||
|
def preserve_title_case(input_string):
|
||||||
|
# Convert the input string to title case
|
||||||
|
exceptions = ['LLC', 'LP', 'HRT', 'XTX', 'UBS']
|
||||||
|
title_case_string = input_string.title()
|
||||||
|
|
||||||
|
# Split the title case string into words
|
||||||
|
words = title_case_string.split()
|
||||||
|
|
||||||
|
# Check each word against the exceptions list and replace if necessary
|
||||||
|
for i, word in enumerate(words):
|
||||||
|
if word.upper() in exceptions:
|
||||||
|
words[i] = word.upper()
|
||||||
|
|
||||||
|
# Join the words back into a single string
|
||||||
|
result_string = ' '.join(words)
|
||||||
|
|
||||||
|
return result_string.replace('And', '&')
|
||||||
|
|
||||||
|
|
||||||
|
async def get_data(ticker):
|
||||||
|
try:
|
||||||
|
filters_input = {'issueSymbolIdentifier': [ticker]}
|
||||||
|
|
||||||
|
df = finra_api_queries.retrieve_dataset(
|
||||||
|
dataset_name,
|
||||||
|
api_token,
|
||||||
|
filtered_columns=filtered_columns_input,
|
||||||
|
filters = filters_input,
|
||||||
|
date_filter=date_filter_inputs)
|
||||||
|
|
||||||
|
df = df.rename(columns={"initialPublishedDate": "date","marketParticipantName": "name", "issueSymbolIdentifier": "symbol"})
|
||||||
|
df_copy = df.copy()
|
||||||
|
#Create new dataset for top 10 market makers with the highest activity
|
||||||
|
top_market_makers_df = df_copy.drop(['symbol','date'], axis=1)
|
||||||
|
top_market_makers_df = top_market_makers_df.groupby(['name']).mean().reset_index()
|
||||||
|
top_market_makers_df = top_market_makers_df.rename(columns={"totalWeeklyTradeCount": "avgWeeklyTradeCount","totalWeeklyShareQuantity": "avgWeeklyShareQuantity", "totalNotionalSum": "avgNotionalSum"})
|
||||||
|
|
||||||
|
top_market_makers_list = top_market_makers_df.to_dict('records')
|
||||||
|
top_market_makers_list = sorted(top_market_makers_list, key=lambda x: x['avgNotionalSum'], reverse=True)[0:10]
|
||||||
|
for item in top_market_makers_list:
|
||||||
|
item['name'] = preserve_title_case(item['name'])
|
||||||
|
|
||||||
|
#Create new dataset for historical movements
|
||||||
|
|
||||||
|
history_df = df_copy.drop(['symbol','name'], axis=1)
|
||||||
|
history_df = history_df.groupby(['date']).sum().reset_index()
|
||||||
|
history_data = history_df.to_dict('records')
|
||||||
|
|
||||||
|
return {'topMarketMakers': top_market_makers_list, 'history': history_data}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error fetching data for {ticker}: {e}")
|
||||||
|
return {}
|
||||||
|
|
||||||
|
async def save_json(symbol, data):
|
||||||
|
# Use async file writing to avoid blocking the event loop
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
path = f"json/market-maker/companies/{symbol}.json"
|
||||||
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||||
|
await loop.run_in_executor(None, ujson.dump, data, open(path, 'w'))
|
||||||
|
|
||||||
|
async def process_ticker(ticker):
|
||||||
|
data = await get_data(ticker)
|
||||||
|
if len(data) > 0:
|
||||||
|
await save_json(ticker, data)
|
||||||
|
|
||||||
|
async def run():
|
||||||
|
con = sqlite3.connect('stocks.db')
|
||||||
|
etf_con = sqlite3.connect('etf.db')
|
||||||
|
|
||||||
|
cursor = con.cursor()
|
||||||
|
cursor.execute("PRAGMA journal_mode = wal")
|
||||||
|
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%'")
|
||||||
|
stocks_symbols = [row[0] for row in cursor.fetchall()]
|
||||||
|
|
||||||
|
etf_cursor = etf_con.cursor()
|
||||||
|
etf_cursor.execute("PRAGMA journal_mode = wal")
|
||||||
|
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
|
||||||
|
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
|
||||||
|
|
||||||
|
con.close()
|
||||||
|
etf_con.close()
|
||||||
|
|
||||||
|
total_symbols = stocks_symbols #+ etf_symbols
|
||||||
|
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
tasks = []
|
||||||
|
for ticker in total_symbols:
|
||||||
|
tasks.append(process_ticker(ticker))
|
||||||
|
|
||||||
|
# Run tasks concurrently in batches to avoid too many open connections
|
||||||
|
batch_size = 10 # Adjust based on your system's capacity
|
||||||
|
for i in tqdm(range(0, len(tasks), batch_size)):
|
||||||
|
batch = tasks[i:i + batch_size]
|
||||||
|
await asyncio.gather(*batch)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
asyncio.run(run())
|
||||||
|
except Exception as e:
|
||||||
|
print(f"An error occurred: {e}")
|
||||||
17
app/main.py
17
app/main.py
@ -2874,3 +2874,20 @@ async def get_dark_pool(data:TickerData):
|
|||||||
redis_client.set(cache_key, ujson.dumps(res))
|
redis_client.set(cache_key, ujson.dumps(res))
|
||||||
redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day
|
redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
@app.post("/market-maker")
|
||||||
|
async def get_market_maker(data:TickerData):
|
||||||
|
ticker = data.ticker.upper()
|
||||||
|
cache_key = f"market-maker-{ticker}"
|
||||||
|
cached_result = redis_client.get(cache_key)
|
||||||
|
if cached_result:
|
||||||
|
return ujson.loads(cached_result)
|
||||||
|
try:
|
||||||
|
with open(f"json/market-maker/companies/{ticker}.json", 'r') as file:
|
||||||
|
res = ujson.load(file)
|
||||||
|
except:
|
||||||
|
res = {}
|
||||||
|
|
||||||
|
redis_client.set(cache_key, ujson.dumps(res))
|
||||||
|
redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day
|
||||||
|
return res
|
||||||
@ -309,6 +309,17 @@ def run_dark_pool():
|
|||||||
]
|
]
|
||||||
subprocess.run(command)
|
subprocess.run(command)
|
||||||
|
|
||||||
|
def run_market_maker():
|
||||||
|
week = datetime.today().weekday()
|
||||||
|
if week <= 5:
|
||||||
|
subprocess.run(["python3", "cron_market_maker.py"])
|
||||||
|
command = [
|
||||||
|
"sudo", "rsync", "-avz", "-e", "ssh",
|
||||||
|
"/root/backend/app/json/market-maker",
|
||||||
|
f"root@{useast_ip_address}:/root/backend/app/json"
|
||||||
|
]
|
||||||
|
subprocess.run(command)
|
||||||
|
|
||||||
# Create functions to run each schedule in a separate thread
|
# Create functions to run each schedule in a separate thread
|
||||||
def run_threaded(job_func):
|
def run_threaded(job_func):
|
||||||
job_thread = threading.Thread(target=job_func)
|
job_thread = threading.Thread(target=job_func)
|
||||||
@ -339,6 +350,9 @@ schedule.every().day.at("14:00").do(run_threaded, run_cron_var).tag('var_job')
|
|||||||
|
|
||||||
|
|
||||||
schedule.every().day.at("15:45").do(run_threaded, run_restart_cache)
|
schedule.every().day.at("15:45").do(run_threaded, run_restart_cache)
|
||||||
|
|
||||||
|
schedule.every().saturday.at("01:00").do(run_threaded, run_market_maker).tag('markt_maker_job')
|
||||||
|
|
||||||
schedule.every(1).minutes.do(run_threaded, run_cron_portfolio).tag('portfolio_job')
|
schedule.every(1).minutes.do(run_threaded, run_cron_portfolio).tag('portfolio_job')
|
||||||
schedule.every(5).minutes.do(run_threaded, run_cron_market_movers).tag('market_movers_job')
|
schedule.every(5).minutes.do(run_threaded, run_cron_market_movers).tag('market_movers_job')
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user