bugfixing
This commit is contained in:
parent
1e8f725a6c
commit
db3b406646
@ -19,10 +19,10 @@ async def save_price_data(symbol, data):
|
|||||||
ujson.dump(data, file)
|
ujson.dump(data, file)
|
||||||
|
|
||||||
|
|
||||||
async def fetch_and_save_symbols_data(symbols):
|
async def fetch_and_save_symbols_data(symbols, semaphore):
|
||||||
tasks = []
|
tasks = []
|
||||||
for symbol in symbols:
|
for symbol in symbols:
|
||||||
task = asyncio.create_task(get_todays_data(symbol))
|
task = asyncio.create_task(get_todays_data(symbol, semaphore))
|
||||||
tasks.append(task)
|
tasks.append(task)
|
||||||
responses = await asyncio.gather(*tasks)
|
responses = await asyncio.gather(*tasks)
|
||||||
|
|
||||||
@ -30,7 +30,7 @@ async def fetch_and_save_symbols_data(symbols):
|
|||||||
if len(response) > 0:
|
if len(response) > 0:
|
||||||
await save_price_data(symbol, response)
|
await save_price_data(symbol, response)
|
||||||
|
|
||||||
async def get_todays_data(ticker):
|
async def get_todays_data(ticker, semaphore):
|
||||||
# Assuming GetStartEndDate().run() returns today's start and end datetime objects
|
# Assuming GetStartEndDate().run() returns today's start and end datetime objects
|
||||||
start_date_1d, end_date_1d = GetStartEndDate().run()
|
start_date_1d, end_date_1d = GetStartEndDate().run()
|
||||||
|
|
||||||
@ -48,18 +48,18 @@ async def get_todays_data(ticker):
|
|||||||
current_date = start_date_1d
|
current_date = start_date_1d
|
||||||
target_time = time(9, 30)
|
target_time = time(9, 30)
|
||||||
|
|
||||||
|
# Use semaphore to limit concurrent connections
|
||||||
|
async with semaphore:
|
||||||
# Async HTTP request
|
# Async HTTP request
|
||||||
|
try:
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
responses = await asyncio.gather(session.get(url))
|
async with session.get(url) as response:
|
||||||
|
|
||||||
for response in responses:
|
|
||||||
try:
|
try:
|
||||||
json_data = await response.json()
|
json_data = await response.json()
|
||||||
# Create DataFrame and reverse order if needed
|
# Create DataFrame and reverse order if needed
|
||||||
df_1d = pd.DataFrame(json_data).iloc[::-1].reset_index(drop=True)
|
df_1d = pd.DataFrame(json_data).iloc[::-1].reset_index(drop=True)
|
||||||
|
|
||||||
# Filter out rows not matching today's date.
|
# Filter out rows not matching today's date.
|
||||||
# If the column is "date":
|
|
||||||
df_1d = df_1d[df_1d['date'].str.startswith(today_str)]
|
df_1d = df_1d[df_1d['date'].str.startswith(today_str)]
|
||||||
|
|
||||||
# If you want to rename "date" to "time", do that after filtering:
|
# If you want to rename "date" to "time", do that after filtering:
|
||||||
@ -74,30 +74,23 @@ async def get_todays_data(ticker):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# The following block handles non-weekend logic and appends additional rows if needed.
|
|
||||||
'''
|
|
||||||
if current_weekday not in (5, 6):
|
|
||||||
if current_date.time() >= target_time:
|
|
||||||
extract_date = current_date.strftime('%Y-%m-%d')
|
|
||||||
end_time = pd.to_datetime(f'{extract_date} 16:00:00')
|
|
||||||
new_index = pd.date_range(start=df_1d['time'].iloc[-1], end=end_time, freq='1min')
|
|
||||||
|
|
||||||
remaining_df = pd.DataFrame(index=new_index, columns=['open', 'high', 'low', 'close'])
|
|
||||||
remaining_df = remaining_df.reset_index().rename(columns={"index": "time"})
|
|
||||||
remaining_df['time'] = remaining_df['time'].dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
||||||
remaining_df = remaining_df.set_index('time')
|
|
||||||
|
|
||||||
# Concatenate the remaining_df (skipping the first row as in your original code)
|
|
||||||
df_1d = pd.concat([df_1d, remaining_df[1::]], ignore_index=True)
|
|
||||||
'''
|
|
||||||
# Convert DataFrame back to JSON list format
|
# Convert DataFrame back to JSON list format
|
||||||
df_1d = ujson.loads(df_1d.to_json(orient="records"))
|
df_1d = ujson.loads(df_1d.to_json(orient="records"))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(f"Error processing data for {ticker}: {e}")
|
||||||
|
df_1d = []
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Connection error for {ticker}: {e}")
|
||||||
df_1d = []
|
df_1d = []
|
||||||
|
|
||||||
return df_1d
|
return df_1d
|
||||||
|
|
||||||
async def run():
|
async def run():
|
||||||
|
# Create a semaphore to limit the number of concurrent connections
|
||||||
|
# Adjust this number based on your system's limits
|
||||||
|
connection_limit = 50
|
||||||
|
semaphore = asyncio.Semaphore(connection_limit)
|
||||||
|
|
||||||
con = sqlite3.connect('stocks.db')
|
con = sqlite3.connect('stocks.db')
|
||||||
etf_con = sqlite3.connect('etf.db')
|
etf_con = sqlite3.connect('etf.db')
|
||||||
|
|
||||||
@ -132,15 +125,17 @@ async def run():
|
|||||||
|
|
||||||
total_symbols = stocks_symbols + etf_symbols + index_symbols
|
total_symbols = stocks_symbols + etf_symbols + index_symbols
|
||||||
|
|
||||||
chunk_size = 500
|
# Reduce chunk size to avoid too many concurrent requests
|
||||||
|
chunk_size = 100
|
||||||
for i in range(0, len(total_symbols), chunk_size):
|
for i in range(0, len(total_symbols), chunk_size):
|
||||||
symbols_chunk = total_symbols[i:i+chunk_size]
|
symbols_chunk = total_symbols[i:i+chunk_size]
|
||||||
await fetch_and_save_symbols_data(symbols_chunk)
|
await fetch_and_save_symbols_data(symbols_chunk, semaphore)
|
||||||
print('sleeping...')
|
print(f'Completed chunk {i//chunk_size + 1} of {(len(total_symbols) + chunk_size - 1) // chunk_size}')
|
||||||
await asyncio.sleep(30) # Wait for 60 seconds between chunks
|
# No need to sleep as much since we're using a semaphore to control concurrency
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
asyncio.run(run())
|
asyncio.run(run())
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(f"Main error: {e}")
|
||||||
Loading…
x
Reference in New Issue
Block a user