From 80e7e4d863d78672111ab8e80a777ca288183ba5 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Tue, 28 Jan 2025 20:44:53 +0100 Subject: [PATCH] add potus tracker --- app/cron_potus_tracker.py | 166 ++++++++++++++++++++++++++++++++++++++ app/main.py | 34 +++++++- app/primary_cron_job.py | 1 + 3 files changed, 198 insertions(+), 3 deletions(-) create mode 100644 app/cron_potus_tracker.py diff --git a/app/cron_potus_tracker.py b/app/cron_potus_tracker.py new file mode 100644 index 0000000..aa4f513 --- /dev/null +++ b/app/cron_potus_tracker.py @@ -0,0 +1,166 @@ +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from webdriver_manager.chrome import ChromeDriverManager +from geopy.geocoders import Nominatim +import aiohttp +import asyncio +import orjson +from dotenv import load_dotenv +import os + +load_dotenv() +geolocator = Nominatim(user_agent="myGeocodingApp/1.0 (your-email@example.com)") + +def save_json(data): + path = "json/tracker/potus" + os.makedirs(path, exist_ok=True) + + with open(f"{path}/data.json", "wb") as file: + file.write(orjson.dumps(data)) + +# Set up the Selenium WebDriver +chrome_options = Options() +chrome_options.add_argument("--headless") # Run browser in headless mode +chrome_options.add_argument("--disable-gpu") +chrome_options.add_argument("--no-sandbox") + +# Replace 'path/to/chromedriver' with your actual chromedriver path +service = Service(ChromeDriverManager().install()) +driver = webdriver.Chrome(service=service, options=chrome_options) + +# Open the URL +url = os.getenv('POTUS_TRACKER') +driver.get(url) + +def get_bills(): + try: + # Wait for the page to load + WebDriverWait(driver, 10).until( + EC.presence_of_element_located((By.ID, "legislation-container")) + ) + + # Locate the legislation container + legislation_container = driver.find_element(By.ID, "legislation-container") + legislation_items = legislation_container.find_elements(By.CLASS_NAME, "legislation-item") + + # Extract data + data = [] + for item in legislation_items: + # Badge + badge = item.find_element(By.CLASS_NAME, "badge").text + + # Header (Title) + header = item.find_element(By.CLASS_NAME, "legislation-header").text + + # Description + description = item.find_element(By.CLASS_NAME, "legislation-description").text + + # Time Ago (if present) + time_ago_element = item.find_elements(By.CLASS_NAME, "datetime-ago") + time_ago = time_ago_element[0].text if time_ago_element else None + + # Meta Info (e.g., status) + meta_info_elements = item.find_elements(By.CLASS_NAME, "legislation-meta") + meta_info = [] + if meta_info_elements: + for meta_item in meta_info_elements[0].find_elements(By.TAG_NAME, "div"): + meta_info.append(meta_item.text.strip()) + + # Check if there's a "Read More" button to click + read_more_buttons = item.find_elements(By.CLASS_NAME, "read-more-btn") # Now using correct class + if read_more_buttons: + print("Found 'Read More' button, clicking it...") + # Click the "Read More" button + read_more_buttons[0].click() + + # Wait for the popup to become visible + print("Waiting for the popup to appear...") + WebDriverWait(driver, 10).until( + EC.visibility_of_element_located((By.ID, "popup-container")) # Wait until popup is visible + ) + + # Extract content from the popup + print("Popup appeared, extracting content...") + popup_title = driver.find_element(By.ID, "popup-title").text + popup_content = driver.find_element(By.ID, "popup-content").text + + + # Add the popup content and URL to the description (optional) + description = f"{popup_content}" + + # Close the popup (optional) + close_button = driver.find_element(By.ID, "popup-close-button") + close_button.click() + print("Popup closed.") + + # Append data to list + data.append({ + "badge": badge, + "title": header, + "description": description, + "time": time_ago, + }) + + # Print scraped data + + return data + finally: + # Close the driver + driver.quit() + + + +async def get_data(): + bill_data = get_bills() + + url = "https://media-cdn.factba.se/rss/json/trump/calendar-full.json" + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + # Filter out items with None for date or time, then sort + data = sorted( + (item for item in data if item['date'] is not None and item['time'] is not None), + key=lambda x: (x['date'], x['time']), + reverse=True + ) + + else: + print(f"Failed to fetch data. HTTP status code: {response.status}") + + if len(data) > 0 and len(bill_data) > 0: + # Latest location + details = data[0]['details'] + location = data[0]['location'] + + + for address in [details, location]: + if any(place in address for place in ["White House", "Blair House","Washington DC"]): + location = "Washington, DC" + else: + location = address # Otherwise, use the full address string + + # Geocode the processed address + location_data = geolocator.geocode(location) + city = location_data.address.split(',', 1)[0] + if location_data: + + # Extract city from the address components + address_components = location_data.raw.get('address', {}) + + + # Extract latitude and longitude + latitude = location_data.latitude + longitude = location_data.longitude + print(f"Latitude: {latitude}, Longitude: {longitude}") + break + + res_dict = {'city': city, 'lon': longitude, 'lat': latitude, 'history': data, 'billData': bill_data} + save_json(res_dict) + + +asyncio.run(get_data()) \ No newline at end of file diff --git a/app/main.py b/app/main.py index 8b374dd..cc73f4b 100755 --- a/app/main.py +++ b/app/main.py @@ -2571,7 +2571,6 @@ async def get_ipo_calendar(data:IPOData, api_key: str = Security(get_api_key)): @app.post("/heatmap") async def get_heatmap(data: GeneralData, api_key: str = Security(get_api_key)): - print(data) time_period = data.params cache_key = f"heatmap-{time_period}" cached_result = redis_client.get(cache_key) @@ -2579,7 +2578,7 @@ async def get_heatmap(data: GeneralData, api_key: str = Security(get_api_key)): if cached_result: return StreamingResponse( io.BytesIO(cached_result), - media_type="text/html", + media_type="application/json", headers={"Content-Encoding": "gzip"} ) @@ -2600,7 +2599,7 @@ async def get_heatmap(data: GeneralData, api_key: str = Security(get_api_key)): return StreamingResponse( io.BytesIO(compressed_data), - media_type="text/html", + media_type="application/json", headers={ "Content-Encoding": "gzip", } @@ -4282,6 +4281,35 @@ async def get_market_flow(api_key: str = Security(get_api_key)): headers={"Content-Encoding": "gzip"} ) +@app.get("/potus-tracker") +async def get_data(api_key: str = Security(get_api_key)): + cache_key = f"potus-tracker" + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + + try: + with open(f"json/tracker/potus/data.json", 'rb') as file: + res = orjson.loads(file.read()) + except: + res = {} + + data = orjson.dumps(res) + compressed_data = gzip.compress(data) + + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key,60*15) + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + @app.get("/newsletter") async def get_newsletter(): diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index 2b487d2..c486fad 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -113,6 +113,7 @@ def run_fda_calendar(): week = now.weekday() if week <= 5: run_command(["python3", "cron_fda_calendar.py"]) + run_command(["python3", "cron_potus_tracker.py"]) def run_cron_insider_trading(): week = datetime.today().weekday()