optimize websocket
This commit is contained in:
parent
b6a659761a
commit
5fc6d84fad
@ -8,12 +8,35 @@ from typing import Dict, Any
|
|||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from datetime import datetime, time
|
from datetime import datetime, time
|
||||||
import zoneinfo
|
import zoneinfo
|
||||||
|
import aiofiles
|
||||||
|
import functools
|
||||||
|
|
||||||
|
# Use uvloop for faster event loop if available
|
||||||
|
try:
|
||||||
|
import uvloop
|
||||||
|
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
# Set up logging configuration
|
# Optimize logging
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||||
|
handlers=[
|
||||||
|
logging.StreamHandler()]
|
||||||
|
)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Precompute holidays and use a set for faster lookups
|
||||||
|
US_HOLIDAYS = {
|
||||||
|
"2024-01-01", "2024-01-15", "2024-02-19",
|
||||||
|
"2024-03-29", "2024-05-27", "2024-06-19",
|
||||||
|
"2024-07-04", "2024-09-02", "2024-11-28",
|
||||||
|
"2024-12-25"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Use functools.cache to memoize market hours check
|
||||||
|
@functools.cache
|
||||||
def check_market_hours() -> bool:
|
def check_market_hours() -> bool:
|
||||||
"""
|
"""
|
||||||
Check if the stock market is currently open.
|
Check if the stock market is currently open.
|
||||||
@ -21,69 +44,56 @@ def check_market_hours() -> bool:
|
|||||||
Returns:
|
Returns:
|
||||||
bool: True if market is open, False otherwise
|
bool: True if market is open, False otherwise
|
||||||
"""
|
"""
|
||||||
# US stock market holidays for 2024
|
|
||||||
us_holidays = [
|
|
||||||
"2024-01-01", # New Year's Day
|
|
||||||
"2024-01-15", # Martin Luther King Jr. Day
|
|
||||||
"2024-02-19", # Presidents' Day
|
|
||||||
"2024-03-29", # Good Friday
|
|
||||||
"2024-05-27", # Memorial Day
|
|
||||||
"2024-06-19", # Juneteenth
|
|
||||||
"2024-07-04", # Independence Day
|
|
||||||
"2024-09-02", # Labor Day
|
|
||||||
"2024-11-28", # Thanksgiving
|
|
||||||
"2024-12-25", # Christmas Day
|
|
||||||
]
|
|
||||||
|
|
||||||
# Get current time in Eastern Time
|
|
||||||
et_tz = zoneinfo.ZoneInfo('America/New_York')
|
et_tz = zoneinfo.ZoneInfo('America/New_York')
|
||||||
now = datetime.now(et_tz)
|
now = datetime.now(et_tz)
|
||||||
# Check for weekend
|
|
||||||
if now.weekday() >= 5: # 5 and 6 are Saturday and Sunday
|
# Quick weekend check
|
||||||
|
if now.weekday() >= 5:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Check for holidays
|
# Use set for faster holiday lookup
|
||||||
if now.strftime('%Y-%m-%d') in us_holidays:
|
if now.strftime('%Y-%m-%d') in US_HOLIDAYS:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Market hours are 9:30 AM to 4:00 PM ET
|
# Market hours check
|
||||||
market_open = time(9, 30)
|
|
||||||
market_close = time(16, 0)
|
|
||||||
current_time = now.time()
|
current_time = now.time()
|
||||||
|
return time(9, 30) <= current_time < time(16, 0)
|
||||||
# Check if current time is within market hours
|
|
||||||
return market_open <= current_time < market_close
|
|
||||||
|
|
||||||
class WebSocketStockTicker:
|
class WebSocketStockTicker:
|
||||||
def __init__(self, api_key: str, uri: str = "wss://websockets.financialmodelingprep.com"):
|
def __init__(self, api_key: str, uri: str = "wss://websockets.financialmodelingprep.com"):
|
||||||
|
# Use slots to reduce memory overhead
|
||||||
|
__slots__ = ['api_key', 'uri', 'output_dir', 'login_payload', 'subscribe_payload']
|
||||||
|
|
||||||
self.api_key = api_key
|
self.api_key = api_key
|
||||||
self.uri = uri
|
self.uri = uri
|
||||||
self.output_dir = Path('json/websocket/companies')
|
self.output_dir = Path('json/websocket/companies')
|
||||||
self.output_dir.mkdir(parents=True, exist_ok=True)
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
self.login_payload = {
|
# Precompute payloads to avoid repeated dictionary creation
|
||||||
|
self.login_payload = orjson.dumps({
|
||||||
"event": "login",
|
"event": "login",
|
||||||
"data": {"apiKey": self.api_key}
|
"data": {"apiKey": self.api_key}
|
||||||
}
|
})
|
||||||
|
|
||||||
self.subscribe_payload = {
|
self.subscribe_payload = orjson.dumps({
|
||||||
"event": "subscribe",
|
"event": "subscribe",
|
||||||
"data": {"ticker": ["*"]}
|
"data": {"ticker": ["*"]}
|
||||||
}
|
})
|
||||||
|
|
||||||
async def _safe_write(self, file_path: Path, data: Dict[str, Any]) -> None:
|
async def _safe_write(self, file_path: Path, data: Dict[str, Any]) -> None:
|
||||||
"""Safely write data to file with error handling."""
|
"""Safely write data to file using aiofiles for non-blocking I/O."""
|
||||||
try:
|
try:
|
||||||
with open(file_path, 'wb') as f:
|
async with aiofiles.open(file_path, 'wb') as f:
|
||||||
f.write(orjson.dumps(data))
|
await f.write(orjson.dumps(data))
|
||||||
except IOError as e:
|
except IOError as e:
|
||||||
logger.error(f"File write error for {file_path}: {e}")
|
logger.error(f"File write error for {file_path}: {e}")
|
||||||
|
|
||||||
async def _process_message(self, message: str) -> None:
|
async def _process_message(self, message: str) -> None:
|
||||||
"""Process and store individual WebSocket messages."""
|
"""Optimized message processing with minimal allocation."""
|
||||||
try:
|
try:
|
||||||
data = orjson.loads(message)
|
data = orjson.loads(message)
|
||||||
|
|
||||||
|
# Fast symbol extraction and sanitization
|
||||||
if 's' in data:
|
if 's' in data:
|
||||||
symbol = data['s'].upper()
|
symbol = data['s'].upper()
|
||||||
safe_symbol = ''.join(c for c in symbol if c.isalnum() or c in ['-', '_'])
|
safe_symbol = ''.join(c for c in symbol if c.isalnum() or c in ['-', '_'])
|
||||||
@ -97,36 +107,46 @@ class WebSocketStockTicker:
|
|||||||
logger.error(f"Error processing message: {e}")
|
logger.error(f"Error processing message: {e}")
|
||||||
|
|
||||||
async def connect(self) -> None:
|
async def connect(self) -> None:
|
||||||
"""Establish WebSocket connection with auto-reconnect."""
|
"""Establish WebSocket connection with robust error handling."""
|
||||||
|
reconnect_delay = 5
|
||||||
|
max_reconnect_delay = 60
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
# Check market hours before connecting
|
# Check market hours before connecting
|
||||||
if not check_market_hours():
|
if not check_market_hours():
|
||||||
logger.info("Market is closed. Waiting 5 minutes before checking again.")
|
logger.info("Market is closed. Waiting 5 minutes before checking again.")
|
||||||
await asyncio.sleep(300) # Wait 5 minutes before checking again
|
await asyncio.sleep(300) # Wait 5 minutes
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with websockets.connect(self.uri, ping_interval=30) as websocket:
|
async with websockets.connect(self.uri, ping_interval=30) as websocket:
|
||||||
# Login and subscribe
|
# Reset reconnect delay on successful connection
|
||||||
await websocket.send(orjson.dumps(self.login_payload))
|
reconnect_delay = 5
|
||||||
await asyncio.sleep(2)
|
|
||||||
await websocket.send(orjson.dumps(self.subscribe_payload))
|
|
||||||
|
|
||||||
# Handle incoming messages
|
# Login and subscribe with pre-serialized payloads
|
||||||
|
await websocket.send(self.login_payload)
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
await websocket.send(self.subscribe_payload)
|
||||||
|
|
||||||
|
# Handle incoming messages with timeout
|
||||||
async for message in websocket:
|
async for message in websocket:
|
||||||
# Additional check in case market closes during connection
|
|
||||||
if not check_market_hours():
|
if not check_market_hours():
|
||||||
logger.info("Market closed during connection. Disconnecting.")
|
logger.info("Market closed during connection. Disconnecting.")
|
||||||
break
|
break
|
||||||
await self._process_message(message)
|
|
||||||
|
# Use asyncio.create_task for concurrent message processing
|
||||||
|
asyncio.create_task(self._process_message(message))
|
||||||
|
|
||||||
except (websockets.exceptions.ConnectionClosedError,
|
except (websockets.exceptions.ConnectionClosedError,
|
||||||
websockets.exceptions.WebSocketException) as e:
|
websockets.exceptions.WebSocketException) as e:
|
||||||
logger.warning(f"WebSocket error: {e}. Reconnecting in 5 seconds...")
|
logger.warning(f"WebSocket error: {e}. Reconnecting in {reconnect_delay} seconds...")
|
||||||
await asyncio.sleep(5)
|
await asyncio.sleep(reconnect_delay)
|
||||||
|
# Exponential backoff with cap
|
||||||
|
reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Unexpected error: {e}. Reconnecting in 5 seconds...")
|
logger.error(f"Unexpected error: {e}. Reconnecting in {reconnect_delay} seconds...")
|
||||||
await asyncio.sleep(5)
|
await asyncio.sleep(reconnect_delay)
|
||||||
|
reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay)
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user