backend/app/cron_dark_pool_flow.py
MuslemRahimi 78202b0490 update
2025-03-05 14:18:01 +01:00

201 lines
7.3 KiB
Python

import os
import pandas as pd
import orjson
from dotenv import load_dotenv
import sqlite3
from datetime import datetime, timezone
import pytz
import requests # Add missing import
from dateutil.parser import isoparse
from utils.helper import load_latest_json
import time
import hashlib
import requests
from tqdm import tqdm
load_dotenv()
api_key = os.getenv('INTRINIO_API_KEY')
today_date = datetime.now(timezone.utc).strftime('%Y-%m-%d')
start_date = today_date
end_date = today_date
with open(f"json/stock-screener/data.json", 'rb') as file:
stock_screener_data = orjson.loads(file.read())
stock_screener_data_dict = {item['symbol']: item for item in stock_screener_data}
quote_cache = {}
def get_quote_data(symbol):
"""Get quote data for a symbol from JSON file"""
if symbol in quote_cache:
return quote_cache[symbol]
try:
with open(f"json/quote/{symbol}.json") as file:
quote_data = orjson.loads(file.read())
quote_cache[symbol] = quote_data # Cache the loaded data
return quote_data
except FileNotFoundError:
return None
def save_json(data):
directory = "json/dark-pool/feed"
try:
# Create a set to track unique entries based on a combination of 'ticker' and 'trackingID'
seen = set()
unique_data = []
for item in data:
identifier = f"{item['trackingID']}"
if identifier not in seen:
seen.add(identifier)
unique_data.append(item)
# Sort the data by date
latest_data = sorted(unique_data, key=lambda x: datetime.fromisoformat(x['date'].replace('Z', '+00:00')), reverse=True)
# Use the date from the first element of sorted data
if latest_data:
first_date = datetime.fromisoformat(latest_data[0]['date'].replace('Z', '+00:00')).strftime('%Y-%m-%d')
else:
first_date = datetime.now().strftime('%Y-%m-%d') # Fallback in case data is empty
# Ensure the directory exists
os.makedirs(directory, exist_ok=True)
# Save the data to the dated JSON file
with open(directory+"/data.json", 'wb') as file:
file.write(orjson.dumps(latest_data))
print(f"Saved {len(latest_data)} unique datapoints")
except Exception as e:
print(f"An error occurred while saving data: {e}")
def get_data():
try:
unique_trades = {}
sources = ['utp_delayed', 'cta_a_delayed', 'cta_b_delayed']
page_size = 50000
min_size = 2000
threshold = 1E5 # Define threshold
for source in tqdm(sources):
try:
next_page = '' # Reset for each source
while True:
# Build the URL with the current page (if available)
url = (
f"https://api-v2.intrinio.com/securities/trades?"
f"timezone=UTC&source={source}&start_date={start_date}&end_date={end_date}"
f"&page_size={page_size}&min_size={min_size}"
)
if next_page:
url += f"&next_page={next_page}"
url += f"&darkpool_only=true&api_key={api_key}"
response = requests.get(url)
if response.status_code == 200:
output = response.json()
trades = output.get("trades", [])
# Process each trade and maintain uniqueness
for trade in trades:
price = trade.get("price", 0)
size = trade.get("size", 0)
if price * size > threshold: # Apply filtering condition
unique_key = (
f"{trade.get('symbol')}_"
f"{trade.get('timestamp')}_"
f"{trade.get('price')}_"
f"{trade.get('total_volume')}"
)
if unique_key not in unique_trades:
unique_trades[unique_key] = trade
# Check if there's a next page; if not, exit the loop
next_page = output.get("next_page")
print(next_page)
if not next_page:
break
else:
print(f"Error fetching data from source {source}: {response.status_code} - {response.text}")
break
except Exception as e:
print(f"Error processing source {source}: {e}")
pass
return list(unique_trades.values())
except Exception as e:
print(f"Error fetching data: {e}")
return []
def main():
# Fetch new data from the API
data = get_data()
print(len(data))
res = []
for item in data:
symbol = item['symbol']
# Adjust ticker formatting for BRK-A/BRK-B if needed
ticker = symbol
if symbol.lower() == 'brk.b':
ticker = 'BRK-B'
elif symbol.lower() == 'brk.a':
ticker = 'BRK-A'
try:
# Use the datetime 'timestamp' to extract the executed date
timestamp_dt = datetime.fromisoformat(item['timestamp'])
executed_date = timestamp_dt.strftime('%Y-%m-%d')
# Create a unique trackingID using hashlib (MD5)
raw_tracking_string = f"{symbol}_{timestamp_dt.isoformat()}"
tracking_id = hashlib.md5(raw_tracking_string.encode('utf-8')).hexdigest()[:10]
if executed_date == today_date:
sector = stock_screener_data_dict.get(symbol, {}).get('sector', "")
volume = float(item['total_volume'])
size = float(item['size'])
price = round(float(item['price']), 2)
quote_data = get_quote_data(symbol) or {}
size_volume_ratio = round((size / volume) * 100, 2) if volume else 0
size_avg_volume_ratio = round((size / quote_data.get('avgVolume', 1)) * 100, 2)
res.append({
'ticker': ticker,
'date': item['timestamp'],
'price': price,
'size': size,
'volume': volume,
'premium': round(size*price,2), # default to 0 if premium isn't provided
'sector': sector,
'assetType': 'Stock' if symbol in stock_screener_data_dict else 'ETF',
'sizeVolRatio': size_volume_ratio,
'sizeAvgVolRatio': size_avg_volume_ratio,
'trackingID': tracking_id
})
except Exception as e:
print(f"Error processing {symbol}: {e}")
# Combine new data with existing data
if res:
# Save the combined data to a daily file
save_json(res)
if __name__ == '__main__':
main()