diff --git a/app/cron_options_flow.py b/app/cron_options_flow.py index d3c79eb..9742971 100755 --- a/app/cron_options_flow.py +++ b/app/cron_options_flow.py @@ -37,8 +37,7 @@ async def fetch_options_activity(page): try: data = await asyncio.to_thread(fin.options_activity, date_from=start_date, date_to=end_date, page=page, pagesize=1000) return orjson.loads(fin.output(data))['option_activity'] - except Exception as e: - print(f"Exception on page {page}: {e}") + except: return [] # Asynchronous function to fetch multiple pages @@ -79,7 +78,8 @@ def clean_and_filter_data(res_list): filtered_list.append({key: value for key, value in item.items() if key not in ['description_extended', 'updated']}) except Exception as e: print(f"Error processing item: {e}") - continue + pass + return filtered_list # Main execution flow diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index cfc15b9..70b2d0d 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -350,6 +350,7 @@ def run_threaded(job_func): # Schedule the job to run + schedule.every().day.at("01:00").do(run_threaded, run_db_schedule_job) schedule.every().day.at("22:30").do(run_threaded, run_options_jobs).tag('options_job') schedule.every().day.at("05:00").do(run_threaded, run_options_historical_flow).tag('options_historical_flow_job') diff --git a/fastify/app.js b/fastify/app.js index c28e225..e3def1b 100755 --- a/fastify/app.js +++ b/fastify/app.js @@ -178,78 +178,89 @@ fastify.register(async function (fastify) { }); + fastify.register(async function (fastify) { fastify.get("/options-flow-reader", { websocket: true }, (connection, req) => { let sendInterval; + let pingInterval; + let sendTimeout = null; let lastSentData = []; - - // Function to send data to the client + const sendData = async () => { - const filePath = path.join(__dirname, "../app/json/options-flow/feed/data.json"); - - try { - if (fs.existsSync(filePath)) { - const fileData = fs.readFileSync(filePath, "utf8").trim(); - - if (!fileData) { - console.error("File is empty:", filePath); - setTimeout(sendData, 2000); - return; + if (sendTimeout) { + clearTimeout(sendTimeout); + sendTimeout = null; } - let parsedData; + const filePath = path.join(__dirname, "../app/json/options-flow/feed/data.json"); + try { - parsedData = JSON.parse(fileData); - } catch (jsonErr) { - console.error("Invalid JSON format:", jsonErr); - setTimeout(sendData, 2000); - return; + if (fs.existsSync(filePath)) { + const fileData = fs.readFileSync(filePath, "utf8").trim(); + + if (!fileData) { + sendTimeout = setTimeout(sendData, 2000); + return; + } + + let parsedData; + try { + parsedData = JSON.parse(fileData); + } catch (jsonErr) { + sendTimeout = setTimeout(sendData, 2000); + return; + } + + if (parsedData.length > lastSentData.length && connection.socket.readyState === 1) { + connection.socket.send(JSON.stringify(parsedData)); + lastSentData = parsedData; + } + } else { + sendTimeout = setTimeout(sendData, 2000); + } + } catch (err) { + sendTimeout = setTimeout(sendData, 2000); } + }; - // Send data only if the length has increased since the last send - if (parsedData.length > lastSentData.length) { - connection.socket.send(JSON.stringify(parsedData)); - console.log("Options data sent: Length", parsedData.length); - lastSentData = parsedData; - } - } else { - console.error("File not found:", filePath); - setTimeout(sendData, 2000); - } - } catch (err) { - console.error("Error sending data to client:", err); - setTimeout(sendData, 2000); - } -}; - - - // Send data to the client initially + // Initial send and interval setup sendData(); - - // Start sending data periodically sendInterval = setInterval(sendData, 500); - - // Handle client disconnect - connection.socket.on("close", () => { - console.log("Client disconnected"); - clearInterval(sendInterval); - }); - // Handle server crash cleanup - const closeHandler = () => { - console.log("Server is closing. Cleaning up resources..."); + // Heartbeat mechanism + pingInterval = setInterval(() => { + if (connection.socket.readyState === 1) { + connection.socket.ping(); + } + }, 25000); + + // Cleanup function + const cleanup = () => { clearInterval(sendInterval); - if (connection.socket.readyState === connection.socket.OPEN) { + clearInterval(pingInterval); + if (sendTimeout) clearTimeout(sendTimeout); + [ + 'exit', 'SIGINT', 'SIGTERM', + 'uncaughtException', 'unhandledRejection' + ].forEach(event => process.off(event, cleanup)); + + if (connection.socket.readyState === 1) { connection.socket.close(); } }; - - // Add close handler to process events - process.on("exit", closeHandler); - process.on("SIGINT", closeHandler); - process.on("SIGTERM", closeHandler); - process.on("uncaughtException", closeHandler); - process.on("unhandledRejection", closeHandler); + + // Process event handlers + process.on('exit', cleanup); + process.on('SIGINT', cleanup); + process.on('SIGTERM', cleanup); + process.on('uncaughtException', cleanup); + process.on('unhandledRejection', cleanup); + + // WebSocket close handler + connection.socket.on('close', () => { + console.log('Client disconnected'); + cleanup(); + }); }); });