diff --git a/app/cron_options_flow.py b/app/cron_options_flow.py index a61e6b5..1fe7eef 100755 --- a/app/cron_options_flow.py +++ b/app/cron_options_flow.py @@ -35,36 +35,8 @@ def process_page(page): try: data = fin.options_activity(date_from=start_date, date_to=end_date, page=page, pagesize=1000) data = ujson.loads(fin.output(data))['option_activity'] - filtered_data = [{key: value for key, value in item.items() if key not in ['description_extended','updated']} for item in data] - time.sleep(1) - page_list = [] - for item in filtered_data: - if item['underlying_price'] != '': - ticker = item['ticker'] - if ticker == 'BRK.A': - ticker = 'BRK-A' - elif ticker == 'BRK.B': - ticker = 'BRK-B' - put_call = 'Calls' if item['put_call'] == 'CALL' else 'Puts' - - asset_type = 'stock' if ticker in stock_symbols else ('etf' if ticker in etf_symbols else '') - - item['underlying_type'] = asset_type.lower() - item['put_call'] = put_call - item['ticker'] = ticker - item['price'] = round(float(item['price']), 2) - item['strike_price'] = round(float(item['strike_price']), 2) - item['cost_basis'] = round(float(item['cost_basis']), 2) - item['underlying_price'] = round(float(item['underlying_price']), 2) - item['option_activity_type'] = item['option_activity_type'].capitalize() - item['sentiment'] = item['sentiment'].capitalize() - item['execution_estimate'] = item['execution_estimate'].replace('_', ' ').title() - item['tradeCount'] = item['trade_count'] - - page_list.append(item) - - return page_list + return data except Exception as e: print(e) return [] @@ -78,7 +50,7 @@ max_workers = 6 # Fetch pages concurrently with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_page = {executor.submit(process_page, page): page for page in range(150)} + future_to_page = {executor.submit(process_page, page): page for page in range(130)} for future in concurrent.futures.as_completed(future_to_page): page = future_to_page[future] try: @@ -93,10 +65,41 @@ with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: def custom_key(item): return item['time'] -res_list = sorted(res_list, key=custom_key, reverse =True) +res_list = [{key: value for key, value in item.items() if key not in ['description_extended','updated']} for item in res_list] +filtered_list = [] +for item in res_list: + try: + if item['underlying_price'] != '': + ticker = item['ticker'] + if ticker == 'BRK.A': + ticker = 'BRK-A' + elif ticker == 'BRK.B': + ticker = 'BRK-B' + + put_call = 'Calls' if item['put_call'] == 'CALL' else 'Puts' + + asset_type = 'stock' if ticker in stock_symbols else ('etf' if ticker in etf_symbols else '') + + item['underlying_type'] = asset_type.lower() + item['put_call'] = put_call + item['ticker'] = ticker + item['price'] = round(float(item['price']), 2) + item['strike_price'] = round(float(item['strike_price']), 2) + item['cost_basis'] = round(float(item['cost_basis']), 2) + item['underlying_price'] = round(float(item['underlying_price']), 2) + item['option_activity_type'] = item['option_activity_type'].capitalize() + item['sentiment'] = item['sentiment'].capitalize() + item['execution_estimate'] = item['execution_estimate'].replace('_', ' ').title() + item['tradeCount'] = item['trade_count'] + + filtered_list.append(item) + except: + pass + +filtered_list = sorted(filtered_list, key=custom_key, reverse =True) with open(f"json/options-flow/feed/data.json", 'w') as file: - ujson.dump(res_list, file) + ujson.dump(filtered_list, file) stock_con.close() etf_con.close() \ No newline at end of file diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index 6d59398..01c25ae 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -564,7 +564,7 @@ def run_threaded(job_func): job_thread.start() # Schedule the job to run - +''' schedule.every().day.at("01:00").do(run_threaded, run_options_bubble_ticker).tag('options_ticker_job') schedule.every().day.at("02:00").do(run_threaded, run_db_schedule_job) schedule.every().day.at("03:00").do(run_threaded, run_dark_pool) @@ -630,7 +630,7 @@ schedule.every(12).hours.do(run_threaded, run_analyst_rating).tag('analyst_job') schedule.every(2).minutes.do(run_threaded, run_dashboard).tag('dashboard_job') - +''' schedule.every(20).seconds.do(run_threaded, run_if_not_running(run_cron_options_flow, 'options_flow_job')).tag('options_flow_job')