update dark pool cron job
This commit is contained in:
parent
24409f0009
commit
dc9be95ab0
@ -7,6 +7,8 @@ from datetime import datetime
|
|||||||
import pytz
|
import pytz
|
||||||
import requests # Add missing import
|
import requests # Add missing import
|
||||||
from dateutil.parser import isoparse
|
from dateutil.parser import isoparse
|
||||||
|
from utils.helper import load_latest_json
|
||||||
|
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
api_key = os.getenv('UNUSUAL_WHALES_API_KEY')
|
api_key = os.getenv('UNUSUAL_WHALES_API_KEY')
|
||||||
@ -36,43 +38,21 @@ def get_quote_data(symbol):
|
|||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def load_json(file_path):
|
|
||||||
"""Load existing JSON data from file."""
|
|
||||||
if os.path.exists(file_path):
|
|
||||||
try:
|
|
||||||
with open(file_path, 'r') as file:
|
|
||||||
return orjson.loads(file.read())
|
|
||||||
except (ValueError, IOError):
|
|
||||||
print(f"Warning: Could not read or parse {file_path}. Starting with an empty list.")
|
|
||||||
return []
|
|
||||||
|
|
||||||
def save_latest_ratings(combined_data, json_file_path, limit=2000):
|
def save_to_daily_file(data, directory):
|
||||||
|
"""Save data to a daily JSON file."""
|
||||||
try:
|
try:
|
||||||
# Create a set to track unique entries based on a combination of 'ticker' and 'date'
|
# Ensure the directory exists
|
||||||
seen = set()
|
os.makedirs(directory, exist_ok=True)
|
||||||
unique_data = []
|
# Generate filename based on today's date
|
||||||
|
date_str = datetime.now().strftime('%Y-%m-%d')
|
||||||
for item in combined_data:
|
file_path = os.path.join(directory, f"{date_str}.json")
|
||||||
identifier = f"{item['trackingID']}"
|
# Save data to the file
|
||||||
if identifier not in seen:
|
with open(file_path, 'wb') as file:
|
||||||
seen.add(identifier)
|
file.write(orjson.dumps(data))
|
||||||
unique_data.append(item)
|
print(f"{len(data)} datapoints successfully saved to {file_path}")
|
||||||
|
|
||||||
# Sort the data by date
|
|
||||||
sorted_data = sorted(unique_data, key=lambda x: datetime.fromisoformat(x['date'].replace('Z', '+00:00')), reverse=True)
|
|
||||||
|
|
||||||
# Keep only the latest `limit` entries
|
|
||||||
latest_data = sorted_data[:limit]
|
|
||||||
|
|
||||||
# Save the trimmed and deduplicated data to the JSON file
|
|
||||||
with open(json_file_path, 'wb') as file:
|
|
||||||
file.write(orjson.dumps(latest_data))
|
|
||||||
|
|
||||||
print(f"Saved {len(latest_data)} unique and latest ratings to {json_file_path}.")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"An error occurred while saving data: {e}")
|
print(f"Error saving data to file: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def get_data():
|
def get_data():
|
||||||
try:
|
try:
|
||||||
@ -83,78 +63,51 @@ def get_data():
|
|||||||
return []
|
return []
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
# Load environment variables
|
# Directory for saving daily historical flow data
|
||||||
con = sqlite3.connect('stocks.db')
|
historical_directory = 'json/dark-pool/historical-flow'
|
||||||
cursor = con.cursor()
|
|
||||||
cursor.execute("PRAGMA journal_mode = wal")
|
|
||||||
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
|
|
||||||
stock_symbols = [row[0] for row in cursor.fetchall()]
|
|
||||||
|
|
||||||
etf_con = sqlite3.connect('etf.db')
|
# Load the latest JSON file from the directory
|
||||||
etf_cursor = etf_con.cursor()
|
existing_data = load_latest_json(historical_directory)
|
||||||
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
|
existing_keys = {item.get('trackingID', None) for item in existing_data}
|
||||||
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
|
|
||||||
total_symbols = stock_symbols + etf_symbols
|
|
||||||
con.close()
|
|
||||||
etf_con.close()
|
|
||||||
|
|
||||||
json_file_path = 'json/dark-pool/feed/data.json'
|
# Fetch new data from the API
|
||||||
existing_data = load_json(json_file_path)
|
|
||||||
# Transform existing data into a set of unique trackingIDs
|
|
||||||
existing_keys = {item.get('trackingID',None) for item in existing_data}
|
|
||||||
data = get_data()
|
data = get_data()
|
||||||
|
|
||||||
# Prepare results with only new data
|
|
||||||
res = []
|
res = []
|
||||||
for item in data:
|
for item in data:
|
||||||
symbol = item['ticker']
|
symbol = item['ticker']
|
||||||
if symbol.lower() == 'brk.b':
|
if symbol.lower() == 'brk.b':
|
||||||
item['ticker'] = 'BRK-B'
|
item['ticker'] = 'BRK-B'
|
||||||
symbol = item['ticker']
|
|
||||||
if symbol.lower() == 'brk.a':
|
if symbol.lower() == 'brk.a':
|
||||||
item['ticker'] = 'BRK-A'
|
item['ticker'] = 'BRK-A'
|
||||||
symbol = item['ticker']
|
try:
|
||||||
if symbol in total_symbols:
|
if item['tracking_id'] not in existing_keys:
|
||||||
quote_data = get_quote_data(symbol)
|
sector = stock_screener_data_dict.get(symbol, {}).get('sector', "")
|
||||||
if symbol in stock_symbols:
|
volume = float(item['volume'])
|
||||||
asset_type = 'Stock'
|
size = float(item['size'])
|
||||||
else:
|
quote_data = get_quote_data(symbol) or {}
|
||||||
asset_type = 'ETF'
|
size_volume_ratio = round((size / volume) * 100, 2)
|
||||||
|
size_avg_volume_ratio = round((size / quote_data.get('avgVolume', 1)) * 100, 2)
|
||||||
|
res.append({
|
||||||
|
'ticker': item['ticker'],
|
||||||
|
'date': item['executed_at'],
|
||||||
|
'price': round(float(item['price']), 2),
|
||||||
|
'size': item['size'],
|
||||||
|
'volume': volume,
|
||||||
|
'premium': item['premium'],
|
||||||
|
'sector': sector,
|
||||||
|
'assetType': 'Stock' if symbol in stock_screener_data_dict else 'ETF',
|
||||||
|
'sizeVolRatio': size_volume_ratio,
|
||||||
|
'sizeAvgVolRatio': size_avg_volume_ratio,
|
||||||
|
'trackingID': item['tracking_id']
|
||||||
|
})
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing {symbol}: {e}")
|
||||||
|
|
||||||
try:
|
# Combine new data with existing data
|
||||||
# Check if the data is already in the file
|
|
||||||
if item['tracking_id'] not in existing_keys:
|
|
||||||
try:
|
|
||||||
sector = stock_screener_data_dict[symbol].get('sector', None)
|
|
||||||
except:
|
|
||||||
sector = ""
|
|
||||||
|
|
||||||
volume = float(item['volume'])
|
|
||||||
size = float(item['size'])
|
|
||||||
|
|
||||||
size_volume_ratio = round((size / volume) * 100, 2)
|
|
||||||
size_avg_volume_ratio = round((size / quote_data.get('avgVolume', 1)) * 100, 2)
|
|
||||||
res.append({
|
|
||||||
'ticker': item['ticker'],
|
|
||||||
'date': item['executed_at'],
|
|
||||||
'price': round(float(item['price']),2),
|
|
||||||
'size': item['size'],
|
|
||||||
'volume': volume,
|
|
||||||
'premium': item['premium'],
|
|
||||||
'sector': sector,
|
|
||||||
'assetType': asset_type,
|
|
||||||
'sizeVolRatio': size_volume_ratio,
|
|
||||||
'sizeAvgVolRatio': size_avg_volume_ratio,
|
|
||||||
'trackingID': item['tracking_id']
|
|
||||||
})
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error processing {symbol}: {e}")
|
|
||||||
|
|
||||||
# Append new data to existing data and combine
|
|
||||||
combined_data = existing_data + res
|
combined_data = existing_data + res
|
||||||
# Save the updated data
|
|
||||||
save_latest_ratings(combined_data, json_file_path)
|
|
||||||
|
|
||||||
|
# Save the combined data to a daily file
|
||||||
|
save_to_daily_file(combined_data, historical_directory)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
|
|||||||
10
app/main.py
10
app/main.py
@ -39,6 +39,7 @@ from slowapi.util import get_remote_address
|
|||||||
from slowapi.errors import RateLimitExceeded
|
from slowapi.errors import RateLimitExceeded
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from utils.helper import load_latest_json
|
||||||
|
|
||||||
# DB constants & context manager
|
# DB constants & context manager
|
||||||
|
|
||||||
@ -2940,13 +2941,12 @@ async def get_options_flow_feed(api_key: str = Security(get_api_key)):
|
|||||||
headers={"Content-Encoding": "gzip"}
|
headers={"Content-Encoding": "gzip"}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@app.get("/dark-pool-flow-feed")
|
@app.get("/dark-pool-flow-feed")
|
||||||
async def get_dark_pool_feed(api_key: str = Security(get_api_key)):
|
async def get_dark_pool_feed(api_key: str = Security(get_api_key)):
|
||||||
try:
|
directory = "json/dark-pool/historical-flow"
|
||||||
with open(f"json/dark-pool/feed/data.json", 'rb') as file:
|
res_list = load_latest_json(directory)
|
||||||
res_list = orjson.loads(file.read())
|
|
||||||
except:
|
|
||||||
res_list = []
|
|
||||||
data = orjson.dumps(res_list)
|
data = orjson.dumps(res_list)
|
||||||
compressed_data = gzip.compress(data)
|
compressed_data = gzip.compress(data)
|
||||||
return StreamingResponse(
|
return StreamingResponse(
|
||||||
|
|||||||
Binary file not shown.
@ -1,4 +1,6 @@
|
|||||||
from datetime import datetime, timedelta, time
|
from datetime import datetime, timedelta, time
|
||||||
|
import os
|
||||||
|
import orjson
|
||||||
import pytz
|
import pytz
|
||||||
|
|
||||||
def check_market_hours():
|
def check_market_hours():
|
||||||
@ -28,3 +30,30 @@ def check_market_hours():
|
|||||||
return True #"Market hours."
|
return True #"Market hours."
|
||||||
else:
|
else:
|
||||||
return False #"Market is closed."
|
return False #"Market is closed."
|
||||||
|
|
||||||
|
|
||||||
|
def load_latest_json(directory: str):
|
||||||
|
"""Load the latest JSON file from a directory based on the filename (assumed to be a date)."""
|
||||||
|
try:
|
||||||
|
latest_file = None
|
||||||
|
latest_date = None
|
||||||
|
|
||||||
|
# Iterate over files in the directory
|
||||||
|
for filename in os.listdir(directory):
|
||||||
|
if filename.endswith('.json'):
|
||||||
|
# Extract date from filename (assumed format 'YYYY-MM-DD.json')
|
||||||
|
file_date = filename.split('.')[0]
|
||||||
|
|
||||||
|
if latest_date is None or file_date > latest_date:
|
||||||
|
latest_date = file_date
|
||||||
|
latest_file = filename
|
||||||
|
|
||||||
|
if not latest_file:
|
||||||
|
return [] # No files found
|
||||||
|
|
||||||
|
latest_file_path = os.path.join(directory, latest_file)
|
||||||
|
with open(latest_file_path, 'rb') as file:
|
||||||
|
return orjson.loads(file.read())
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error loading latest JSON file: {e}")
|
||||||
|
return []
|
||||||
Loading…
x
Reference in New Issue
Block a user