update dark pool flow cron job
This commit is contained in:
parent
047e2def23
commit
47cecb16d3
@ -1,123 +1,160 @@
|
||||
|
||||
from datetime import timedelta
|
||||
from GetStartEndDate import GetStartEndDate
|
||||
from concurrent.futures import ThreadPoolExecutor, TimeoutError
|
||||
import intrinio_sdk as intrinio
|
||||
import ujson
|
||||
import sqlite3
|
||||
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
|
||||
import pandas as pd
|
||||
import orjson
|
||||
from dotenv import load_dotenv
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
import pytz
|
||||
import requests # Add missing import
|
||||
from dateutil.parser import isoparse
|
||||
|
||||
load_dotenv()
|
||||
API_KEY = os.getenv('INTRINIO_API_KEY')
|
||||
api_key = os.getenv('UNUSUAL_WHALES_API_KEY')
|
||||
|
||||
intrinio.ApiClient().set_api_key(API_KEY)
|
||||
intrinio.ApiClient().allow_retries(True)
|
||||
querystring = {"limit": "200"}
|
||||
url = "https://api.unusualwhales.com/api/darkpool/recent"
|
||||
headers = {
|
||||
"Accept": "application/json, text/plain",
|
||||
"Authorization": api_key
|
||||
}
|
||||
|
||||
def save_json(data):
|
||||
with open(f"json/dark-pool/flow/data.json", 'w') as file:
|
||||
ujson.dump(data, file)
|
||||
with open(f"json/stock-screener/data.json", 'rb') as file:
|
||||
stock_screener_data = orjson.loads(file.read())
|
||||
stock_screener_data_dict = {item['symbol']: item for item in stock_screener_data}
|
||||
|
||||
quote_cache = {}
|
||||
|
||||
def get_quote_data(symbol):
|
||||
"""Get quote data for a symbol from JSON file"""
|
||||
if symbol in quote_cache:
|
||||
return quote_cache[symbol]
|
||||
try:
|
||||
with open(f"json/quote/{symbol}.json") as file:
|
||||
quote_data = orjson.loads(file.read())
|
||||
quote_cache[symbol] = quote_data # Cache the loaded data
|
||||
return quote_data
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
|
||||
def load_json(file_path):
|
||||
"""Load existing JSON data from file."""
|
||||
if os.path.exists(file_path):
|
||||
try:
|
||||
with open(file_path, 'r') as file:
|
||||
return orjson.loads(file.read())
|
||||
except (ValueError, IOError):
|
||||
print(f"Warning: Could not read or parse {file_path}. Starting with an empty list.")
|
||||
return []
|
||||
|
||||
def save_latest_ratings(combined_data, json_file_path, limit=2000):
|
||||
try:
|
||||
# Create a set to track unique entries based on a combination of 'ticker' and 'date'
|
||||
seen = set()
|
||||
unique_data = []
|
||||
|
||||
for item in combined_data:
|
||||
identifier = f"{item['trackingID']}"
|
||||
if identifier not in seen:
|
||||
seen.add(identifier)
|
||||
unique_data.append(item)
|
||||
|
||||
# Sort the data by date
|
||||
sorted_data = sorted(unique_data, key=lambda x: datetime.fromisoformat(x['date'].replace('Z', '+00:00')), reverse=True)
|
||||
|
||||
# Keep only the latest `limit` entries
|
||||
latest_data = sorted_data[:limit]
|
||||
|
||||
# Save the trimmed and deduplicated data to the JSON file
|
||||
with open(json_file_path, 'wb') as file:
|
||||
file.write(orjson.dumps(latest_data))
|
||||
|
||||
print(f"Saved {len(latest_data)} unique and latest ratings to {json_file_path}.")
|
||||
except Exception as e:
|
||||
print(f"An error occurred while saving data: {e}")
|
||||
|
||||
|
||||
identifier = 'INTC'
|
||||
source = 'utp_delayed'
|
||||
start_date, end_date = GetStartEndDate().run()
|
||||
start_date = start_date.strftime("%Y-%m-%d")
|
||||
end_date = end_date.strftime("%Y-%m-%d")
|
||||
start_time = ''
|
||||
end_time = ''
|
||||
timezone = 'UTC'
|
||||
page_size = 1000
|
||||
darkpool_only = True
|
||||
min_size = 100
|
||||
count = 0
|
||||
|
||||
def get_data():
|
||||
data = []
|
||||
count = 0
|
||||
try:
|
||||
response = requests.get(url, headers=headers, params=querystring)
|
||||
return response.json().get('data', [])
|
||||
except Exception as e:
|
||||
print(f"Error fetching data: {e}")
|
||||
return []
|
||||
|
||||
while True:
|
||||
if count == 0:
|
||||
next_page = ''
|
||||
try:
|
||||
response = intrinio.SecurityApi().get_security_trades_by_symbol(
|
||||
identifier, source, start_date=start_date, start_time=start_time,
|
||||
end_date=end_date, end_time=end_time, timezone=timezone,
|
||||
page_size=page_size, darkpool_only=darkpool_only, min_size=min_size,
|
||||
next_page=next_page
|
||||
)
|
||||
|
||||
filtered_entries = [entry.__dict__ for entry in response.trades]
|
||||
|
||||
data.extend(filtered_entries)
|
||||
next_page = response.next_page
|
||||
|
||||
if not next_page:
|
||||
break
|
||||
count += 1
|
||||
print(f'Current length {len(data)}')
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
break
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def run():
|
||||
def main():
|
||||
# Load environment variables
|
||||
con = sqlite3.connect('stocks.db')
|
||||
cursor = con.cursor()
|
||||
cursor.execute("SELECT DISTINCT symbol, name FROM stocks")
|
||||
stocks = cursor.fetchall()
|
||||
cursor.execute("PRAGMA journal_mode = wal")
|
||||
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
|
||||
stock_symbols = [row[0] for row in cursor.fetchall()]
|
||||
|
||||
etf_con = sqlite3.connect('etf.db')
|
||||
etf_cursor = etf_con.cursor()
|
||||
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
|
||||
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
|
||||
total_symbols = stock_symbols + etf_symbols
|
||||
con.close()
|
||||
symbol_name_map = {row[0]: row[1] for row in stocks}
|
||||
stock_symbols = list(symbol_name_map.keys())
|
||||
etf_con.close()
|
||||
|
||||
json_file_path = 'json/dark-pool/feed/data.json'
|
||||
existing_data = load_json(json_file_path)
|
||||
# Transform existing data into a set of unique trackingIDs
|
||||
existing_keys = {item.get('trackingID',None) for item in existing_data}
|
||||
data = get_data()
|
||||
filtered_data = [entry for entry in data if entry['_symbol'] in stock_symbols]
|
||||
|
||||
|
||||
filtered_data = [
|
||||
{
|
||||
'symbol': entry['_symbol'],
|
||||
'name': symbol_name_map[entry['_symbol']],
|
||||
'date': (entry['_timestamp']-timedelta(hours=4)).isoformat(),
|
||||
'price': entry['_price'],
|
||||
'total_volume': entry['_total_volume'],
|
||||
'size': entry['_size']
|
||||
}
|
||||
for entry in filtered_data
|
||||
]
|
||||
|
||||
sorted_data = sorted(filtered_data, key=lambda x: x['date'])
|
||||
|
||||
previous_total_volume = None
|
||||
# Prepare results with only new data
|
||||
res = []
|
||||
for item in data:
|
||||
symbol = item['ticker']
|
||||
if symbol.lower() == 'brk.b':
|
||||
item['ticker'] = 'BRK-B'
|
||||
symbol = item['ticker']
|
||||
if symbol.lower() == 'brk.a':
|
||||
item['ticker'] = 'BRK-A'
|
||||
symbol = item['ticker']
|
||||
if symbol in total_symbols:
|
||||
quote_data = get_quote_data(symbol)
|
||||
if symbol in stock_symbols:
|
||||
asset_type = 'Stock'
|
||||
else:
|
||||
asset_type = 'ETF'
|
||||
|
||||
for entry in sorted_data:
|
||||
if previous_total_volume is not None:
|
||||
entry["volume"] = int(entry["total_volume"]) - previous_total_volume
|
||||
else:
|
||||
entry["volume"] = int(entry["total_volume"]) #if you prefer to keep the first volume as is
|
||||
previous_total_volume = int(entry["total_volume"])
|
||||
try:
|
||||
# Check if the data is already in the file
|
||||
if item['tracking_id'] not in existing_keys:
|
||||
try:
|
||||
sector = stock_screener_data_dict[symbol].get('sector', None)
|
||||
except:
|
||||
sector = None
|
||||
|
||||
sorted_data = sorted(sorted_data, key=lambda x: x['date'], reverse=True)
|
||||
volume = float(item['volume'])
|
||||
size = float(item['size'])
|
||||
|
||||
daily_volume_percentage = round((size / volume) * 100, 2)
|
||||
avg_volume_percentage = round((size / quote_data.get('avgVolume', 1)) * 100, 2)
|
||||
res.append({
|
||||
'ticker': item['ticker'],
|
||||
'date': item['executed_at'],
|
||||
'price': round(float(item['price']),2),
|
||||
'size': item['size'],
|
||||
'volume': volume,
|
||||
'premium': item['premium'],
|
||||
'sector': sector,
|
||||
'assetType': asset_type,
|
||||
'dailyVolumePercentage': daily_volume_percentage,
|
||||
'avgVolumePercentage': avg_volume_percentage,
|
||||
'trackingID': item['tracking_id']
|
||||
})
|
||||
except Exception as e:
|
||||
print(f"Error processing {symbol}: {e}")
|
||||
|
||||
# Append new data to existing data and combine
|
||||
combined_data = existing_data + res
|
||||
# Save the updated data
|
||||
save_latest_ratings(combined_data, json_file_path)
|
||||
|
||||
|
||||
if len(sorted_data) > 0:
|
||||
save_json(sorted_data)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
executor = ThreadPoolExecutor(max_workers=1)
|
||||
future = executor.submit(run)
|
||||
try:
|
||||
# Wait for the result with a timeout of 300 seconds (5 minutes)
|
||||
future.result(timeout=1000)
|
||||
except TimeoutError:
|
||||
print("The operation timed out.")
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
finally:
|
||||
executor.shutdown()
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
14
app/main.py
14
app/main.py
@ -2935,6 +2935,20 @@ async def get_options_flow_feed(api_key: str = Security(get_api_key)):
|
||||
headers={"Content-Encoding": "gzip"}
|
||||
)
|
||||
|
||||
@app.get("/dark-pool-flow-feed")
|
||||
async def get_dark_pool_feed(api_key: str = Security(get_api_key)):
|
||||
try:
|
||||
with open(f"json/dark-pool/feed/data.json", 'rb') as file:
|
||||
res_list = orjson.loads(file.read())
|
||||
except:
|
||||
res_list = []
|
||||
data = orjson.dumps(res_list)
|
||||
compressed_data = gzip.compress(data)
|
||||
return StreamingResponse(
|
||||
io.BytesIO(compressed_data),
|
||||
media_type="application/json",
|
||||
headers={"Content-Encoding": "gzip"}
|
||||
)
|
||||
|
||||
@app.get("/options-zero-dte")
|
||||
async def get_options_flow_feed(api_key: str = Security(get_api_key)):
|
||||
|
||||
22
app/test.py
22
app/test.py
@ -1,8 +1,20 @@
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
|
||||
url = "https://twitter.com/search?q=%24AAPL&src=typed_query"
|
||||
response = requests.get(url)
|
||||
soup = BeautifulSoup(response.content, 'html.parser')
|
||||
load_dotenv()
|
||||
api_key = os.getenv('UNUSUAL_WHALES_API_KEY')
|
||||
|
||||
print(soup)
|
||||
querystring = {"limit":"200"}
|
||||
|
||||
url = "https://api.unusualwhales.com/api/darkpool/recent"
|
||||
|
||||
headers = {
|
||||
"Accept": "application/json, text/plain",
|
||||
"Authorization": api_key
|
||||
}
|
||||
|
||||
response = requests.get(url, headers=headers, params=querystring)
|
||||
|
||||
print(len(response.json()['data']))
|
||||
print(response.json()['data'][0])
|
||||
Loading…
x
Reference in New Issue
Block a user