From 48d9a0a5081e0b498606c17b97dd58d4cb6262ca Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Mon, 16 Sep 2024 21:46:58 +0200 Subject: [PATCH] bugfixing websocket error --- fastify/app.js | 274 ++++++++++++++++++++++++------------------------- 1 file changed, 135 insertions(+), 139 deletions(-) diff --git a/fastify/app.js b/fastify/app.js index eec50cb..d207142 100755 --- a/fastify/app.js +++ b/fastify/app.js @@ -123,20 +123,10 @@ let sendInterval; fastify.register(async function (fastify) { fastify.get("/realtime-data", { websocket: true }, (connection, req) => { - // Send a welcome message to the client + let symbol = ""; + let isConnectionActive = true; + let ws; - //connection.socket.send('hi from server'); - - // Listen for incoming messages from the client - connection.socket.on("message", (message) => { - symbol = message.toString("utf-8"); - console.log("Received message from client:", symbol); - - // If you want to dynamically update the subscription based on client's message - updateSubscription(); - }); - - //====================== const login = { event: "login", data: { @@ -147,69 +137,72 @@ fastify.register(async function (fastify) { const subscribe = { event: "subscribe", data: { - ticker: "", // Initial value; will be updated dynamically + ticker: "", }, }; function updateSubscription() { subscribe.data.ticker = symbol; + if (ws && ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify(subscribe)); + } } - // Create a new WebSocket instance for your backend - const ws = new WebSocket("wss://websockets.financialmodelingprep.com"); + function connectToFMP() { + ws = new WebSocket("wss://websockets.financialmodelingprep.com"); - // Handle WebSocket connection open + ws.on("open", function open() { + console.log("Connected to FMP WebSocket"); + ws.send(JSON.stringify(login)); + setTimeout(() => updateSubscription(), 2000); + }); - ws.on("open", function open() { - ws.send(JSON.stringify(login)); - wait(2000); //2 seconds in milliseconds - ws.send(JSON.stringify(subscribe)); - }); + ws.on("error", function (error) { + console.error("FMP WebSocket error:", error); + }); - // Handle WebSocket errors - ws.on("error", function (error) { - console.error("WebSocket error:", error); - // Handle the error gracefully, you might want to notify the client or log it. - // For now, let's close the connection if an error occurs - connection.socket.close(); - }); + ws.on("close", function () { + console.log("FMP WebSocket closed. Attempting to reconnect..."); + setTimeout(connectToFMP, 5000); + }); - ws.on("message", function (data, flags) { - const stringData = data.toString("utf-8"); + ws.on("message", function (data) { + if (!isConnectionActive) return; - try { - const jsonData = JSON.parse(stringData); - const bpData = jsonData.bp; + const stringData = data.toString("utf-8"); + try { + const jsonData = JSON.parse(stringData); + const bpData = jsonData.bp; - // Check if bpData is a number and not equal to zero - if (typeof bpData === "number" && bpData !== 0) { - if (connection.socket.readyState === WebSocket.OPEN && !isSend) { - connection.socket.send(JSON.stringify({ bp: bpData })); - isSend = true; - setTimeout(() => { - isSend = false; - }, 800); + if (typeof bpData === "number" && bpData !== 0) { + if (connection.socket.readyState === WebSocket.OPEN) { + connection.socket.send(JSON.stringify({ bp: bpData })); + } } + } catch (error) { + console.error("Error parsing JSON:", error); } - } catch (error) { - console.error("Error parsing JSON:", error); - } + }); + } + + connectToFMP(); + + connection.socket.on("message", (message) => { + symbol = message.toString("utf-8"); + console.log("Received message from client:", symbol); + updateSubscription(); }); - //====================== - - // Handle client disconnect connection.socket.on("close", () => { console.log("Client disconnected"); - connection?.socket?.close(); - // Check if the WebSocket is open before trying to close it - if (ws.readyState === WebSocket.OPEN) { - try { - ws.close(); - } catch (e) { - console.error("Error while closing WebSocket:", e); - } - } + isConnectionActive = false; + if (ws) ws.close(); + }); + + connection.socket.on("error", (error) => { + console.error("WebSocket error:", error); + isConnectionActive = false; + if (ws) ws.close(); }); }); }); @@ -219,18 +212,10 @@ fastify.register(async function (fastify) { "/realtime-crypto-data", { websocket: true }, (connection, req) => { - // Send a welcome message to the client + let symbol = ""; + let isConnectionActive = true; + let ws; - // Listen for incoming messages from the client - connection.socket.on("message", (message) => { - symbol = message.toString("utf-8"); - console.log("Received message from client:", symbol); - - // If you want to dynamically update the subscription based on client's message - updateSubscription(); - }); - - //====================== const login = { event: "login", data: { @@ -241,64 +226,65 @@ fastify.register(async function (fastify) { const subscribe = { event: "subscribe", data: { - ticker: "", // Initial value; will be updated dynamically + ticker: "", }, }; function updateSubscription() { subscribe.data.ticker = symbol; + if (ws && ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify(subscribe)); + } } - // Create a new WebSocket instance for your backend - const ws = new WebSocket("wss://crypto.financialmodelingprep.com"); + function connectToCryptoFMP() { + ws = new WebSocket("wss://crypto.financialmodelingprep.com"); - // Handle WebSocket connection open + ws.on("open", function open() { + console.log("Connected to Crypto FMP WebSocket"); + ws.send(JSON.stringify(login)); + setTimeout(() => updateSubscription(), 2000); + }); - ws.on("open", function open() { - ws.send(JSON.stringify(login)); - wait(2000); //2 seconds in milliseconds - ws.send(JSON.stringify(subscribe)); + ws.on("error", function (error) { + console.error("Crypto FMP WebSocket error:", error); + }); + + ws.on("close", function () { + console.log( + "Crypto FMP WebSocket closed. Attempting to reconnect..." + ); + setTimeout(connectToCryptoFMP, 5000); + }); + + ws.on("message", function (data) { + if (!isConnectionActive) return; + + const stringData = data.toString("utf-8"); + if (connection.socket.readyState === WebSocket.OPEN) { + connection.socket.send(stringData); + } + }); + } + + connectToCryptoFMP(); + + connection.socket.on("message", (message) => { + symbol = message.toString("utf-8"); + console.log("Received message from client:", symbol); + updateSubscription(); }); - // Handle WebSocket errors - ws.on("error", function (error) { - console.error("WebSocket error:", error); - // Handle the error gracefully, you might want to notify the client or log it. - // For now, let's close the connection if an error occurs - connection.socket.close(); - }); - - // Handle WebSocket messages - ws.on("message", function (data, flags) { - const stringData = data.toString("utf-8"); - - if (connection.socket.readyState === WebSocket.OPEN && !isSend) { - connection.socket.send(stringData); - //console.log(stringData) - isSend = true; - setTimeout(() => { - isSend = false; - }, 800); - - //wait(2000); - } - //wait(2000); - }); - - //====================== - - // Handle client disconnect connection.socket.on("close", () => { console.log("Client disconnected"); - connection?.socket?.close(); - // Check if the WebSocket is open before trying to close it - if (ws.readyState === WebSocket.OPEN) { - try { - ws.close(); - } catch (e) { - console.error("Error while closing WebSocket:", e); - } - } + isConnectionActive = false; + if (ws) ws.close(); + }); + + connection.socket.on("error", (error) => { + console.error("WebSocket error:", error); + isConnectionActive = false; + if (ws) ws.close(); }); } ); @@ -312,9 +298,12 @@ fastify.register(async function (fastify) { let jsonData; let sendInterval; let clientIds = []; + let isConnectionActive = true; // Function to send filtered data to the client const sendData = async () => { + if (!isConnectionActive) return; + const filePath = path.join( __dirname, "../app/json/options-flow/feed/data.json" @@ -324,34 +313,28 @@ fastify.register(async function (fastify) { const fileData = fs.readFileSync(filePath, "utf8"); jsonData = JSON.parse(fileData); - // Do nothing if clientIds is empty if (clientIds.length === 0) { console.log("Client IDs list is empty, doing nothing."); - return; // Exit function if clientIds is empty + return; } - // Filter out elements whose ids are not in clientIds const filteredData = jsonData.filter( (item) => !clientIds.includes(item.id) ); - // Send the filtered data back to the client - connection.socket.send(JSON.stringify(filteredData)); + if (connection.socket.readyState === WebSocket.OPEN) { + connection.socket.send(JSON.stringify(filteredData)); + } } else { console.error("File not found:", filePath); clearInterval(sendInterval); - connection?.socket?.close(); - console.error("Connection closed"); - throw new Error("This is an intentional uncaught exception!"); } } catch (err) { console.error("Error sending data to client:", err); + // Don't close the connection here, just log the error } }; - // Send data to the client initially - sendData(); - // Start sending data periodically sendInterval = setInterval(sendData, 5000); @@ -360,10 +343,7 @@ fastify.register(async function (fastify) { try { const parsedMessage = JSON.parse(message); if (parsedMessage?.ids) { - //console.log("Received ids from client:", parsedMessage.ids); - clientIds = parsedMessage.ids; // Update the ids list from the client - } else { - //console.log("No ids received in the message"); + clientIds = parsedMessage.ids; } } catch (error) { console.error("Error parsing incoming message:", error); @@ -373,26 +353,42 @@ fastify.register(async function (fastify) { // Handle client disconnect connection.socket.on("close", () => { console.log("Client disconnected"); + isConnectionActive = false; clearInterval(sendInterval); }); - // Handle server crash cleanup - const closeHandler = () => { - console.log("Server is closing. Cleaning up resources..."); + // Handle WebSocket errors + connection.socket.on("error", (error) => { + console.error("WebSocket error:", error); + isConnectionActive = false; clearInterval(sendInterval); - connection.socket.close(); - }; + // Don't close the connection here, let the client handle reconnection + }); - // Add close handler to process event - process.on("exit", closeHandler); - process.on("SIGINT", closeHandler); - process.on("SIGTERM", closeHandler); - process.on("uncaughtException", closeHandler); - process.on("unhandledRejection", closeHandler); + // Send initial data + sendData(); } ); }); +fastify.setErrorHandler((error, request, reply) => { + console.error("Server error:", error); + reply.status(500).send({ error: "Internal Server Error" }); +}); + +// Graceful shutdown +process.on("SIGINT", async () => { + console.log("Received SIGINT. Shutting down gracefully..."); + await fastify.close(); + process.exit(0); +}); + +process.on("SIGTERM", async () => { + console.log("Received SIGTERM. Shutting down gracefully..."); + await fastify.close(); + process.exit(0); +}); + // Function to start the server function startServer() { if (!serverRunning) {