diff --git a/fastify/app.js b/fastify/app.js index 5ffd9e2..a2ed1e6 100755 --- a/fastify/app.js +++ b/fastify/app.js @@ -343,6 +343,125 @@ fastify.register(async function (fastify) { ); }); + +fastify.register(async function (fastify) { + fastify.get( + "/multiple-realtime-data", + { websocket: true }, + (connection, req) => { + let tickers = []; + let sendInterval; + // Mapping for each ticker's `isSend` status to avoid duplicate sends + const tickerStatus = {}; + + // Function to send data for all tickers as a list + const sendData = async () => { + const dataToSend = []; + + // Iterate over tickers and collect data + for (const symbol of tickers) { + const filePath = path.join( + __dirname, + `../app/json/websocket/companies/${symbol}.json` + ); + + try { + if (fs.existsSync(filePath)) { + const fileData = fs.readFileSync(filePath, "utf8"); + const jsonData = JSON.parse(fileData); + + // Only send data if conditions are met + if ( + jsonData?.ap != null && + jsonData?.t != null && + ["Q", "T"].includes(jsonData?.type) && + connection.socket.readyState === WebSocket.OPEN && + !tickerStatus[symbol] + ) { + // Collect data to send later + dataToSend.push({ + symbol, // Include the ticker symbol in the sent data + ap: jsonData?.ap, + }); + + // Set ticker as "sent" and reset after 500ms + tickerStatus[symbol] = true; + setTimeout(() => { + tickerStatus[symbol] = false; + }, 500); + } + } else { + console.error("File not found for ticker:", symbol); + } + } catch (err) { + console.error("Error sending data for ticker:", symbol, err); + } + } + + // Send all collected data as a single message + if (dataToSend.length > 0 && connection.socket.readyState === WebSocket.OPEN) { + connection.socket.send(JSON.stringify(dataToSend)); + } + }; + + // Start receiving messages from the client + connection.socket.on("message", (message) => { + try { + // Parse message as JSON to get tickers array + tickers = JSON.parse(message.toString("utf-8")); + console.log("Received tickers from client:", tickers); + + // Initialize ticker status for each symbol + tickers.forEach((ticker) => { + tickerStatus[ticker] = false; + }); + + // Start periodic data sending if not already started + if (!sendInterval) { + sendInterval = setInterval(sendData, 5000); + } + } catch (err) { + console.error("Failed to parse tickers from client message:", err); + } + }); + + // Handle client disconnect + connection.socket.on("close", () => { + console.log("Client disconnected"); + clearInterval(sendInterval); + removeProcessListeners(); + }); + + // Handle server crash cleanup + const closeHandler = () => { + console.log("Server is closing. Cleaning up resources..."); + clearInterval(sendInterval); + connection.socket.close(); + removeProcessListeners(); + }; + + // Add close handler to process events + process.on("exit", closeHandler); + process.on("SIGINT", closeHandler); + process.on("SIGTERM", closeHandler); + process.on("uncaughtException", closeHandler); + process.on("unhandledRejection", closeHandler); + + // Function to remove process listeners to avoid memory leaks + const removeProcessListeners = () => { + process.off("exit", closeHandler); + process.off("SIGINT", closeHandler); + process.off("SIGTERM", closeHandler); + process.off("uncaughtException", closeHandler); + process.off("unhandledRejection", closeHandler); + }; + } + ); +}); + + + + // Function to start the server function startServer() { if (!serverRunning) {