bugfixing websocket

This commit is contained in:
MuslemRahimi 2024-12-03 17:20:28 +01:00
parent e4681370ad
commit d5cf64199d

View File

@ -86,122 +86,6 @@ function formatTimestampNewYork(timestamp) {
.replace(",", "");
}
fastify.register(async function (fastify) {
fastify.get(
"/realtime-data",
{ websocket: true },
(connection, req) => {
let jsonData;
let sendInterval;
let symbol;
let isSend = false;
// Function to send data to the client
const sendData = async () => {
if (!symbol) return; // Check if symbol is defined
const filePath = path.join(__dirname, `../app/json/websocket/companies/${symbol}.json`);
try {
if (fs.existsSync(filePath)) {
const fileData = fs.readFileSync(filePath, "utf8");
jsonData = JSON.parse(fileData);
// Logic to send data if certain conditions are met
if (
jsonData?.lp != null &&
jsonData?.t != null &&
["Q", "T"].includes(jsonData?.type) &&
connection.socket.readyState === WebSocket.OPEN &&
!isSend
) {
// Calculate the average price
const avgPrice =
(parseFloat(jsonData.ap) +
parseFloat(jsonData.bp) +
parseFloat(jsonData.lp)) /
3;
connection.socket.send(
JSON.stringify({
bp: jsonData?.bp,
ap: jsonData?.ap,
lp: jsonData?.lp?.toFixed(2),
avgPrice: avgPrice?.toFixed(2), // Add the computed average price
type: jsonData?.type,
time: formatTimestampNewYork(jsonData?.t),
})
);
isSend = true;
setTimeout(() => {
isSend = false;
}, 500); // Reset isSend after 500ms
}
} else {
console.error("File not found:", filePath);
clearInterval(sendInterval);
connection.socket.close();
console.error("Connection closed");
}
} catch (err) {
console.error("Error sending data to client:", err);
clearInterval(sendInterval);
connection.socket.close();
}
};
// Start receiving messages from the client
connection.socket.on("message", (message) => {
symbol = message.toString("utf-8")?.toUpperCase();
console.log("Received message from client:", symbol);
// Send data immediately upon receiving a symbol
sendData();
// Start periodic data sending if not already started
if (!sendInterval) {
sendInterval = setInterval(sendData, 1000);
}
});
// Handle client disconnect
connection.socket.on("close", () => {
console.log("Client disconnected");
clearInterval(sendInterval);
removeProcessListeners();
});
// Handle server crash cleanup
const closeHandler = () => {
console.log("Server is closing. Cleaning up resources...");
clearInterval(sendInterval);
connection.socket.close();
removeProcessListeners();
};
// Add close handler to process events
process.on("exit", closeHandler);
process.on("SIGINT", closeHandler);
process.on("SIGTERM", closeHandler);
process.on("uncaughtException", closeHandler);
process.on("unhandledRejection", closeHandler);
// Function to remove process listeners to avoid memory leaks
const removeProcessListeners = () => {
process.off("exit", closeHandler);
process.off("SIGINT", closeHandler);
process.off("SIGTERM", closeHandler);
process.off("uncaughtException", closeHandler);
process.off("unhandledRejection", closeHandler);
};
}
);
});
fastify.register(async function (fastify) {
fastify.get(
@ -357,7 +241,7 @@ fastify.register(async function (fastify) {
fastify.register(async function (fastify) {
fastify.get(
"/multiple-realtime-data",
"/price-data",
{ websocket: true },
(connection, req) => {
let tickers = [];
@ -374,11 +258,23 @@ fastify.register(async function (fastify) {
__dirname,
`../app/json/websocket/companies/${symbol}.json`
);
try {
if (fs?.existsSync(filePath)) {
const fileData = fs?.readFileSync(filePath, "utf8");
const jsonData = JSON?.parse(fileData);
if (!fileData) {
console.error(`File is empty for ticker: ${symbol}`);
continue;
}
let jsonData;
try {
jsonData = JSON?.parse(fileData);
} catch (parseError) {
console.error(`Invalid JSON format for ticker: ${symbol}`, parseError);
console.error(`File content: ${fileData}`);
continue;
}
// Only send data if conditions are met and data has changed
if (
@ -389,33 +285,35 @@ fastify.register(async function (fastify) {
["Q", "T"].includes(jsonData?.type) &&
connection.socket.readyState === WebSocket.OPEN
) {
// Calculate the average price
const avgPrice =
((parseFloat(jsonData.ap) +
parseFloat(jsonData.bp) +
parseFloat(jsonData.lp)) /
3);
const avgPrice = (
parseFloat(jsonData?.ap) +
parseFloat(jsonData?.bp) +
parseFloat(jsonData?.lp)
) / 3;
// Check if the current data is different from the last sent data
const currentDataSignature = `${jsonData?.lp}`;
const finalPrice = Math.abs(avgPrice - jsonData?.bp) / jsonData?.bp > 0.05
? jsonData.bp
: avgPrice;
const currentDataSignature = `${jsonData?.bp}`;
const lastSentSignature = lastSentData[symbol];
if (currentDataSignature !== lastSentSignature) {
// Collect data to send
dataToSend?.push({
symbol, // Include the ticker symbol in the sent data
symbol,
ap: jsonData?.ap,
bp: jsonData?.bp,
lp: jsonData?.lp,
avgPrice: avgPrice, // Add the computed average price
avgPrice: finalPrice,
type: jsonData?.type,
time: formatTimestampNewYork(jsonData?.t),
});
// Update the last sent data for this ticker
lastSentData[symbol] = currentDataSignature;
}
}
} else {
//console.error("File not found for ticker:", symbol);
console.error("File not found for ticker:", symbol);
}
} catch (err) {
console.error("Error processing data for ticker:", symbol, err);
@ -485,6 +383,8 @@ fastify.register(async function (fastify) {
);
});
// Function to start the server
function startServer() {
if (!serverRunning) {