bugfixing
This commit is contained in:
parent
c7cad764b3
commit
ee2cab45a8
@ -10,74 +10,96 @@ import time
|
|||||||
load_dotenv()
|
load_dotenv()
|
||||||
api_key = os.getenv('FMP_API_KEY')
|
api_key = os.getenv('FMP_API_KEY')
|
||||||
|
|
||||||
|
|
||||||
async def filter_and_deduplicate(data, excluded_domains=None, deduplicate_key='title'):
|
async def filter_and_deduplicate(data, excluded_domains=None, deduplicate_key='title'):
|
||||||
"""
|
"""
|
||||||
Filter out items with specified domains in their URL and remove duplicates based on a specified key.
|
Filter out items with specified domains in their URL and remove duplicates based on a specified key.
|
||||||
|
|
||||||
Args:
|
|
||||||
data (list): List of dictionaries containing item data.
|
|
||||||
excluded_domains (list): List of domain strings to exclude. Defaults to ['prnewswire.com', 'globenewswire.com', 'accesswire.com'].
|
|
||||||
deduplicate_key (str): The key to use for deduplication. Defaults to 'title'.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
list: Filtered and deduplicated list of items.
|
|
||||||
"""
|
"""
|
||||||
if excluded_domains is None:
|
if excluded_domains is None:
|
||||||
excluded_domains = ['prnewswire.com', 'globenewswire.com', 'accesswire.com']
|
excluded_domains = ['prnewswire.com', 'globenewswire.com', 'accesswire.com']
|
||||||
|
|
||||||
seen_keys = set()
|
seen_keys = set()
|
||||||
filtered_data = []
|
filtered_data = []
|
||||||
|
|
||||||
for item in data:
|
for item in data:
|
||||||
if not any(domain in item['url'] for domain in excluded_domains):
|
if not any(domain in item['url'] for domain in excluded_domains):
|
||||||
key = item.get(deduplicate_key)
|
key = item.get(deduplicate_key)
|
||||||
if key and key not in seen_keys:
|
if key and key not in seen_keys:
|
||||||
filtered_data.append(item)
|
filtered_data.append(item)
|
||||||
seen_keys.add(key)
|
seen_keys.add(key)
|
||||||
|
|
||||||
return filtered_data
|
return filtered_data
|
||||||
|
|
||||||
async def save_quote_as_json(symbol, data):
|
|
||||||
with open(f"json/market-news/companies/{symbol}.json", 'w') as file:
|
|
||||||
ujson.dump(data, file)
|
|
||||||
|
|
||||||
async def get_data(chunk):
|
async def save_quote_as_json(symbol, data):
|
||||||
|
"""
|
||||||
|
Save data as JSON in a batch to reduce disk I/O
|
||||||
|
"""
|
||||||
|
async with asyncio.Lock(): # Ensure thread-safe writes
|
||||||
|
with open(f"json/market-news/companies/{symbol}.json", 'w') as file:
|
||||||
|
ujson.dump(data, file)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_data(session, chunk):
|
||||||
|
"""
|
||||||
|
Fetch data for a chunk of tickers using a single session
|
||||||
|
"""
|
||||||
company_tickers = ','.join(chunk)
|
company_tickers = ','.join(chunk)
|
||||||
async with aiohttp.ClientSession() as session:
|
url = f'https://financialmodelingprep.com/api/v3/stock_news?tickers={company_tickers}&page=0&limit=2000&apikey={api_key}'
|
||||||
url = f'https://financialmodelingprep.com/api/v3/stock_news?tickers={company_tickers}&page=0&limit=2000&apikey={api_key}'
|
|
||||||
async with session.get(url) as response:
|
async with session.get(url) as response:
|
||||||
if response.status == 200:
|
if response.status == 200:
|
||||||
return await response.json()
|
return await response.json()
|
||||||
else:
|
return []
|
||||||
return []
|
|
||||||
|
|
||||||
def get_symbols(db_name, table_name):
|
def get_symbols(db_name, table_name):
|
||||||
|
"""
|
||||||
|
Fetch symbols from the SQLite database
|
||||||
|
"""
|
||||||
with sqlite3.connect(db_name) as con:
|
with sqlite3.connect(db_name) as con:
|
||||||
cursor = con.cursor()
|
cursor = con.cursor()
|
||||||
cursor.execute("PRAGMA journal_mode = wal")
|
cursor.execute("PRAGMA journal_mode = wal")
|
||||||
cursor.execute(f"SELECT DISTINCT symbol FROM {table_name} WHERE symbol NOT LIKE '%.%'")
|
cursor.execute(f"SELECT DISTINCT symbol FROM {table_name} WHERE symbol NOT LIKE '%.%'")
|
||||||
return [row[0] for row in cursor.fetchall()]
|
return [row[0] for row in cursor.fetchall()]
|
||||||
|
|
||||||
|
|
||||||
|
async def process_chunk(session, chunk):
|
||||||
|
"""
|
||||||
|
Process a chunk of symbols
|
||||||
|
"""
|
||||||
|
data = await get_data(session, chunk)
|
||||||
|
tasks = []
|
||||||
|
for symbol in chunk:
|
||||||
|
filtered_data = [item for item in data if item['symbol'] == symbol]
|
||||||
|
filtered_data = await filter_and_deduplicate(filtered_data)
|
||||||
|
if filtered_data:
|
||||||
|
tasks.append(save_quote_as_json(symbol, filtered_data))
|
||||||
|
if tasks:
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
"""
|
||||||
|
Main function to coordinate fetching and processing
|
||||||
|
"""
|
||||||
stock_symbols = get_symbols('stocks.db', 'stocks')
|
stock_symbols = get_symbols('stocks.db', 'stocks')
|
||||||
etf_symbols = get_symbols('etf.db', 'etfs')
|
etf_symbols = get_symbols('etf.db', 'etfs')
|
||||||
crypto_symbols = get_symbols('crypto.db', 'cryptos')
|
crypto_symbols = get_symbols('crypto.db', 'cryptos')
|
||||||
total_symbols = stock_symbols + etf_symbols + crypto_symbols
|
total_symbols = stock_symbols + etf_symbols + crypto_symbols
|
||||||
|
|
||||||
chunk_size = len(total_symbols) // 70 # Divide the list into N chunks
|
# Dynamically adjust chunk size
|
||||||
|
chunk_size = 15 # Adjust based on your needs
|
||||||
chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)]
|
chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)]
|
||||||
|
|
||||||
for chunk in tqdm(chunks):
|
async with aiohttp.ClientSession() as session:
|
||||||
data = await get_data(chunk)
|
tasks = [process_chunk(session, chunk) for chunk in chunks]
|
||||||
for symbol in chunk:
|
for task in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
|
||||||
filtered_data = [item for item in data if item['symbol'] == symbol]
|
await task
|
||||||
filtered_data = await filter_and_deduplicate(filtered_data)
|
|
||||||
if len(filtered_data) > 0:
|
|
||||||
await save_quote_as_json(symbol, filtered_data)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
try:
|
try:
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"An error occurred: {e}")
|
print(f"An error occurred: {e}")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user