From 3bf04eeb1aa4661256ba626c622b8277e848c167 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Mon, 18 Nov 2024 17:37:48 +0100 Subject: [PATCH] update websocket --- app/cron_websocket.py | 94 ++++++++++++++++++ fastify/app.js | 198 ++++++++++++++++++------------------- fastify/feedback/server.js | 24 ----- 3 files changed, 193 insertions(+), 123 deletions(-) create mode 100644 app/cron_websocket.py delete mode 100755 fastify/feedback/server.js diff --git a/app/cron_websocket.py b/app/cron_websocket.py new file mode 100644 index 0000000..d7ba52a --- /dev/null +++ b/app/cron_websocket.py @@ -0,0 +1,94 @@ +import asyncio +import websockets +import orjson +import os +import logging +from pathlib import Path +from typing import Dict, Any +from dotenv import load_dotenv + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s: %(message)s' +) +logger = logging.getLogger(__name__) + +class WebSocketStockTicker: + def __init__(self, api_key: str, uri: str = "wss://websockets.financialmodelingprep.com"): + self.api_key = api_key + self.uri = uri + self.output_dir = Path('json/websocket/companies') + self.output_dir.mkdir(parents=True, exist_ok=True) + + self.login_payload = { + "event": "login", + "data": {"apiKey": self.api_key} + } + + self.subscribe_payload = { + "event": "subscribe", + "data": {"ticker": ["*"]} + } + + async def _safe_write(self, file_path: Path, data: Dict[str, Any]) -> None: + """Safely write data to file with error handling.""" + try: + with open(file_path, 'wb') as f: + f.write(orjson.dumps(data)) + except IOError as e: + logger.error(f"File write error for {file_path}: {e}") + + async def _process_message(self, message: str) -> None: + """Process and store individual WebSocket messages.""" + try: + data = orjson.loads(message) + + if 's' in data: + symbol = data['s'].upper() + safe_symbol = ''.join(c for c in symbol if c.isalnum() or c in ['-', '_']) + file_path = self.output_dir / f"{safe_symbol}.json" + + await self._safe_write(file_path, data) + logger.info(f"Processed data for {safe_symbol}") + + except orjson.JSONDecodeError: + logger.warning(f"Invalid JSON received: {message}") + except Exception as e: + logger.error(f"Error processing message: {e}") + + async def connect(self) -> None: + """Establish WebSocket connection with auto-reconnect.""" + while True: + try: + async with websockets.connect(self.uri, ping_interval=30) as websocket: + # Login and subscribe + await websocket.send(orjson.dumps(self.login_payload)) + await asyncio.sleep(2) + await websocket.send(orjson.dumps(self.subscribe_payload)) + + # Handle incoming messages + async for message in websocket: + await self._process_message(message) + + except (websockets.exceptions.ConnectionClosedError, + websockets.exceptions.WebSocketException) as e: + logger.warning(f"WebSocket error: {e}. Reconnecting in 5 seconds...") + await asyncio.sleep(5) + except Exception as e: + logger.error(f"Unexpected error: {e}. Reconnecting in 5 seconds...") + await asyncio.sleep(5) + +async def main(): + load_dotenv() + api_key = os.getenv('FMP_API_KEY') + + if not api_key: + logger.error("API Key not found. Please set FMP_API_KEY in .env file.") + return + + ticker = WebSocketStockTicker(api_key) + await ticker.connect() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/fastify/app.js b/fastify/app.js index 39b9e53..05c0066 100755 --- a/fastify/app.js +++ b/fastify/app.js @@ -72,7 +72,6 @@ fastify.register(require("./create-post-image/server"), { sharp }); fastify.register(require("./delete-comment/server"), { pb }); fastify.register(require("./delete-post/server"), { pb }); fastify.register(require("./leaderboard/server"), { pb }); -fastify.register(require("./feedback/server"), { pb }); fastify.register(require("./edit-name-watchlist/server"), { pb }); fastify.register(require("./create-strategy/server"), { pb }); fastify.register(require("./delete-strategy/server"), { pb }); @@ -127,111 +126,112 @@ function formatTimestampNewYork(timestamp) { } fastify.register(async function (fastify) { - fastify.get("/realtime-data", { websocket: true }, (connection, req) => { - // Send a welcome message to the client + fastify.get( + "/realtime-data", + { websocket: true }, + (connection, req) => { - //connection.socket.send('hi from server'); + let jsonData; + let sendInterval; + let symbol; + let isSend = false; - // Listen for incoming messages from the client - connection.socket.on("message", (message) => { - symbol = message.toString("utf-8"); - console.log("Received message from client:", symbol); + + // Function to send data to the client + const sendData = async () => { + if (!symbol) return; // Check if symbol is defined + const filePath = path.join(__dirname, `../app/json/websocket/companies/${symbol}.json`); - // If you want to dynamically update the subscription based on client's message - updateSubscription(); - }); - - //====================== - const login = { - event: "login", - data: { - apiKey: fmpAPIKey, - }, - }; - - const subscribe = { - event: "subscribe", - data: { - ticker: "", // Initial value; will be updated dynamically - }, - }; - - function updateSubscription() { - subscribe.data.ticker = symbol; - } - - // Create a new WebSocket instance for your backend - const ws = new WebSocket("wss://websockets.financialmodelingprep.com"); - - // Handle WebSocket connection open - - ws.on("open", function open() { - ws.send(JSON.stringify(login)); - wait(2000); //2 seconds in milliseconds - ws.send(JSON.stringify(subscribe)); - }); - - // Handle WebSocket errors - ws.on("error", function (error) { - console.error("WebSocket error:", error); - // Handle the error gracefully, you might want to notify the client or log it. - // For now, let's close the connection if an error occurs - connection.socket.close(); - }); - - ws.on("message", function (data, flags) { - const stringData = data.toString("utf-8"); - - try { - const jsonData = JSON.parse(stringData); - - // Check if bpData is a number, not equal to zero, and jsonData properties are not null/undefined - if ( - //jsonData?.bp != null && - //jsonData?.ap != null && - jsonData?.lp != null && - jsonData?.t != null && - ["Q", "T"]?.includes(jsonData?.type) && - connection.socket.readyState === WebSocket.OPEN && - !isSend - ) { - connection.socket.send( - JSON.stringify({ - bp: jsonData.bp, - ap: jsonData.ap, - lp: jsonData.lp?.toFixed(2), - type: jsonData.type, - time: formatTimestampNewYork(jsonData?.t), - }) - ); - isSend = true; - setTimeout(() => { - isSend = false; - }, 500); - } - } catch (error) { - console.error("Error parsing JSON:", error); - } - }); - - //====================== - - // Handle client disconnect - connection.socket.on("close", () => { - console.log("Client disconnected"); - connection?.socket?.close(); - // Check if the WebSocket is open before trying to close it - if (ws.readyState === WebSocket.OPEN) { try { - ws.close(); - } catch (e) { - console.error("Error while closing WebSocket:", e); + if (fs.existsSync(filePath)) { + const fileData = fs.readFileSync(filePath, "utf8"); + jsonData = JSON.parse(fileData); + + // Logic to send data if certain conditions are met + if ( + jsonData?.lp != null && + jsonData?.t != null && + ["Q", "T"].includes(jsonData?.type) && + connection.socket.readyState === WebSocket.OPEN && + !isSend + ) { + connection.socket.send( + JSON.stringify({ + bp: jsonData?.bp, + ap: jsonData?.ap, + lp: jsonData?.lp?.toFixed(2), + type: jsonData?.type, + time: formatTimestampNewYork(jsonData?.t), + }) + ); + isSend = true; + setTimeout(() => { + isSend = false; + }, 500); // Reset isSend after 500ms + } + } else { + console.error("File not found:", filePath); + clearInterval(sendInterval); + connection.socket.close(); + console.error("Connection closed"); + } + } catch (err) { + console.error("Error sending data to client:", err); + clearInterval(sendInterval); + connection.socket.close(); } - } - }); - }); + }; + + // Start receiving messages from the client + connection.socket.on("message", (message) => { + symbol = message.toString("utf-8")?.toUpperCase(); + console.log("Received message from client:", symbol); + + // Send data immediately upon receiving a symbol + sendData(); + + // Start periodic data sending if not already started + if (!sendInterval) { + sendInterval = setInterval(sendData, 5000); + } + }); + + // Handle client disconnect + connection.socket.on("close", () => { + console.log("Client disconnected"); + clearInterval(sendInterval); + removeProcessListeners(); + }); + + // Handle server crash cleanup + const closeHandler = () => { + console.log("Server is closing. Cleaning up resources..."); + clearInterval(sendInterval); + connection.socket.close(); + removeProcessListeners(); + }; + + // Add close handler to process events + process.on("exit", closeHandler); + process.on("SIGINT", closeHandler); + process.on("SIGTERM", closeHandler); + process.on("uncaughtException", closeHandler); + process.on("unhandledRejection", closeHandler); + + // Function to remove process listeners to avoid memory leaks + const removeProcessListeners = () => { + process.off("exit", closeHandler); + process.off("SIGINT", closeHandler); + process.off("SIGTERM", closeHandler); + process.off("uncaughtException", closeHandler); + process.off("unhandledRejection", closeHandler); + }; + } + ); }); + + fastify.register(async function (fastify) { fastify.get( "/realtime-crypto-data", diff --git a/fastify/feedback/server.js b/fastify/feedback/server.js deleted file mode 100755 index 7668bab..0000000 --- a/fastify/feedback/server.js +++ /dev/null @@ -1,24 +0,0 @@ -// Declare a route -module.exports = function (fastify, opts, done) { - - const pb = opts.pb; - - fastify.post('/feedback', async (request, reply) => { - const data = request.body; - - try { - await pb.collection("feedback").create(data) - output = 'success'; - } - catch(e) { - //console.log(e) - output = 'failure'; - } - - - reply.send({ items: output }) - - }); - - done(); -}; \ No newline at end of file