bugfixing websocket error

This commit is contained in:
MuslemRahimi 2024-09-16 21:46:58 +02:00
parent 1479cc127c
commit 48d9a0a508

View File

@ -123,20 +123,10 @@ let sendInterval;
fastify.register(async function (fastify) { fastify.register(async function (fastify) {
fastify.get("/realtime-data", { websocket: true }, (connection, req) => { fastify.get("/realtime-data", { websocket: true }, (connection, req) => {
// Send a welcome message to the client let symbol = "";
let isConnectionActive = true;
let ws;
//connection.socket.send('hi from server');
// Listen for incoming messages from the client
connection.socket.on("message", (message) => {
symbol = message.toString("utf-8");
console.log("Received message from client:", symbol);
// If you want to dynamically update the subscription based on client's message
updateSubscription();
});
//======================
const login = { const login = {
event: "login", event: "login",
data: { data: {
@ -147,69 +137,72 @@ fastify.register(async function (fastify) {
const subscribe = { const subscribe = {
event: "subscribe", event: "subscribe",
data: { data: {
ticker: "", // Initial value; will be updated dynamically ticker: "",
}, },
}; };
function updateSubscription() { function updateSubscription() {
subscribe.data.ticker = symbol; subscribe.data.ticker = symbol;
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(subscribe));
}
} }
// Create a new WebSocket instance for your backend function connectToFMP() {
const ws = new WebSocket("wss://websockets.financialmodelingprep.com"); ws = new WebSocket("wss://websockets.financialmodelingprep.com");
// Handle WebSocket connection open ws.on("open", function open() {
console.log("Connected to FMP WebSocket");
ws.send(JSON.stringify(login));
setTimeout(() => updateSubscription(), 2000);
});
ws.on("open", function open() { ws.on("error", function (error) {
ws.send(JSON.stringify(login)); console.error("FMP WebSocket error:", error);
wait(2000); //2 seconds in milliseconds });
ws.send(JSON.stringify(subscribe));
});
// Handle WebSocket errors ws.on("close", function () {
ws.on("error", function (error) { console.log("FMP WebSocket closed. Attempting to reconnect...");
console.error("WebSocket error:", error); setTimeout(connectToFMP, 5000);
// Handle the error gracefully, you might want to notify the client or log it. });
// For now, let's close the connection if an error occurs
connection.socket.close();
});
ws.on("message", function (data, flags) { ws.on("message", function (data) {
const stringData = data.toString("utf-8"); if (!isConnectionActive) return;
try { const stringData = data.toString("utf-8");
const jsonData = JSON.parse(stringData); try {
const bpData = jsonData.bp; const jsonData = JSON.parse(stringData);
const bpData = jsonData.bp;
// Check if bpData is a number and not equal to zero if (typeof bpData === "number" && bpData !== 0) {
if (typeof bpData === "number" && bpData !== 0) { if (connection.socket.readyState === WebSocket.OPEN) {
if (connection.socket.readyState === WebSocket.OPEN && !isSend) { connection.socket.send(JSON.stringify({ bp: bpData }));
connection.socket.send(JSON.stringify({ bp: bpData })); }
isSend = true;
setTimeout(() => {
isSend = false;
}, 800);
} }
} catch (error) {
console.error("Error parsing JSON:", error);
} }
} catch (error) { });
console.error("Error parsing JSON:", error); }
}
connectToFMP();
connection.socket.on("message", (message) => {
symbol = message.toString("utf-8");
console.log("Received message from client:", symbol);
updateSubscription();
}); });
//======================
// Handle client disconnect
connection.socket.on("close", () => { connection.socket.on("close", () => {
console.log("Client disconnected"); console.log("Client disconnected");
connection?.socket?.close(); isConnectionActive = false;
// Check if the WebSocket is open before trying to close it if (ws) ws.close();
if (ws.readyState === WebSocket.OPEN) { });
try {
ws.close(); connection.socket.on("error", (error) => {
} catch (e) { console.error("WebSocket error:", error);
console.error("Error while closing WebSocket:", e); isConnectionActive = false;
} if (ws) ws.close();
}
}); });
}); });
}); });
@ -219,18 +212,10 @@ fastify.register(async function (fastify) {
"/realtime-crypto-data", "/realtime-crypto-data",
{ websocket: true }, { websocket: true },
(connection, req) => { (connection, req) => {
// Send a welcome message to the client let symbol = "";
let isConnectionActive = true;
let ws;
// Listen for incoming messages from the client
connection.socket.on("message", (message) => {
symbol = message.toString("utf-8");
console.log("Received message from client:", symbol);
// If you want to dynamically update the subscription based on client's message
updateSubscription();
});
//======================
const login = { const login = {
event: "login", event: "login",
data: { data: {
@ -241,64 +226,65 @@ fastify.register(async function (fastify) {
const subscribe = { const subscribe = {
event: "subscribe", event: "subscribe",
data: { data: {
ticker: "", // Initial value; will be updated dynamically ticker: "",
}, },
}; };
function updateSubscription() { function updateSubscription() {
subscribe.data.ticker = symbol; subscribe.data.ticker = symbol;
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(subscribe));
}
} }
// Create a new WebSocket instance for your backend function connectToCryptoFMP() {
const ws = new WebSocket("wss://crypto.financialmodelingprep.com"); ws = new WebSocket("wss://crypto.financialmodelingprep.com");
// Handle WebSocket connection open ws.on("open", function open() {
console.log("Connected to Crypto FMP WebSocket");
ws.send(JSON.stringify(login));
setTimeout(() => updateSubscription(), 2000);
});
ws.on("open", function open() { ws.on("error", function (error) {
ws.send(JSON.stringify(login)); console.error("Crypto FMP WebSocket error:", error);
wait(2000); //2 seconds in milliseconds });
ws.send(JSON.stringify(subscribe));
ws.on("close", function () {
console.log(
"Crypto FMP WebSocket closed. Attempting to reconnect..."
);
setTimeout(connectToCryptoFMP, 5000);
});
ws.on("message", function (data) {
if (!isConnectionActive) return;
const stringData = data.toString("utf-8");
if (connection.socket.readyState === WebSocket.OPEN) {
connection.socket.send(stringData);
}
});
}
connectToCryptoFMP();
connection.socket.on("message", (message) => {
symbol = message.toString("utf-8");
console.log("Received message from client:", symbol);
updateSubscription();
}); });
// Handle WebSocket errors
ws.on("error", function (error) {
console.error("WebSocket error:", error);
// Handle the error gracefully, you might want to notify the client or log it.
// For now, let's close the connection if an error occurs
connection.socket.close();
});
// Handle WebSocket messages
ws.on("message", function (data, flags) {
const stringData = data.toString("utf-8");
if (connection.socket.readyState === WebSocket.OPEN && !isSend) {
connection.socket.send(stringData);
//console.log(stringData)
isSend = true;
setTimeout(() => {
isSend = false;
}, 800);
//wait(2000);
}
//wait(2000);
});
//======================
// Handle client disconnect
connection.socket.on("close", () => { connection.socket.on("close", () => {
console.log("Client disconnected"); console.log("Client disconnected");
connection?.socket?.close(); isConnectionActive = false;
// Check if the WebSocket is open before trying to close it if (ws) ws.close();
if (ws.readyState === WebSocket.OPEN) { });
try {
ws.close(); connection.socket.on("error", (error) => {
} catch (e) { console.error("WebSocket error:", error);
console.error("Error while closing WebSocket:", e); isConnectionActive = false;
} if (ws) ws.close();
}
}); });
} }
); );
@ -312,9 +298,12 @@ fastify.register(async function (fastify) {
let jsonData; let jsonData;
let sendInterval; let sendInterval;
let clientIds = []; let clientIds = [];
let isConnectionActive = true;
// Function to send filtered data to the client // Function to send filtered data to the client
const sendData = async () => { const sendData = async () => {
if (!isConnectionActive) return;
const filePath = path.join( const filePath = path.join(
__dirname, __dirname,
"../app/json/options-flow/feed/data.json" "../app/json/options-flow/feed/data.json"
@ -324,34 +313,28 @@ fastify.register(async function (fastify) {
const fileData = fs.readFileSync(filePath, "utf8"); const fileData = fs.readFileSync(filePath, "utf8");
jsonData = JSON.parse(fileData); jsonData = JSON.parse(fileData);
// Do nothing if clientIds is empty
if (clientIds.length === 0) { if (clientIds.length === 0) {
console.log("Client IDs list is empty, doing nothing."); console.log("Client IDs list is empty, doing nothing.");
return; // Exit function if clientIds is empty return;
} }
// Filter out elements whose ids are not in clientIds
const filteredData = jsonData.filter( const filteredData = jsonData.filter(
(item) => !clientIds.includes(item.id) (item) => !clientIds.includes(item.id)
); );
// Send the filtered data back to the client if (connection.socket.readyState === WebSocket.OPEN) {
connection.socket.send(JSON.stringify(filteredData)); connection.socket.send(JSON.stringify(filteredData));
}
} else { } else {
console.error("File not found:", filePath); console.error("File not found:", filePath);
clearInterval(sendInterval); clearInterval(sendInterval);
connection?.socket?.close();
console.error("Connection closed");
throw new Error("This is an intentional uncaught exception!");
} }
} catch (err) { } catch (err) {
console.error("Error sending data to client:", err); console.error("Error sending data to client:", err);
// Don't close the connection here, just log the error
} }
}; };
// Send data to the client initially
sendData();
// Start sending data periodically // Start sending data periodically
sendInterval = setInterval(sendData, 5000); sendInterval = setInterval(sendData, 5000);
@ -360,10 +343,7 @@ fastify.register(async function (fastify) {
try { try {
const parsedMessage = JSON.parse(message); const parsedMessage = JSON.parse(message);
if (parsedMessage?.ids) { if (parsedMessage?.ids) {
//console.log("Received ids from client:", parsedMessage.ids); clientIds = parsedMessage.ids;
clientIds = parsedMessage.ids; // Update the ids list from the client
} else {
//console.log("No ids received in the message");
} }
} catch (error) { } catch (error) {
console.error("Error parsing incoming message:", error); console.error("Error parsing incoming message:", error);
@ -373,26 +353,42 @@ fastify.register(async function (fastify) {
// Handle client disconnect // Handle client disconnect
connection.socket.on("close", () => { connection.socket.on("close", () => {
console.log("Client disconnected"); console.log("Client disconnected");
isConnectionActive = false;
clearInterval(sendInterval); clearInterval(sendInterval);
}); });
// Handle server crash cleanup // Handle WebSocket errors
const closeHandler = () => { connection.socket.on("error", (error) => {
console.log("Server is closing. Cleaning up resources..."); console.error("WebSocket error:", error);
isConnectionActive = false;
clearInterval(sendInterval); clearInterval(sendInterval);
connection.socket.close(); // Don't close the connection here, let the client handle reconnection
}; });
// Add close handler to process event // Send initial data
process.on("exit", closeHandler); sendData();
process.on("SIGINT", closeHandler);
process.on("SIGTERM", closeHandler);
process.on("uncaughtException", closeHandler);
process.on("unhandledRejection", closeHandler);
} }
); );
}); });
fastify.setErrorHandler((error, request, reply) => {
console.error("Server error:", error);
reply.status(500).send({ error: "Internal Server Error" });
});
// Graceful shutdown
process.on("SIGINT", async () => {
console.log("Received SIGINT. Shutting down gracefully...");
await fastify.close();
process.exit(0);
});
process.on("SIGTERM", async () => {
console.log("Received SIGTERM. Shutting down gracefully...");
await fastify.close();
process.exit(0);
});
// Function to start the server // Function to start the server
function startServer() { function startServer() {
if (!serverRunning) { if (!serverRunning) {