diff --git a/app/main.py b/app/main.py index 131d258..2713faf 100755 --- a/app/main.py +++ b/app/main.py @@ -4389,14 +4389,15 @@ async def get_stock_data(data: BulkList, api_key: str = Security(get_api_key)): @app.post("/bulk-download") async def get_data(data: BulkDownload, api_key: str = Security(get_api_key)): - # Ensure tickers are uppercase and unique if needed. + # Ensure tickers are uppercase. tickers = [ticker.upper() for ticker in data.tickers] selected_data_items = [item for item in data.bulkData if item.get("selected") is True] + # Mapping file paths for non-Options data types. DATA_TYPE_PATHS = { - "Price Data": "json/historical-price/max/{ticker}.json", - "Dividends Data": "json/dividends/companies/{ticker}.json", - "Options Data": "json/options-historical-data/companies/{ticker}.json" + "Stock Price": "json/historical-price/max/{ticker}.json", + "Dividends": "json/dividends/companies/{ticker}.json", + "Dark Pool": "json/dark-pool/companies/{ticker}.json", } # Create an in-memory binary stream for the zip archive. @@ -4408,27 +4409,25 @@ async def get_data(data: BulkDownload, api_key: str = Security(get_api_key)): for data_item in selected_data_items: data_type_name = data_item.get("name") - # For Price Data, we need to merge the adjusted price data. + # Special handling for Price Data. if data_type_name == "Price Data": - # Read historical price data. try: with open(f"json/historical-price/max/{ticker}.json", 'rb') as file: res = orjson.loads(file.read()) - except Exception as e: + except Exception: res = [] # Read adjusted price data. try: with open(f"json/historical-price/adj/{ticker}.json", 'rb') as file: adj_res = orjson.loads(file.read()) - except Exception as e: + except Exception: adj_res = [] - # Create a dictionary mapping date (or time) to the corresponding adjusted price entry. - # We assume "date" in adj_res corresponds to "time" in res. + # Map adjusted entries by date. adj_by_date = {entry["date"]: entry for entry in adj_res if "date" in entry} - # Loop over the historical price records and add the adjusted prices if available. + # Merge adjusted data into the regular records. for record in res: date_key = record.get("time") if date_key in adj_by_date: @@ -4438,37 +4437,123 @@ async def get_data(data: BulkDownload, api_key: str = Security(get_api_key)): record["adjLow"] = adj_entry.get("adjLow") record["adjClose"] = adj_entry.get("adjClose") - json_data = res # Use the merged result. + json_data = res + + # Convert and write CSV for Price Data. + csv_buffer = io.StringIO() + csv_writer = csv.writer(csv_buffer) + if json_data and isinstance(json_data, list) and len(json_data) > 0: + headers = list(json_data[0].keys()) + csv_writer.writerow(headers) + for row in json_data: + csv_writer.writerow([row.get(key, "") for key in headers]) + else: + csv_writer.writerow(["No data available"]) + + zip_csv_path = f"{data_type_name}/{ticker}.csv" + zf.writestr(zip_csv_path, csv_buffer.getvalue()) + + # Special handling for Options. + elif data_type_name == "Options": + # Load the options historical data. + try: + with open(f"json/options-historical-data/companies/{ticker}.json", 'rb') as file: + options_data = orjson.loads(file.read()) + except Exception: + options_data = [] + + + + # Add Unusual Activity data under Options/Unusual_Activity. + try: + with open(f"json/unusual-activity/{ticker}.json", 'rb') as file: + unusual_data = orjson.loads(file.read()) + except Exception: + unusual_data = [] + + csv_buffer_unusual = io.StringIO() + csv_writer_unusual = csv.writer(csv_buffer_unusual) + if unusual_data and isinstance(unusual_data, list) and len(unusual_data) > 0: + headers = list(unusual_data[0].keys()) + csv_writer_unusual.writerow(headers) + for row in unusual_data: + csv_writer_unusual.writerow([row.get(key, "") for key in headers]) + else: + csv_writer_unusual.writerow(["No data available"]) + + zip_csv_unusual_path = f"Options/Unusual_Activity/{ticker}.csv" + zf.writestr(zip_csv_unusual_path, csv_buffer_unusual.getvalue()) + + # Also add the historical options data into a separate Historical folder. + csv_buffer_hist = io.StringIO() + csv_writer_hist = csv.writer(csv_buffer_hist) + if options_data and isinstance(options_data, list) and len(options_data) > 0: + headers = list(options_data[0].keys()) + csv_writer_hist.writerow(headers) + for row in options_data: + csv_writer_hist.writerow([row.get(key, "") for key in headers]) + else: + csv_writer_hist.writerow(["No data available"]) + + zip_csv_hist_path = f"Options/Historical/{ticker}.csv" + zf.writestr(zip_csv_hist_path, csv_buffer_hist.getvalue()) + + # --- OI Data Handling --- + # Create two subfolders: one for "Strike" and another for "Expiry". + for category in ["strike", "expiry"]: + try: + with open(f"json/oi/{category}/{ticker}.json", 'rb') as file: + oi_data = orjson.loads(file.read()) + # For "strike", filter data using the 85th percentile threshold. + if category == 'strike' and oi_data and isinstance(oi_data, list): + val_sums = [item["call_oi"] + item["put_oi"] for item in oi_data] + threshold = np.percentile(val_sums, 85) + oi_data = [item for item in oi_data if (item["call_oi"] + item["put_oi"]) >= threshold] + except Exception: + oi_data = [] + + csv_buffer_oi = io.StringIO() + csv_writer_oi = csv.writer(csv_buffer_oi) + if oi_data and isinstance(oi_data, list) and len(oi_data) > 0: + headers = list(oi_data[0].keys()) + csv_writer_oi.writerow(headers) + for row in oi_data: + csv_writer_oi.writerow([row.get(key, "") for key in headers]) + else: + csv_writer_oi.writerow(["No data available"]) + + # Capitalize the folder name. + folder_category = category.capitalize() + zip_csv_oi_path = f"Options/OI/{folder_category}/{ticker}.csv" + zf.writestr(zip_csv_oi_path, csv_buffer_oi.getvalue()) + + + # Handling for other data types. else: - # Fallback to the mapped file path for other data types. file_path_template = DATA_TYPE_PATHS.get(data_type_name) if not file_path_template: continue # Skip if the data type is not mapped. try: with open(file_path_template.format(ticker=ticker), 'rb') as file: json_data = orjson.loads(file.read()) - if data_type_name == 'Dividends Data': - json_data = json_data['history'] + if data_type_name == 'Dividends': + json_data = json_data.get('history', []) json_data = sorted(json_data, key=lambda item: item['date']) - except: + except Exception: json_data = [] - # Convert the JSON data to CSV. - csv_buffer = io.StringIO() - csv_writer = csv.writer(csv_buffer) - - if json_data and isinstance(json_data, list) and len(json_data) > 0: - # Write headers based on the keys of the first record. - headers = list(json_data[0].keys()) - csv_writer.writerow(headers) - for row in json_data: - csv_writer.writerow([row.get(key, "") for key in headers]) - else: - csv_writer.writerow(["No data available"]) - - # Write the CSV content to the zip file under a folder named after the data type. - zip_csv_path = f"{data_type_name}/{ticker}.csv" - zf.writestr(zip_csv_path, csv_buffer.getvalue()) + csv_buffer = io.StringIO() + csv_writer = csv.writer(csv_buffer) + if json_data and isinstance(json_data, list) and len(json_data) > 0: + headers = list(json_data[0].keys()) + csv_writer.writerow(headers) + for row in json_data: + csv_writer.writerow([row.get(key, "") for key in headers]) + else: + csv_writer.writerow(["No data available"]) + + zip_csv_path = f"{data_type_name}/{ticker}.csv" + zf.writestr(zip_csv_path, csv_buffer.getvalue()) memory_file.seek(0) return StreamingResponse( @@ -4477,6 +4562,24 @@ async def get_data(data: BulkDownload, api_key: str = Security(get_api_key)): headers={"Content-Disposition": "attachment; filename=bulk_data.zip"} ) + + + + + + + + + + + + + + + + + + @app.get("/newsletter") async def get_newsletter(): try: diff --git a/app/secondary_cron_job.py b/app/secondary_cron_job.py index 8d6490f..ad27dc2 100755 --- a/app/secondary_cron_job.py +++ b/app/secondary_cron_job.py @@ -16,12 +16,9 @@ def run_pocketbase(): subprocess.run(["python3", "cron_notification_channel.py"]) def run_restart_cache(): - # Update db daily - week = datetime.today().weekday() - if week <= 5: - subprocess.run(["pm2", "restart", "fastapi"]) - subprocess.run(["pm2", "restart", "fastify"]) - subprocess.run(["pm2", "restart", "websocket"]) + subprocess.run(["pm2", "restart", "fastapi"]) + subprocess.run(["pm2", "restart", "fastify"]) + subprocess.run(["pm2", "restart", "websocket"]) def run_json_job(): subprocess.run(["python3", "restart_json.py"]) @@ -36,8 +33,8 @@ def run_cron_price_alert(): def run_refresh_pocketbase(): """Runs cron_pocketbase.py with --refresh at the start of each month.""" - today = datetime.now(berlin_tz) - if today.day == 1: # Check if today is the 1st day of the month + now = datetime.now(berlin_tz) + if now.day == 1: subprocess.run(["python3", "cron_pocketbase.py", "--refresh"]) @@ -49,12 +46,12 @@ def run_threaded(job_func): # Existing scheduled tasks schedule.every().day.at("06:30").do(run_threaded, run_pocketbase).tag('pocketbase_job') -schedule.every().day.at("15:31").do(run_threaded, run_restart_cache) +schedule.every().day.at("15:30").do(run_threaded, run_restart_cache) schedule.every().day.at("23:00").do(run_threaded, run_restart_cache) -schedule.every(3).hours.do(run_threaded, run_json_job).tag('json_job') +schedule.every(2).hours.do(run_threaded, run_json_job).tag('json_job') schedule.every(1).minutes.do(run_threaded, run_cron_price_alert).tag('price_alert_job') -schedule.every().day.at("00:00").do(run_threaded, run_refresh_pocketbase) +schedule.every().day.at("00:30").do(run_threaded, run_refresh_pocketbase) # Keep the scheduler running while True: