bugfixing

This commit is contained in:
MuslemRahimi 2025-01-25 21:12:17 +01:00
parent 66806f6c0b
commit 608dbeeb38
2 changed files with 85 additions and 56 deletions

View File

@ -1,72 +1,100 @@
import ujson import json
import asyncio from selenium import webdriver
import aiohttp from selenium.webdriver.common.by import By
import os from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import sqlite3 import sqlite3
from tqdm import tqdm import ujson
from dotenv import load_dotenv from dotenv import load_dotenv
from datetime import datetime import os
import requests
# Load environment variables
load_dotenv() load_dotenv()
today = datetime.today().date() url = os.getenv('FDA_CALENDAR')
api_key = os.getenv('UNUSUAL_WHALES_API_KEY') def save_json(data):
with open(f"json/fda-calendar/data.json", 'wb') as file:
url = "https://api.unusualwhales.com/api/market/fda-calendar" ujson.dumps(data, file)
headers = {
"Accept": "application/json, text/plain",
"Authorization": api_key
}
async def save_json(data):
with open(f"json/fda-calendar/data.json", 'w') as file:
ujson.dump(data, file)
async def get_data():
def main():
# Set up Chrome options
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
# Initialize WebDriver
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
# Connect to the database to get stock symbols
con = sqlite3.connect('stocks.db') con = sqlite3.connect('stocks.db')
cursor = con.cursor() cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal") cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks") cursor.execute("SELECT DISTINCT symbol FROM stocks")
stock_symbols = [row[0] for row in cursor.fetchall()] stock_symbols = [row[0].strip() for row in cursor.fetchall()] # Ensure symbols are stripped
con.close() con.close()
try: try:
response = requests.get(url, headers=headers) # Navigate to FDA calendar
data = response.json()['data'] driver.get(url)
data = [
entry for entry in data # Wait for the table to load
if datetime.strptime(entry['start_date'], '%Y-%m-%d').date() >= today WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "table.flow-full-table"))
)
# Extract table data
entries = []
rows = driver.find_elements(By.CSS_SELECTOR, "table.flow-full-table tbody tr")
for row in rows:
cols = row.find_elements(By.TAG_NAME, "td")
if len(cols) >=6: # Check for minimum required columns
try:
# Extract ticker from the anchor tag, stripping whitespace
ticker_element = cols[0].find_element(By.TAG_NAME, "a")
ticker = ticker_element.text.strip() if ticker_element else ""
ticker = ticker or None # Set to None if empty after strip
except:
ticker = None # If no anchor tag found
# Extract other fields, converting empty strings to None
date = cols[1].text.strip() or None
drug = cols[2].text.strip() or None
indication = cols[3].text.strip() or None
status = cols[4].text.strip() or None
description = cols[5].text.strip() or None
entry = {
"ticker": ticker,
"date": date,
"drug": drug,
"indication": indication,
"status": status,
"description": description
}
entries.append(entry)
# Filter entries to include only those with tickers present in the database
filtered_entries = [
entry for entry in entries
if entry['ticker'] is not None and entry['ticker'] in stock_symbols
] ]
res_list = []
for item in data: if filtered_entries:
try: save_json(filtered_entries)
symbol = item['ticker'] print("Successfully scraped FDA calendar data")
if symbol in stock_symbols:
res_list.append({**item})
except:
pass
return data
except Exception as e: except Exception as e:
print(f"Error fetching data: {e}") print(f"Error during scraping: {str(e)}")
return [] finally:
driver.quit()
async def run():
data = await get_data()
if len(data) > 0:
await save_json(data)
if __name__ == "__main__": if __name__ == "__main__":
try: main()
asyncio.run(run())
except Exception as e:
print(f"An error occurred: {e}")

View File

@ -110,8 +110,7 @@ def run_options_jobs():
def run_fda_calendar(): def run_fda_calendar():
now = datetime.now(ny_tz) now = datetime.now(ny_tz)
week = now.weekday() week = now.weekday()
hour = now.hour if week <= 5:
if week <= 4 and 8 <= hour < 20:
run_command(["python3", "cron_fda_calendar.py"]) run_command(["python3", "cron_fda_calendar.py"])
def run_cron_insider_trading(): def run_cron_insider_trading():
@ -378,6 +377,9 @@ schedule.every().day.at("08:00").do(run_threaded, run_cron_insider_trading).tag(
schedule.every().day.at("08:30").do(run_threaded, run_dividends).tag('dividends_job') schedule.every().day.at("08:30").do(run_threaded, run_dividends).tag('dividends_job')
schedule.every().day.at("09:00").do(run_threaded, run_shareholders).tag('shareholders_job') schedule.every().day.at("09:00").do(run_threaded, run_shareholders).tag('shareholders_job')
schedule.every().day.at("09:30").do(run_threaded, run_profile).tag('profile_job') schedule.every().day.at("09:30").do(run_threaded, run_profile).tag('profile_job')
schedule.every().day.at("10:00").do(run_threaded, run_fda_calendar).tag('fda_job')
#schedule.every().day.at("10:30").do(run_threaded, run_sec_filings).tag('sec_filings_job') #schedule.every().day.at("10:30").do(run_threaded, run_sec_filings).tag('sec_filings_job')
#schedule.every().day.at("11:00").do(run_threaded, run_executive).tag('executive_job') #schedule.every().day.at("11:00").do(run_threaded, run_executive).tag('executive_job')
@ -420,7 +422,6 @@ schedule.every(2).hours.do(run_threaded, run_analyst_rating).tag('analyst_job')
schedule.every(1).hours.do(run_threaded, run_company_news).tag('company_news_job') schedule.every(1).hours.do(run_threaded, run_company_news).tag('company_news_job')
schedule.every(3).hours.do(run_threaded, run_press_releases).tag('press_release_job') schedule.every(3).hours.do(run_threaded, run_press_releases).tag('press_release_job')
#schedule.every(1).hours.do(run_threaded, run_fda_calendar).tag('fda_calendar_job')
schedule.every(20).minutes.do(run_threaded, run_options_stats).tag('options_stats_job') schedule.every(20).minutes.do(run_threaded, run_options_stats).tag('options_stats_job')