diff --git a/app/cron_dark_pool_flow.py b/app/cron_dark_pool_flow.py index 28def05..0f026d6 100644 --- a/app/cron_dark_pool_flow.py +++ b/app/cron_dark_pool_flow.py @@ -8,17 +8,18 @@ import pytz import requests # Add missing import from dateutil.parser import isoparse from utils.helper import load_latest_json - +import time +import hashlib +import requests +from tqdm import tqdm load_dotenv() -api_key = os.getenv('UNUSUAL_WHALES_API_KEY') +api_key = os.getenv('INTRINIO_API_KEY') -querystring = {"limit": "200"} -url = "https://api.unusualwhales.com/api/darkpool/recent" -headers = { - "Accept": "application/json, text/plain", - "Authorization": api_key -} +today_date = datetime.now(timezone.utc).strftime('%Y-%m-%d') + +start_date = today_date +end_date = today_date with open(f"json/stock-screener/data.json", 'rb') as file: stock_screener_data = orjson.loads(file.read()) @@ -39,7 +40,8 @@ def get_quote_data(symbol): return None -def save_to_daily_file(data, directory): +def save_json(data): + directory = "json/dark-pool/feed" try: # Create a set to track unique entries based on a combination of 'ticker' and 'trackingID' seen = set() @@ -60,16 +62,15 @@ def save_to_daily_file(data, directory): else: first_date = datetime.now().strftime('%Y-%m-%d') # Fallback in case data is empty - json_file_path = os.path.join(directory, f"{first_date}.json") # Ensure the directory exists os.makedirs(directory, exist_ok=True) # Save the data to the dated JSON file - with open(json_file_path, 'wb') as file: + with open(directory+"/data.json", 'wb') as file: file.write(orjson.dumps(latest_data)) - print(f"Saved {len(latest_data)} unique and latest ratings to {json_file_path}.") + print(f"Saved {len(latest_data)} unique datapoints") except Exception as e: print(f"An error occurred while saving data: {e}") @@ -77,65 +78,122 @@ def save_to_daily_file(data, directory): def get_data(): try: - response = requests.get(url, headers=headers, params=querystring) - return response.json().get('data', []) + unique_trades = {} + sources = ['utp_delayed', 'cta_a_delayed', 'cta_b_delayed'] + page_size = 50000 + min_size = 2000 + threshold = 1E6 # Define threshold + + for source in tqdm(sources): + try: + next_page = '' # Reset for each source + while True: + # Build the URL with the current page (if available) + url = ( + f"https://api-v2.intrinio.com/securities/trades?" + f"timezone=UTC&source={source}&start_date={start_date}&end_date={end_date}" + f"&page_size={page_size}&min_size={min_size}" + ) + if next_page: + url += f"&next_page={next_page}" + url += f"&darkpool_only=true&api_key={api_key}" + + response = requests.get(url) + if response.status_code == 200: + output = response.json() + trades = output.get("trades", []) + + # Process each trade and maintain uniqueness + for trade in trades: + price = trade.get("price", 0) + size = trade.get("size", 0) + + if price * size > threshold: # Apply filtering condition + unique_key = ( + f"{trade.get('symbol')}_" + f"{trade.get('timestamp')}_" + f"{trade.get('price')}_" + f"{trade.get('total_volume')}" + ) + if unique_key not in unique_trades: + unique_trades[unique_key] = trade + + # Check if there's a next page; if not, exit the loop + next_page = output.get("next_page") + print(next_page) + if not next_page: + break + else: + print(f"Error fetching data from source {source}: {response.status_code} - {response.text}") + break + except Exception as e: + print(f"Error processing source {source}: {e}") + pass + + return list(unique_trades.values()) + except Exception as e: print(f"Error fetching data: {e}") return [] -def main(): - # Directory for saving daily historical flow data - historical_directory = 'json/dark-pool/historical-flow' - # Load the latest JSON file from the directory - existing_data = load_latest_json(historical_directory, find=False) - existing_keys = {item.get('trackingID', None) for item in existing_data} + + + +def main(): # Fetch new data from the API data = get_data() - + print(len(data)) res = [] - today_date = datetime.now(timezone.utc).strftime('%Y-%m-%d') for item in data: - symbol = item['ticker'] + symbol = item['symbol'] + # Adjust ticker formatting for BRK-A/BRK-B if needed + ticker = symbol if symbol.lower() == 'brk.b': - item['ticker'] = 'BRK-B' - if symbol.lower() == 'brk.a': - item['ticker'] = 'BRK-A' + ticker = 'BRK-B' + elif symbol.lower() == 'brk.a': + ticker = 'BRK-A' try: - executed_date = item['executed_at'][:10] # Extract date in YYYY-MM-DD format - if item['tracking_id'] not in existing_keys and executed_date == today_date: + # Use the datetime 'timestamp' to extract the executed date + timestamp_dt = datetime.fromisoformat(item['timestamp']) + executed_date = timestamp_dt.strftime('%Y-%m-%d') + + # Create a unique trackingID using hashlib (MD5) + raw_tracking_string = f"{symbol}_{timestamp_dt.isoformat()}" + tracking_id = hashlib.md5(raw_tracking_string.encode('utf-8')).hexdigest()[:10] + + if executed_date == today_date: sector = stock_screener_data_dict.get(symbol, {}).get('sector', "") - volume = float(item['volume']) + volume = float(item['total_volume']) size = float(item['size']) + price = round(float(item['price']), 2) quote_data = get_quote_data(symbol) or {} - size_volume_ratio = round((size / volume) * 100, 2) + size_volume_ratio = round((size / volume) * 100, 2) if volume else 0 size_avg_volume_ratio = round((size / quote_data.get('avgVolume', 1)) * 100, 2) + res.append({ - 'ticker': item['ticker'], - 'date': item['executed_at'], - 'price': round(float(item['price']), 2), - 'size': item['size'], + 'ticker': ticker, + 'date': item['timestamp'], + 'price': price, + 'size': size, 'volume': volume, - 'premium': item['premium'], + 'premium': round(size*price,2), # default to 0 if premium isn't provided 'sector': sector, 'assetType': 'Stock' if symbol in stock_screener_data_dict else 'ETF', 'sizeVolRatio': size_volume_ratio, 'sizeAvgVolRatio': size_avg_volume_ratio, - 'trackingID': item['tracking_id'] + 'trackingID': tracking_id }) except Exception as e: print(f"Error processing {symbol}: {e}") # Combine new data with existing data - combined_data = existing_data + res - if combined_data: + if res: # Save the combined data to a daily file - save_to_daily_file(combined_data, historical_directory) - -if __name__ == '__main__': - main() + save_json(res) + if __name__ == '__main__': main() diff --git a/app/main.py b/app/main.py index 0a95bc9..be69854 100755 --- a/app/main.py +++ b/app/main.py @@ -3009,7 +3009,7 @@ async def get_options_flow_feed(api_key: str = Security(get_api_key)): @app.get("/dark-pool-flow-feed") async def get_dark_pool_feed(api_key: str = Security(get_api_key)): - cache_key = f"dark-pooll-flow-feed" + cache_key = f"dark-pool-flow-feed" cached_result = redis_client.get(cache_key) if cached_result: return StreamingResponse( @@ -3019,9 +3019,8 @@ async def get_dark_pool_feed(api_key: str = Security(get_api_key)): ) try: - directory = "json/dark-pool/historical-flow" - res_list = load_latest_json(directory) - res_list = [item for item in res_list if float(item['premium']) > 500_000] + with open(f"json/dark-pool/feed/data.json", "r") as file: + res_list = orjson.loads(file.read()) except: res_list = []