update cron jobs

This commit is contained in:
MuslemRahimi 2024-12-29 16:26:41 +01:00
parent c618253767
commit 6e94223d3b
2 changed files with 119 additions and 77 deletions

View File

@ -7,6 +7,8 @@ import sqlite3
from datetime import datetime, timedelta from datetime import datetime, timedelta
import pytz import pytz
from typing import List, Dict from typing import List, Dict
import sqlite3
from tqdm import tqdm
def save_json(data, symbol): def save_json(data, symbol):
@ -38,42 +40,34 @@ def get_last_7_weekdays():
def analyze_dark_pool_levels(trades: List[Dict], def analyze_dark_pool_levels(trades: List[Dict],
size_threshold: float = 0.8, size_threshold: float = 0.8,
price_grouping: float = 1.0) -> Dict: price_grouping: float = 1.0) -> Dict:
# Convert to DataFrame for easier manipulation if not trades or not isinstance(trades, list):
return {}
try:
df = pd.DataFrame(trades) df = pd.DataFrame(trades)
if df.empty:
return {}
# Ensure necessary columns exist
if 'premium' not in df or 'price' not in df or 'size' not in df:
return {}
# Convert premium strings to float values # Convert premium strings to float values
df['premium'] = df['premium'].apply(lambda x: float(str(x).replace(',', ''))) df['premium'] = df['premium'].apply(lambda x: float(str(x).replace(',', '')))
df['price_level'] = (df['price'] / price_grouping).round(1) * price_grouping
# Round prices to group nearby levels
df['price_level'] = (df['price'] / price_grouping).round(2) * price_grouping
# Group by price level and sum volumes
size_by_price = df.groupby('price_level').agg({ size_by_price = df.groupby('price_level').agg({
'size': 'sum', 'size': 'sum',
'premium': 'sum' 'premium': 'sum'
}).reset_index() }).reset_index()
# Calculate volume threshold
min_size = size_by_price['size'].quantile(size_threshold) min_size = size_by_price['size'].quantile(size_threshold)
# Identify significant levels
significant_levels = size_by_price[size_by_price['size'] >= min_size] significant_levels = size_by_price[size_by_price['size'] >= min_size]
# Sort levels by volume to get strongest levels first
significant_levels = significant_levels.sort_values('size', ascending=False) significant_levels = significant_levels.sort_values('size', ascending=False)
# Separate into support and resistance based on current price
current_price = df['price'].iloc[-1] current_price = df['price'].iloc[-1]
support_levels = significant_levels[significant_levels['price_level'] < current_price].to_dict('records')
resistance_levels = significant_levels[significant_levels['price_level'] > current_price].to_dict('records')
support_levels = significant_levels[
significant_levels['price_level'] < current_price
].to_dict('records')
resistance_levels = significant_levels[
significant_levels['price_level'] > current_price
].to_dict('records')
# Calculate additional metrics
metrics = { metrics = {
'avgTradeSize': round(df['size'].mean(), 2), 'avgTradeSize': round(df['size'].mean(), 2),
'totalPrem': round(df['premium'].sum(), 2), 'totalPrem': round(df['premium'].sum(), 2),
@ -82,11 +76,35 @@ def analyze_dark_pool_levels(trades: List[Dict],
price_level = support_levels + resistance_levels price_level = support_levels + resistance_levels
price_level = sorted(price_level, key=lambda x: float(x['price_level'])) price_level = sorted(price_level, key=lambda x: float(x['price_level']))
return { return {
'price_level': price_level, 'price_level': price_level,
'metrics': metrics, 'metrics': metrics,
} }
except Exception as e:
print(f"Error analyzing dark pool levels: {e}")
return {}
def run():
con = sqlite3.connect('stocks.db')
etf_con = sqlite3.connect('etf.db')
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%'")
stocks_symbols = [row[0] for row in cursor.fetchall()]
etf_cursor = etf_con.cursor()
etf_cursor.execute("PRAGMA journal_mode = wal")
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
con.close()
etf_con.close()
total_symbols = stocks_symbols+ etf_symbols
data = [] data = []
weekdays = get_last_7_weekdays() weekdays = get_last_7_weekdays()
for date in weekdays: for date in weekdays:
@ -97,26 +115,36 @@ for date in weekdays:
except: except:
pass pass
symbol = "GME" for symbol in tqdm(total_symbols):
res_list = [item for item in data if item['ticker'] == symbol] try:
res_list = [item for item in data if isinstance(item, dict) and item.get('ticker', None) == symbol]
dark_pool_levels = analyze_dark_pool_levels( dark_pool_levels = analyze_dark_pool_levels(
trades=res_list, trades=res_list,
size_threshold=0.9, # Look for levels with volume in top 20% size_threshold=0.8, # Look for levels with volume in top 20%
price_grouping=1.0 # Group prices within $1.00 price_grouping=1.0 # Group prices within $1.00
) )
print(dark_pool_levels['metrics']) if dark_pool_levels.get('price_level'): # Ensure there are valid levels
top_5_elements = [
{k: v for k, v in item.items() if k not in ['ticker', 'sector', 'assetType']}
for item in sorted(res_list, key=lambda x: float(x.get('premium', 0)), reverse=True)[:5]
]
top_5_elements = [{k: v for k, v in item.items() if k not in ['ticker', 'sector', 'assetType']} for item in sorted(res_list, key=lambda x: float(x['premium']), reverse=True)[:5]]
# Add rank to each item # Add rank to each item
for rank, item in enumerate(top_5_elements, 1): for rank, item in enumerate(top_5_elements, 1):
item['rank'] = rank item['rank'] = rank
data = {'hottestTrades': top_5_elements, 'priceLevel': dark_pool_levels['price_level'], 'metrics': dark_pool_levels['metrics']} data_to_save = {
'hottestTrades': top_5_elements,
'priceLevel': dark_pool_levels['price_level'],
'metrics': dark_pool_levels['metrics']
}
if len(data) > 0: save_json(data_to_save, symbol)
save_json(data, symbol) except Exception as e:
#print(data) print(f"Error processing {symbol}: {e}")
if __name__ == "__main__":
run()

View File

@ -65,6 +65,19 @@ def run_dark_pool_flow():
if week <= 4 and 8 <= hour < 20: if week <= 4 and 8 <= hour < 20:
run_command(["python3", "cron_dark_pool_flow.py"]) run_command(["python3", "cron_dark_pool_flow.py"])
def run_dark_pool_level():
now = datetime.now(ny_tz)
week = now.weekday()
hour = now.hour
if week <= 4 and 8 <= hour < 20:
run_command(["python3", "cron_dark_pool_level.py"])
def run_dark_pool_ticker():
now = datetime.now(ny_tz)
week = now.weekday()
if week <= 5:
run_command(["python3", "cron_dark_pool_ticker.py"])
def run_fda_calendar(): def run_fda_calendar():
now = datetime.now(ny_tz) now = datetime.now(ny_tz)
week = now.weekday() week = now.weekday()
@ -329,10 +342,10 @@ schedule.every().day.at("05:00").do(run_threaded, run_options_gex).tag('options_
schedule.every().day.at("05:00").do(run_threaded, run_export_price).tag('export_price_job') schedule.every().day.at("05:00").do(run_threaded, run_export_price).tag('export_price_job')
schedule.every().day.at("06:00").do(run_threaded, run_historical_price).tag('historical_job') schedule.every().day.at("06:00").do(run_threaded, run_historical_price).tag('historical_job')
schedule.every().day.at("06:30").do(run_threaded, run_ai_score).tag('ai_score_job') schedule.every().day.at("06:30").do(run_threaded, run_ai_score).tag('ai_score_job')
schedule.every().day.at("07:00").do(run_threaded, run_ta_rating).tag('ta_rating_job') schedule.every().day.at("07:00").do(run_threaded, run_ta_rating).tag('ta_rating_job')
schedule.every().day.at("08:00").do(run_threaded, run_dark_pool_ticker).tag('dark_pool_ticker_job')
schedule.every().day.at("09:00").do(run_threaded, run_hedge_fund).tag('hedge_fund_job') schedule.every().day.at("09:00").do(run_threaded, run_hedge_fund).tag('hedge_fund_job')
schedule.every().day.at("07:30").do(run_threaded, run_financial_statements).tag('financial_statements_job') schedule.every().day.at("07:30").do(run_threaded, run_financial_statements).tag('financial_statements_job')
schedule.every().day.at("08:00").do(run_threaded, run_economy_indicator).tag('economy_indicator_job') schedule.every().day.at("08:00").do(run_threaded, run_economy_indicator).tag('economy_indicator_job')
@ -384,6 +397,7 @@ schedule.every(3).hours.do(run_threaded, run_press_releases).tag('press_release_
schedule.every(1).hours.do(run_threaded, run_fda_calendar).tag('fda_calendar_job') schedule.every(1).hours.do(run_threaded, run_fda_calendar).tag('fda_calendar_job')
schedule.every(5).minutes.do(run_threaded, run_dark_pool_level).tag('dark_pool_level_job')
schedule.every(10).seconds.do(run_threaded, run_dark_pool_flow).tag('dark_pool_flow_job') schedule.every(10).seconds.do(run_threaded, run_dark_pool_flow).tag('dark_pool_flow_job')
schedule.every(2).minutes.do(run_threaded, run_dashboard).tag('dashboard_job') schedule.every(2).minutes.do(run_threaded, run_dashboard).tag('dashboard_job')