add swap data endpoint

This commit is contained in:
MuslemRahimi 2024-07-21 01:43:33 +02:00
parent c3e115fd12
commit 35739930d1
2 changed files with 152 additions and 68 deletions

View File

@ -4,50 +4,53 @@ import glob
import requests import requests
import os import os
import sqlite3 import sqlite3
import ujson
from zipfile import ZipFile from zipfile import ZipFile
import datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm from tqdm import tqdm
from datetime import datetime, timedelta
import shutil
# Define some configuration variables # Define configuration variables
OUTPUT_PATH = r"./json/swap" OUTPUT_PATH = "./json/swap"
COMPANIES_PATH = r"./json/swap/companies" COMPANIES_PATH = "./json/swap/companies"
MAX_WORKERS = 4 MAX_WORKERS = 4
CHUNK_SIZE = 1000 # Adjust this value based on your system's RAM CHUNK_SIZE = 5000 # Adjust based on system RAM
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) DAYS_TO_PROCESS = 360
# Ensure the companies directory exists # Ensure directories exist
# Remove the directory
shutil.rmtree('json/swap/companies')
os.makedirs(COMPANIES_PATH, exist_ok=True) os.makedirs(COMPANIES_PATH, exist_ok=True)
con = sqlite3.connect('stocks.db')
cursor = con.cursor() def get_stock_symbols():
cursor.execute("PRAGMA journal_mode = wal") with sqlite3.connect('stocks.db') as con:
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") cursor = con.cursor()
stock_symbols = [row[0] for row in cursor.fetchall()] cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%'")
total_symbols = [row[0] for row in cursor.fetchall()]
return total_symbols
con.close() stock_symbols = get_stock_symbols()
# Function to clean and convert to numeric values
def clean_and_convert(series):
return pd.to_numeric(series.replace({',': ''}, regex=True).str.extract(r'(\d+)', expand=False), errors='coerce').fillna(0).astype(int)
start = datetime.datetime.today() - datetime.timedelta(days=180)
end = datetime.datetime.today()
dates = [start + datetime.timedelta(days=i) for i in range((end - start).days + 1)]
# Generate filenames for each date
filenames = [
f"SEC_CUMULATIVE_EQUITIES_{year}_{month}_{day}.zip"
for year, month, day in [
(date.strftime("%Y"), date.strftime("%m"), date.strftime("%d"))
for date in dates
]
]
def generate_filenames():
end = datetime.today()
start = end - timedelta(days=DAYS_TO_PROCESS)
dates = [start + timedelta(days=i) for i in range((end - start).days + 1)]
return [f"SEC_CUMULATIVE_EQUITIES_{date.strftime('%Y_%m_%d')}.zip" for date in dates]
def download_and_process(filename): def download_and_process(filename):
csv_output_filename = os.path.join(OUTPUT_PATH, filename.replace('.zip', '.csv')) csv_output_filename = os.path.join(OUTPUT_PATH, filename.replace('.zip', '.csv'))
if os.path.exists(csv_output_filename ): if os.path.exists(csv_output_filename):
print(f"{csv_output_filename} already exists. Skipping download and processing.") print(f"{csv_output_filename} already exists. Skipping.")
return return
url = f"https://pddata.dtcc.com/ppd/api/report/cumulative/sec/{filename}" url = f"https://pddata.dtcc.com/ppd/api/report/cumulative/sec/{filename}"
@ -63,79 +66,130 @@ def download_and_process(filename):
csv_filename = zip_ref.namelist()[0] csv_filename = zip_ref.namelist()[0]
zip_ref.extractall() zip_ref.extractall()
output_filename = os.path.join(OUTPUT_PATH, f"{csv_filename}") output_filename = os.path.join(OUTPUT_PATH, csv_filename)
columns_to_keep = [
"Underlying Asset ID", "Underlier ID-Leg 1",
"Effective Date", "Notional amount-Leg 1",
"Expiration Date", "Total notional quantity-Leg 1",
"Dissemination Identifier", "Original Dissemination Identifier",
"Dissemintation ID", "Original Dissemintation ID",
"Primary Asset Class", "Action Type"
]
# Process the CSV in chunks
chunk_list = [] chunk_list = []
for chunk in pd.read_csv(csv_filename, chunksize=CHUNK_SIZE, low_memory=False, on_bad_lines="skip"): for chunk in pd.read_csv(csv_filename, chunksize=CHUNK_SIZE, low_memory=False, on_bad_lines="skip", usecols=lambda x: x in columns_to_keep):
# Rename columns if necessary
if "Dissemination Identifier" not in chunk.columns:
chunk.rename(columns={
"Dissemintation ID": "Dissemination Identifier",
"Original Dissemintation ID": "Original Dissemination Identifier"
}, inplace=True)
chunk_list.append(chunk) chunk_list.append(chunk)
# Concatenate chunks and save
pd.concat(chunk_list, ignore_index=True).to_csv(output_filename, index=False) pd.concat(chunk_list, ignore_index=True).to_csv(output_filename, index=False)
# Delete original downloaded files
os.remove(filename) os.remove(filename)
os.remove(csv_filename) os.remove(csv_filename)
tasks = [] print(f"Processed and saved {output_filename}")
for filename in filenames:
tasks.append(executor.submit(download_and_process, filename))
for task in tqdm(as_completed(tasks), total=len(tasks)):
pass
files = glob.glob(OUTPUT_PATH + "/" + "*")
def process_and_save_by_ticker(): def process_and_save_by_ticker():
csv_files = glob.glob(os.path.join(OUTPUT_PATH, "*.csv")) csv_files = glob.glob(os.path.join(OUTPUT_PATH, "*.csv"))
# Initialize DataFrames for each stock symbol # Sort CSV files by date (assuming filename format is "SEC_CUMULATIVE_EQUITIES_YYYY_MM_DD.csv")
stock_data = {symbol: pd.DataFrame() for symbol in stock_symbols} sorted_csv_files = sorted(csv_files, key=lambda x: datetime.strptime("_".join(os.path.splitext(os.path.basename(x))[0].split('_')[3:]), "%Y_%m_%d"), reverse=True)
for file in tqdm(csv_files, desc="Processing files"): # Select only the N latest files
latest_csv_files = sorted_csv_files[:100]
# Create a set of stock symbols for faster lookup
stock_symbols_set = set(stock_symbols)
for file in tqdm(latest_csv_files, desc="Processing files"):
if not os.path.isfile(file): # Skip if not a file if not os.path.isfile(file): # Skip if not a file
continue continue
try: try:
# Read the CSV file in chunks
for chunk in pd.read_csv(file, chunksize=CHUNK_SIZE, low_memory=False, on_bad_lines="skip"): for chunk in pd.read_csv(file, chunksize=CHUNK_SIZE, low_memory=False, on_bad_lines="skip"):
if chunk.empty: if chunk.empty:
continue continue
# Rename columns if necessary
if "Dissemination Identifier" not in chunk.columns: if "Dissemination Identifier" not in chunk.columns:
chunk.rename(columns={ chunk.rename(columns={
"Dissemintation ID": "Dissemination Identifier", "Dissemintation ID": "Dissemination Identifier",
"Original Dissemintation ID": "Original Dissemination Identifier" "Original Dissemintation ID": "Original Dissemination Identifier"
}, inplace=True) }, inplace=True)
# Filter and append data for each stock symbol # Determine which column to use for filtering
for symbol in stock_symbols: filter_column = "Underlying Asset ID" if "Primary Asset Class" in chunk.columns or "Action Type" in chunk.columns else "Underlier ID-Leg 1"
if "Primary Asset Class" in chunk.columns or "Action Type" in chunk.columns:
symbol_data = chunk[chunk["Underlying Asset ID"].str.contains(f"{symbol}.", na=False)]
else:
symbol_data = chunk[chunk["Underlier ID-Leg 1"].str.contains(f"{symbol}.", na=False)]
stock_data[symbol] = pd.concat([stock_data[symbol], symbol_data], ignore_index=True) # Extract the symbol from the filter column
chunk['symbol'] = chunk[filter_column].str.split('.').str[0]
# Filter the chunk to include only rows with symbols in our list
filtered_chunk = chunk[chunk['symbol'].isin(stock_symbols_set)]
# If the filtered chunk is not empty, process and save it
if not filtered_chunk.empty:
columns_to_keep = ["symbol", "Effective Date", "Notional amount-Leg 1", "Expiration Date", "Total notional quantity-Leg 1"]
filtered_chunk = filtered_chunk[columns_to_keep]
# Convert 'Notional amount-Leg 1' and 'Total notional quantity-Leg 1' to integers
filtered_chunk['Notional amount-Leg 1'] = clean_and_convert(filtered_chunk['Notional amount-Leg 1'])
filtered_chunk['Total notional quantity-Leg 1'] = clean_and_convert(filtered_chunk['Total notional quantity-Leg 1'])
# Group by symbol and append to respective files
for symbol, group in filtered_chunk.groupby('symbol'):
output_file = os.path.join(COMPANIES_PATH, f"{symbol}.json")
group = group.drop(columns=['symbol'])
# Convert DataFrame to list of dictionaries
records = group.to_dict('records')
if os.path.exists(output_file):
with open(output_file, 'r+') as f:
data = ujson.load(f)
data.extend(records)
f.seek(0)
ujson.dump(data, f)
else:
with open(output_file, 'w') as f:
ujson.dump(records, f)
except pd.errors.EmptyDataError: except pd.errors.EmptyDataError:
print(f"Skipping empty file: {file}") print(f"Skipping empty file: {file}")
except Exception as e: except Exception as e:
print(f"Error processing file {file}: {str(e)}") print(f"Error processing file {file}: {str(e)}")
# Save data for each stock symbol # Final processing of each symbol's file
for symbol, data in stock_data.items(): for symbol in tqdm(stock_symbols, desc="Final processing"):
if not data.empty: file_path = os.path.join(COMPANIES_PATH, f"{symbol}.json")
# Treat "Original Dissemination Identifier" and "Dissemination Identifier" as long integers if os.path.exists(file_path):
data["Original Dissemination Identifier"] = data["Original Dissemination Identifier"].astype("Int64") try:
data["Dissemination Identifier"] = data["Dissemination Identifier"].astype("Int64") with open(file_path, 'r') as f:
data = data.drop(columns=["Unnamed: 0"], errors="ignore") data = ujson.load(f)
# Keep only specific columns # Convert to DataFrame for processing
columns_to_keep = ["Effective Date", "Notional amount-Leg 1", "Expiration Date", "Total notional quantity-Leg 1"] df = pd.DataFrame(data)
data = data[columns_to_keep] df["Original Dissemination Identifier"] = df["Original Dissemination Identifier"].astype("Int64")
df["Dissemination Identifier"] = df["Dissemination Identifier"].astype("Int64")
# Save to CSV # Convert back to list of dictionaries and save
output_file = os.path.join(COMPANIES_PATH, f"{symbol}.csv") processed_data = df.to_dict('records')
data.to_csv(output_file, index=False) with open(file_path, 'w') as f:
print(f"Saved data for {symbol} to {output_file}") ujson.dump(processed_data, f)
print(f"Processed and saved data for {symbol}")
except Exception as e:
print(f"Error processing {symbol}: {str(e)}")
process_and_save_by_ticker() if __name__ == "__main__":
filenames = generate_filenames()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
list(tqdm(executor.map(download_and_process, filenames), total=len(filenames)))
process_and_save_by_ticker()

View File

@ -3179,3 +3179,33 @@ async def get_clinical_trial(data:TickerData, api_key: str = Security(get_api_ke
media_type="application/json", media_type="application/json",
headers={"Content-Encoding": "gzip"} headers={"Content-Encoding": "gzip"}
) )
@app.post("/swap-ticker")
async def get_clinical_trial(data:TickerData, api_key: str = Security(get_api_key)):
ticker = data.ticker.upper()
cache_key = f"swap-{ticker}"
cached_result = redis_client.get(cache_key)
if cached_result:
return StreamingResponse(
io.BytesIO(cached_result),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)
try:
with open(f"json/swap/companies/{ticker}.json", 'rb') as file:
res = orjson.loads(file.read())
except:
res = []
data = orjson.dumps(res)
compressed_data = gzip.compress(data)
redis_client.set(cache_key, compressed_data)
redis_client.expire(cache_key, 3600*3600)
return StreamingResponse(
io.BytesIO(compressed_data),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)