diff --git a/app/cron_swap.py b/app/cron_swap.py new file mode 100644 index 0000000..93a784e --- /dev/null +++ b/app/cron_swap.py @@ -0,0 +1,128 @@ +import pandas as pd +import numpy as np +import glob +import requests +import os +from zipfile import ZipFile +import datetime +from concurrent.futures import ThreadPoolExecutor, as_completed +from tqdm import tqdm + +# Define some configuration variables +OUTPUT_PATH = r"./json/swap" +COMPANIES_PATH = r"./json/swap/companies" +MAX_WORKERS = 1 +CHUNK_SIZE = 1000 # Adjust this value based on your system's RAM +executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) + +# Ensure the companies directory exists +os.makedirs(COMPANIES_PATH, exist_ok=True) + +# List of stock symbols you're interested in +stock_symbols = ['AAPL', 'GME', 'AMD'] # Add more symbols as needed + + +start = datetime.datetime.today() - datetime.timedelta(days=30) +end = datetime.datetime.today() +dates = [start + datetime.timedelta(days=i) for i in range((end - start).days + 1)] + +# Generate filenames for each date +filenames = [ + f"SEC_CUMULATIVE_EQUITIES_{year}_{month}_{day}.zip" + for year, month, day in [ + (date.strftime("%Y"), date.strftime("%m"), date.strftime("%d")) + for date in dates + ] +] + + +def download_and_process(filename): + url = f"https://pddata.dtcc.com/ppd/api/report/cumulative/sec/{filename}" + req = requests.get(url) + if req.status_code != 200: + print(f"Failed to download {url}") + return + + with open(filename, "wb") as f: + f.write(req.content) + + with ZipFile(filename, "r") as zip_ref: + csv_filename = zip_ref.namelist()[0] + zip_ref.extractall() + + output_filename = os.path.join(OUTPUT_PATH, f"{csv_filename}") + + # Process the CSV in chunks + chunk_list = [] + for chunk in pd.read_csv(csv_filename, chunksize=CHUNK_SIZE, low_memory=False, on_bad_lines="skip"): + chunk_list.append(chunk) + + # Concatenate chunks and save + pd.concat(chunk_list, ignore_index=True).to_csv(output_filename, index=False) + + # Delete original downloaded files + + os.remove(filename) + os.remove(csv_filename) + +tasks = [] +for filename in filenames: + tasks.append(executor.submit(download_and_process, filename)) + +for task in tqdm(as_completed(tasks), total=len(tasks)): + pass + +files = glob.glob(OUTPUT_PATH + "/" + "*") + +def process_and_save_by_ticker(): + csv_files = glob.glob(os.path.join(OUTPUT_PATH, "*.csv")) + + # Initialize DataFrames for each stock symbol + stock_data = {symbol: pd.DataFrame() for symbol in stock_symbols} + + for file in tqdm(csv_files, desc="Processing files"): + if not os.path.isfile(file): # Skip if not a file + continue + try: + for chunk in pd.read_csv(file, chunksize=CHUNK_SIZE, low_memory=False, on_bad_lines="skip"): + if chunk.empty: + continue + if "Dissemination Identifier" not in chunk.columns: + chunk.rename(columns={ + "Dissemintation ID": "Dissemination Identifier", + "Original Dissemintation ID": "Original Dissemination Identifier" + }, inplace=True) + + # Filter and append data for each stock symbol + for symbol in stock_symbols: + if "Primary Asset Class" in chunk.columns or "Action Type" in chunk.columns: + symbol_data = chunk[chunk["Underlying Asset ID"].str.contains(f"{symbol}.", na=False)] + else: + symbol_data = chunk[chunk["Underlier ID-Leg 1"].str.contains(f"{symbol}.", na=False)] + + stock_data[symbol] = pd.concat([stock_data[symbol], symbol_data], ignore_index=True) + + except pd.errors.EmptyDataError: + print(f"Skipping empty file: {file}") + except Exception as e: + print(f"Error processing file {file}: {str(e)}") + + # Save data for each stock symbol + for symbol, data in stock_data.items(): + if not data.empty: + # Treat "Original Dissemination Identifier" and "Dissemination Identifier" as long integers + data["Original Dissemination Identifier"] = data["Original Dissemination Identifier"].astype("Int64") + data["Dissemination Identifier"] = data["Dissemination Identifier"].astype("Int64") + data = data.drop(columns=["Unnamed: 0"], errors="ignore") + + # Keep only specific columns + columns_to_keep = ["Effective Date", "Notional amount-Leg 1", "Expiration Date", "Total notional quantity-Leg 1"] + data = data[columns_to_keep] + + # Save to CSV + output_file = os.path.join(COMPANIES_PATH, f"{symbol}.csv") + data.to_csv(output_file, index=False) + print(f"Saved data for {symbol} to {output_file}") + + +process_and_save_by_ticker() \ No newline at end of file