update cron job

This commit is contained in:
MuslemRahimi 2025-03-07 14:58:54 +01:00
parent 14c189add6
commit fa6b523d5c
2 changed files with 70 additions and 25 deletions

View File

@ -9,7 +9,12 @@ import pytz
from typing import List, Dict from typing import List, Dict
import sqlite3 import sqlite3
from tqdm import tqdm from tqdm import tqdm
import time
from collections import defaultdict
from utils.helper import check_market_hours
utc = pytz.utc
ny_tz = pytz.timezone("America/New_York")
def save_json(data, symbol): def save_json(data, symbol):
def convert_numpy(obj): def convert_numpy(obj):
@ -84,6 +89,39 @@ def analyze_dark_pool_levels(trades: List[Dict],
return {} return {}
def today_trend(data):
filtered_list = []
result = []
for item in data:
try:
# Convert date to NY timezone
dt_utc = datetime.fromisoformat(item['date'][:-6]).replace(tzinfo=utc)
dt_ny = dt_utc.astimezone(ny_tz)
# Define trading hours (9:30 AM - 4:00 PM NY time)
market_open = dt_ny.replace(hour=9, minute=30, second=0, microsecond=0)
market_close = dt_ny.replace(hour=16, minute=0, second=0, microsecond=0)
if market_open <= dt_ny <= market_close: # Filter valid times
filtered_list.append({
'size': item['size'],
'date': dt_ny.strftime("%Y-%m-%d %H:%M") # Format as HH:MM
})
except:
pass
filtered_list.sort(key=lambda x: datetime.strptime(x['date'], "%Y-%m-%d %H:%M"))
summed_data = defaultdict(float)
for entry in filtered_list:
try:
summed_data[entry['date']] += entry['size']
except:
pass
result = [{'date': date, 'totalSize': size} for date, size in summed_data.items()]
return result
def run(): def run():
con = sqlite3.connect('stocks.db') con = sqlite3.connect('stocks.db')
@ -105,34 +143,41 @@ def run():
total_symbols = stocks_symbols+ etf_symbols total_symbols = stocks_symbols+ etf_symbols
with open(f"json/dark-pool/feed/data.json", "r") as file: with open(f"json/dark-pool/feed/data.json", "r") as file:
raw_data = orjson.loads(file.read()) raw_data = orjson.loads(file.read())
for symbol in tqdm(total_symbols):
try:
res_list = [item for item in raw_data if isinstance(item, dict) and item['ticker'] == symbol]
dark_pool_levels = analyze_dark_pool_levels(
trades=res_list,
size_threshold=0.8, # Look for levels with volume in top 20%
price_grouping=1.0 # Group prices within $1.00
)
if dark_pool_levels.get('price_level'): # Ensure there are valid levels market_status = check_market_hours()
top_5_elements = [ if market_status:
{k: v for k, v in item.items() if k not in ['ticker', 'sector', 'assetType']} for symbol in tqdm(total_symbols):
for item in sorted(res_list, key=lambda x: float(x.get('premium', 0)), reverse=True)[:5] try:
] res_list = [item for item in raw_data if isinstance(item, dict) and item['ticker'] == symbol]
trend_list = today_trend(res_list)
dark_pool_levels = analyze_dark_pool_levels(
trades=res_list,
size_threshold=0.8,
price_grouping=1.0
)
if dark_pool_levels.get('price_level'): # Ensure there are valid levels
top_5_elements = [
{k: v for k, v in item.items() if k not in ['ticker', 'sector', 'assetType']}
for item in sorted(res_list, key=lambda x: float(x.get('premium', 0)), reverse=True)[:5]
]
# Add rank to each item # Add rank to each item
for rank, item in enumerate(top_5_elements, 1): for rank, item in enumerate(top_5_elements, 1):
item['rank'] = rank item['rank'] = rank
data_to_save = { data_to_save = {
'hottestTrades': top_5_elements, 'hottestTrades': top_5_elements,
'priceLevel': dark_pool_levels['price_level'], 'priceLevel': dark_pool_levels['price_level'],
'metrics': dark_pool_levels['metrics'] 'trend': trend_list,
} 'metrics': dark_pool_levels['metrics']
}
save_json(data_to_save, symbol) save_json(data_to_save, symbol)
except Exception as e: except Exception as e:
print(f"Error processing {symbol}: {e}") print(f"Error processing {symbol}: {e}")

View File

@ -416,7 +416,7 @@ schedule.every(5).minutes.do(run_threaded, run_list).tag('stock_list_job')
schedule.every(30).minutes.do(run_threaded, run_dark_pool_level).tag('dark_pool_level_job') schedule.every(15).minutes.do(run_threaded, run_dark_pool_level).tag('dark_pool_level_job')
schedule.every(10).minutes.do(run_threaded, run_dark_pool_flow).tag('dark_pool_flow_job') schedule.every(10).minutes.do(run_threaded, run_dark_pool_flow).tag('dark_pool_flow_job')
schedule.every(2).minutes.do(run_threaded, run_dashboard).tag('dashboard_job') schedule.every(2).minutes.do(run_threaded, run_dashboard).tag('dashboard_job')