398 lines
14 KiB
Python
398 lines
14 KiB
Python
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import NoSuchElementException
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
from geopy.geocoders import Nominatim
|
|
import aiohttp
|
|
import asyncio
|
|
import orjson
|
|
import ujson
|
|
from dotenv import load_dotenv
|
|
import os
|
|
import sqlite3
|
|
import pandas as pd
|
|
from openai import OpenAI
|
|
from datetime import datetime, timedelta
|
|
import hashlib
|
|
|
|
def generate_unique_id(data):
|
|
# Concatenate the title and date to form a string
|
|
unique_str = f"{data['title']}-{data['date']}"
|
|
|
|
# Hash the concatenated string to ensure uniqueness
|
|
unique_id = hashlib.md5(unique_str.encode()).hexdigest()
|
|
|
|
return unique_id
|
|
|
|
|
|
load_dotenv()
|
|
geolocator = Nominatim(user_agent="myGeocodingApp/1.0 (your-email@example.com)")
|
|
|
|
openai_api_key = os.getenv('OPENAI_API_KEY')
|
|
org_id = os.getenv('OPENAI_ORG')
|
|
client = OpenAI(
|
|
api_key=openai_api_key,
|
|
organization=org_id,
|
|
)
|
|
|
|
|
|
query_template = """
|
|
SELECT
|
|
date, close
|
|
FROM
|
|
"{symbol}"
|
|
WHERE
|
|
date BETWEEN ? AND ?
|
|
"""
|
|
|
|
|
|
def save_json(data):
|
|
path = "json/tracker/potus"
|
|
os.makedirs(path, exist_ok=True)
|
|
|
|
with open(f"{path}/data.json", "wb") as file:
|
|
file.write(orjson.dumps(data))
|
|
|
|
# Set up the Selenium WebDriver
|
|
chrome_options = Options()
|
|
chrome_options.add_argument("--headless") # Run browser in headless mode
|
|
chrome_options.add_argument("--disable-gpu")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
|
|
# Replace 'path/to/chromedriver' with your actual chromedriver path
|
|
service = Service(ChromeDriverManager().install())
|
|
driver = webdriver.Chrome(service=service, options=chrome_options)
|
|
|
|
url ="https://www.whitehouse.gov/presidential-actions/"
|
|
driver.get(url)
|
|
|
|
|
|
def get_summary(data):
|
|
unique_id = generate_unique_id(data) # Assuming this function exists
|
|
|
|
# Check if the file exists
|
|
file_path = f"json/executive-orders/{unique_id}.json"
|
|
|
|
# Create directory if it doesn't exist
|
|
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
|
|
|
if os.path.exists(file_path):
|
|
print(f"File {file_path} already exists, skipping summary generation.")
|
|
return
|
|
|
|
try:
|
|
data_string = f"Analyze this executive order: {data['description']}"
|
|
response = client.chat.completions.create(
|
|
model="gpt-4o-mini",
|
|
messages=[
|
|
{
|
|
"role": "system",
|
|
"content": "Don't use quotes or titles or bullet points. Provide a clear and concise summary of the US president's executive order. To break the section use <br> to make it html compatible. Explain its potential impact on the stock market, indicating whether it is likely to be bullish, bearish, or neutral, and justify your reasoning based on key aspects of the order. Keep it under 600 characters."
|
|
},
|
|
{"role": "user", "content": data_string}
|
|
],
|
|
max_tokens=1000,
|
|
temperature=0.7
|
|
)
|
|
|
|
summary = response.choices[0].message.content
|
|
data['description'] = summary
|
|
|
|
# Save the data with the generated summary
|
|
with open(file_path, "w", encoding="utf-8") as file:
|
|
json_str = ujson.dumps(data)
|
|
file.write(json_str)
|
|
|
|
return json_str
|
|
|
|
except Exception as e:
|
|
print(f"Error generating summary: {str(e)}")
|
|
|
|
|
|
def get_executive_orders():
|
|
url = "https://www.whitehouse.gov/presidential-actions/"
|
|
|
|
# Set up headless Selenium WebDriver
|
|
chrome_options = Options()
|
|
chrome_options.add_argument("--headless")
|
|
chrome_options.add_argument("--disable-gpu")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
|
|
service = Service(ChromeDriverManager().install())
|
|
driver = webdriver.Chrome(service=service, options=chrome_options)
|
|
|
|
driver.get(url)
|
|
|
|
try:
|
|
# Wait for executive orders list to load
|
|
wait = WebDriverWait(driver, 10)
|
|
orders = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, "ul.wp-block-post-template > li")))
|
|
|
|
executive_orders = []
|
|
|
|
# First pass to collect basic information
|
|
for order in orders:
|
|
try:
|
|
title_element = order.find_element(By.CSS_SELECTOR, "h2.wp-block-post-title a")
|
|
title = title_element.text.strip()
|
|
link = title_element.get_attribute("href")
|
|
|
|
date_element = order.find_element(By.CSS_SELECTOR, "div.wp-block-post-date time")
|
|
date_raw = date_element.get_attribute("datetime").split("T")[0]
|
|
date_formatted = datetime.strptime(date_raw, "%Y-%m-%d").strftime("%Y-%m-%d")
|
|
|
|
executive_orders.append({
|
|
"title": title,
|
|
"date": date_formatted,
|
|
"link": link,
|
|
"description": None # Initialize description field
|
|
})
|
|
except Exception as e:
|
|
print(f"Error processing an executive order: {e}")
|
|
|
|
# Second pass to collect descriptions
|
|
for eo in executive_orders:
|
|
try:
|
|
driver.get(eo['link'])
|
|
|
|
# Wait for description content to load
|
|
desc_wait = WebDriverWait(driver, 10)
|
|
description_element = desc_wait.until(
|
|
EC.presence_of_element_located((By.CSS_SELECTOR, "div.entry-content.wp-block-post-content"))
|
|
)
|
|
|
|
# Extract and clean text
|
|
eo['description'] = description_element.text.strip()
|
|
|
|
except Exception as e:
|
|
print(f"Error fetching description for {eo['link']}: {e}")
|
|
eo['description'] = "Description unavailable"
|
|
|
|
return executive_orders
|
|
|
|
finally:
|
|
driver.quit()
|
|
|
|
|
|
|
|
async def get_historical_sector():
|
|
sector_list = ["SPY","XLB", "XLC", "XLY", "XLP", "XLE", "XLF", "XLV", "XLI", "XLRE", "XLK", "XLU"]
|
|
res_dict = {}
|
|
|
|
def calculate_percentage_change(current_price, previous_price):
|
|
if previous_price == 0:
|
|
return 0
|
|
return ((current_price - previous_price) / previous_price) * 100
|
|
|
|
def find_closest_date(data, target_date):
|
|
# Find the closest date entry equal to or before the target date
|
|
target_date = datetime.strptime(target_date, '%Y-%m-%d')
|
|
for entry in reversed(data): # Reverse to search from newest to oldest
|
|
entry_date = datetime.strptime(entry['time'], '%Y-%m-%d')
|
|
if entry_date <= target_date:
|
|
return entry
|
|
return None
|
|
|
|
for symbol in sector_list:
|
|
try:
|
|
# Load historical data
|
|
with open(f"json/historical-price/max/{symbol}.json", "r") as file:
|
|
data = orjson.loads(file.read())
|
|
|
|
# Load current data for 1D change
|
|
with open(f"json/quote/{symbol}.json", "r") as file:
|
|
current_data = round(orjson.loads(file.read()).get('changesPercentage', 0),2)
|
|
|
|
if not data:
|
|
continue
|
|
|
|
# Get the latest price (last item in the list)
|
|
latest_price = data[-1]['close']
|
|
|
|
# Calculate dates for different periods
|
|
today = datetime.strptime(data[-1]['time'], '%Y-%m-%d')
|
|
dates = {
|
|
'1W': (today - timedelta(days=7)).strftime('%Y-%m-%d'),
|
|
'1M': (today - timedelta(days=30)).strftime('%Y-%m-%d'),
|
|
'3M': (today - timedelta(days=90)).strftime('%Y-%m-%d'),
|
|
'6M': (today - timedelta(days=180)).strftime('%Y-%m-%d'),
|
|
'Inauguration': '2025-01-20'
|
|
}
|
|
|
|
changes = {'1D': current_data}
|
|
|
|
# Calculate percentage changes for each period
|
|
for period, target_date in dates.items():
|
|
historical_entry = find_closest_date(data, target_date)
|
|
if historical_entry:
|
|
change = calculate_percentage_change(latest_price, historical_entry['close'])
|
|
changes[period] = round(change, 2)
|
|
else:
|
|
changes[period] = 0
|
|
|
|
res_dict[symbol] = changes
|
|
|
|
except Exception as e:
|
|
print(f"Error processing {symbol}: {str(e)}")
|
|
continue
|
|
return res_dict
|
|
|
|
async def get_truth_social_post():
|
|
|
|
chrome_options = Options()
|
|
chrome_options.add_argument("--headless")
|
|
chrome_options.add_argument("--disable-gpu")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
|
|
service = Service(ChromeDriverManager().install())
|
|
driver = webdriver.Chrome(service=service, options=chrome_options)
|
|
|
|
driver.get("https://trumpstruth.org/?per_page=40")
|
|
|
|
wait = WebDriverWait(driver, 20)
|
|
statuses = wait.until(EC.presence_of_all_elements_located((By.CLASS_NAME, 'status')))
|
|
|
|
posts_data = []
|
|
|
|
for status in statuses:
|
|
try:
|
|
# Extract username
|
|
username = status.find_element(By.CLASS_NAME, 'status-info__account-name').text
|
|
except NoSuchElementException:
|
|
username = "N/A"
|
|
|
|
try:
|
|
# Extract date (second meta item)
|
|
meta_items = status.find_elements(By.CLASS_NAME, 'status-info__meta-item')
|
|
date = meta_items[1].text if len(meta_items) >= 2 else "N/A"
|
|
except (NoSuchElementException, IndexError):
|
|
date = "N/A"
|
|
|
|
try:
|
|
# Extract content text
|
|
content_element = status.find_element(By.CLASS_NAME, 'status__content')
|
|
content = content_element.text.strip()
|
|
except NoSuchElementException:
|
|
content = ""
|
|
|
|
# Extract video URL if present
|
|
video_url = ""
|
|
try:
|
|
video_element = status.find_element(By.CSS_SELECTOR, '.status-attachment--video video')
|
|
video_url = video_element.get_attribute('src')
|
|
except NoSuchElementException:
|
|
pass
|
|
|
|
# Extract external link details if present
|
|
external_link = ""
|
|
link_title = ""
|
|
link_description = ""
|
|
try:
|
|
card = status.find_element(By.CLASS_NAME, 'status-card')
|
|
external_link = card.get_attribute('href')
|
|
link_title = card.find_element(By.CLASS_NAME, 'status-card__title').text
|
|
link_description = card.find_element(By.CLASS_NAME, 'status-card__description').text
|
|
except NoSuchElementException:
|
|
pass
|
|
|
|
# Extract original post URL
|
|
try:
|
|
original_post_url = status.find_element(By.CLASS_NAME, 'status__external-link').get_attribute('href')
|
|
except NoSuchElementException:
|
|
original_post_url = ""
|
|
|
|
posts_data.append({
|
|
'date': date,
|
|
'content': content,
|
|
'videoUrl': video_url,
|
|
'externalLink': external_link,
|
|
'title': link_title,
|
|
'source': original_post_url
|
|
})
|
|
|
|
posts_data = [item for item in posts_data if item['videoUrl'] == "" and "youtube" not in item['content'] and item['content'] != ""]
|
|
return posts_data
|
|
|
|
|
|
|
|
async def get_data():
|
|
|
|
post_list = await get_truth_social_post()
|
|
market_dict = await get_historical_sector()
|
|
|
|
executive_orders = get_executive_orders()
|
|
|
|
executive_orders_summary = []
|
|
|
|
for item in executive_orders:
|
|
try:
|
|
data = get_summary(item)
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
for item in executive_orders:
|
|
try:
|
|
unique_id = generate_unique_id(item)
|
|
|
|
# Open and read the JSON file
|
|
with open(f"json/executive-orders/{unique_id}.json", "r") as file:
|
|
data = orjson.loads(file.read())
|
|
|
|
# Assign sentiment based on words in the description
|
|
if 'bullish' in data['description']:
|
|
data['sentiment'] = 'Bullish'
|
|
elif 'bearish' in data['description']:
|
|
data['sentiment'] = 'Bearish'
|
|
else:
|
|
data['sentiment'] = 'Neutral'
|
|
|
|
executive_orders_summary.append(data)
|
|
except Exception as e:
|
|
print(f"Error processing item {item}: {e}")
|
|
|
|
|
|
query = query_template.format(symbol='SPY')
|
|
|
|
etf_con = sqlite3.connect('etf.db')
|
|
etf_cursor = etf_con.cursor()
|
|
etf_cursor.execute("PRAGMA journal_mode = wal")
|
|
|
|
df = pd.read_sql_query(query, etf_con, params=("2025-01-20", datetime.today().strftime("%Y-%m-%d")))
|
|
if not df.empty:
|
|
df['changesPercentage'] = (df['close'].pct_change() * 100).round(2)
|
|
sp500_list = df.dropna().to_dict(orient="records") # Drop NaN values and convert to list
|
|
etf_con.close()
|
|
|
|
|
|
url = "https://media-cdn.factba.se/rss/json/trump/calendar-full.json"
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
# Filter out items with None for date or time, then sort
|
|
data = sorted(
|
|
(item for item in data if item['date'] is not None and item['time'] is not None),
|
|
key=lambda x: (x['date'], x['time']),
|
|
reverse=True
|
|
)
|
|
|
|
else:
|
|
print(f"Failed to fetch data. HTTP status code: {response.status}")
|
|
|
|
if len(data) > 0 and len(executive_orders_summary) > 0:
|
|
|
|
for item in data:
|
|
for price_item in sp500_list:
|
|
if item['date'] == price_item['date']:
|
|
item['changesPercentage'] = price_item['changesPercentage']
|
|
break
|
|
res_dict = {'posts': post_list, 'marketPerformance': market_dict, 'history': data, 'executiveOrders': executive_orders_summary}
|
|
save_json(res_dict)
|
|
|
|
|
|
|
|
asyncio.run(get_data()) |