From db3b406646cf0b6df6851d961ac4a5224e5c4e02 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Fri, 4 Apr 2025 20:01:12 +0200 Subject: [PATCH] bugfixing --- app/cron_one_day_price.py | 103 ++++++++++++++++++-------------------- 1 file changed, 49 insertions(+), 54 deletions(-) diff --git a/app/cron_one_day_price.py b/app/cron_one_day_price.py index 1c7c84f..78e80bd 100755 --- a/app/cron_one_day_price.py +++ b/app/cron_one_day_price.py @@ -19,10 +19,10 @@ async def save_price_data(symbol, data): ujson.dump(data, file) -async def fetch_and_save_symbols_data(symbols): +async def fetch_and_save_symbols_data(symbols, semaphore): tasks = [] for symbol in symbols: - task = asyncio.create_task(get_todays_data(symbol)) + task = asyncio.create_task(get_todays_data(symbol, semaphore)) tasks.append(task) responses = await asyncio.gather(*tasks) @@ -30,7 +30,7 @@ async def fetch_and_save_symbols_data(symbols): if len(response) > 0: await save_price_data(symbol, response) -async def get_todays_data(ticker): +async def get_todays_data(ticker, semaphore): # Assuming GetStartEndDate().run() returns today's start and end datetime objects start_date_1d, end_date_1d = GetStartEndDate().run() @@ -48,56 +48,49 @@ async def get_todays_data(ticker): current_date = start_date_1d target_time = time(9, 30) - # Async HTTP request - async with aiohttp.ClientSession() as session: - responses = await asyncio.gather(session.get(url)) - - for response in responses: - try: - json_data = await response.json() - # Create DataFrame and reverse order if needed - df_1d = pd.DataFrame(json_data).iloc[::-1].reset_index(drop=True) - - # Filter out rows not matching today's date. - # If the column is "date": - df_1d = df_1d[df_1d['date'].str.startswith(today_str)] - - # If you want to rename "date" to "time", do that after filtering: - df_1d = df_1d.drop(['volume'], axis=1) - df_1d = df_1d.round(2).rename(columns={"date": "time"}) - - # Update the first row 'close' with previousClose from your stored json if available - try: - with open(f"json/quote/{ticker}.json", 'r') as file: - res = ujson.load(file) - df_1d.loc[df_1d.index[0], 'close'] = res['previousClose'] - except Exception as e: - pass - - # The following block handles non-weekend logic and appends additional rows if needed. - ''' - if current_weekday not in (5, 6): - if current_date.time() >= target_time: - extract_date = current_date.strftime('%Y-%m-%d') - end_time = pd.to_datetime(f'{extract_date} 16:00:00') - new_index = pd.date_range(start=df_1d['time'].iloc[-1], end=end_time, freq='1min') + # Use semaphore to limit concurrent connections + async with semaphore: + # Async HTTP request + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + try: + json_data = await response.json() + # Create DataFrame and reverse order if needed + df_1d = pd.DataFrame(json_data).iloc[::-1].reset_index(drop=True) - remaining_df = pd.DataFrame(index=new_index, columns=['open', 'high', 'low', 'close']) - remaining_df = remaining_df.reset_index().rename(columns={"index": "time"}) - remaining_df['time'] = remaining_df['time'].dt.strftime('%Y-%m-%d %H:%M:%S') - remaining_df = remaining_df.set_index('time') + # Filter out rows not matching today's date. + df_1d = df_1d[df_1d['date'].str.startswith(today_str)] - # Concatenate the remaining_df (skipping the first row as in your original code) - df_1d = pd.concat([df_1d, remaining_df[1::]], ignore_index=True) - ''' - # Convert DataFrame back to JSON list format - df_1d = ujson.loads(df_1d.to_json(orient="records")) - except Exception as e: - print(e) - df_1d = [] + # If you want to rename "date" to "time", do that after filtering: + df_1d = df_1d.drop(['volume'], axis=1) + df_1d = df_1d.round(2).rename(columns={"date": "time"}) + + # Update the first row 'close' with previousClose from your stored json if available + try: + with open(f"json/quote/{ticker}.json", 'r') as file: + res = ujson.load(file) + df_1d.loc[df_1d.index[0], 'close'] = res['previousClose'] + except Exception as e: + pass + + # Convert DataFrame back to JSON list format + df_1d = ujson.loads(df_1d.to_json(orient="records")) + except Exception as e: + print(f"Error processing data for {ticker}: {e}") + df_1d = [] + except Exception as e: + print(f"Connection error for {ticker}: {e}") + df_1d = [] return df_1d + async def run(): + # Create a semaphore to limit the number of concurrent connections + # Adjust this number based on your system's limits + connection_limit = 50 + semaphore = asyncio.Semaphore(connection_limit) + con = sqlite3.connect('stocks.db') etf_con = sqlite3.connect('etf.db') @@ -130,17 +123,19 @@ async def run(): stocks_symbols = sorted(stocks_symbols, key=lambda s: market_caps[s], reverse=True) stocks_symbols = sorted(stocks_symbols, key=lambda x: '.' in x) - total_symbols = stocks_symbols+ etf_symbols + index_symbols + total_symbols = stocks_symbols + etf_symbols + index_symbols - chunk_size = 500 + # Reduce chunk size to avoid too many concurrent requests + chunk_size = 100 for i in range(0, len(total_symbols), chunk_size): symbols_chunk = total_symbols[i:i+chunk_size] - await fetch_and_save_symbols_data(symbols_chunk) - print('sleeping...') - await asyncio.sleep(30) # Wait for 60 seconds between chunks + await fetch_and_save_symbols_data(symbols_chunk, semaphore) + print(f'Completed chunk {i//chunk_size + 1} of {(len(total_symbols) + chunk_size - 1) // chunk_size}') + # No need to sleep as much since we're using a semaphore to control concurrency + await asyncio.sleep(5) try: asyncio.run(run()) except Exception as e: - print(e) \ No newline at end of file + print(f"Main error: {e}") \ No newline at end of file