diff --git a/app/cron_market_maker.py b/app/cron_market_maker.py new file mode 100644 index 0000000..2ed4fe6 --- /dev/null +++ b/app/cron_market_maker.py @@ -0,0 +1,129 @@ +import ujson +import asyncio +import aiohttp +import sqlite3 +from tqdm import tqdm +from datetime import datetime,timedelta +import os +from dotenv import load_dotenv +from concurrent.futures import ThreadPoolExecutor +from finra_api_queries import finra_api_queries + +# Load environment variables +load_dotenv() +api_key = os.getenv('FINRA_API_KEY') +api_secret = os.getenv('FINRA_API_SECRET') +api_token = finra_api_queries.retrieve_api_token(finra_api_key_input=api_key, finra_api_secret_input=api_secret) + +start_date = datetime.today() - timedelta(365) +end_date = datetime.today() +start_date = start_date.strftime("%Y-%m-%d") +end_date = end_date.strftime("%Y-%m-%d") + +dataset_name = "weekly_summary" +filtered_columns_input = ['issueSymbolIdentifier', 'marketParticipantName', 'totalWeeklyTradeCount', 'totalWeeklyShareQuantity', 'totalNotionalSum', 'initialPublishedDate'] +date_filter_inputs = [{'startDate': start_date, 'endDate': end_date, 'fieldName': 'initialPublishedDate'}] + + +def preserve_title_case(input_string): + # Convert the input string to title case + exceptions = ['LLC', 'LP', 'HRT', 'XTX', 'UBS'] + title_case_string = input_string.title() + + # Split the title case string into words + words = title_case_string.split() + + # Check each word against the exceptions list and replace if necessary + for i, word in enumerate(words): + if word.upper() in exceptions: + words[i] = word.upper() + + # Join the words back into a single string + result_string = ' '.join(words) + + return result_string.replace('And', '&') + + +async def get_data(ticker): + try: + filters_input = {'issueSymbolIdentifier': [ticker]} + + df = finra_api_queries.retrieve_dataset( + dataset_name, + api_token, + filtered_columns=filtered_columns_input, + filters = filters_input, + date_filter=date_filter_inputs) + + df = df.rename(columns={"initialPublishedDate": "date","marketParticipantName": "name", "issueSymbolIdentifier": "symbol"}) + df_copy = df.copy() + #Create new dataset for top 10 market makers with the highest activity + top_market_makers_df = df_copy.drop(['symbol','date'], axis=1) + top_market_makers_df = top_market_makers_df.groupby(['name']).mean().reset_index() + top_market_makers_df = top_market_makers_df.rename(columns={"totalWeeklyTradeCount": "avgWeeklyTradeCount","totalWeeklyShareQuantity": "avgWeeklyShareQuantity", "totalNotionalSum": "avgNotionalSum"}) + + top_market_makers_list = top_market_makers_df.to_dict('records') + top_market_makers_list = sorted(top_market_makers_list, key=lambda x: x['avgNotionalSum'], reverse=True)[0:10] + for item in top_market_makers_list: + item['name'] = preserve_title_case(item['name']) + + #Create new dataset for historical movements + + history_df = df_copy.drop(['symbol','name'], axis=1) + history_df = history_df.groupby(['date']).sum().reset_index() + history_data = history_df.to_dict('records') + + return {'topMarketMakers': top_market_makers_list, 'history': history_data} + + except Exception as e: + print(f"Error fetching data for {ticker}: {e}") + return {} + +async def save_json(symbol, data): + # Use async file writing to avoid blocking the event loop + loop = asyncio.get_event_loop() + path = f"json/market-maker/companies/{symbol}.json" + os.makedirs(os.path.dirname(path), exist_ok=True) + await loop.run_in_executor(None, ujson.dump, data, open(path, 'w')) + +async def process_ticker(ticker): + data = await get_data(ticker) + if len(data) > 0: + await save_json(ticker, data) + +async def run(): + con = sqlite3.connect('stocks.db') + etf_con = sqlite3.connect('etf.db') + + cursor = con.cursor() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%'") + stocks_symbols = [row[0] for row in cursor.fetchall()] + + etf_cursor = etf_con.cursor() + etf_cursor.execute("PRAGMA journal_mode = wal") + etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") + etf_symbols = [row[0] for row in etf_cursor.fetchall()] + + con.close() + etf_con.close() + + total_symbols = stocks_symbols #+ etf_symbols + + async with aiohttp.ClientSession() as session: + tasks = [] + for ticker in total_symbols: + tasks.append(process_ticker(ticker)) + + # Run tasks concurrently in batches to avoid too many open connections + batch_size = 10 # Adjust based on your system's capacity + for i in tqdm(range(0, len(tasks), batch_size)): + batch = tasks[i:i + batch_size] + await asyncio.gather(*batch) + + +if __name__ == "__main__": + try: + asyncio.run(run()) + except Exception as e: + print(f"An error occurred: {e}") diff --git a/app/main.py b/app/main.py index 85aaf82..6679401 100755 --- a/app/main.py +++ b/app/main.py @@ -2871,6 +2871,23 @@ async def get_dark_pool(data:TickerData): except: res = [] + redis_client.set(cache_key, ujson.dumps(res)) + redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day + return res + +@app.post("/market-maker") +async def get_market_maker(data:TickerData): + ticker = data.ticker.upper() + cache_key = f"market-maker-{ticker}" + cached_result = redis_client.get(cache_key) + if cached_result: + return ujson.loads(cached_result) + try: + with open(f"json/market-maker/companies/{ticker}.json", 'r') as file: + res = ujson.load(file) + except: + res = {} + redis_client.set(cache_key, ujson.dumps(res)) redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day return res \ No newline at end of file diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index 74045ce..8f18108 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -309,6 +309,17 @@ def run_dark_pool(): ] subprocess.run(command) +def run_market_maker(): + week = datetime.today().weekday() + if week <= 5: + subprocess.run(["python3", "cron_market_maker.py"]) + command = [ + "sudo", "rsync", "-avz", "-e", "ssh", + "/root/backend/app/json/market-maker", + f"root@{useast_ip_address}:/root/backend/app/json" + ] + subprocess.run(command) + # Create functions to run each schedule in a separate thread def run_threaded(job_func): job_thread = threading.Thread(target=job_func) @@ -339,6 +350,9 @@ schedule.every().day.at("14:00").do(run_threaded, run_cron_var).tag('var_job') schedule.every().day.at("15:45").do(run_threaded, run_restart_cache) + +schedule.every().saturday.at("01:00").do(run_threaded, run_market_maker).tag('markt_maker_job') + schedule.every(1).minutes.do(run_threaded, run_cron_portfolio).tag('portfolio_job') schedule.every(5).minutes.do(run_threaded, run_cron_market_movers).tag('market_movers_job')