update websocket

This commit is contained in:
MuslemRahimi 2024-09-13 16:47:03 +02:00
parent 6e2096a473
commit 278b63f430
2 changed files with 33 additions and 64 deletions

View File

@ -564,7 +564,7 @@ def run_threaded(job_func):
job_thread.start()
# Schedule the job to run
'''
schedule.every().day.at("01:00").do(run_threaded, run_options_bubble_ticker).tag('options_ticker_job')
schedule.every().day.at("02:00").do(run_threaded, run_db_schedule_job)
schedule.every().day.at("03:00").do(run_threaded, run_dark_pool)
@ -630,7 +630,7 @@ schedule.every(12).hours.do(run_threaded, run_analyst_rating).tag('analyst_job')
schedule.every(2).minutes.do(run_threaded, run_dashboard).tag('dashboard_job')
'''
schedule.every(20).seconds.do(run_threaded, run_if_not_running(run_cron_options_flow, 'options_flow_job')).tag('options_flow_job')

View File

@ -311,8 +311,9 @@ fastify.register(async function (fastify) {
(connection, req) => {
let jsonData;
let sendInterval;
let clientIds = [];
// Function to send data to the client
// Function to send filtered data to the client
const sendData = async () => {
const filePath = path.join(
__dirname,
@ -322,7 +323,20 @@ fastify.register(async function (fastify) {
if (fs.existsSync(filePath)) {
const fileData = fs.readFileSync(filePath, "utf8");
jsonData = JSON.parse(fileData);
connection.socket.send(JSON.stringify(jsonData));
// Do nothing if clientIds is empty
if (clientIds.length === 0) {
console.log("Client IDs list is empty, doing nothing.");
return; // Exit function if clientIds is empty
}
// Filter out elements whose ids are not in clientIds
const filteredData = jsonData.filter(
(item) => !clientIds.includes(item.id)
);
// Send the filtered data back to the client
connection.socket.send(JSON.stringify(filteredData));
} else {
console.error("File not found:", filePath);
clearInterval(sendInterval);
@ -341,6 +355,21 @@ fastify.register(async function (fastify) {
// Start sending data periodically
sendInterval = setInterval(sendData, 5000);
// Handle incoming messages from the client to update the ids
connection.socket.on("message", (message) => {
try {
const parsedMessage = JSON.parse(message);
if (parsedMessage?.ids) {
//console.log("Received ids from client:", parsedMessage.ids);
clientIds = parsedMessage.ids; // Update the ids list from the client
} else {
//console.log("No ids received in the message");
}
} catch (error) {
console.error("Error parsing incoming message:", error);
}
});
// Handle client disconnect
connection.socket.on("close", () => {
console.log("Client disconnected");
@ -364,66 +393,6 @@ fastify.register(async function (fastify) {
);
});
fastify.register(async function (fastify) {
fastify.get(
"/options-zero-dte-reader",
{ websocket: true },
(connection, req) => {
let jsonData;
let sendInterval;
// Function to send data to the client
const sendData = async () => {
const filePath = path.join(
__dirname,
"../app/json/options-flow/zero-dte/data.json"
);
try {
if (fs.existsSync(filePath)) {
const fileData = fs.readFileSync(filePath, "utf8");
jsonData = JSON.parse(fileData);
connection.socket.send(JSON.stringify(jsonData));
} else {
console.error("File not found:", filePath);
clearInterval(sendInterval);
console.error("Connection closed");
throw new Error("This is an intentional uncaught exception!");
}
} catch (err) {
console.error("Error sending data to client:", err);
}
};
// Send data to the client initially
sendData();
// Start sending data periodically
sendInterval = setInterval(sendData, 5000);
// Handle client disconnect
connection.socket.on("close", () => {
console.log("Client disconnected");
connection?.socket?.close();
clearInterval(sendInterval);
});
// Handle server crash cleanup
const closeHandler = () => {
console.log("Server is closing. Cleaning up resources...");
clearInterval(sendInterval);
connection?.socket?.close();
};
// Add close handler to process event
process.on("exit", closeHandler);
process.on("SIGINT", closeHandler);
process.on("SIGTERM", closeHandler);
process.on("uncaughtException", closeHandler);
process.on("unhandledRejection", closeHandler);
}
);
});
// Function to start the server
function startServer() {
if (!serverRunning) {