From 47cecb16d3d55cd6aa234b118662822f854882a5 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Wed, 25 Dec 2024 12:49:51 +0100 Subject: [PATCH] update dark pool flow cron job --- app/cron_dark_pool_flow.py | 243 +++++++++++++++++++++---------------- app/main.py | 14 +++ app/test.py | 22 +++- 3 files changed, 171 insertions(+), 108 deletions(-) diff --git a/app/cron_dark_pool_flow.py b/app/cron_dark_pool_flow.py index ff3e316..0ff349d 100644 --- a/app/cron_dark_pool_flow.py +++ b/app/cron_dark_pool_flow.py @@ -1,123 +1,160 @@ - -from datetime import timedelta -from GetStartEndDate import GetStartEndDate -from concurrent.futures import ThreadPoolExecutor, TimeoutError -import intrinio_sdk as intrinio -import ujson -import sqlite3 - -from dotenv import load_dotenv import os - +import pandas as pd +import orjson +from dotenv import load_dotenv +import sqlite3 +from datetime import datetime +import pytz +import requests # Add missing import +from dateutil.parser import isoparse load_dotenv() -API_KEY = os.getenv('INTRINIO_API_KEY') +api_key = os.getenv('UNUSUAL_WHALES_API_KEY') -intrinio.ApiClient().set_api_key(API_KEY) -intrinio.ApiClient().allow_retries(True) +querystring = {"limit": "200"} +url = "https://api.unusualwhales.com/api/darkpool/recent" +headers = { + "Accept": "application/json, text/plain", + "Authorization": api_key +} -def save_json(data): - with open(f"json/dark-pool/flow/data.json", 'w') as file: - ujson.dump(data, file) +with open(f"json/stock-screener/data.json", 'rb') as file: + stock_screener_data = orjson.loads(file.read()) +stock_screener_data_dict = {item['symbol']: item for item in stock_screener_data} + +quote_cache = {} + +def get_quote_data(symbol): + """Get quote data for a symbol from JSON file""" + if symbol in quote_cache: + return quote_cache[symbol] + try: + with open(f"json/quote/{symbol}.json") as file: + quote_data = orjson.loads(file.read()) + quote_cache[symbol] = quote_data # Cache the loaded data + return quote_data + except FileNotFoundError: + return None + +def load_json(file_path): + """Load existing JSON data from file.""" + if os.path.exists(file_path): + try: + with open(file_path, 'r') as file: + return orjson.loads(file.read()) + except (ValueError, IOError): + print(f"Warning: Could not read or parse {file_path}. Starting with an empty list.") + return [] + +def save_latest_ratings(combined_data, json_file_path, limit=2000): + try: + # Create a set to track unique entries based on a combination of 'ticker' and 'date' + seen = set() + unique_data = [] + + for item in combined_data: + identifier = f"{item['trackingID']}" + if identifier not in seen: + seen.add(identifier) + unique_data.append(item) + + # Sort the data by date + sorted_data = sorted(unique_data, key=lambda x: datetime.fromisoformat(x['date'].replace('Z', '+00:00')), reverse=True) + + # Keep only the latest `limit` entries + latest_data = sorted_data[:limit] + + # Save the trimmed and deduplicated data to the JSON file + with open(json_file_path, 'wb') as file: + file.write(orjson.dumps(latest_data)) + + print(f"Saved {len(latest_data)} unique and latest ratings to {json_file_path}.") + except Exception as e: + print(f"An error occurred while saving data: {e}") -identifier = 'INTC' -source = 'utp_delayed' -start_date, end_date = GetStartEndDate().run() -start_date = start_date.strftime("%Y-%m-%d") -end_date = end_date.strftime("%Y-%m-%d") -start_time = '' -end_time = '' -timezone = 'UTC' -page_size = 1000 -darkpool_only = True -min_size = 100 -count = 0 def get_data(): - data = [] - count = 0 + try: + response = requests.get(url, headers=headers, params=querystring) + return response.json().get('data', []) + except Exception as e: + print(f"Error fetching data: {e}") + return [] - while True: - if count == 0: - next_page = '' - try: - response = intrinio.SecurityApi().get_security_trades_by_symbol( - identifier, source, start_date=start_date, start_time=start_time, - end_date=end_date, end_time=end_time, timezone=timezone, - page_size=page_size, darkpool_only=darkpool_only, min_size=min_size, - next_page=next_page - ) - - filtered_entries = [entry.__dict__ for entry in response.trades] - - data.extend(filtered_entries) - next_page = response.next_page - - if not next_page: - break - count += 1 - print(f'Current length {len(data)}') - - except Exception as e: - print(e) - break - - return data - - -def run(): +def main(): + # Load environment variables con = sqlite3.connect('stocks.db') cursor = con.cursor() - cursor.execute("SELECT DISTINCT symbol, name FROM stocks") - stocks = cursor.fetchall() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") + stock_symbols = [row[0] for row in cursor.fetchall()] + + etf_con = sqlite3.connect('etf.db') + etf_cursor = etf_con.cursor() + etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") + etf_symbols = [row[0] for row in etf_cursor.fetchall()] + total_symbols = stock_symbols + etf_symbols con.close() - symbol_name_map = {row[0]: row[1] for row in stocks} - stock_symbols = list(symbol_name_map.keys()) + etf_con.close() + json_file_path = 'json/dark-pool/feed/data.json' + existing_data = load_json(json_file_path) + # Transform existing data into a set of unique trackingIDs + existing_keys = {item.get('trackingID',None) for item in existing_data} data = get_data() - filtered_data = [entry for entry in data if entry['_symbol'] in stock_symbols] - - - filtered_data = [ - { - 'symbol': entry['_symbol'], - 'name': symbol_name_map[entry['_symbol']], - 'date': (entry['_timestamp']-timedelta(hours=4)).isoformat(), - 'price': entry['_price'], - 'total_volume': entry['_total_volume'], - 'size': entry['_size'] - } - for entry in filtered_data - ] - sorted_data = sorted(filtered_data, key=lambda x: x['date']) - - previous_total_volume = None + # Prepare results with only new data + res = [] + for item in data: + symbol = item['ticker'] + if symbol.lower() == 'brk.b': + item['ticker'] = 'BRK-B' + symbol = item['ticker'] + if symbol.lower() == 'brk.a': + item['ticker'] = 'BRK-A' + symbol = item['ticker'] + if symbol in total_symbols: + quote_data = get_quote_data(symbol) + if symbol in stock_symbols: + asset_type = 'Stock' + else: + asset_type = 'ETF' - for entry in sorted_data: - if previous_total_volume is not None: - entry["volume"] = int(entry["total_volume"]) - previous_total_volume - else: - entry["volume"] = int(entry["total_volume"]) #if you prefer to keep the first volume as is - previous_total_volume = int(entry["total_volume"]) + try: + # Check if the data is already in the file + if item['tracking_id'] not in existing_keys: + try: + sector = stock_screener_data_dict[symbol].get('sector', None) + except: + sector = None - sorted_data = sorted(sorted_data, key=lambda x: x['date'], reverse=True) + volume = float(item['volume']) + size = float(item['size']) + + daily_volume_percentage = round((size / volume) * 100, 2) + avg_volume_percentage = round((size / quote_data.get('avgVolume', 1)) * 100, 2) + res.append({ + 'ticker': item['ticker'], + 'date': item['executed_at'], + 'price': round(float(item['price']),2), + 'size': item['size'], + 'volume': volume, + 'premium': item['premium'], + 'sector': sector, + 'assetType': asset_type, + 'dailyVolumePercentage': daily_volume_percentage, + 'avgVolumePercentage': avg_volume_percentage, + 'trackingID': item['tracking_id'] + }) + except Exception as e: + print(f"Error processing {symbol}: {e}") + + # Append new data to existing data and combine + combined_data = existing_data + res + # Save the updated data + save_latest_ratings(combined_data, json_file_path) - if len(sorted_data) > 0: - save_json(sorted_data) - - -if __name__ == "__main__": - executor = ThreadPoolExecutor(max_workers=1) - future = executor.submit(run) - try: - # Wait for the result with a timeout of 300 seconds (5 minutes) - future.result(timeout=1000) - except TimeoutError: - print("The operation timed out.") - except Exception as e: - print(f"An error occurred: {e}") - finally: - executor.shutdown() \ No newline at end of file +if __name__ == '__main__': + main() diff --git a/app/main.py b/app/main.py index 6d5b359..c06773a 100755 --- a/app/main.py +++ b/app/main.py @@ -2935,6 +2935,20 @@ async def get_options_flow_feed(api_key: str = Security(get_api_key)): headers={"Content-Encoding": "gzip"} ) +@app.get("/dark-pool-flow-feed") +async def get_dark_pool_feed(api_key: str = Security(get_api_key)): + try: + with open(f"json/dark-pool/feed/data.json", 'rb') as file: + res_list = orjson.loads(file.read()) + except: + res_list = [] + data = orjson.dumps(res_list) + compressed_data = gzip.compress(data) + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) @app.get("/options-zero-dte") async def get_options_flow_feed(api_key: str = Security(get_api_key)): diff --git a/app/test.py b/app/test.py index 11dd5b9..560ece6 100644 --- a/app/test.py +++ b/app/test.py @@ -1,8 +1,20 @@ import requests -from bs4 import BeautifulSoup +from dotenv import load_dotenv +import os -url = "https://twitter.com/search?q=%24AAPL&src=typed_query" -response = requests.get(url) -soup = BeautifulSoup(response.content, 'html.parser') +load_dotenv() +api_key = os.getenv('UNUSUAL_WHALES_API_KEY') -print(soup) \ No newline at end of file +querystring = {"limit":"200"} + +url = "https://api.unusualwhales.com/api/darkpool/recent" + +headers = { + "Accept": "application/json, text/plain", + "Authorization": api_key +} + +response = requests.get(url, headers=headers, params=querystring) + +print(len(response.json()['data'])) +print(response.json()['data'][0]) \ No newline at end of file