From 6207b52859e0e36e7246711fcd8a538873f35b8e Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Wed, 9 Oct 2024 21:07:51 +0200 Subject: [PATCH] bugfixing options flow --- app/cron_options_flow.py | 51 +++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 30 deletions(-) diff --git a/app/cron_options_flow.py b/app/cron_options_flow.py index 44079d2..b0dd99d 100755 --- a/app/cron_options_flow.py +++ b/app/cron_options_flow.py @@ -1,14 +1,11 @@ -import time -from benzinga import financial_data -import ujson -import numpy as np -import sqlite3 import asyncio -from datetime import datetime, timedelta -import concurrent.futures +import ujson +import sqlite3 +import os +from datetime import datetime from GetStartEndDate import GetStartEndDate from dotenv import load_dotenv -import os +from benzinga import financial_data # Load environment variables load_dotenv() @@ -34,30 +31,20 @@ start_date_1d, end_date_1d = GetStartEndDate().run() start_date = start_date_1d.strftime("%Y-%m-%d") end_date = end_date_1d.strftime("%Y-%m-%d") -# Process a page of option activity -def process_page(page): +# Asynchronous wrapper for fin.options_activity +async def fetch_options_activity(page): try: - data = fin.options_activity(date_from=start_date, date_to=end_date, page=page, pagesize=1000) - data = ujson.loads(fin.output(data))['option_activity'] - return data + data = await asyncio.to_thread(fin.options_activity, date_from=start_date, date_to=end_date, page=page, pagesize=1000) + return ujson.loads(fin.output(data))['option_activity'] except Exception as e: - print(f"Error on page {page}: {e}") + print(f"Exception on page {page}: {e}") return [] -# Fetch and process pages concurrently -def fetch_options_data(max_pages=130, max_workers=6): - res_list = [] - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_page = {executor.submit(process_page, page): page for page in range(max_pages)} - for future in concurrent.futures.as_completed(future_to_page): - page = future_to_page[future] - try: - page_data = future.result() - res_list.extend(page_data) - except Exception as e: - print(f"Exception on page {page}: {e}") - break - return res_list +# Asynchronous function to fetch multiple pages +async def fetch_all_pages(max_pages=130): + tasks = [fetch_options_activity(page) for page in range(max_pages)] + results = await asyncio.gather(*tasks) + return [item for sublist in results for item in sublist] # Clean and filter the fetched data def clean_and_filter_data(res_list): @@ -94,9 +81,9 @@ def clean_and_filter_data(res_list): return filtered_list # Main execution flow -if __name__ == "__main__": +async def main(): # Fetch and process option data - options_data = fetch_options_data() + options_data = await fetch_all_pages() # Clean and filter the data filtered_data = clean_and_filter_data(options_data) @@ -110,3 +97,7 @@ if __name__ == "__main__": ujson.dump(sorted_data, file) print(f"Data successfully written to {output_file}") + +# Run the async event loop +if __name__ == "__main__": + asyncio.run(main())