backend/app/cron_potus_tracker.py
MuslemRahimi cce53995b6 bugfixing
2025-02-24 17:54:42 +01:00

317 lines
12 KiB
Python

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from geopy.geocoders import Nominatim
import aiohttp
import asyncio
import orjson
import ujson
from dotenv import load_dotenv
import os
import sqlite3
import pandas as pd
from openai import OpenAI
from datetime import datetime, timedelta
import hashlib
def generate_unique_id(data):
# Concatenate the title and date to form a string
unique_str = f"{data['title']}-{data['date']}"
# Hash the concatenated string to ensure uniqueness
unique_id = hashlib.md5(unique_str.encode()).hexdigest()
return unique_id
load_dotenv()
geolocator = Nominatim(user_agent="myGeocodingApp/1.0 (your-email@example.com)")
openai_api_key = os.getenv('OPENAI_API_KEY')
org_id = os.getenv('OPENAI_ORG')
client = OpenAI(
api_key=openai_api_key,
organization=org_id,
)
query_template = """
SELECT
date, close
FROM
"{symbol}"
WHERE
date BETWEEN ? AND ?
"""
def save_json(data):
path = "json/tracker/potus"
os.makedirs(path, exist_ok=True)
with open(f"{path}/data.json", "wb") as file:
file.write(orjson.dumps(data))
# Set up the Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless") # Run browser in headless mode
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
# Replace 'path/to/chromedriver' with your actual chromedriver path
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
url ="https://www.whitehouse.gov/presidential-actions/"
driver.get(url)
def get_summary(data):
unique_id = generate_unique_id(data) # Assuming this function exists
# Check if the file exists
file_path = f"json/executive-orders/{unique_id}.json"
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(file_path), exist_ok=True)
if os.path.exists(file_path):
print(f"File {file_path} already exists, skipping summary generation.")
return
try:
data_string = f"Analyze this executive order: {data['description']}"
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "Don't use quotes or titles or bullet points. Provide a clear and concise summary of the US president's executive order. To break the section use <br> to make it html compatible. Explain its potential impact on the stock market, indicating whether it is likely to be bullish, bearish, or neutral, and justify your reasoning based on key aspects of the order. Keep it under 600 characters."
},
{"role": "user", "content": data_string}
],
max_tokens=1000,
temperature=0.7
)
summary = response.choices[0].message.content
data['description'] = summary
# Save the data with the generated summary
with open(file_path, "w", encoding="utf-8") as file:
json_str = ujson.dumps(data)
file.write(json_str)
return json_str
except Exception as e:
print(f"Error generating summary: {str(e)}")
def get_executive_orders():
url = "https://www.whitehouse.gov/presidential-actions/"
# Set up headless Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
driver.get(url)
try:
# Wait for executive orders list to load
wait = WebDriverWait(driver, 10)
orders = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, "ul.wp-block-post-template > li")))
executive_orders = []
# First pass to collect basic information
for order in orders:
try:
title_element = order.find_element(By.CSS_SELECTOR, "h2.wp-block-post-title a")
title = title_element.text.strip()
link = title_element.get_attribute("href")
date_element = order.find_element(By.CSS_SELECTOR, "div.wp-block-post-date time")
date_raw = date_element.get_attribute("datetime").split("T")[0]
date_formatted = datetime.strptime(date_raw, "%Y-%m-%d").strftime("%Y-%m-%d")
executive_orders.append({
"title": title,
"date": date_formatted,
"link": link,
"description": None # Initialize description field
})
except Exception as e:
print(f"Error processing an executive order: {e}")
# Second pass to collect descriptions
for eo in executive_orders:
try:
driver.get(eo['link'])
# Wait for description content to load
desc_wait = WebDriverWait(driver, 10)
description_element = desc_wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div.entry-content.wp-block-post-content"))
)
# Extract and clean text
eo['description'] = description_element.text.strip()
except Exception as e:
print(f"Error fetching description for {eo['link']}: {e}")
eo['description'] = "Description unavailable"
return executive_orders
finally:
driver.quit()
async def get_historical_sector():
sector_list = ["SPY","XLB", "XLC", "XLY", "XLP", "XLE", "XLF", "XLV", "XLI", "XLRE", "XLK", "XLU"]
res_dict = {}
def calculate_percentage_change(current_price, previous_price):
if previous_price == 0:
return 0
return ((current_price - previous_price) / previous_price) * 100
def find_closest_date(data, target_date):
# Find the closest date entry equal to or before the target date
target_date = datetime.strptime(target_date, '%Y-%m-%d')
for entry in reversed(data): # Reverse to search from newest to oldest
entry_date = datetime.strptime(entry['time'], '%Y-%m-%d')
if entry_date <= target_date:
return entry
return None
for symbol in sector_list:
try:
# Load historical data
with open(f"json/historical-price/max/{symbol}.json", "r") as file:
data = orjson.loads(file.read())
# Load current data for 1D change
with open(f"json/quote/{symbol}.json", "r") as file:
current_data = round(orjson.loads(file.read()).get('changesPercentage', 0),2)
if not data:
continue
# Get the latest price (last item in the list)
latest_price = data[-1]['close']
# Calculate dates for different periods
today = datetime.strptime(data[-1]['time'], '%Y-%m-%d')
dates = {
'1W': (today - timedelta(days=7)).strftime('%Y-%m-%d'),
'1M': (today - timedelta(days=30)).strftime('%Y-%m-%d'),
'3M': (today - timedelta(days=90)).strftime('%Y-%m-%d'),
'6M': (today - timedelta(days=180)).strftime('%Y-%m-%d'),
'Inauguration': '2025-01-20'
}
changes = {'1D': current_data}
# Calculate percentage changes for each period
for period, target_date in dates.items():
historical_entry = find_closest_date(data, target_date)
if historical_entry:
change = calculate_percentage_change(latest_price, historical_entry['close'])
changes[period] = round(change, 2)
else:
changes[period] = 0
res_dict[symbol] = changes
except Exception as e:
print(f"Error processing {symbol}: {str(e)}")
continue
return res_dict
async def get_data():
market_dict = await get_historical_sector()
executive_orders = get_executive_orders()
executive_orders_summary = []
for item in executive_orders:
try:
data = get_summary(item)
except Exception as e:
print(e)
for item in executive_orders:
try:
unique_id = generate_unique_id(item)
# Open and read the JSON file
with open(f"json/executive-orders/{unique_id}.json", "r") as file:
data = orjson.loads(file.read())
# Assign sentiment based on words in the description
if 'bullish' in data['description']:
data['sentiment'] = 'Bullish'
elif 'bearish' in data['description']:
data['sentiment'] = 'Bearish'
else:
data['sentiment'] = 'Neutral'
executive_orders_summary.append(data)
except Exception as e:
print(f"Error processing item {item}: {e}")
query = query_template.format(symbol='SPY')
etf_con = sqlite3.connect('etf.db')
etf_cursor = etf_con.cursor()
etf_cursor.execute("PRAGMA journal_mode = wal")
df = pd.read_sql_query(query, etf_con, params=("2025-01-20", datetime.today().strftime("%Y-%m-%d")))
if not df.empty:
df['changesPercentage'] = (df['close'].pct_change() * 100).round(2)
sp500_list = df.dropna().to_dict(orient="records") # Drop NaN values and convert to list
etf_con.close()
url = "https://media-cdn.factba.se/rss/json/trump/calendar-full.json"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
# Filter out items with None for date or time, then sort
data = sorted(
(item for item in data if item['date'] is not None and item['time'] is not None),
key=lambda x: (x['date'], x['time']),
reverse=True
)
else:
print(f"Failed to fetch data. HTTP status code: {response.status}")
if len(data) > 0 and len(executive_orders_summary) > 0:
for item in data:
for price_item in sp500_list:
if item['date'] == price_item['date']:
item['changesPercentage'] = price_item['changesPercentage']
break
res_dict = {'marketPerformance': market_dict, 'history': data, 'executiveOrders': executive_orders_summary}
save_json(res_dict)
asyncio.run(get_data())