add multiple realtime data websocket

This commit is contained in:
MuslemRahimi 2024-11-25 22:20:41 +01:00
parent 42f9db45d3
commit 33de38905e

View File

@ -343,6 +343,125 @@ fastify.register(async function (fastify) {
); );
}); });
fastify.register(async function (fastify) {
fastify.get(
"/multiple-realtime-data",
{ websocket: true },
(connection, req) => {
let tickers = [];
let sendInterval;
// Mapping for each ticker's `isSend` status to avoid duplicate sends
const tickerStatus = {};
// Function to send data for all tickers as a list
const sendData = async () => {
const dataToSend = [];
// Iterate over tickers and collect data
for (const symbol of tickers) {
const filePath = path.join(
__dirname,
`../app/json/websocket/companies/${symbol}.json`
);
try {
if (fs.existsSync(filePath)) {
const fileData = fs.readFileSync(filePath, "utf8");
const jsonData = JSON.parse(fileData);
// Only send data if conditions are met
if (
jsonData?.ap != null &&
jsonData?.t != null &&
["Q", "T"].includes(jsonData?.type) &&
connection.socket.readyState === WebSocket.OPEN &&
!tickerStatus[symbol]
) {
// Collect data to send later
dataToSend.push({
symbol, // Include the ticker symbol in the sent data
ap: jsonData?.ap,
});
// Set ticker as "sent" and reset after 500ms
tickerStatus[symbol] = true;
setTimeout(() => {
tickerStatus[symbol] = false;
}, 500);
}
} else {
console.error("File not found for ticker:", symbol);
}
} catch (err) {
console.error("Error sending data for ticker:", symbol, err);
}
}
// Send all collected data as a single message
if (dataToSend.length > 0 && connection.socket.readyState === WebSocket.OPEN) {
connection.socket.send(JSON.stringify(dataToSend));
}
};
// Start receiving messages from the client
connection.socket.on("message", (message) => {
try {
// Parse message as JSON to get tickers array
tickers = JSON.parse(message.toString("utf-8"));
console.log("Received tickers from client:", tickers);
// Initialize ticker status for each symbol
tickers.forEach((ticker) => {
tickerStatus[ticker] = false;
});
// Start periodic data sending if not already started
if (!sendInterval) {
sendInterval = setInterval(sendData, 5000);
}
} catch (err) {
console.error("Failed to parse tickers from client message:", err);
}
});
// Handle client disconnect
connection.socket.on("close", () => {
console.log("Client disconnected");
clearInterval(sendInterval);
removeProcessListeners();
});
// Handle server crash cleanup
const closeHandler = () => {
console.log("Server is closing. Cleaning up resources...");
clearInterval(sendInterval);
connection.socket.close();
removeProcessListeners();
};
// Add close handler to process events
process.on("exit", closeHandler);
process.on("SIGINT", closeHandler);
process.on("SIGTERM", closeHandler);
process.on("uncaughtException", closeHandler);
process.on("unhandledRejection", closeHandler);
// Function to remove process listeners to avoid memory leaks
const removeProcessListeners = () => {
process.off("exit", closeHandler);
process.off("SIGINT", closeHandler);
process.off("SIGTERM", closeHandler);
process.off("uncaughtException", closeHandler);
process.off("unhandledRejection", closeHandler);
};
}
);
});
// Function to start the server // Function to start the server
function startServer() { function startServer() {
if (!serverRunning) { if (!serverRunning) {