From 5fc6d84fadcbd5ccf419825fb961504424c5d147 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Tue, 26 Nov 2024 18:18:19 +0100 Subject: [PATCH] optimize websocket --- app/cron_websocket.py | 116 +++++++++++++++++++++++++----------------- 1 file changed, 68 insertions(+), 48 deletions(-) diff --git a/app/cron_websocket.py b/app/cron_websocket.py index 80017c6..dbd1339 100644 --- a/app/cron_websocket.py +++ b/app/cron_websocket.py @@ -8,12 +8,35 @@ from typing import Dict, Any from dotenv import load_dotenv from datetime import datetime, time import zoneinfo +import aiofiles +import functools +# Use uvloop for faster event loop if available +try: + import uvloop + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) +except ImportError: + pass -# Set up logging configuration -logging.basicConfig(level=logging.INFO) +# Optimize logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler()] +) logger = logging.getLogger(__name__) +# Precompute holidays and use a set for faster lookups +US_HOLIDAYS = { + "2024-01-01", "2024-01-15", "2024-02-19", + "2024-03-29", "2024-05-27", "2024-06-19", + "2024-07-04", "2024-09-02", "2024-11-28", + "2024-12-25" +} + +# Use functools.cache to memoize market hours check +@functools.cache def check_market_hours() -> bool: """ Check if the stock market is currently open. @@ -21,69 +44,56 @@ def check_market_hours() -> bool: Returns: bool: True if market is open, False otherwise """ - # US stock market holidays for 2024 - us_holidays = [ - "2024-01-01", # New Year's Day - "2024-01-15", # Martin Luther King Jr. Day - "2024-02-19", # Presidents' Day - "2024-03-29", # Good Friday - "2024-05-27", # Memorial Day - "2024-06-19", # Juneteenth - "2024-07-04", # Independence Day - "2024-09-02", # Labor Day - "2024-11-28", # Thanksgiving - "2024-12-25", # Christmas Day - ] - - # Get current time in Eastern Time et_tz = zoneinfo.ZoneInfo('America/New_York') now = datetime.now(et_tz) - # Check for weekend - if now.weekday() >= 5: # 5 and 6 are Saturday and Sunday + + # Quick weekend check + if now.weekday() >= 5: return False - # Check for holidays - if now.strftime('%Y-%m-%d') in us_holidays: + # Use set for faster holiday lookup + if now.strftime('%Y-%m-%d') in US_HOLIDAYS: return False - # Market hours are 9:30 AM to 4:00 PM ET - market_open = time(9, 30) - market_close = time(16, 0) + # Market hours check current_time = now.time() - - # Check if current time is within market hours - return market_open <= current_time < market_close + return time(9, 30) <= current_time < time(16, 0) class WebSocketStockTicker: def __init__(self, api_key: str, uri: str = "wss://websockets.financialmodelingprep.com"): + # Use slots to reduce memory overhead + __slots__ = ['api_key', 'uri', 'output_dir', 'login_payload', 'subscribe_payload'] + self.api_key = api_key self.uri = uri self.output_dir = Path('json/websocket/companies') self.output_dir.mkdir(parents=True, exist_ok=True) - self.login_payload = { + # Precompute payloads to avoid repeated dictionary creation + self.login_payload = orjson.dumps({ "event": "login", "data": {"apiKey": self.api_key} - } + }) - self.subscribe_payload = { + self.subscribe_payload = orjson.dumps({ "event": "subscribe", "data": {"ticker": ["*"]} - } + }) async def _safe_write(self, file_path: Path, data: Dict[str, Any]) -> None: - """Safely write data to file with error handling.""" + """Safely write data to file using aiofiles for non-blocking I/O.""" try: - with open(file_path, 'wb') as f: - f.write(orjson.dumps(data)) + async with aiofiles.open(file_path, 'wb') as f: + await f.write(orjson.dumps(data)) except IOError as e: logger.error(f"File write error for {file_path}: {e}") async def _process_message(self, message: str) -> None: - """Process and store individual WebSocket messages.""" + """Optimized message processing with minimal allocation.""" try: data = orjson.loads(message) + # Fast symbol extraction and sanitization if 's' in data: symbol = data['s'].upper() safe_symbol = ''.join(c for c in symbol if c.isalnum() or c in ['-', '_']) @@ -97,36 +107,46 @@ class WebSocketStockTicker: logger.error(f"Error processing message: {e}") async def connect(self) -> None: - """Establish WebSocket connection with auto-reconnect.""" + """Establish WebSocket connection with robust error handling.""" + reconnect_delay = 5 + max_reconnect_delay = 60 + while True: # Check market hours before connecting if not check_market_hours(): logger.info("Market is closed. Waiting 5 minutes before checking again.") - await asyncio.sleep(300) # Wait 5 minutes before checking again + await asyncio.sleep(300) # Wait 5 minutes continue try: async with websockets.connect(self.uri, ping_interval=30) as websocket: - # Login and subscribe - await websocket.send(orjson.dumps(self.login_payload)) - await asyncio.sleep(2) - await websocket.send(orjson.dumps(self.subscribe_payload)) + # Reset reconnect delay on successful connection + reconnect_delay = 5 - # Handle incoming messages + # Login and subscribe with pre-serialized payloads + await websocket.send(self.login_payload) + await asyncio.sleep(2) + await websocket.send(self.subscribe_payload) + + # Handle incoming messages with timeout async for message in websocket: - # Additional check in case market closes during connection if not check_market_hours(): logger.info("Market closed during connection. Disconnecting.") break - await self._process_message(message) + + # Use asyncio.create_task for concurrent message processing + asyncio.create_task(self._process_message(message)) except (websockets.exceptions.ConnectionClosedError, websockets.exceptions.WebSocketException) as e: - logger.warning(f"WebSocket error: {e}. Reconnecting in 5 seconds...") - await asyncio.sleep(5) + logger.warning(f"WebSocket error: {e}. Reconnecting in {reconnect_delay} seconds...") + await asyncio.sleep(reconnect_delay) + # Exponential backoff with cap + reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay) except Exception as e: - logger.error(f"Unexpected error: {e}. Reconnecting in 5 seconds...") - await asyncio.sleep(5) + logger.error(f"Unexpected error: {e}. Reconnecting in {reconnect_delay} seconds...") + await asyncio.sleep(reconnect_delay) + reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay) async def main(): load_dotenv()