diff --git a/app/cron_dark_pool_level.py b/app/cron_dark_pool_level.py new file mode 100644 index 0000000..efe7d6f --- /dev/null +++ b/app/cron_dark_pool_level.py @@ -0,0 +1,122 @@ +import os +import pandas as pd +import numpy as np +import orjson +from dotenv import load_dotenv +import sqlite3 +from datetime import datetime, timedelta +import pytz +from typing import List, Dict + + +def save_json(data, symbol): + def convert_numpy(obj): + if isinstance(obj, np.generic): + return obj.item() # Convert numpy scalar to Python scalar + raise TypeError(f"Type is not JSON serializable: {type(obj)}") + + directory = "json/dark-pool/price-level" + os.makedirs(directory, exist_ok=True) # Ensure the directory exists + with open(f"{directory}/{symbol}.json", 'wb') as file: # Use binary mode for orjson + file.write(orjson.dumps(data, default=convert_numpy)) + +# Function to get the last 7 weekdays +def get_last_7_weekdays(): + today = datetime.today() + weekdays = [] + + # Start from today and go back until we have 7 weekdays + while len(weekdays) < 7: + if today.weekday() < 5: # Monday to Friday are weekdays (0-4) + weekdays.append(today) + today -= timedelta(days=1) + + weekdays = [item.strftime("%Y-%m-%d") for item in weekdays] + return weekdays + + +def analyze_dark_pool_levels(trades: List[Dict], + size_threshold: float = 0.8, + price_grouping: float = 1.0) -> Dict: + # Convert to DataFrame for easier manipulation + df = pd.DataFrame(trades) + + # Convert premium strings to float values + df['premium'] = df['premium'].apply(lambda x: float(str(x).replace(',', ''))) + + # Round prices to group nearby levels + df['price_level'] = (df['price'] / price_grouping).round(2) * price_grouping + + # Group by price level and sum volumes + size_by_price = df.groupby('price_level').agg({ + 'size': 'sum', + 'premium': 'sum' + }).reset_index() + + # Calculate volume threshold + min_size = size_by_price['size'].quantile(size_threshold) + + # Identify significant levels + significant_levels = size_by_price[size_by_price['size'] >= min_size] + + # Sort levels by volume to get strongest levels first + significant_levels = significant_levels.sort_values('size', ascending=False) + + # Separate into support and resistance based on current price + current_price = df['price'].iloc[-1] + + support_levels = significant_levels[ + significant_levels['price_level'] < current_price + ].to_dict('records') + + resistance_levels = significant_levels[ + significant_levels['price_level'] > current_price + ].to_dict('records') + + # Calculate additional metrics + metrics = { + 'avgTradeSize': round(df['size'].mean(),2), + 'totalPrem': round(df['premium'].sum(),2), + 'avgPremTrade': round(df['premium'].mean(),2) + } + + price_level = support_levels+resistance_levels + price_level = sorted(price_level, key=lambda x: float(x['price_level'])) + return { + 'price_level': price_level, + 'metrics': metrics, + } + +data = [] +weekdays = get_last_7_weekdays() +for date in weekdays: + try: + with open(f"json/dark-pool/historical-flow/{date}.json", "r") as file: + raw_data = orjson.loads(file.read()) + data +=raw_data + except: + pass + +symbol = "GME" +res_list = [item for item in data if item['ticker'] == symbol] + + +dark_pool_levels = analyze_dark_pool_levels( + trades=res_list, + size_threshold=0.9, # Look for levels with volume in top 20% + price_grouping=1.0 # Group prices within $1.00 +) + +print(dark_pool_levels['metrics']) + + +top_5_elements = [{k: v for k, v in item.items() if k not in ['ticker', 'sector', 'assetType']} for item in sorted(res_list, key=lambda x: float(x['premium']), reverse=True)[:5]] +# Add rank to each item +for rank, item in enumerate(top_5_elements, 1): + item['rank'] = rank + +data = {'hottestTrades': top_5_elements, 'priceLevel': dark_pool_levels['price_level'], 'metrics': dark_pool_levels['metrics']} + +if len(data) > 0: + save_json(data, symbol) +#print(data) \ No newline at end of file diff --git a/app/cron_dark_pool.py b/app/cron_dark_pool_ticker.py similarity index 100% rename from app/cron_dark_pool.py rename to app/cron_dark_pool_ticker.py diff --git a/app/main.py b/app/main.py index 12a7da5..3df00e0 100755 --- a/app/main.py +++ b/app/main.py @@ -3359,57 +3359,63 @@ async def get_most_shorted_stocks(api_key: str = Security(get_api_key)): redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day return res -@app.get("/most-retail-volume") -async def get_most_retail_volume(api_key: str = Security(get_api_key)): - cache_key = f"most-retail-volume" - cached_result = redis_client.get(cache_key) - if cached_result: - return orjson.loads(cached_result) - try: - with open(f"json/retail-volume/data.json", 'rb') as file: - res = orjson.loads(file.read()) - except: - res = [] - redis_client.set(cache_key, orjson.dumps(res)) - redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day - return res - - -@app.post("/retail-volume") -async def get_retail_volume(data:TickerData, api_key: str = Security(get_api_key)): - ticker = data.ticker.upper() - cache_key = f"retail-volume-{ticker}" - cached_result = redis_client.get(cache_key) - if cached_result: - return orjson.loads(cached_result) - try: - with open(f"json/retail-volume/companies/{ticker}.json", 'rb') as file: - res = orjson.loads(file.read()) - except: - res = {} - - redis_client.set(cache_key, orjson.dumps(res)) - redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day - return res - -@app.post("/dark-pool") +@app.post("/historical-dark-pool") async def get_dark_pool(data:TickerData, api_key: str = Security(get_api_key)): ticker = data.ticker.upper() - cache_key = f"dark-pool-{ticker}" + cache_key = f"historical-dark-pool-{ticker}" cached_result = redis_client.get(cache_key) if cached_result: - return orjson.loads(cached_result) + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"}) + try: with open(f"json/dark-pool/companies/{ticker}.json", 'rb') as file: res = orjson.loads(file.read()) except: res = [] - redis_client.set(cache_key, orjson.dumps(res)) - redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day - return res + data = orjson.dumps(res) + compressed_data = gzip.compress(data) + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 3600*60) + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + + +@app.post("/dark-pool-level") +async def get_dark_pool(data:TickerData, api_key: str = Security(get_api_key)): + ticker = data.ticker.upper() + cache_key = f"dark-pool-level-{ticker}" + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"}) + + try: + with open(f"json/dark-pool/price-level/{ticker}.json", 'rb') as file: + res = orjson.loads(file.read()) + except: + res = [] + + data = orjson.dumps(res) + compressed_data = gzip.compress(data) + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 60*5) + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) @app.get("/dark-pool-flow") async def get_dark_pool_flow(api_key: str = Security(get_api_key)): diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index 671ffa5..3744a7c 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -384,7 +384,7 @@ schedule.every(3).hours.do(run_threaded, run_press_releases).tag('press_release_ schedule.every(1).hours.do(run_threaded, run_fda_calendar).tag('fda_calendar_job') -schedule.every(1).minutes.do(run_threaded, run_dark_pool_flow).tag('dark_pool_flow_job') +schedule.every(10).seconds.do(run_threaded, run_dark_pool_flow).tag('dark_pool_flow_job') schedule.every(2).minutes.do(run_threaded, run_dashboard).tag('dashboard_job')