From a33d0fd8b68f3c606040088b3d0032ff0d1a7f88 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Mon, 16 Sep 2024 22:16:19 +0200 Subject: [PATCH] bugfixing --- fastify/app.js | 641 +++++++++++++++++++++++-------------------------- 1 file changed, 301 insertions(+), 340 deletions(-) diff --git a/fastify/app.js b/fastify/app.js index d207142..8776f20 100755 --- a/fastify/app.js +++ b/fastify/app.js @@ -1,40 +1,28 @@ -let serverRunning = false; - -const fastify = require("fastify")({}); +const fastify = require("fastify")({ logger: true }); const cors = require("@fastify/cors"); - -//Load API KEYS -require("dotenv").config({ path: "../app/.env" }); -const fmpAPIKey = process.env.FMP_API_KEY; -//const mixpanelAPIKey = process.env.MIXPANEL_API_KEY; -const twitchAPIKey = process.env.TWITCH_API_KEY; -const twitchSecretKey = process.env.TWITCH_SECRET_KEY; - -//const Mixpanel = require('mixpanel'); -//const UAParser = require('ua-parser-js'); - -const got = require("got"); //Only version npm i got@11.8.3 works with ESM +const WebSocket = require("ws"); +const path = require("path"); +const fs = require("fs"); +const PocketBase = require("pocketbase/cjs"); +const { serialize } = require("object-to-formdata"); +const got = require("got"); const cheerio = require("cheerio"); const sharp = require("sharp"); const axios = require("axios"); -const fs = require("fs"); -const path = require("path"); -//const pino = require('pino'); -//const mixpanel = Mixpanel.init(mixpanelAPIKey, { debug: false }); +let serverRunning = false; +const activeConnections = new Set(); + +// Load API KEYS +require("dotenv").config({ path: "../app/.env" }); +const fmpAPIKey = process.env.FMP_API_KEY; +const twitchAPIKey = process.env.TWITCH_API_KEY; +const twitchSecretKey = process.env.TWITCH_SECRET_KEY; -const PocketBase = require("pocketbase/cjs"); const pb = new PocketBase("http://127.0.0.1:8090"); - -// globally disable auto cancellation -//See https://github.com/pocketbase/js-sdk#auto-cancellation -//Bug happens that get-post gives an error of auto-cancellation. Hence set it to false; pb.autoCancellation(false); -const { serialize } = require("object-to-formdata"); - // Register the CORS plugin -//Add Cors so that only localhost and my stocknear.com can send acceptable requests fastify.register(cors); const corsMiddleware = (request, reply, done) => { const allowedOrigins = [ @@ -60,7 +48,7 @@ const corsMiddleware = (request, reply, done) => { }; fastify.addHook("onRequest", corsMiddleware); -//fastify.register(require('./mixpanel/server'), { mixpanel, UAParser }); +// Register routes fastify.register(require("./get-user-stats/server"), { pb }); fastify.register(require("./get-community-stats/server"), { pb }); fastify.register(require("./get-moderators/server"), { pb }); @@ -104,355 +92,328 @@ fastify.register(require("./downvote/server"), { pb }); fastify.register(require("./upvote-comment/server"), { pb }); fastify.register(require("./downvote-comment/server"), { pb }); -//fastify.register(require('./create-comment/server'), { pb }); - -function wait(ms) { - var start = new Date().getTime(); - var end = start; - while (end < start + ms) { - end = new Date().getTime(); - } -} - fastify.register(require("@fastify/websocket")); -const WebSocket = require("ws"); - -let isSend = false; -let sendInterval; - -fastify.register(async function (fastify) { - fastify.get("/realtime-data", { websocket: true }, (connection, req) => { - let symbol = ""; - let isConnectionActive = true; - let ws; - - const login = { - event: "login", - data: { - apiKey: fmpAPIKey, - }, - }; - - const subscribe = { - event: "subscribe", - data: { - ticker: "", - }, - }; - - function updateSubscription() { - subscribe.data.ticker = symbol; - if (ws && ws.readyState === WebSocket.OPEN) { - ws.send(JSON.stringify(subscribe)); - } - } - - function connectToFMP() { - ws = new WebSocket("wss://websockets.financialmodelingprep.com"); - - ws.on("open", function open() { - console.log("Connected to FMP WebSocket"); - ws.send(JSON.stringify(login)); - setTimeout(() => updateSubscription(), 2000); - }); - - ws.on("error", function (error) { - console.error("FMP WebSocket error:", error); - }); - - ws.on("close", function () { - console.log("FMP WebSocket closed. Attempting to reconnect..."); - setTimeout(connectToFMP, 5000); - }); - - ws.on("message", function (data) { - if (!isConnectionActive) return; - - const stringData = data.toString("utf-8"); - try { - const jsonData = JSON.parse(stringData); - const bpData = jsonData.bp; - - if (typeof bpData === "number" && bpData !== 0) { - if (connection.socket.readyState === WebSocket.OPEN) { - connection.socket.send(JSON.stringify({ bp: bpData })); - } - } - } catch (error) { - console.error("Error parsing JSON:", error); - } - }); - } - - connectToFMP(); - - connection.socket.on("message", (message) => { - symbol = message.toString("utf-8"); - console.log("Received message from client:", symbol); - updateSubscription(); - }); - - connection.socket.on("close", () => { - console.log("Client disconnected"); - isConnectionActive = false; - if (ws) ws.close(); - }); - - connection.socket.on("error", (error) => { - console.error("WebSocket error:", error); - isConnectionActive = false; - if (ws) ws.close(); - }); - }); -}); - fastify.register(async function (fastify) { + fastify.get("/realtime-data", { websocket: true }, handleWebSocket); fastify.get( "/realtime-crypto-data", { websocket: true }, - (connection, req) => { - let symbol = ""; - let isConnectionActive = true; - let ws; - - const login = { - event: "login", - data: { - apiKey: fmpAPIKey, - }, - }; - - const subscribe = { - event: "subscribe", - data: { - ticker: "", - }, - }; - - function updateSubscription() { - subscribe.data.ticker = symbol; - if (ws && ws.readyState === WebSocket.OPEN) { - ws.send(JSON.stringify(subscribe)); - } - } - - function connectToCryptoFMP() { - ws = new WebSocket("wss://crypto.financialmodelingprep.com"); - - ws.on("open", function open() { - console.log("Connected to Crypto FMP WebSocket"); - ws.send(JSON.stringify(login)); - setTimeout(() => updateSubscription(), 2000); - }); - - ws.on("error", function (error) { - console.error("Crypto FMP WebSocket error:", error); - }); - - ws.on("close", function () { - console.log( - "Crypto FMP WebSocket closed. Attempting to reconnect..." - ); - setTimeout(connectToCryptoFMP, 5000); - }); - - ws.on("message", function (data) { - if (!isConnectionActive) return; - - const stringData = data.toString("utf-8"); - if (connection.socket.readyState === WebSocket.OPEN) { - connection.socket.send(stringData); - } - }); - } - - connectToCryptoFMP(); - - connection.socket.on("message", (message) => { - symbol = message.toString("utf-8"); - console.log("Received message from client:", symbol); - updateSubscription(); - }); - - connection.socket.on("close", () => { - console.log("Client disconnected"); - isConnectionActive = false; - if (ws) ws.close(); - }); - - connection.socket.on("error", (error) => { - console.error("WebSocket error:", error); - isConnectionActive = false; - if (ws) ws.close(); - }); - } + handleCryptoWebSocket ); -}); - -fastify.register(async function (fastify) { fastify.get( "/options-flow-reader", { websocket: true }, - (connection, req) => { - let jsonData; - let sendInterval; - let clientIds = []; - let isConnectionActive = true; - - // Function to send filtered data to the client - const sendData = async () => { - if (!isConnectionActive) return; - - const filePath = path.join( - __dirname, - "../app/json/options-flow/feed/data.json" - ); - try { - if (fs.existsSync(filePath)) { - const fileData = fs.readFileSync(filePath, "utf8"); - jsonData = JSON.parse(fileData); - - if (clientIds.length === 0) { - console.log("Client IDs list is empty, doing nothing."); - return; - } - - const filteredData = jsonData.filter( - (item) => !clientIds.includes(item.id) - ); - - if (connection.socket.readyState === WebSocket.OPEN) { - connection.socket.send(JSON.stringify(filteredData)); - } - } else { - console.error("File not found:", filePath); - clearInterval(sendInterval); - } - } catch (err) { - console.error("Error sending data to client:", err); - // Don't close the connection here, just log the error - } - }; - - // Start sending data periodically - sendInterval = setInterval(sendData, 5000); - - // Handle incoming messages from the client to update the ids - connection.socket.on("message", (message) => { - try { - const parsedMessage = JSON.parse(message); - if (parsedMessage?.ids) { - clientIds = parsedMessage.ids; - } - } catch (error) { - console.error("Error parsing incoming message:", error); - } - }); - - // Handle client disconnect - connection.socket.on("close", () => { - console.log("Client disconnected"); - isConnectionActive = false; - clearInterval(sendInterval); - }); - - // Handle WebSocket errors - connection.socket.on("error", (error) => { - console.error("WebSocket error:", error); - isConnectionActive = false; - clearInterval(sendInterval); - // Don't close the connection here, let the client handle reconnection - }); - - // Send initial data - sendData(); - } + handleOptionsFlowWebSocket ); }); +function handleWebSocket(connection, req) { + let symbol = ""; + let ws; + + const cleanup = () => { + activeConnections.delete(connection); + if (ws) ws.close(); + }; + + activeConnections.add(connection); + + const login = { + event: "login", + data: { + apiKey: fmpAPIKey, + }, + }; + + const subscribe = { + event: "subscribe", + data: { + ticker: "", + }, + }; + + function updateSubscription() { + subscribe.data.ticker = symbol; + if (ws && ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify(subscribe)); + } + } + + function connectToFMP() { + ws = new WebSocket("wss://websockets.financialmodelingprep.com"); + + ws.on("open", function open() { + fastify.log.info("Connected to FMP WebSocket"); + ws.send(JSON.stringify(login)); + setTimeout(() => updateSubscription(), 2000); + }); + + ws.on("error", function (error) { + fastify.log.error("FMP WebSocket error:", error); + }); + + ws.on("close", function () { + fastify.log.info("FMP WebSocket closed. Attempting to reconnect..."); + setTimeout(connectToFMP, 5000); + }); + + ws.on("message", function (data) { + const stringData = data.toString("utf-8"); + try { + const jsonData = JSON.parse(stringData); + const bpData = jsonData.bp; + + if (typeof bpData === "number" && bpData !== 0) { + if (connection.socket.readyState === WebSocket.OPEN) { + connection.socket.send(JSON.stringify({ bp: bpData })); + } + } + } catch (error) { + fastify.log.error("Error parsing JSON:", error); + } + }); + } + + connectToFMP(); + + connection.socket.on("message", (message) => { + symbol = message.toString("utf-8"); + fastify.log.info("Received message from client:", symbol); + updateSubscription(); + }); + + connection.socket.on("close", () => { + fastify.log.info("Client disconnected"); + cleanup(); + }); + + connection.socket.on("error", (error) => { + fastify.log.error("WebSocket error:", error); + cleanup(); + }); +} + +function handleCryptoWebSocket(connection, req) { + let symbol = ""; + let ws; + + const cleanup = () => { + activeConnections.delete(connection); + if (ws) ws.close(); + }; + + activeConnections.add(connection); + + const login = { + event: "login", + data: { + apiKey: fmpAPIKey, + }, + }; + + const subscribe = { + event: "subscribe", + data: { + ticker: "", + }, + }; + + function updateSubscription() { + subscribe.data.ticker = symbol; + if (ws && ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify(subscribe)); + } + } + + function connectToCryptoFMP() { + ws = new WebSocket("wss://crypto.financialmodelingprep.com"); + + ws.on("open", function open() { + fastify.log.info("Connected to Crypto FMP WebSocket"); + ws.send(JSON.stringify(login)); + setTimeout(() => updateSubscription(), 2000); + }); + + ws.on("error", function (error) { + fastify.log.error("Crypto FMP WebSocket error:", error); + }); + + ws.on("close", function () { + fastify.log.info( + "Crypto FMP WebSocket closed. Attempting to reconnect..." + ); + setTimeout(connectToCryptoFMP, 5000); + }); + + ws.on("message", function (data) { + const stringData = data.toString("utf-8"); + if (connection.socket.readyState === WebSocket.OPEN) { + connection.socket.send(stringData); + } + }); + } + + connectToCryptoFMP(); + + connection.socket.on("message", (message) => { + symbol = message.toString("utf-8"); + fastify.log.info("Received message from client:", symbol); + updateSubscription(); + }); + + connection.socket.on("close", () => { + fastify.log.info("Client disconnected"); + cleanup(); + }); + + connection.socket.on("error", (error) => { + fastify.log.error("WebSocket error:", error); + cleanup(); + }); +} + +function handleOptionsFlowWebSocket(connection, req) { + let jsonData; + let sendInterval; + let clientIds = []; + + const cleanup = () => { + activeConnections.delete(connection); + clearInterval(sendInterval); + }; + + activeConnections.add(connection); + + const sendData = async () => { + const filePath = path.join( + __dirname, + "../app/json/options-flow/feed/data.json" + ); + try { + if (fs.existsSync(filePath)) { + const fileData = fs.readFileSync(filePath, "utf8"); + jsonData = JSON.parse(fileData); + + if (clientIds.length === 0) { + fastify.log.info("Client IDs list is empty, doing nothing."); + return; + } + + const filteredData = jsonData.filter( + (item) => !clientIds.includes(item.id) + ); + + if (connection.socket.readyState === WebSocket.OPEN) { + connection.socket.send(JSON.stringify(filteredData)); + } + } else { + fastify.log.error("File not found:", filePath); + clearInterval(sendInterval); + } + } catch (err) { + fastify.log.error("Error sending data to client:", err); + } + }; + + sendInterval = setInterval(sendData, 5000); + + connection.socket.on("message", (message) => { + try { + const parsedMessage = JSON.parse(message); + if (parsedMessage?.ids) { + clientIds = parsedMessage.ids; + } + } catch (error) { + fastify.log.error("Error parsing incoming message:", error); + } + }); + + connection.socket.on("close", () => { + fastify.log.info("Client disconnected"); + cleanup(); + }); + + connection.socket.on("error", (error) => { + fastify.log.error("WebSocket error:", error); + cleanup(); + }); + + sendData(); +} + +// Graceful shutdown +async function gracefulShutdown() { + fastify.log.info("Shutting down gracefully..."); + for (const connection of activeConnections) { + connection.socket.close(); + } + await fastify.close(); + process.exit(0); +} + +process.on("SIGINT", gracefulShutdown); +process.on("SIGTERM", gracefulShutdown); + +// Error handling fastify.setErrorHandler((error, request, reply) => { - console.error("Server error:", error); + fastify.log.error(error); reply.status(500).send({ error: "Internal Server Error" }); }); -// Graceful shutdown -process.on("SIGINT", async () => { - console.log("Received SIGINT. Shutting down gracefully..."); - await fastify.close(); - process.exit(0); -}); - -process.on("SIGTERM", async () => { - console.log("Received SIGTERM. Shutting down gracefully..."); - await fastify.close(); - process.exit(0); -}); - -// Function to start the server -function startServer() { - if (!serverRunning) { - fastify.listen(2000, (err) => { - if (err) { - console.error("Error starting server:", err); - process.exit(1); // Exit the process if server start fails - } - serverRunning = true; - console.log("Server started successfully on port 2000!"); - }); - } else { - console.log("Server is already running."); +// Start the server +async function startServer() { + try { + await fastify.listen({ port: 2000, host: "0.0.0.0" }); + serverRunning = true; + fastify.log.info("Server started successfully on port 2000!"); + } catch (err) { + fastify.log.error(err); + process.exit(1); } } // Function to stop the server -function stopServer() { +async function stopServer() { if (serverRunning) { - return new Promise((resolve, reject) => { - fastify.close((err) => { - if (err) { - console.error("Error closing server:", err); - reject(err); - } else { - serverRunning = false; - console.log("Server closed successfully!"); - resolve(); - } - }); - }); + try { + await fastify.close(); + serverRunning = false; + fastify.log.info("Server closed successfully!"); + } catch (err) { + fastify.log.error("Error closing server:", err); + throw err; + } } else { - console.log("Server is not running."); - return Promise.resolve(); + fastify.log.info("Server is not running."); } } // Function to gracefully close and restart the server -function restartServer() { +async function restartServer() { if (serverRunning) { - stopServer() - .then(() => { - console.log("Restarting server..."); - startServer(); - }) - .catch((error) => { - console.error("Failed to restart server:", error); - process.exit(1); // Exit the process if server restart fails - }); + try { + await stopServer(); + fastify.log.info("Restarting server..."); + await startServer(); + } catch (error) { + fastify.log.error("Failed to restart server:", error); + process.exit(1); + } } else { - console.log("Server is not running. Starting server..."); - startServer(); + fastify.log.info("Server is not running. Starting server..."); + await startServer(); } } -// Add a global error handler for uncaught exceptions +// Global error handlers process.on("uncaughtException", (err) => { - console.error("Uncaught Exception:", err); + fastify.log.error("Uncaught Exception:", err); restartServer(); }); -// Add a global error handler for unhandled promise rejections process.on("unhandledRejection", (reason, promise) => { - console.error("Unhandled Rejection at:", promise, "reason:", reason); + fastify.log.error("Unhandled Rejection at:", promise, "reason:", reason); restartServer(); });