backend/app/cron_company_news.py
2024-08-09 11:15:43 +02:00

85 lines
3.1 KiB
Python

import ujson
import asyncio
import aiohttp
import sqlite3
from tqdm import tqdm
from dotenv import load_dotenv
import os
load_dotenv()
api_key = os.getenv('FMP_API_KEY')
async def filter_and_deduplicate(data, excluded_domains=None, deduplicate_key='title'):
"""
Filter out items with specified domains in their URL and remove duplicates based on a specified key.
Args:
data (list): List of dictionaries containing item data.
excluded_domains (list): List of domain strings to exclude. Defaults to ['prnewswire.com', 'globenewswire.com', 'accesswire.com'].
deduplicate_key (str): The key to use for deduplication. Defaults to 'title'.
Returns:
list: Filtered and deduplicated list of items.
"""
if excluded_domains is None:
excluded_domains = ['prnewswire.com', 'globenewswire.com', 'accesswire.com']
seen_keys = set()
filtered_data = []
for item in data:
if not any(domain in item['url'] for domain in excluded_domains):
key = item.get(deduplicate_key)
if key and key not in seen_keys:
filtered_data.append(item)
seen_keys.add(key)
return filtered_data
async def fetch_news(session, url):
async with session.get(url) as response:
return await response.json()
async def save_news(data, symbol):
#os.makedirs("json/market-news/companies", exist_ok=True)
with open(f"json/market-news/companies/{symbol}.json", 'w') as file:
ujson.dump(data, file)
async def process_symbols(symbols):
limit = 200
chunk_size = 50 # Adjust this value based on API limitations
async with aiohttp.ClientSession() as session:
for i in tqdm(range(0, len(symbols), chunk_size)):
chunk = symbols[i:i+chunk_size]
company_tickers = ','.join(chunk)
url = f'https://financialmodelingprep.com/api/v3/stock_news?tickers={company_tickers}&limit={limit}&apikey={api_key}'
data = await fetch_news(session, url)
custom_domains = ['prnewswire.com', 'globenewswire.com', 'accesswire.com']
data = await filter_and_deduplicate(data, excluded_domains=custom_domains)
tasks = [save_news(data, symbol) for symbol in chunk]
await asyncio.gather(*tasks)
def get_symbols(db_name, table_name):
with sqlite3.connect(db_name) as con:
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute(f"SELECT DISTINCT symbol FROM {table_name} WHERE symbol NOT LIKE '%.%'")
return [row[0] for row in cursor.fetchall()]
async def main():
stock_symbols = get_symbols('stocks.db', 'stocks')
etf_symbols = get_symbols('etf.db', 'etfs')
crypto_symbols = get_symbols('crypto.db', 'cryptos')
total_symbols = stock_symbols + etf_symbols + crypto_symbols
await process_symbols(total_symbols)
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as e:
print(f"An error occurred: {e}")