From fa6b523d5c2cfedab8baa0c8825d51f38fb41df0 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Fri, 7 Mar 2025 14:58:54 +0100 Subject: [PATCH] update cron job --- app/cron_dark_pool_level.py | 93 +++++++++++++++++++++++++++---------- app/primary_cron_job.py | 2 +- 2 files changed, 70 insertions(+), 25 deletions(-) diff --git a/app/cron_dark_pool_level.py b/app/cron_dark_pool_level.py index be4fa66..3ff0c0b 100644 --- a/app/cron_dark_pool_level.py +++ b/app/cron_dark_pool_level.py @@ -9,7 +9,12 @@ import pytz from typing import List, Dict import sqlite3 from tqdm import tqdm +import time +from collections import defaultdict +from utils.helper import check_market_hours +utc = pytz.utc +ny_tz = pytz.timezone("America/New_York") def save_json(data, symbol): def convert_numpy(obj): @@ -84,6 +89,39 @@ def analyze_dark_pool_levels(trades: List[Dict], return {} +def today_trend(data): + filtered_list = [] + result = [] + for item in data: + try: + # Convert date to NY timezone + dt_utc = datetime.fromisoformat(item['date'][:-6]).replace(tzinfo=utc) + dt_ny = dt_utc.astimezone(ny_tz) + + # Define trading hours (9:30 AM - 4:00 PM NY time) + market_open = dt_ny.replace(hour=9, minute=30, second=0, microsecond=0) + market_close = dt_ny.replace(hour=16, minute=0, second=0, microsecond=0) + + if market_open <= dt_ny <= market_close: # Filter valid times + filtered_list.append({ + 'size': item['size'], + 'date': dt_ny.strftime("%Y-%m-%d %H:%M") # Format as HH:MM + }) + except: + pass + + filtered_list.sort(key=lambda x: datetime.strptime(x['date'], "%Y-%m-%d %H:%M")) + + summed_data = defaultdict(float) + for entry in filtered_list: + try: + summed_data[entry['date']] += entry['size'] + except: + pass + + result = [{'date': date, 'totalSize': size} for date, size in summed_data.items()] + + return result def run(): con = sqlite3.connect('stocks.db') @@ -105,34 +143,41 @@ def run(): total_symbols = stocks_symbols+ etf_symbols with open(f"json/dark-pool/feed/data.json", "r") as file: raw_data = orjson.loads(file.read()) - for symbol in tqdm(total_symbols): - try: - res_list = [item for item in raw_data if isinstance(item, dict) and item['ticker'] == symbol] - dark_pool_levels = analyze_dark_pool_levels( - trades=res_list, - size_threshold=0.8, # Look for levels with volume in top 20% - price_grouping=1.0 # Group prices within $1.00 - ) - if dark_pool_levels.get('price_level'): # Ensure there are valid levels - top_5_elements = [ - {k: v for k, v in item.items() if k not in ['ticker', 'sector', 'assetType']} - for item in sorted(res_list, key=lambda x: float(x.get('premium', 0)), reverse=True)[:5] - ] + market_status = check_market_hours() + if market_status: + for symbol in tqdm(total_symbols): + try: + res_list = [item for item in raw_data if isinstance(item, dict) and item['ticker'] == symbol] + + trend_list = today_trend(res_list) + + dark_pool_levels = analyze_dark_pool_levels( + trades=res_list, + size_threshold=0.8, + price_grouping=1.0 + ) + + if dark_pool_levels.get('price_level'): # Ensure there are valid levels + top_5_elements = [ + {k: v for k, v in item.items() if k not in ['ticker', 'sector', 'assetType']} + for item in sorted(res_list, key=lambda x: float(x.get('premium', 0)), reverse=True)[:5] + ] - # Add rank to each item - for rank, item in enumerate(top_5_elements, 1): - item['rank'] = rank + # Add rank to each item + for rank, item in enumerate(top_5_elements, 1): + item['rank'] = rank - data_to_save = { - 'hottestTrades': top_5_elements, - 'priceLevel': dark_pool_levels['price_level'], - 'metrics': dark_pool_levels['metrics'] - } + data_to_save = { + 'hottestTrades': top_5_elements, + 'priceLevel': dark_pool_levels['price_level'], + 'trend': trend_list, + 'metrics': dark_pool_levels['metrics'] + } - save_json(data_to_save, symbol) - except Exception as e: - print(f"Error processing {symbol}: {e}") + save_json(data_to_save, symbol) + except Exception as e: + print(f"Error processing {symbol}: {e}") diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index b318ecb..cfc15b9 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -416,7 +416,7 @@ schedule.every(5).minutes.do(run_threaded, run_list).tag('stock_list_job') -schedule.every(30).minutes.do(run_threaded, run_dark_pool_level).tag('dark_pool_level_job') +schedule.every(15).minutes.do(run_threaded, run_dark_pool_level).tag('dark_pool_level_job') schedule.every(10).minutes.do(run_threaded, run_dark_pool_flow).tag('dark_pool_flow_job') schedule.every(2).minutes.do(run_threaded, run_dashboard).tag('dashboard_job')