diff --git a/app/cron_dark_pool_flow.py b/app/cron_dark_pool_flow.py new file mode 100644 index 0000000..3c33e62 --- /dev/null +++ b/app/cron_dark_pool_flow.py @@ -0,0 +1,93 @@ +import time +from datetime import datetime +from GetStartEndDate import GetStartEndDate + +import intrinio_sdk as intrinio +import ujson +import sqlite3 +import pytz + +from dotenv import load_dotenv +import os + +ny_tz = pytz.timezone('America/New_York') + + +load_dotenv() +api_key = os.getenv('INTRINIO_API_KEY') + +intrinio.ApiClient().set_api_key(api_key) +intrinio.ApiClient().allow_retries(True) + +def save_json(data): + with open(f"json/dark-pool/flow/data.json", 'w') as file: + ujson.dump(data, file) + + +source = 'cta_a_delayed' +start_date, end_date = GetStartEndDate().run() +start_time = '' +end_time = '' +timezone = 'UTC' +page_size = 1000 +darkpool_only = True +min_size = 100 +count = 0 + + +def get_data(): + data = [] + count = 0 + while True: + if count == 0: + next_page = '' + try: + response = intrinio.SecurityApi().get_security_trades(source, start_date=start_date, start_time=start_time, end_date=end_date, end_time=end_time, timezone=timezone, page_size=page_size, darkpool_only=darkpool_only, min_size=min_size, next_page=next_page) + data += response.trades + + next_page = response.next_page + if not next_page or count == 10: + break + count +=1 + except: + pass + + return data + +def run(): + con = sqlite3.connect('stocks.db') + cursor = con.cursor() + cursor.execute("SELECT DISTINCT symbol, name FROM stocks") + stocks = cursor.fetchall() + con.close() + + symbol_name_map = {row[0]: row[1] for row in stocks} + stock_symbols = list(symbol_name_map.keys()) + data = get_data() + + print(len(data)) + # Convert each SecurityTrades object to a dictionary + data_dicts = [entry.__dict__ for entry in data] + # Filter the data + filtered_data = [entry for entry in data_dicts if entry['_symbol'] in stock_symbols] + res = [ + { + 'symbol': entry['_symbol'], + 'name': symbol_name_map[entry['_symbol']], + 'date': entry['_timestamp'].astimezone(ny_tz).isoformat(), + 'price': entry['_price'], + 'volume': entry['_total_volume'], + 'size': entry['_size'] + } + for entry in filtered_data + ] + + if len(res) > 0: + save_json(res) + + +if __name__ == "__main__": + try: + run() + except Exception as e: + print(f"An error occurred: {e}") diff --git a/app/main.py b/app/main.py index 64810af..9ed2a2e 100755 --- a/app/main.py +++ b/app/main.py @@ -2611,7 +2611,7 @@ async def get_wiim(data:TickerData): try: with open(f"json/wiim/company/{ticker}.json", 'r') as file: - res = ujson.load(file)[:10] + res = ujson.load(file)[:5] except: res = [] @@ -2921,6 +2921,35 @@ async def get_dark_pool(data:TickerData): redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day return res + +@app.get("/dark-pool-flow") +async def get_dark_pool_flow(): + cache_key = f"dark-flow-flow" + + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"}) + try: + with open(f"json/dark-pool/flow/data.json", 'r') as file: + res = ujson.load(file) + except: + res = [] + + data = ujson.dumps(res).encode('utf-8') + compressed_data = gzip.compress(data) + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 60*15) # Set cache expiration time to 15 min + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + + @app.post("/market-maker") async def get_market_maker(data:TickerData): ticker = data.ticker.upper() diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index cba01c5..24781c1 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -333,6 +333,19 @@ def run_dark_pool(): ] run_command(command) + +def run_dark_pool_flow(): + week = datetime.today().weekday() + if week <= 5: + run_command(["python3", "cron_dark_pool_flow.py"]) + command = [ + "sudo", "rsync", "-avz", "-e", "ssh", + "/root/backend/app/json/dark-pool/flow", + f"root@{useast_ip_address}:/root/backend/app/json/dark-pool" + ] + run_command(command) + + def run_market_maker(): week = datetime.today().weekday() if week <= 5: @@ -436,9 +449,9 @@ schedule.every().day.at("10:00").do(run_threaded, run_shareholders).tag('shareho schedule.every().day.at("10:15").do(run_threaded, run_share_statistics).tag('share_statistics_job') schedule.every().day.at("10:30").do(run_threaded, run_sec_filings).tag('sec_filings_job') schedule.every().day.at("11:00").do(run_threaded, run_executive).tag('executive_job') -schedule.every().day.at("11:30").do(run_threaded, run_retail_volume).tag('retail_volume_job') +schedule.every().day.at("03:00").do(run_threaded, run_retail_volume).tag('retail_volume_job') schedule.every().day.at("11:45").do(run_threaded, run_clinical_trial).tag('clinical_trial_job') -schedule.every().day.at("12:00").do(run_threaded, run_implied_volatility).tag('implied_volatility_job') +schedule.every().day.at("02:00").do(run_threaded, run_implied_volatility).tag('implied_volatility_job') schedule.every().day.at("13:30").do(run_threaded, run_stockdeck).tag('stockdeck_job') @@ -450,7 +463,7 @@ schedule.every().day.at("14:00").do(run_threaded, run_cron_var).tag('var_job') schedule.every().day.at("15:45").do(run_threaded, run_restart_cache) schedule.every(2).days.at("01:00").do(run_borrowed_share).tag('borrowed_share_job') -schedule.every().saturday.at("01:00").do(run_threaded, run_market_maker).tag('markt_maker_job') +schedule.every(2).days.at("01:00").do(run_threaded, run_market_maker).tag('markt_maker_job') schedule.every().saturday.at("05:00").do(run_threaded, run_ownership_stats).tag('ownership_stats_job') @@ -464,6 +477,8 @@ schedule.every(15).minutes.do(run_threaded, run_cron_heatmap).tag('heatmap_job') schedule.every(1).minutes.do(run_threaded, run_cron_quote).tag('quote_job') schedule.every(1).minutes.do(run_threaded, run_cron_price_alert).tag('price_alert_job') schedule.every(15).minutes.do(run_threaded, run_market_moods).tag('market_moods_job') +schedule.every(20).minutes.do(run_threaded, run_dark_pool_flow).tag('dark_pool_flow_job') + schedule.every(2).hours.do(run_threaded, run_fda_calendar).tag('fda_calendar_job') schedule.every(3).hours.do(run_threaded, run_json_job).tag('json_job') schedule.every(12).hours.do(run_threaded, run_analyst_rating).tag('analyst_job') diff --git a/app/quant-analysis/daily_return.png b/app/quant-analysis/daily_return.png index 8661585..7f73983 100644 Binary files a/app/quant-analysis/daily_return.png and b/app/quant-analysis/daily_return.png differ diff --git a/app/quant-analysis/histogram.png b/app/quant-analysis/histogram.png index ee73df5..efe18c7 100644 Binary files a/app/quant-analysis/histogram.png and b/app/quant-analysis/histogram.png differ diff --git a/app/quant-analysis/simulation.png b/app/quant-analysis/simulation.png index 346105a..3e75cb2 100644 Binary files a/app/quant-analysis/simulation.png and b/app/quant-analysis/simulation.png differ