diff --git a/fastify/app.js b/fastify/app.js index e09eec2..ea97d53 100755 --- a/fastify/app.js +++ b/fastify/app.js @@ -86,122 +86,6 @@ function formatTimestampNewYork(timestamp) { .replace(",", ""); } -fastify.register(async function (fastify) { - fastify.get( - "/realtime-data", - { websocket: true }, - (connection, req) => { - - let jsonData; - let sendInterval; - let symbol; - let isSend = false; - - - // Function to send data to the client - const sendData = async () => { - if (!symbol) return; // Check if symbol is defined - const filePath = path.join(__dirname, `../app/json/websocket/companies/${symbol}.json`); - - try { - if (fs.existsSync(filePath)) { - const fileData = fs.readFileSync(filePath, "utf8"); - jsonData = JSON.parse(fileData); - - // Logic to send data if certain conditions are met - if ( - jsonData?.lp != null && - jsonData?.t != null && - ["Q", "T"].includes(jsonData?.type) && - connection.socket.readyState === WebSocket.OPEN && - !isSend - ) { - // Calculate the average price - const avgPrice = - (parseFloat(jsonData.ap) + - parseFloat(jsonData.bp) + - parseFloat(jsonData.lp)) / - 3; - - connection.socket.send( - JSON.stringify({ - bp: jsonData?.bp, - ap: jsonData?.ap, - lp: jsonData?.lp?.toFixed(2), - avgPrice: avgPrice?.toFixed(2), // Add the computed average price - type: jsonData?.type, - time: formatTimestampNewYork(jsonData?.t), - }) - ); - - isSend = true; - setTimeout(() => { - isSend = false; - }, 500); // Reset isSend after 500ms - } - } else { - console.error("File not found:", filePath); - clearInterval(sendInterval); - connection.socket.close(); - console.error("Connection closed"); - } - } catch (err) { - console.error("Error sending data to client:", err); - clearInterval(sendInterval); - connection.socket.close(); - } -}; - - - // Start receiving messages from the client - connection.socket.on("message", (message) => { - symbol = message.toString("utf-8")?.toUpperCase(); - console.log("Received message from client:", symbol); - - // Send data immediately upon receiving a symbol - sendData(); - - // Start periodic data sending if not already started - if (!sendInterval) { - sendInterval = setInterval(sendData, 1000); - } - }); - - // Handle client disconnect - connection.socket.on("close", () => { - console.log("Client disconnected"); - clearInterval(sendInterval); - removeProcessListeners(); - }); - - // Handle server crash cleanup - const closeHandler = () => { - console.log("Server is closing. Cleaning up resources..."); - clearInterval(sendInterval); - connection.socket.close(); - removeProcessListeners(); - }; - - // Add close handler to process events - process.on("exit", closeHandler); - process.on("SIGINT", closeHandler); - process.on("SIGTERM", closeHandler); - process.on("uncaughtException", closeHandler); - process.on("unhandledRejection", closeHandler); - - // Function to remove process listeners to avoid memory leaks - const removeProcessListeners = () => { - process.off("exit", closeHandler); - process.off("SIGINT", closeHandler); - process.off("SIGTERM", closeHandler); - process.off("uncaughtException", closeHandler); - process.off("unhandledRejection", closeHandler); - }; - } - ); -}); - - fastify.register(async function (fastify) { fastify.get( @@ -357,7 +241,7 @@ fastify.register(async function (fastify) { fastify.register(async function (fastify) { fastify.get( - "/multiple-realtime-data", + "/price-data", { websocket: true }, (connection, req) => { let tickers = []; @@ -374,11 +258,23 @@ fastify.register(async function (fastify) { __dirname, `../app/json/websocket/companies/${symbol}.json` ); - - try { +try { if (fs?.existsSync(filePath)) { const fileData = fs?.readFileSync(filePath, "utf8"); - const jsonData = JSON?.parse(fileData); + + if (!fileData) { + console.error(`File is empty for ticker: ${symbol}`); + continue; + } + + let jsonData; + try { + jsonData = JSON?.parse(fileData); + } catch (parseError) { + console.error(`Invalid JSON format for ticker: ${symbol}`, parseError); + console.error(`File content: ${fileData}`); + continue; + } // Only send data if conditions are met and data has changed if ( @@ -389,33 +285,35 @@ fastify.register(async function (fastify) { ["Q", "T"].includes(jsonData?.type) && connection.socket.readyState === WebSocket.OPEN ) { - // Calculate the average price - const avgPrice = - ((parseFloat(jsonData.ap) + - parseFloat(jsonData.bp) + - parseFloat(jsonData.lp)) / - 3); + const avgPrice = ( + parseFloat(jsonData?.ap) + + parseFloat(jsonData?.bp) + + parseFloat(jsonData?.lp) + ) / 3; - // Check if the current data is different from the last sent data - const currentDataSignature = `${jsonData?.lp}`; + const finalPrice = Math.abs(avgPrice - jsonData?.bp) / jsonData?.bp > 0.05 + ? jsonData.bp + : avgPrice; + + const currentDataSignature = `${jsonData?.bp}`; const lastSentSignature = lastSentData[symbol]; if (currentDataSignature !== lastSentSignature) { - // Collect data to send dataToSend?.push({ - symbol, // Include the ticker symbol in the sent data + symbol, ap: jsonData?.ap, bp: jsonData?.bp, lp: jsonData?.lp, - avgPrice: avgPrice, // Add the computed average price + avgPrice: finalPrice, + type: jsonData?.type, + time: formatTimestampNewYork(jsonData?.t), }); - // Update the last sent data for this ticker lastSentData[symbol] = currentDataSignature; } } } else { - //console.error("File not found for ticker:", symbol); + console.error("File not found for ticker:", symbol); } } catch (err) { console.error("Error processing data for ticker:", symbol, err); @@ -485,6 +383,8 @@ fastify.register(async function (fastify) { ); }); + + // Function to start the server function startServer() { if (!serverRunning) {