add press release cron job
This commit is contained in:
parent
38418c895f
commit
7574c13647
@ -30,7 +30,7 @@ async def filter_and_deduplicate(data, excluded_domains=None, deduplicate_key='t
|
||||
Filter out items with specified domains in their URL and remove duplicates based on a specified key.
|
||||
"""
|
||||
if excluded_domains is None:
|
||||
excluded_domains = ['prnewswire.com', 'globenewswire.com', 'accesswire.com', 'youtube.com']
|
||||
excluded_domains = ['prnewswire.com', 'globenewswire.com', 'accesswire.com']
|
||||
seen_keys = set()
|
||||
filtered_data = []
|
||||
for item in data:
|
||||
@ -55,7 +55,7 @@ async def get_data(session, chunk, rate_limiter):
|
||||
"""
|
||||
await rate_limiter.acquire()
|
||||
company_tickers = ','.join(chunk)
|
||||
url = f'https://financialmodelingprep.com/api/v3/stock_news?tickers={company_tickers}&page=0&limit=50&apikey={api_key}'
|
||||
url = f'https://financialmodelingprep.com/stable/news/stock?symbols={company_tickers}&limit=100&apikey={api_key}'
|
||||
|
||||
async with session.get(url) as response:
|
||||
if response.status == 200:
|
||||
@ -97,12 +97,12 @@ async def main():
|
||||
etf_symbols = get_symbols('etf.db', 'etfs')
|
||||
crypto_symbols = get_symbols('crypto.db', 'cryptos')
|
||||
total_symbols = stock_symbols + etf_symbols + crypto_symbols
|
||||
|
||||
#total_symbols = ['AAPL']
|
||||
# Dynamically adjust chunk size
|
||||
chunk_size = 10 # Adjust based on your needs
|
||||
chunk_size = 1 # Adjust based on your needs
|
||||
chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)]
|
||||
|
||||
rate_limiter = RateLimiter(rate_limit=200, sleep_time=60)
|
||||
rate_limiter = RateLimiter(rate_limit=300, sleep_time=60)
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
tasks = [process_chunk(session, chunk, rate_limiter) for chunk in chunks]
|
||||
|
||||
97
app/cron_press_releases.py
Normal file
97
app/cron_press_releases.py
Normal file
@ -0,0 +1,97 @@
|
||||
import ujson
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import sqlite3
|
||||
from tqdm import tqdm
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
import time
|
||||
|
||||
load_dotenv()
|
||||
api_key = os.getenv('FMP_API_KEY')
|
||||
|
||||
class RateLimiter:
|
||||
def __init__(self, rate_limit=200, sleep_time=60):
|
||||
self.rate_limit = rate_limit
|
||||
self.sleep_time = sleep_time
|
||||
self.request_count = 0
|
||||
self.lock = asyncio.Lock()
|
||||
|
||||
async def acquire(self):
|
||||
async with self.lock:
|
||||
self.request_count += 1
|
||||
if self.request_count >= self.rate_limit:
|
||||
print(f"Processed {self.rate_limit} requests. Sleeping for {self.sleep_time} seconds...")
|
||||
await asyncio.sleep(self.sleep_time)
|
||||
self.request_count = 0
|
||||
|
||||
|
||||
async def save_json(symbol, data):
|
||||
"""
|
||||
Save data as JSON in a batch to reduce disk I/O
|
||||
"""
|
||||
async with asyncio.Lock(): # Ensure thread-safe writes
|
||||
with open(f"json/market-news/press-releases/{symbol}.json", 'w') as file:
|
||||
ujson.dump(data, file)
|
||||
|
||||
async def get_data(session, chunk, rate_limiter):
|
||||
"""
|
||||
Fetch data for a chunk of tickers using a single session
|
||||
"""
|
||||
await rate_limiter.acquire()
|
||||
company_tickers = ','.join(chunk)
|
||||
url = f'https://financialmodelingprep.com/stable/news/press-releases?symbols={company_tickers}&limit=50&apikey={api_key}'
|
||||
|
||||
async with session.get(url) as response:
|
||||
if response.status == 200:
|
||||
return await response.json()
|
||||
return []
|
||||
|
||||
def get_symbols(db_name, table_name):
|
||||
"""
|
||||
Fetch symbols from the SQLite database
|
||||
"""
|
||||
with sqlite3.connect(db_name) as con:
|
||||
cursor = con.cursor()
|
||||
cursor.execute("PRAGMA journal_mode = wal")
|
||||
cursor.execute(f"SELECT DISTINCT symbol FROM {table_name} WHERE symbol NOT LIKE '%.%'")
|
||||
return [row[0] for row in cursor.fetchall()]
|
||||
|
||||
async def process_chunk(session, chunk, rate_limiter):
|
||||
"""
|
||||
Process a chunk of symbols
|
||||
"""
|
||||
data = await get_data(session, chunk, rate_limiter)
|
||||
tasks = []
|
||||
for symbol in chunk:
|
||||
try:
|
||||
filtered_data = [item for item in data if item['symbol'] == symbol]
|
||||
if filtered_data:
|
||||
tasks.append(save_json(symbol, filtered_data))
|
||||
except Exception as e:
|
||||
print(e)
|
||||
if tasks:
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
async def main():
|
||||
"""
|
||||
Main function to coordinate fetching and processing
|
||||
"""
|
||||
total_symbols = get_symbols('stocks.db', 'stocks')
|
||||
#total_symbols = ['AAPL']
|
||||
# Dynamically adjust chunk size
|
||||
chunk_size = 1 # Adjust based on your needs
|
||||
chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)]
|
||||
|
||||
rate_limiter = RateLimiter(rate_limit=300, sleep_time=60)
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
tasks = [process_chunk(session, chunk, rate_limiter) for chunk in chunks]
|
||||
for task in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
|
||||
await task
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
33
app/main.py
33
app/main.py
@ -754,7 +754,38 @@ async def stock_news(data: TickerData, api_key: str = Security(get_api_key)):
|
||||
data = orjson.dumps(res)
|
||||
compressed_data = gzip.compress(data)
|
||||
redis_client.set(cache_key, compressed_data)
|
||||
redis_client.expire(cache_key, 60*5)
|
||||
redis_client.expire(cache_key, 60*30)
|
||||
|
||||
return StreamingResponse(
|
||||
io.BytesIO(compressed_data),
|
||||
media_type="application/json",
|
||||
headers={"Content-Encoding": "gzip"}
|
||||
)
|
||||
|
||||
|
||||
@app.post("/stock-press-release")
|
||||
async def stock_news(data: TickerData, api_key: str = Security(get_api_key)):
|
||||
ticker = data.ticker.upper()
|
||||
cache_key = f"press-releases-{ticker}"
|
||||
|
||||
cached_result = redis_client.get(cache_key)
|
||||
if cached_result:
|
||||
return StreamingResponse(
|
||||
io.BytesIO(cached_result),
|
||||
media_type="application/json",
|
||||
headers={"Content-Encoding": "gzip"})
|
||||
|
||||
|
||||
try:
|
||||
with open(f"json/market-news/press-releases/{ticker}.json", 'rb') as file:
|
||||
res = orjson.loads(file.read())
|
||||
except:
|
||||
res = []
|
||||
|
||||
data = orjson.dumps(res)
|
||||
compressed_data = gzip.compress(data)
|
||||
redis_client.set(cache_key, compressed_data)
|
||||
redis_client.expire(cache_key, 60*60)
|
||||
|
||||
return StreamingResponse(
|
||||
io.BytesIO(compressed_data),
|
||||
|
||||
@ -120,11 +120,16 @@ def run_cron_market_news():
|
||||
if week <= 4:
|
||||
run_command(["python3", "cron_market_news.py"])
|
||||
|
||||
def run_cron_company_news():
|
||||
def run_company_news():
|
||||
week = datetime.today().weekday()
|
||||
if week <= 4:
|
||||
run_command(["python3", "cron_company_news.py"])
|
||||
|
||||
def run_press_releases():
|
||||
week = datetime.today().weekday()
|
||||
if week <= 4:
|
||||
run_command(["python3", "cron_press_releases.py"])
|
||||
|
||||
def run_cron_heatmap():
|
||||
run_command(["python3", "cron_heatmap.py"])
|
||||
|
||||
@ -360,7 +365,9 @@ schedule.every(3).hours.do(run_threaded, run_options_net_flow).tag('options_net_
|
||||
#schedule.every(4).hours.do(run_threaded, run_share_statistics).tag('share_statistics_job')
|
||||
|
||||
schedule.every(1).hours.do(run_threaded, run_analyst_rating).tag('analyst_job')
|
||||
schedule.every(1).hours.do(run_threaded, run_cron_company_news).tag('company_news_job')
|
||||
schedule.every(1).hours.do(run_threaded, run_company_news).tag('company_news_job')
|
||||
schedule.every(3).hours.do(run_threaded, run_press_releases).tag('press_release_job')
|
||||
|
||||
|
||||
|
||||
schedule.every(2).minutes.do(run_threaded, run_dashboard).tag('dashboard_job')
|
||||
|
||||
11
app/test.py
11
app/test.py
@ -1,6 +1,5 @@
|
||||
with open("json/stock-screener/data.json", 'rb') as file:
|
||||
try:
|
||||
data = file.read()
|
||||
print(data[14807230:14807250]) # Print the problematic section
|
||||
except Exception as e:
|
||||
print(f"Error reading file: {e}")
|
||||
import requests
|
||||
|
||||
url = "https://api.stocktwits.com/api/2/streams/symbol/AAPL.json?filter=top"
|
||||
response = requests.get(url)
|
||||
print(response)
|
||||
Loading…
x
Reference in New Issue
Block a user