diff --git a/app/cron_swap.py b/app/cron_swap.py index bdbc9fe..a40522f 100644 --- a/app/cron_swap.py +++ b/app/cron_swap.py @@ -4,50 +4,53 @@ import glob import requests import os import sqlite3 +import ujson from zipfile import ZipFile import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm +from datetime import datetime, timedelta +import shutil -# Define some configuration variables -OUTPUT_PATH = r"./json/swap" -COMPANIES_PATH = r"./json/swap/companies" +# Define configuration variables +OUTPUT_PATH = "./json/swap" +COMPANIES_PATH = "./json/swap/companies" MAX_WORKERS = 4 -CHUNK_SIZE = 1000 # Adjust this value based on your system's RAM -executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) +CHUNK_SIZE = 5000 # Adjust based on system RAM +DAYS_TO_PROCESS = 360 -# Ensure the companies directory exists +# Ensure directories exist +# Remove the directory +shutil.rmtree('json/swap/companies') os.makedirs(COMPANIES_PATH, exist_ok=True) -con = sqlite3.connect('stocks.db') -cursor = con.cursor() -cursor.execute("PRAGMA journal_mode = wal") -cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") -stock_symbols = [row[0] for row in cursor.fetchall()] +def get_stock_symbols(): + with sqlite3.connect('stocks.db') as con: + cursor = con.cursor() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%'") + total_symbols = [row[0] for row in cursor.fetchall()] + return total_symbols -con.close() +stock_symbols = get_stock_symbols() +# Function to clean and convert to numeric values +def clean_and_convert(series): + return pd.to_numeric(series.replace({',': ''}, regex=True).str.extract(r'(\d+)', expand=False), errors='coerce').fillna(0).astype(int) -start = datetime.datetime.today() - datetime.timedelta(days=180) -end = datetime.datetime.today() -dates = [start + datetime.timedelta(days=i) for i in range((end - start).days + 1)] - -# Generate filenames for each date -filenames = [ - f"SEC_CUMULATIVE_EQUITIES_{year}_{month}_{day}.zip" - for year, month, day in [ - (date.strftime("%Y"), date.strftime("%m"), date.strftime("%d")) - for date in dates - ] -] +def generate_filenames(): + end = datetime.today() + start = end - timedelta(days=DAYS_TO_PROCESS) + dates = [start + timedelta(days=i) for i in range((end - start).days + 1)] + return [f"SEC_CUMULATIVE_EQUITIES_{date.strftime('%Y_%m_%d')}.zip" for date in dates] def download_and_process(filename): csv_output_filename = os.path.join(OUTPUT_PATH, filename.replace('.zip', '.csv')) - if os.path.exists(csv_output_filename ): - print(f"{csv_output_filename} already exists. Skipping download and processing.") + if os.path.exists(csv_output_filename): + print(f"{csv_output_filename} already exists. Skipping.") return url = f"https://pddata.dtcc.com/ppd/api/report/cumulative/sec/{filename}" @@ -63,79 +66,130 @@ def download_and_process(filename): csv_filename = zip_ref.namelist()[0] zip_ref.extractall() - output_filename = os.path.join(OUTPUT_PATH, f"{csv_filename}") + output_filename = os.path.join(OUTPUT_PATH, csv_filename) + + columns_to_keep = [ + "Underlying Asset ID", "Underlier ID-Leg 1", + "Effective Date", "Notional amount-Leg 1", + "Expiration Date", "Total notional quantity-Leg 1", + "Dissemination Identifier", "Original Dissemination Identifier", + "Dissemintation ID", "Original Dissemintation ID", + "Primary Asset Class", "Action Type" + ] - # Process the CSV in chunks chunk_list = [] - for chunk in pd.read_csv(csv_filename, chunksize=CHUNK_SIZE, low_memory=False, on_bad_lines="skip"): + for chunk in pd.read_csv(csv_filename, chunksize=CHUNK_SIZE, low_memory=False, on_bad_lines="skip", usecols=lambda x: x in columns_to_keep): + # Rename columns if necessary + if "Dissemination Identifier" not in chunk.columns: + chunk.rename(columns={ + "Dissemintation ID": "Dissemination Identifier", + "Original Dissemintation ID": "Original Dissemination Identifier" + }, inplace=True) + chunk_list.append(chunk) - # Concatenate chunks and save pd.concat(chunk_list, ignore_index=True).to_csv(output_filename, index=False) - # Delete original downloaded files - os.remove(filename) os.remove(csv_filename) + + print(f"Processed and saved {output_filename}") -tasks = [] -for filename in filenames: - tasks.append(executor.submit(download_and_process, filename)) - -for task in tqdm(as_completed(tasks), total=len(tasks)): - pass - -files = glob.glob(OUTPUT_PATH + "/" + "*") def process_and_save_by_ticker(): csv_files = glob.glob(os.path.join(OUTPUT_PATH, "*.csv")) - # Initialize DataFrames for each stock symbol - stock_data = {symbol: pd.DataFrame() for symbol in stock_symbols} + # Sort CSV files by date (assuming filename format is "SEC_CUMULATIVE_EQUITIES_YYYY_MM_DD.csv") + sorted_csv_files = sorted(csv_files, key=lambda x: datetime.strptime("_".join(os.path.splitext(os.path.basename(x))[0].split('_')[3:]), "%Y_%m_%d"), reverse=True) - for file in tqdm(csv_files, desc="Processing files"): + # Select only the N latest files + latest_csv_files = sorted_csv_files[:100] + + # Create a set of stock symbols for faster lookup + stock_symbols_set = set(stock_symbols) + + for file in tqdm(latest_csv_files, desc="Processing files"): if not os.path.isfile(file): # Skip if not a file continue try: + # Read the CSV file in chunks for chunk in pd.read_csv(file, chunksize=CHUNK_SIZE, low_memory=False, on_bad_lines="skip"): if chunk.empty: continue + + # Rename columns if necessary if "Dissemination Identifier" not in chunk.columns: chunk.rename(columns={ "Dissemintation ID": "Dissemination Identifier", "Original Dissemintation ID": "Original Dissemination Identifier" }, inplace=True) - # Filter and append data for each stock symbol - for symbol in stock_symbols: - if "Primary Asset Class" in chunk.columns or "Action Type" in chunk.columns: - symbol_data = chunk[chunk["Underlying Asset ID"].str.contains(f"{symbol}.", na=False)] - else: - symbol_data = chunk[chunk["Underlier ID-Leg 1"].str.contains(f"{symbol}.", na=False)] - - stock_data[symbol] = pd.concat([stock_data[symbol], symbol_data], ignore_index=True) + # Determine which column to use for filtering + filter_column = "Underlying Asset ID" if "Primary Asset Class" in chunk.columns or "Action Type" in chunk.columns else "Underlier ID-Leg 1" + # Extract the symbol from the filter column + chunk['symbol'] = chunk[filter_column].str.split('.').str[0] + + # Filter the chunk to include only rows with symbols in our list + filtered_chunk = chunk[chunk['symbol'].isin(stock_symbols_set)] + + # If the filtered chunk is not empty, process and save it + if not filtered_chunk.empty: + columns_to_keep = ["symbol", "Effective Date", "Notional amount-Leg 1", "Expiration Date", "Total notional quantity-Leg 1"] + filtered_chunk = filtered_chunk[columns_to_keep] + + # Convert 'Notional amount-Leg 1' and 'Total notional quantity-Leg 1' to integers + filtered_chunk['Notional amount-Leg 1'] = clean_and_convert(filtered_chunk['Notional amount-Leg 1']) + filtered_chunk['Total notional quantity-Leg 1'] = clean_and_convert(filtered_chunk['Total notional quantity-Leg 1']) + + # Group by symbol and append to respective files + for symbol, group in filtered_chunk.groupby('symbol'): + output_file = os.path.join(COMPANIES_PATH, f"{symbol}.json") + group = group.drop(columns=['symbol']) + + # Convert DataFrame to list of dictionaries + records = group.to_dict('records') + + if os.path.exists(output_file): + with open(output_file, 'r+') as f: + data = ujson.load(f) + data.extend(records) + f.seek(0) + ujson.dump(data, f) + else: + with open(output_file, 'w') as f: + ujson.dump(records, f) + except pd.errors.EmptyDataError: print(f"Skipping empty file: {file}") except Exception as e: print(f"Error processing file {file}: {str(e)}") - # Save data for each stock symbol - for symbol, data in stock_data.items(): - if not data.empty: - # Treat "Original Dissemination Identifier" and "Dissemination Identifier" as long integers - data["Original Dissemination Identifier"] = data["Original Dissemination Identifier"].astype("Int64") - data["Dissemination Identifier"] = data["Dissemination Identifier"].astype("Int64") - data = data.drop(columns=["Unnamed: 0"], errors="ignore") - - # Keep only specific columns - columns_to_keep = ["Effective Date", "Notional amount-Leg 1", "Expiration Date", "Total notional quantity-Leg 1"] - data = data[columns_to_keep] - - # Save to CSV - output_file = os.path.join(COMPANIES_PATH, f"{symbol}.csv") - data.to_csv(output_file, index=False) - print(f"Saved data for {symbol} to {output_file}") + # Final processing of each symbol's file + for symbol in tqdm(stock_symbols, desc="Final processing"): + file_path = os.path.join(COMPANIES_PATH, f"{symbol}.json") + if os.path.exists(file_path): + try: + with open(file_path, 'r') as f: + data = ujson.load(f) + + # Convert to DataFrame for processing + df = pd.DataFrame(data) + df["Original Dissemination Identifier"] = df["Original Dissemination Identifier"].astype("Int64") + df["Dissemination Identifier"] = df["Dissemination Identifier"].astype("Int64") + + # Convert back to list of dictionaries and save + processed_data = df.to_dict('records') + with open(file_path, 'w') as f: + ujson.dump(processed_data, f) + + print(f"Processed and saved data for {symbol}") + except Exception as e: + print(f"Error processing {symbol}: {str(e)}") -process_and_save_by_ticker() \ No newline at end of file +if __name__ == "__main__": + filenames = generate_filenames() + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + list(tqdm(executor.map(download_and_process, filenames), total=len(filenames))) + process_and_save_by_ticker() \ No newline at end of file diff --git a/app/main.py b/app/main.py index 271fc25..cb6481e 100755 --- a/app/main.py +++ b/app/main.py @@ -3174,6 +3174,36 @@ async def get_clinical_trial(data:TickerData, api_key: str = Security(get_api_ke redis_client.set(cache_key, compressed_data) redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + +@app.post("/swap-ticker") +async def get_clinical_trial(data:TickerData, api_key: str = Security(get_api_key)): + ticker = data.ticker.upper() + cache_key = f"swap-{ticker}" + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + + try: + with open(f"json/swap/companies/{ticker}.json", 'rb') as file: + res = orjson.loads(file.read()) + except: + res = [] + + data = orjson.dumps(res) + compressed_data = gzip.compress(data) + + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 3600*3600) + return StreamingResponse( io.BytesIO(compressed_data), media_type="application/json",