update potus tracker with ai
This commit is contained in:
parent
14c4bbd916
commit
eff6c1b981
@ -9,14 +9,35 @@ from geopy.geocoders import Nominatim
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import orjson
|
||||
import ujson
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
import sqlite3
|
||||
import pandas as pd
|
||||
from openai import OpenAI
|
||||
from datetime import datetime
|
||||
import hashlib
|
||||
|
||||
def generate_unique_id(data):
|
||||
# Concatenate the title and date to form a string
|
||||
unique_str = f"{data['title']}-{data['date']}"
|
||||
|
||||
# Hash the concatenated string to ensure uniqueness
|
||||
unique_id = hashlib.md5(unique_str.encode()).hexdigest()
|
||||
|
||||
return unique_id
|
||||
|
||||
|
||||
load_dotenv()
|
||||
geolocator = Nominatim(user_agent="myGeocodingApp/1.0 (your-email@example.com)")
|
||||
|
||||
openai_api_key = os.getenv('OPENAI_API_KEY')
|
||||
org_id = os.getenv('OPENAI_ORG')
|
||||
client = OpenAI(
|
||||
api_key=openai_api_key,
|
||||
organization=org_id,
|
||||
)
|
||||
|
||||
|
||||
query_template = """
|
||||
SELECT
|
||||
@ -45,90 +66,149 @@ chrome_options.add_argument("--no-sandbox")
|
||||
service = Service(ChromeDriverManager().install())
|
||||
driver = webdriver.Chrome(service=service, options=chrome_options)
|
||||
|
||||
# Open the URL
|
||||
url = os.getenv('POTUS_TRACKER')
|
||||
url ="https://www.whitehouse.gov/presidential-actions/"
|
||||
driver.get(url)
|
||||
|
||||
def get_bills():
|
||||
|
||||
def get_summary(data):
|
||||
unique_id = generate_unique_id(data) # Assuming this function exists
|
||||
|
||||
# Check if the file exists
|
||||
file_path = f"json/executive-orders/{unique_id}.json"
|
||||
|
||||
# Create directory if it doesn't exist
|
||||
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
||||
|
||||
if os.path.exists(file_path):
|
||||
print(f"File {file_path} already exists, skipping summary generation.")
|
||||
return
|
||||
|
||||
try:
|
||||
# Wait for the page to load
|
||||
WebDriverWait(driver, 10).until(
|
||||
EC.presence_of_element_located((By.ID, "legislation-container"))
|
||||
data_string = f"Analyze this executive order: {data['description']}"
|
||||
response = client.chat.completions.create(
|
||||
model="gpt-4o-mini",
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Don't use quotes or titles or bullet points. Provide a clear and concise summary of the US president's executive order. To break the section use <br> to make it html compatible. Explain its potential impact on the stock market, indicating whether it is likely to be bullish, bearish, or neutral, and justify your reasoning based on key aspects of the order. Keep it under 600 characters."
|
||||
},
|
||||
{"role": "user", "content": data_string}
|
||||
],
|
||||
max_tokens=1000,
|
||||
temperature=0.7
|
||||
)
|
||||
|
||||
# Locate the legislation container
|
||||
legislation_container = driver.find_element(By.ID, "legislation-container")
|
||||
legislation_items = legislation_container.find_elements(By.CLASS_NAME, "legislation-item")
|
||||
|
||||
# Extract data
|
||||
data = []
|
||||
for item in legislation_items:
|
||||
# Badge
|
||||
badge = item.find_element(By.CLASS_NAME, "badge").text
|
||||
|
||||
# Header (Title)
|
||||
header = item.find_element(By.CLASS_NAME, "legislation-header").text
|
||||
|
||||
# Description
|
||||
description = item.find_element(By.CLASS_NAME, "legislation-description").text
|
||||
|
||||
# Time Ago (if present)
|
||||
time_ago_element = item.find_elements(By.CLASS_NAME, "datetime-ago")
|
||||
time_ago = time_ago_element[0].text if time_ago_element else None
|
||||
|
||||
# Meta Info (e.g., status)
|
||||
meta_info_elements = item.find_elements(By.CLASS_NAME, "legislation-meta")
|
||||
meta_info = []
|
||||
if meta_info_elements:
|
||||
for meta_item in meta_info_elements[0].find_elements(By.TAG_NAME, "div"):
|
||||
meta_info.append(meta_item.text.strip())
|
||||
|
||||
# Check if there's a "Read More" button to click
|
||||
read_more_buttons = item.find_elements(By.CLASS_NAME, "read-more-btn") # Now using correct class
|
||||
if read_more_buttons:
|
||||
print("Found 'Read More' button, clicking it...")
|
||||
# Click the "Read More" button
|
||||
read_more_buttons[0].click()
|
||||
|
||||
# Wait for the popup to become visible
|
||||
#print("Waiting for the popup to appear...")
|
||||
WebDriverWait(driver, 10).until(
|
||||
EC.visibility_of_element_located((By.ID, "popup-container")) # Wait until popup is visible
|
||||
)
|
||||
|
||||
# Extract content from the popup
|
||||
#print("Popup appeared, extracting content...")
|
||||
popup_title = driver.find_element(By.ID, "popup-title").text
|
||||
popup_content = driver.find_element(By.ID, "popup-content").text
|
||||
|
||||
|
||||
# Add the popup content and URL to the description (optional)
|
||||
description = f"{popup_content}"
|
||||
|
||||
# Close the popup (optional)
|
||||
close_button = driver.find_element(By.ID, "popup-close-button")
|
||||
close_button.click()
|
||||
#print("Popup closed.")
|
||||
|
||||
# Append data to list
|
||||
data.append({
|
||||
"badge": badge,
|
||||
"title": header,
|
||||
"description": description,
|
||||
"time": time_ago,
|
||||
})
|
||||
|
||||
# Print scraped data
|
||||
|
||||
return data
|
||||
summary = response.choices[0].message.content
|
||||
data['description'] = summary
|
||||
|
||||
# Save the data with the generated summary
|
||||
with open(file_path, "w", encoding="utf-8") as file:
|
||||
json_str = ujson.dumps(data)
|
||||
file.write(json_str)
|
||||
|
||||
return json_str
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error generating summary: {str(e)}")
|
||||
|
||||
|
||||
def get_executive_orders():
|
||||
url = "https://www.whitehouse.gov/presidential-actions/"
|
||||
|
||||
# Set up headless Selenium WebDriver
|
||||
chrome_options = Options()
|
||||
chrome_options.add_argument("--headless")
|
||||
chrome_options.add_argument("--disable-gpu")
|
||||
chrome_options.add_argument("--no-sandbox")
|
||||
|
||||
service = Service(ChromeDriverManager().install())
|
||||
driver = webdriver.Chrome(service=service, options=chrome_options)
|
||||
|
||||
driver.get(url)
|
||||
|
||||
try:
|
||||
# Wait for executive orders list to load
|
||||
wait = WebDriverWait(driver, 10)
|
||||
orders = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, "ul.wp-block-post-template > li")))
|
||||
|
||||
executive_orders = []
|
||||
|
||||
# First pass to collect basic information
|
||||
for order in orders:
|
||||
try:
|
||||
title_element = order.find_element(By.CSS_SELECTOR, "h2.wp-block-post-title a")
|
||||
title = title_element.text.strip()
|
||||
link = title_element.get_attribute("href")
|
||||
|
||||
date_element = order.find_element(By.CSS_SELECTOR, "div.wp-block-post-date time")
|
||||
date_raw = date_element.get_attribute("datetime").split("T")[0]
|
||||
date_formatted = datetime.strptime(date_raw, "%Y-%m-%d").strftime("%Y-%m-%d")
|
||||
|
||||
executive_orders.append({
|
||||
"title": title,
|
||||
"date": date_formatted,
|
||||
"link": link,
|
||||
"description": None # Initialize description field
|
||||
})
|
||||
except Exception as e:
|
||||
print(f"Error processing an executive order: {e}")
|
||||
|
||||
# Second pass to collect descriptions
|
||||
for eo in executive_orders:
|
||||
try:
|
||||
driver.get(eo['link'])
|
||||
|
||||
# Wait for description content to load
|
||||
desc_wait = WebDriverWait(driver, 10)
|
||||
description_element = desc_wait.until(
|
||||
EC.presence_of_element_located((By.CSS_SELECTOR, "div.entry-content.wp-block-post-content"))
|
||||
)
|
||||
|
||||
# Extract and clean text
|
||||
eo['description'] = description_element.text.strip()
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error fetching description for {eo['link']}: {e}")
|
||||
eo['description'] = "Description unavailable"
|
||||
|
||||
return executive_orders
|
||||
|
||||
finally:
|
||||
# Close the driver
|
||||
driver.quit()
|
||||
|
||||
|
||||
|
||||
async def get_data():
|
||||
bill_data = get_bills()
|
||||
executive_orders = get_executive_orders()
|
||||
|
||||
executive_orders_summary = []
|
||||
|
||||
for item in executive_orders:
|
||||
try:
|
||||
data = get_summary(item)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
for item in executive_orders:
|
||||
try:
|
||||
unique_id = generate_unique_id(item)
|
||||
|
||||
# Open and read the JSON file
|
||||
with open(f"json/executive-orders/{unique_id}.json", "r") as file:
|
||||
data = orjson.loads(file.read())
|
||||
|
||||
# Assign sentiment based on words in the description
|
||||
if 'bullish' in data['description']:
|
||||
data['sentiment'] = 'Bullish'
|
||||
elif 'bearish' in data['description']:
|
||||
data['sentiment'] = 'Bearish'
|
||||
else:
|
||||
data['sentiment'] = 'Neutral'
|
||||
|
||||
executive_orders_summary.append(data)
|
||||
except Exception as e:
|
||||
print(f"Error processing item {item}: {e}")
|
||||
|
||||
|
||||
|
||||
query = query_template.format(symbol='SPY')
|
||||
|
||||
etf_con = sqlite3.connect('etf.db')
|
||||
@ -158,7 +238,7 @@ async def get_data():
|
||||
else:
|
||||
print(f"Failed to fetch data. HTTP status code: {response.status}")
|
||||
|
||||
if len(data) > 0 and len(bill_data) > 0:
|
||||
if len(data) > 0 and len(executive_orders_summary) > 0:
|
||||
# Latest location
|
||||
details = data[0]['details']
|
||||
location = data[0]['location']
|
||||
@ -193,7 +273,8 @@ async def get_data():
|
||||
if item['date'] == price_item['date']:
|
||||
item['changesPercentage'] = price_item['changesPercentage']
|
||||
break
|
||||
res_dict = {'returnSince': return_since,'city': city, 'lon': longitude, 'lat': latitude, 'history': data, 'billData': bill_data}
|
||||
res_dict = {'returnSince': return_since,'city': city, 'lon': longitude, 'lat': latitude, 'history': data, 'executiveOrders': executive_orders_summary}
|
||||
save_json(res_dict)
|
||||
|
||||
|
||||
asyncio.run(get_data())
|
||||
Loading…
x
Reference in New Issue
Block a user