116 lines
3.7 KiB
Python
116 lines
3.7 KiB
Python
import asyncio
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
import ast
|
|
import orjson
|
|
from tqdm import tqdm
|
|
import aiohttp
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import sqlite3
|
|
import re
|
|
import os
|
|
|
|
|
|
|
|
# Database connection and symbol retrieval
|
|
def get_total_symbols():
|
|
with sqlite3.connect('stocks.db') as con:
|
|
cursor = con.cursor()
|
|
cursor.execute("PRAGMA journal_mode = wal")
|
|
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
|
|
stocks_symbols = [row[0] for row in cursor.fetchall()]
|
|
|
|
with sqlite3.connect('etf.db') as etf_con:
|
|
etf_cursor = etf_con.cursor()
|
|
etf_cursor.execute("PRAGMA journal_mode = wal")
|
|
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
|
|
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
|
|
|
|
return stocks_symbols + etf_symbols
|
|
|
|
|
|
async def save_json(data, symbol):
|
|
directory = "json/unusual-activity"
|
|
os.makedirs(directory, exist_ok=True)
|
|
with open(f"{directory}/{symbol}.json", 'wb') as file:
|
|
file.write(orjson.dumps(data))
|
|
|
|
|
|
async def get_dataset():
|
|
today = datetime.today()
|
|
start_date = today - timedelta(days=365)
|
|
date_list = [(start_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(365)]
|
|
|
|
unique_data = {}
|
|
|
|
for date in tqdm(date_list):
|
|
try:
|
|
with open(f"json/options-historical-data/flow-data/{date}.json", "r") as file:
|
|
data = orjson.loads(file.read())
|
|
data = [item for item in data if item['cost_basis'] >=1E6]
|
|
for item in data:
|
|
if "id" in item:
|
|
unique_data[item["id"]] = item # Store unique items based on "id"
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
with open(f"json/options-flow/feed/data.json", "r") as file:
|
|
data = orjson.loads(file.read())
|
|
data = [item for item in data if item['cost_basis'] >=1E6]
|
|
for item in data:
|
|
if "id" in item:
|
|
unique_data[item["id"]] = item
|
|
except:
|
|
pass
|
|
|
|
all_data = list(unique_data.values()) # Convert back to a list
|
|
|
|
return all_data
|
|
|
|
|
|
async def get_data(symbol, data):
|
|
|
|
res_list = []
|
|
if len(data) > 0:
|
|
for item in data:
|
|
try:
|
|
if item['ticker'] == symbol:
|
|
res_list.append({
|
|
'date': item['date'],
|
|
'premium': item['cost_basis'],
|
|
'sentiment': item['sentiment'],
|
|
'executionEst': item['execution_estimate'],
|
|
'price': item['underlying_price'],
|
|
'unusualType': item['option_activity_type'],
|
|
'size': item['size'],
|
|
'oi': item['open_interest'],
|
|
'optionSymbol': item['option_symbol'],
|
|
'strike': item['strike_price'],
|
|
'expiry': item['date_expiration'],
|
|
'optionType': item['put_call'],
|
|
})
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
res_list = sorted(res_list, key=lambda x: x['date'], reverse=True)
|
|
|
|
if res_list:
|
|
await save_json(res_list, symbol)
|
|
|
|
|
|
|
|
async def main():
|
|
total_symbols = get_total_symbols()
|
|
data = await get_dataset()
|
|
|
|
for symbol in tqdm(total_symbols):
|
|
try:
|
|
await get_data(symbol, data)
|
|
|
|
except Exception as e:
|
|
print(f"Error processing {symbol}: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|