From 278b63f4306210f246a3e481744674af639e563d Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Fri, 13 Sep 2024 16:47:03 +0200 Subject: [PATCH] update websocket --- app/primary_cron_job.py | 4 +- fastify/app.js | 93 ++++++++++++++--------------------------- 2 files changed, 33 insertions(+), 64 deletions(-) diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index 01c25ae..6d59398 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -564,7 +564,7 @@ def run_threaded(job_func): job_thread.start() # Schedule the job to run -''' + schedule.every().day.at("01:00").do(run_threaded, run_options_bubble_ticker).tag('options_ticker_job') schedule.every().day.at("02:00").do(run_threaded, run_db_schedule_job) schedule.every().day.at("03:00").do(run_threaded, run_dark_pool) @@ -630,7 +630,7 @@ schedule.every(12).hours.do(run_threaded, run_analyst_rating).tag('analyst_job') schedule.every(2).minutes.do(run_threaded, run_dashboard).tag('dashboard_job') -''' + schedule.every(20).seconds.do(run_threaded, run_if_not_running(run_cron_options_flow, 'options_flow_job')).tag('options_flow_job') diff --git a/fastify/app.js b/fastify/app.js index 6003dda..eec50cb 100755 --- a/fastify/app.js +++ b/fastify/app.js @@ -311,8 +311,9 @@ fastify.register(async function (fastify) { (connection, req) => { let jsonData; let sendInterval; + let clientIds = []; - // Function to send data to the client + // Function to send filtered data to the client const sendData = async () => { const filePath = path.join( __dirname, @@ -322,7 +323,20 @@ fastify.register(async function (fastify) { if (fs.existsSync(filePath)) { const fileData = fs.readFileSync(filePath, "utf8"); jsonData = JSON.parse(fileData); - connection.socket.send(JSON.stringify(jsonData)); + + // Do nothing if clientIds is empty + if (clientIds.length === 0) { + console.log("Client IDs list is empty, doing nothing."); + return; // Exit function if clientIds is empty + } + + // Filter out elements whose ids are not in clientIds + const filteredData = jsonData.filter( + (item) => !clientIds.includes(item.id) + ); + + // Send the filtered data back to the client + connection.socket.send(JSON.stringify(filteredData)); } else { console.error("File not found:", filePath); clearInterval(sendInterval); @@ -341,6 +355,21 @@ fastify.register(async function (fastify) { // Start sending data periodically sendInterval = setInterval(sendData, 5000); + // Handle incoming messages from the client to update the ids + connection.socket.on("message", (message) => { + try { + const parsedMessage = JSON.parse(message); + if (parsedMessage?.ids) { + //console.log("Received ids from client:", parsedMessage.ids); + clientIds = parsedMessage.ids; // Update the ids list from the client + } else { + //console.log("No ids received in the message"); + } + } catch (error) { + console.error("Error parsing incoming message:", error); + } + }); + // Handle client disconnect connection.socket.on("close", () => { console.log("Client disconnected"); @@ -364,66 +393,6 @@ fastify.register(async function (fastify) { ); }); -fastify.register(async function (fastify) { - fastify.get( - "/options-zero-dte-reader", - { websocket: true }, - (connection, req) => { - let jsonData; - let sendInterval; - - // Function to send data to the client - const sendData = async () => { - const filePath = path.join( - __dirname, - "../app/json/options-flow/zero-dte/data.json" - ); - try { - if (fs.existsSync(filePath)) { - const fileData = fs.readFileSync(filePath, "utf8"); - jsonData = JSON.parse(fileData); - connection.socket.send(JSON.stringify(jsonData)); - } else { - console.error("File not found:", filePath); - clearInterval(sendInterval); - console.error("Connection closed"); - throw new Error("This is an intentional uncaught exception!"); - } - } catch (err) { - console.error("Error sending data to client:", err); - } - }; - - // Send data to the client initially - sendData(); - - // Start sending data periodically - sendInterval = setInterval(sendData, 5000); - - // Handle client disconnect - connection.socket.on("close", () => { - console.log("Client disconnected"); - connection?.socket?.close(); - clearInterval(sendInterval); - }); - - // Handle server crash cleanup - const closeHandler = () => { - console.log("Server is closing. Cleaning up resources..."); - clearInterval(sendInterval); - connection?.socket?.close(); - }; - - // Add close handler to process event - process.on("exit", closeHandler); - process.on("SIGINT", closeHandler); - process.on("SIGTERM", closeHandler); - process.on("uncaughtException", closeHandler); - process.on("unhandledRejection", closeHandler); - } - ); -}); - // Function to start the server function startServer() { if (!serverRunning) {