From cf7ca97687906e440e6cb7c5f503674fd9d5ee23 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Wed, 10 Jul 2024 16:48:54 +0200 Subject: [PATCH] temporary fix --- app/cron_dark_pool_flow.py | 39 ++++++++++++------- app/primary_cron_job.py | 2 +- app/test.py | 78 ++++++++++++++++++++++++++++++++++++++ requirements.txt | 3 +- 4 files changed, 107 insertions(+), 15 deletions(-) create mode 100644 app/test.py diff --git a/app/cron_dark_pool_flow.py b/app/cron_dark_pool_flow.py index 3c33e62..fb08509 100644 --- a/app/cron_dark_pool_flow.py +++ b/app/cron_dark_pool_flow.py @@ -1,17 +1,14 @@ import time -from datetime import datetime +from datetime import datetime, timedelta from GetStartEndDate import GetStartEndDate - +from concurrent.futures import ThreadPoolExecutor, TimeoutError import intrinio_sdk as intrinio import ujson import sqlite3 -import pytz from dotenv import load_dotenv import os -ny_tz = pytz.timezone('America/New_York') - load_dotenv() api_key = os.getenv('INTRINIO_API_KEY') @@ -37,21 +34,31 @@ count = 0 def get_data(): data = [] - count = 0 + #count = 0 + next_page = '' + try: + response = intrinio.SecurityApi().get_security_trades(source, start_date=start_date, start_time=start_time, end_date=end_date, end_time=end_time, timezone=timezone, page_size=page_size, darkpool_only=darkpool_only, min_size=min_size, next_page=next_page) + data = response.trades + except Exception as e: + print(e) + + ''' while True: if count == 0: next_page = '' try: response = intrinio.SecurityApi().get_security_trades(source, start_date=start_date, start_time=start_time, end_date=end_date, end_time=end_time, timezone=timezone, page_size=page_size, darkpool_only=darkpool_only, min_size=min_size, next_page=next_page) data += response.trades - + print(data) next_page = response.next_page - if not next_page or count == 10: + if not next_page or count == 0: break count +=1 - except: - pass + except Exception as e: + print(e) + break + ''' return data def run(): @@ -65,7 +72,6 @@ def run(): stock_symbols = list(symbol_name_map.keys()) data = get_data() - print(len(data)) # Convert each SecurityTrades object to a dictionary data_dicts = [entry.__dict__ for entry in data] # Filter the data @@ -74,7 +80,7 @@ def run(): { 'symbol': entry['_symbol'], 'name': symbol_name_map[entry['_symbol']], - 'date': entry['_timestamp'].astimezone(ny_tz).isoformat(), + 'date': (entry['_timestamp']).isoformat(), 'price': entry['_price'], 'volume': entry['_total_volume'], 'size': entry['_size'] @@ -87,7 +93,14 @@ def run(): if __name__ == "__main__": + executor = ThreadPoolExecutor(max_workers=1) + future = executor.submit(run) try: - run() + # Wait for the result with a timeout of 300 seconds (5 minutes) + future.result(timeout=300) + except TimeoutError: + print("The operation timed out.") except Exception as e: print(f"An error occurred: {e}") + finally: + executor.shutdown() \ No newline at end of file diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index 24781c1..e5b03b5 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -477,7 +477,7 @@ schedule.every(15).minutes.do(run_threaded, run_cron_heatmap).tag('heatmap_job') schedule.every(1).minutes.do(run_threaded, run_cron_quote).tag('quote_job') schedule.every(1).minutes.do(run_threaded, run_cron_price_alert).tag('price_alert_job') schedule.every(15).minutes.do(run_threaded, run_market_moods).tag('market_moods_job') -schedule.every(20).minutes.do(run_threaded, run_dark_pool_flow).tag('dark_pool_flow_job') +#schedule.every(20).minutes.do(run_threaded, run_dark_pool_flow).tag('dark_pool_flow_job') schedule.every(2).hours.do(run_threaded, run_fda_calendar).tag('fda_calendar_job') schedule.every(3).hours.do(run_threaded, run_json_job).tag('json_job') diff --git a/app/test.py b/app/test.py new file mode 100644 index 0000000..3d3cb87 --- /dev/null +++ b/app/test.py @@ -0,0 +1,78 @@ +import time +from datetime import datetime +from GetStartEndDate import GetStartEndDate +from tqdm import tqdm +import concurrent.futures + +import intrinio_sdk as intrinio +import ujson +import sqlite3 +import pytz + +from dotenv import load_dotenv +import os +from threading import Lock + +ny_tz = pytz.timezone('America/New_York') + +load_dotenv() +api_key = os.getenv('INTRINIO_API_KEY') + +intrinio.ApiClient().set_api_key(api_key) +intrinio.ApiClient().allow_retries(True) + +def save_json(data): + with open(f"json/dark-pool/flow/data.json", 'w') as file: + ujson.dump(data, file) + +source = 'cta_a_delayed' +start_date = '' +end_date = '' +start_time = '' +end_time = '' +timezone = 'UTC' +page_size = 100 +darkpool_only = True +min_size = 100 +next_page = '' + +api_call_counter = 0 +lock = Lock() + +def get_data(symbol): + global api_call_counter + try: + response = intrinio.SecurityApi().get_security_trades_by_symbol( + identifier=symbol, source=source, start_date=start_date, start_time=start_time, + end_date=end_date, end_time=end_time, timezone=timezone, page_size=page_size, + darkpool_only=darkpool_only, min_size=min_size, next_page=next_page + ) + data = response.trades + + with lock: + api_call_counter += 1 + if api_call_counter % 1600 == 0: + #print("API call limit reached, sleeping for 60 seconds...") + time.sleep(60) + + except: + pass + +def run(): + con = sqlite3.connect('stocks.db') + cursor = con.cursor() + cursor.execute("SELECT DISTINCT symbol, name FROM stocks") + stocks = cursor.fetchall() + con.close() + + symbol_name_map = {row[0]: row[1] for row in stocks} + stock_symbols = list(symbol_name_map.keys()) + + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + list(tqdm(executor.map(get_data, stock_symbols), total=len(stock_symbols))) + +if __name__ == "__main__": + try: + run() + except Exception as e: + print(f"An error occurred: {e}") diff --git a/requirements.txt b/requirements.txt index a990d0b..ecd7fa4 100755 --- a/requirements.txt +++ b/requirements.txt @@ -33,4 +33,5 @@ python-dateutil backtesting ujson faker -finnhub-python \ No newline at end of file +finnhub-python +intrinio_sdk \ No newline at end of file