From 6e94223d3bcdbb2352821f415931cbed41c0f70e Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Sun, 29 Dec 2024 16:26:41 +0100 Subject: [PATCH] update cron jobs --- app/cron_dark_pool_level.py | 180 +++++++++++++++++++++--------------- app/primary_cron_job.py | 16 +++- 2 files changed, 119 insertions(+), 77 deletions(-) diff --git a/app/cron_dark_pool_level.py b/app/cron_dark_pool_level.py index efe7d6f..851a516 100644 --- a/app/cron_dark_pool_level.py +++ b/app/cron_dark_pool_level.py @@ -7,6 +7,8 @@ import sqlite3 from datetime import datetime, timedelta import pytz from typing import List, Dict +import sqlite3 +from tqdm import tqdm def save_json(data, symbol): @@ -36,87 +38,113 @@ def get_last_7_weekdays(): def analyze_dark_pool_levels(trades: List[Dict], - size_threshold: float = 0.8, - price_grouping: float = 1.0) -> Dict: - # Convert to DataFrame for easier manipulation - df = pd.DataFrame(trades) - - # Convert premium strings to float values - df['premium'] = df['premium'].apply(lambda x: float(str(x).replace(',', ''))) - - # Round prices to group nearby levels - df['price_level'] = (df['price'] / price_grouping).round(2) * price_grouping - - # Group by price level and sum volumes - size_by_price = df.groupby('price_level').agg({ - 'size': 'sum', - 'premium': 'sum' - }).reset_index() - - # Calculate volume threshold - min_size = size_by_price['size'].quantile(size_threshold) - - # Identify significant levels - significant_levels = size_by_price[size_by_price['size'] >= min_size] - - # Sort levels by volume to get strongest levels first - significant_levels = significant_levels.sort_values('size', ascending=False) - - # Separate into support and resistance based on current price - current_price = df['price'].iloc[-1] - - support_levels = significant_levels[ - significant_levels['price_level'] < current_price - ].to_dict('records') - - resistance_levels = significant_levels[ - significant_levels['price_level'] > current_price - ].to_dict('records') - - # Calculate additional metrics - metrics = { - 'avgTradeSize': round(df['size'].mean(),2), - 'totalPrem': round(df['premium'].sum(),2), - 'avgPremTrade': round(df['premium'].mean(),2) - } - - price_level = support_levels+resistance_levels - price_level = sorted(price_level, key=lambda x: float(x['price_level'])) - return { - 'price_level': price_level, - 'metrics': metrics, - } - -data = [] -weekdays = get_last_7_weekdays() -for date in weekdays: + size_threshold: float = 0.8, + price_grouping: float = 1.0) -> Dict: + if not trades or not isinstance(trades, list): + return {} try: - with open(f"json/dark-pool/historical-flow/{date}.json", "r") as file: - raw_data = orjson.loads(file.read()) - data +=raw_data - except: - pass + df = pd.DataFrame(trades) + if df.empty: + return {} -symbol = "GME" -res_list = [item for item in data if item['ticker'] == symbol] + # Ensure necessary columns exist + if 'premium' not in df or 'price' not in df or 'size' not in df: + return {} + + # Convert premium strings to float values + df['premium'] = df['premium'].apply(lambda x: float(str(x).replace(',', ''))) + df['price_level'] = (df['price'] / price_grouping).round(1) * price_grouping + + size_by_price = df.groupby('price_level').agg({ + 'size': 'sum', + 'premium': 'sum' + }).reset_index() + + min_size = size_by_price['size'].quantile(size_threshold) + significant_levels = size_by_price[size_by_price['size'] >= min_size] + significant_levels = significant_levels.sort_values('size', ascending=False) + + current_price = df['price'].iloc[-1] + support_levels = significant_levels[significant_levels['price_level'] < current_price].to_dict('records') + resistance_levels = significant_levels[significant_levels['price_level'] > current_price].to_dict('records') + + metrics = { + 'avgTradeSize': round(df['size'].mean(), 2), + 'totalPrem': round(df['premium'].sum(), 2), + 'avgPremTrade': round(df['premium'].mean(), 2) + } + + price_level = support_levels + resistance_levels + price_level = sorted(price_level, key=lambda x: float(x['price_level'])) + + return { + 'price_level': price_level, + 'metrics': metrics, + } + except Exception as e: + print(f"Error analyzing dark pool levels: {e}") + return {} -dark_pool_levels = analyze_dark_pool_levels( - trades=res_list, - size_threshold=0.9, # Look for levels with volume in top 20% - price_grouping=1.0 # Group prices within $1.00 -) -print(dark_pool_levels['metrics']) +def run(): + con = sqlite3.connect('stocks.db') + etf_con = sqlite3.connect('etf.db') + + cursor = con.cursor() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%'") + stocks_symbols = [row[0] for row in cursor.fetchall()] + + etf_cursor = etf_con.cursor() + etf_cursor.execute("PRAGMA journal_mode = wal") + etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") + etf_symbols = [row[0] for row in etf_cursor.fetchall()] + + con.close() + etf_con.close() + + total_symbols = stocks_symbols+ etf_symbols + data = [] + weekdays = get_last_7_weekdays() + for date in weekdays: + try: + with open(f"json/dark-pool/historical-flow/{date}.json", "r") as file: + raw_data = orjson.loads(file.read()) + data +=raw_data + except: + pass + + for symbol in tqdm(total_symbols): + try: + res_list = [item for item in data if isinstance(item, dict) and item.get('ticker', None) == symbol] + dark_pool_levels = analyze_dark_pool_levels( + trades=res_list, + size_threshold=0.8, # Look for levels with volume in top 20% + price_grouping=1.0 # Group prices within $1.00 + ) + + if dark_pool_levels.get('price_level'): # Ensure there are valid levels + top_5_elements = [ + {k: v for k, v in item.items() if k not in ['ticker', 'sector', 'assetType']} + for item in sorted(res_list, key=lambda x: float(x.get('premium', 0)), reverse=True)[:5] + ] + + # Add rank to each item + for rank, item in enumerate(top_5_elements, 1): + item['rank'] = rank + + data_to_save = { + 'hottestTrades': top_5_elements, + 'priceLevel': dark_pool_levels['price_level'], + 'metrics': dark_pool_levels['metrics'] + } + + save_json(data_to_save, symbol) + except Exception as e: + print(f"Error processing {symbol}: {e}") -top_5_elements = [{k: v for k, v in item.items() if k not in ['ticker', 'sector', 'assetType']} for item in sorted(res_list, key=lambda x: float(x['premium']), reverse=True)[:5]] -# Add rank to each item -for rank, item in enumerate(top_5_elements, 1): - item['rank'] = rank -data = {'hottestTrades': top_5_elements, 'priceLevel': dark_pool_levels['price_level'], 'metrics': dark_pool_levels['metrics']} - -if len(data) > 0: - save_json(data, symbol) -#print(data) \ No newline at end of file +if __name__ == "__main__": + run() \ No newline at end of file diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index 3744a7c..76f542b 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -65,6 +65,19 @@ def run_dark_pool_flow(): if week <= 4 and 8 <= hour < 20: run_command(["python3", "cron_dark_pool_flow.py"]) +def run_dark_pool_level(): + now = datetime.now(ny_tz) + week = now.weekday() + hour = now.hour + if week <= 4 and 8 <= hour < 20: + run_command(["python3", "cron_dark_pool_level.py"]) + +def run_dark_pool_ticker(): + now = datetime.now(ny_tz) + week = now.weekday() + if week <= 5: + run_command(["python3", "cron_dark_pool_ticker.py"]) + def run_fda_calendar(): now = datetime.now(ny_tz) week = now.weekday() @@ -329,10 +342,10 @@ schedule.every().day.at("05:00").do(run_threaded, run_options_gex).tag('options_ schedule.every().day.at("05:00").do(run_threaded, run_export_price).tag('export_price_job') schedule.every().day.at("06:00").do(run_threaded, run_historical_price).tag('historical_job') - schedule.every().day.at("06:30").do(run_threaded, run_ai_score).tag('ai_score_job') schedule.every().day.at("07:00").do(run_threaded, run_ta_rating).tag('ta_rating_job') +schedule.every().day.at("08:00").do(run_threaded, run_dark_pool_ticker).tag('dark_pool_ticker_job') schedule.every().day.at("09:00").do(run_threaded, run_hedge_fund).tag('hedge_fund_job') schedule.every().day.at("07:30").do(run_threaded, run_financial_statements).tag('financial_statements_job') schedule.every().day.at("08:00").do(run_threaded, run_economy_indicator).tag('economy_indicator_job') @@ -384,6 +397,7 @@ schedule.every(3).hours.do(run_threaded, run_press_releases).tag('press_release_ schedule.every(1).hours.do(run_threaded, run_fda_calendar).tag('fda_calendar_job') +schedule.every(5).minutes.do(run_threaded, run_dark_pool_level).tag('dark_pool_level_job') schedule.every(10).seconds.do(run_threaded, run_dark_pool_flow).tag('dark_pool_flow_job') schedule.every(2).minutes.do(run_threaded, run_dashboard).tag('dashboard_job')