optimize options flow job

This commit is contained in:
MuslemRahimi 2024-09-13 16:14:06 +02:00
parent 841877746f
commit 6e2096a473
2 changed files with 37 additions and 34 deletions

View File

@ -35,10 +35,40 @@ def process_page(page):
try: try:
data = fin.options_activity(date_from=start_date, date_to=end_date, page=page, pagesize=1000) data = fin.options_activity(date_from=start_date, date_to=end_date, page=page, pagesize=1000)
data = ujson.loads(fin.output(data))['option_activity'] data = ujson.loads(fin.output(data))['option_activity']
filtered_data = [{key: value for key, value in item.items() if key not in ['description_extended','updated']} for item in data]
time.sleep(1) return data
page_list = [] except Exception as e:
for item in filtered_data: print(e)
return []
# Assuming fin, stock_symbols, and etf_symbols are defined elsewhere
res_list = []
# Adjust max_workers to control the degree of parallelism
max_workers = 6
# Fetch pages concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_page = {executor.submit(process_page, page): page for page in range(130)}
for future in concurrent.futures.as_completed(future_to_page):
page = future_to_page[future]
try:
page_list = future.result()
res_list += page_list
except Exception as e:
print(f"Exception occurred: {e}")
break
# res_list now contains the aggregated results from all pages
#print(res_list)
def custom_key(item):
return item['time']
res_list = [{key: value for key, value in item.items() if key not in ['description_extended','updated']} for item in res_list]
filtered_list = []
for item in res_list:
try:
if item['underlying_price'] != '': if item['underlying_price'] != '':
ticker = item['ticker'] ticker = item['ticker']
if ticker == 'BRK.A': if ticker == 'BRK.A':
@ -62,41 +92,14 @@ def process_page(page):
item['execution_estimate'] = item['execution_estimate'].replace('_', ' ').title() item['execution_estimate'] = item['execution_estimate'].replace('_', ' ').title()
item['tradeCount'] = item['trade_count'] item['tradeCount'] = item['trade_count']
page_list.append(item) filtered_list.append(item)
except:
pass
return page_list filtered_list = sorted(filtered_list, key=custom_key, reverse =True)
except Exception as e:
print(e)
return []
# Assuming fin, stock_symbols, and etf_symbols are defined elsewhere
res_list = []
# Adjust max_workers to control the degree of parallelism
max_workers = 6
# Fetch pages concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_page = {executor.submit(process_page, page): page for page in range(150)}
for future in concurrent.futures.as_completed(future_to_page):
page = future_to_page[future]
try:
page_list = future.result()
res_list += page_list
except Exception as e:
print(f"Exception occurred: {e}")
break
# res_list now contains the aggregated results from all pages
#print(res_list)
def custom_key(item):
return item['time']
res_list = sorted(res_list, key=custom_key, reverse =True)
with open(f"json/options-flow/feed/data.json", 'w') as file: with open(f"json/options-flow/feed/data.json", 'w') as file:
ujson.dump(res_list, file) ujson.dump(filtered_list, file)
stock_con.close() stock_con.close()
etf_con.close() etf_con.close()

View File

@ -564,7 +564,7 @@ def run_threaded(job_func):
job_thread.start() job_thread.start()
# Schedule the job to run # Schedule the job to run
'''
schedule.every().day.at("01:00").do(run_threaded, run_options_bubble_ticker).tag('options_ticker_job') schedule.every().day.at("01:00").do(run_threaded, run_options_bubble_ticker).tag('options_ticker_job')
schedule.every().day.at("02:00").do(run_threaded, run_db_schedule_job) schedule.every().day.at("02:00").do(run_threaded, run_db_schedule_job)
schedule.every().day.at("03:00").do(run_threaded, run_dark_pool) schedule.every().day.at("03:00").do(run_threaded, run_dark_pool)
@ -630,7 +630,7 @@ schedule.every(12).hours.do(run_threaded, run_analyst_rating).tag('analyst_job')
schedule.every(2).minutes.do(run_threaded, run_dashboard).tag('dashboard_job') schedule.every(2).minutes.do(run_threaded, run_dashboard).tag('dashboard_job')
'''
schedule.every(20).seconds.do(run_threaded, run_if_not_running(run_cron_options_flow, 'options_flow_job')).tag('options_flow_job') schedule.every(20).seconds.do(run_threaded, run_if_not_running(run_cron_options_flow, 'options_flow_job')).tag('options_flow_job')