200 lines
7.3 KiB
Python
200 lines
7.3 KiB
Python
import os
|
|
import pandas as pd
|
|
import orjson
|
|
from dotenv import load_dotenv
|
|
import sqlite3
|
|
from datetime import datetime, timezone
|
|
import pytz
|
|
import requests # Add missing import
|
|
from dateutil.parser import isoparse
|
|
from utils.helper import load_latest_json
|
|
import time
|
|
import hashlib
|
|
import requests
|
|
from tqdm import tqdm
|
|
|
|
load_dotenv()
|
|
api_key = os.getenv('INTRINIO_API_KEY')
|
|
|
|
today_date = datetime.now(timezone.utc).strftime('%Y-%m-%d')
|
|
|
|
start_date = today_date
|
|
end_date = today_date
|
|
|
|
with open(f"json/stock-screener/data.json", 'rb') as file:
|
|
stock_screener_data = orjson.loads(file.read())
|
|
stock_screener_data_dict = {item['symbol']: item for item in stock_screener_data}
|
|
|
|
quote_cache = {}
|
|
|
|
def get_quote_data(symbol):
|
|
"""Get quote data for a symbol from JSON file"""
|
|
if symbol in quote_cache:
|
|
return quote_cache[symbol]
|
|
try:
|
|
with open(f"json/quote/{symbol}.json") as file:
|
|
quote_data = orjson.loads(file.read())
|
|
quote_cache[symbol] = quote_data # Cache the loaded data
|
|
return quote_data
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
|
|
def save_json(data):
|
|
directory = "json/dark-pool/feed"
|
|
try:
|
|
# Create a set to track unique entries based on a combination of 'ticker' and 'trackingID'
|
|
seen = set()
|
|
unique_data = []
|
|
|
|
for item in data:
|
|
identifier = f"{item['trackingID']}"
|
|
if identifier not in seen:
|
|
seen.add(identifier)
|
|
unique_data.append(item)
|
|
|
|
# Sort the data by date
|
|
latest_data = sorted(unique_data, key=lambda x: datetime.fromisoformat(x['date'].replace('Z', '+00:00')), reverse=True)
|
|
|
|
# Use the date from the first element of sorted data
|
|
if latest_data:
|
|
first_date = datetime.fromisoformat(latest_data[0]['date'].replace('Z', '+00:00')).strftime('%Y-%m-%d')
|
|
else:
|
|
first_date = datetime.now().strftime('%Y-%m-%d') # Fallback in case data is empty
|
|
|
|
|
|
# Ensure the directory exists
|
|
os.makedirs(directory, exist_ok=True)
|
|
|
|
# Save the data to the dated JSON file
|
|
with open(directory+"/data.json", 'wb') as file:
|
|
file.write(orjson.dumps(latest_data))
|
|
|
|
print(f"Saved {len(latest_data)} unique datapoints")
|
|
except Exception as e:
|
|
print(f"An error occurred while saving data: {e}")
|
|
|
|
|
|
|
|
def get_data():
|
|
try:
|
|
unique_trades = {}
|
|
sources = ['utp_delayed', 'cta_a_delayed', 'cta_b_delayed']
|
|
page_size = 50000
|
|
min_size = 2000
|
|
threshold = 1E6 # Define threshold
|
|
|
|
for source in tqdm(sources):
|
|
try:
|
|
next_page = '' # Reset for each source
|
|
while True:
|
|
# Build the URL with the current page (if available)
|
|
url = (
|
|
f"https://api-v2.intrinio.com/securities/trades?"
|
|
f"timezone=UTC&source={source}&start_date={start_date}&end_date={end_date}"
|
|
f"&page_size={page_size}&min_size={min_size}"
|
|
)
|
|
if next_page:
|
|
url += f"&next_page={next_page}"
|
|
url += f"&darkpool_only=true&api_key={api_key}"
|
|
|
|
response = requests.get(url)
|
|
if response.status_code == 200:
|
|
output = response.json()
|
|
trades = output.get("trades", [])
|
|
|
|
# Process each trade and maintain uniqueness
|
|
for trade in trades:
|
|
price = trade.get("price", 0)
|
|
size = trade.get("size", 0)
|
|
|
|
if price * size > threshold: # Apply filtering condition
|
|
unique_key = (
|
|
f"{trade.get('symbol')}_"
|
|
f"{trade.get('timestamp')}_"
|
|
f"{trade.get('price')}_"
|
|
f"{trade.get('total_volume')}"
|
|
)
|
|
if unique_key not in unique_trades:
|
|
unique_trades[unique_key] = trade
|
|
|
|
# Check if there's a next page; if not, exit the loop
|
|
next_page = output.get("next_page")
|
|
print(next_page)
|
|
if not next_page:
|
|
break
|
|
else:
|
|
print(f"Error fetching data from source {source}: {response.status_code} - {response.text}")
|
|
break
|
|
except Exception as e:
|
|
print(f"Error processing source {source}: {e}")
|
|
pass
|
|
|
|
return list(unique_trades.values())
|
|
|
|
except Exception as e:
|
|
print(f"Error fetching data: {e}")
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
# Fetch new data from the API
|
|
data = get_data()
|
|
print(len(data))
|
|
res = []
|
|
|
|
for item in data:
|
|
symbol = item['symbol']
|
|
# Adjust ticker formatting for BRK-A/BRK-B if needed
|
|
ticker = symbol
|
|
if symbol.lower() == 'brk.b':
|
|
ticker = 'BRK-B'
|
|
elif symbol.lower() == 'brk.a':
|
|
ticker = 'BRK-A'
|
|
try:
|
|
# Use the datetime 'timestamp' to extract the executed date
|
|
timestamp_dt = datetime.fromisoformat(item['timestamp'])
|
|
executed_date = timestamp_dt.strftime('%Y-%m-%d')
|
|
|
|
# Create a unique trackingID using hashlib (MD5)
|
|
raw_tracking_string = f"{symbol}_{timestamp_dt.isoformat()}"
|
|
tracking_id = hashlib.md5(raw_tracking_string.encode('utf-8')).hexdigest()[:10]
|
|
|
|
if executed_date == today_date:
|
|
sector = stock_screener_data_dict.get(symbol, {}).get('sector', "")
|
|
volume = float(item['total_volume'])
|
|
size = float(item['size'])
|
|
price = round(float(item['price']), 2)
|
|
quote_data = get_quote_data(symbol) or {}
|
|
size_volume_ratio = round((size / volume) * 100, 2) if volume else 0
|
|
size_avg_volume_ratio = round((size / quote_data.get('avgVolume', 1)) * 100, 2)
|
|
|
|
res.append({
|
|
'ticker': ticker,
|
|
'date': item['timestamp'],
|
|
'price': price,
|
|
'size': size,
|
|
'volume': volume,
|
|
'premium': round(size*price,2), # default to 0 if premium isn't provided
|
|
'sector': sector,
|
|
'assetType': 'Stock' if symbol in stock_screener_data_dict else 'ETF',
|
|
'sizeVolRatio': size_volume_ratio,
|
|
'sizeAvgVolRatio': size_avg_volume_ratio,
|
|
'trackingID': tracking_id
|
|
})
|
|
except Exception as e:
|
|
print(f"Error processing {symbol}: {e}")
|
|
|
|
# Combine new data with existing data
|
|
if res:
|
|
# Save the combined data to a daily file
|
|
save_json(res)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|