reverting back

This commit is contained in:
MuslemRahimi 2024-09-16 22:22:37 +02:00
parent a33d0fd8b6
commit 13201259cf

View File

@ -1,28 +1,40 @@
const fastify = require("fastify")({ logger: true });
const cors = require("@fastify/cors");
const WebSocket = require("ws");
const path = require("path");
const fs = require("fs");
const PocketBase = require("pocketbase/cjs");
const { serialize } = require("object-to-formdata");
const got = require("got");
const cheerio = require("cheerio");
const sharp = require("sharp");
const axios = require("axios");
let serverRunning = false; let serverRunning = false;
const activeConnections = new Set();
const fastify = require("fastify")({});
const cors = require("@fastify/cors");
//Load API KEYS //Load API KEYS
require("dotenv").config({ path: "../app/.env" }); require("dotenv").config({ path: "../app/.env" });
const fmpAPIKey = process.env.FMP_API_KEY; const fmpAPIKey = process.env.FMP_API_KEY;
//const mixpanelAPIKey = process.env.MIXPANEL_API_KEY;
const twitchAPIKey = process.env.TWITCH_API_KEY; const twitchAPIKey = process.env.TWITCH_API_KEY;
const twitchSecretKey = process.env.TWITCH_SECRET_KEY; const twitchSecretKey = process.env.TWITCH_SECRET_KEY;
//const Mixpanel = require('mixpanel');
//const UAParser = require('ua-parser-js');
const got = require("got"); //Only version npm i got@11.8.3 works with ESM
const cheerio = require("cheerio");
const sharp = require("sharp");
const axios = require("axios");
const fs = require("fs");
const path = require("path");
//const pino = require('pino');
//const mixpanel = Mixpanel.init(mixpanelAPIKey, { debug: false });
const PocketBase = require("pocketbase/cjs");
const pb = new PocketBase("http://127.0.0.1:8090"); const pb = new PocketBase("http://127.0.0.1:8090");
// globally disable auto cancellation
//See https://github.com/pocketbase/js-sdk#auto-cancellation
//Bug happens that get-post gives an error of auto-cancellation. Hence set it to false;
pb.autoCancellation(false); pb.autoCancellation(false);
const { serialize } = require("object-to-formdata");
// Register the CORS plugin // Register the CORS plugin
//Add Cors so that only localhost and my stocknear.com can send acceptable requests
fastify.register(cors); fastify.register(cors);
const corsMiddleware = (request, reply, done) => { const corsMiddleware = (request, reply, done) => {
const allowedOrigins = [ const allowedOrigins = [
@ -48,7 +60,7 @@ const corsMiddleware = (request, reply, done) => {
}; };
fastify.addHook("onRequest", corsMiddleware); fastify.addHook("onRequest", corsMiddleware);
// Register routes //fastify.register(require('./mixpanel/server'), { mixpanel, UAParser });
fastify.register(require("./get-user-stats/server"), { pb }); fastify.register(require("./get-user-stats/server"), { pb });
fastify.register(require("./get-community-stats/server"), { pb }); fastify.register(require("./get-community-stats/server"), { pb });
fastify.register(require("./get-moderators/server"), { pb }); fastify.register(require("./get-moderators/server"), { pb });
@ -92,33 +104,39 @@ fastify.register(require("./downvote/server"), { pb });
fastify.register(require("./upvote-comment/server"), { pb }); fastify.register(require("./upvote-comment/server"), { pb });
fastify.register(require("./downvote-comment/server"), { pb }); fastify.register(require("./downvote-comment/server"), { pb });
//fastify.register(require('./create-comment/server'), { pb });
function wait(ms) {
var start = new Date().getTime();
var end = start;
while (end < start + ms) {
end = new Date().getTime();
}
}
fastify.register(require("@fastify/websocket")); fastify.register(require("@fastify/websocket"));
const WebSocket = require("ws");
let isSend = false;
let sendInterval;
fastify.register(async function (fastify) { fastify.register(async function (fastify) {
fastify.get("/realtime-data", { websocket: true }, handleWebSocket); fastify.get("/realtime-data", { websocket: true }, (connection, req) => {
fastify.get( // Send a welcome message to the client
"/realtime-crypto-data",
{ websocket: true }, //connection.socket.send('hi from server');
handleCryptoWebSocket
); // Listen for incoming messages from the client
fastify.get( connection.socket.on("message", (message) => {
"/options-flow-reader", symbol = message.toString("utf-8");
{ websocket: true }, console.log("Received message from client:", symbol);
handleOptionsFlowWebSocket
); // If you want to dynamically update the subscription based on client's message
updateSubscription();
}); });
function handleWebSocket(connection, req) { //======================
let symbol = "";
let ws;
const cleanup = () => {
activeConnections.delete(connection);
if (ws) ws.close();
};
activeConnections.add(connection);
const login = { const login = {
event: "login", event: "login",
data: { data: {
@ -129,82 +147,90 @@ function handleWebSocket(connection, req) {
const subscribe = { const subscribe = {
event: "subscribe", event: "subscribe",
data: { data: {
ticker: "", ticker: "", // Initial value; will be updated dynamically
}, },
}; };
function updateSubscription() { function updateSubscription() {
subscribe.data.ticker = symbol; subscribe.data.ticker = symbol;
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(subscribe));
}
} }
function connectToFMP() { // Create a new WebSocket instance for your backend
ws = new WebSocket("wss://websockets.financialmodelingprep.com"); const ws = new WebSocket("wss://websockets.financialmodelingprep.com");
// Handle WebSocket connection open
ws.on("open", function open() { ws.on("open", function open() {
fastify.log.info("Connected to FMP WebSocket");
ws.send(JSON.stringify(login)); ws.send(JSON.stringify(login));
setTimeout(() => updateSubscription(), 2000); wait(2000); //2 seconds in milliseconds
ws.send(JSON.stringify(subscribe));
}); });
// Handle WebSocket errors
ws.on("error", function (error) { ws.on("error", function (error) {
fastify.log.error("FMP WebSocket error:", error); console.error("WebSocket error:", error);
// Handle the error gracefully, you might want to notify the client or log it.
// For now, let's close the connection if an error occurs
connection.socket.close();
}); });
ws.on("close", function () { ws.on("message", function (data, flags) {
fastify.log.info("FMP WebSocket closed. Attempting to reconnect...");
setTimeout(connectToFMP, 5000);
});
ws.on("message", function (data) {
const stringData = data.toString("utf-8"); const stringData = data.toString("utf-8");
try { try {
const jsonData = JSON.parse(stringData); const jsonData = JSON.parse(stringData);
const bpData = jsonData.bp; const bpData = jsonData.bp;
// Check if bpData is a number and not equal to zero
if (typeof bpData === "number" && bpData !== 0) { if (typeof bpData === "number" && bpData !== 0) {
if (connection.socket.readyState === WebSocket.OPEN) { if (connection.socket.readyState === WebSocket.OPEN && !isSend) {
connection.socket.send(JSON.stringify({ bp: bpData })); connection.socket.send(JSON.stringify({ bp: bpData }));
isSend = true;
setTimeout(() => {
isSend = false;
}, 800);
} }
} }
} catch (error) { } catch (error) {
fastify.log.error("Error parsing JSON:", error); console.error("Error parsing JSON:", error);
} }
}); });
//======================
// Handle client disconnect
connection.socket.on("close", () => {
console.log("Client disconnected");
connection?.socket?.close();
// Check if the WebSocket is open before trying to close it
if (ws.readyState === WebSocket.OPEN) {
try {
ws.close();
} catch (e) {
console.error("Error while closing WebSocket:", e);
} }
}
});
});
});
connectToFMP(); fastify.register(async function (fastify) {
fastify.get(
"/realtime-crypto-data",
{ websocket: true },
(connection, req) => {
// Send a welcome message to the client
// Listen for incoming messages from the client
connection.socket.on("message", (message) => { connection.socket.on("message", (message) => {
symbol = message.toString("utf-8"); symbol = message.toString("utf-8");
fastify.log.info("Received message from client:", symbol); console.log("Received message from client:", symbol);
// If you want to dynamically update the subscription based on client's message
updateSubscription(); updateSubscription();
}); });
connection.socket.on("close", () => { //======================
fastify.log.info("Client disconnected");
cleanup();
});
connection.socket.on("error", (error) => {
fastify.log.error("WebSocket error:", error);
cleanup();
});
}
function handleCryptoWebSocket(connection, req) {
let symbol = "";
let ws;
const cleanup = () => {
activeConnections.delete(connection);
if (ws) ws.close();
};
activeConnections.add(connection);
const login = { const login = {
event: "login", event: "login",
data: { data: {
@ -215,76 +241,79 @@ function handleCryptoWebSocket(connection, req) {
const subscribe = { const subscribe = {
event: "subscribe", event: "subscribe",
data: { data: {
ticker: "", ticker: "", // Initial value; will be updated dynamically
}, },
}; };
function updateSubscription() { function updateSubscription() {
subscribe.data.ticker = symbol; subscribe.data.ticker = symbol;
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(subscribe));
}
} }
function connectToCryptoFMP() { // Create a new WebSocket instance for your backend
ws = new WebSocket("wss://crypto.financialmodelingprep.com"); const ws = new WebSocket("wss://crypto.financialmodelingprep.com");
// Handle WebSocket connection open
ws.on("open", function open() { ws.on("open", function open() {
fastify.log.info("Connected to Crypto FMP WebSocket");
ws.send(JSON.stringify(login)); ws.send(JSON.stringify(login));
setTimeout(() => updateSubscription(), 2000); wait(2000); //2 seconds in milliseconds
ws.send(JSON.stringify(subscribe));
}); });
// Handle WebSocket errors
ws.on("error", function (error) { ws.on("error", function (error) {
fastify.log.error("Crypto FMP WebSocket error:", error); console.error("WebSocket error:", error);
// Handle the error gracefully, you might want to notify the client or log it.
// For now, let's close the connection if an error occurs
connection.socket.close();
}); });
ws.on("close", function () { // Handle WebSocket messages
fastify.log.info( ws.on("message", function (data, flags) {
"Crypto FMP WebSocket closed. Attempting to reconnect..."
);
setTimeout(connectToCryptoFMP, 5000);
});
ws.on("message", function (data) {
const stringData = data.toString("utf-8"); const stringData = data.toString("utf-8");
if (connection.socket.readyState === WebSocket.OPEN) {
if (connection.socket.readyState === WebSocket.OPEN && !isSend) {
connection.socket.send(stringData); connection.socket.send(stringData);
} //console.log(stringData)
}); isSend = true;
} setTimeout(() => {
isSend = false;
}, 800);
connectToCryptoFMP(); //wait(2000);
}
connection.socket.on("message", (message) => { //wait(2000);
symbol = message.toString("utf-8");
fastify.log.info("Received message from client:", symbol);
updateSubscription();
}); });
//======================
// Handle client disconnect
connection.socket.on("close", () => { connection.socket.on("close", () => {
fastify.log.info("Client disconnected"); console.log("Client disconnected");
cleanup(); connection?.socket?.close();
}); // Check if the WebSocket is open before trying to close it
if (ws.readyState === WebSocket.OPEN) {
connection.socket.on("error", (error) => { try {
fastify.log.error("WebSocket error:", error); ws.close();
cleanup(); } catch (e) {
console.error("Error while closing WebSocket:", e);
}
}
}); });
} }
);
});
function handleOptionsFlowWebSocket(connection, req) { fastify.register(async function (fastify) {
fastify.get(
"/options-flow-reader",
{ websocket: true },
(connection, req) => {
let jsonData; let jsonData;
let sendInterval; let sendInterval;
let clientIds = []; let clientIds = [];
const cleanup = () => { // Function to send filtered data to the client
activeConnections.delete(connection);
clearInterval(sendInterval);
};
activeConnections.add(connection);
const sendData = async () => { const sendData = async () => {
const filePath = path.join( const filePath = path.join(
__dirname, __dirname,
@ -295,125 +324,139 @@ function handleOptionsFlowWebSocket(connection, req) {
const fileData = fs.readFileSync(filePath, "utf8"); const fileData = fs.readFileSync(filePath, "utf8");
jsonData = JSON.parse(fileData); jsonData = JSON.parse(fileData);
// Do nothing if clientIds is empty
if (clientIds.length === 0) { if (clientIds.length === 0) {
fastify.log.info("Client IDs list is empty, doing nothing."); console.log("Client IDs list is empty, doing nothing.");
return; return; // Exit function if clientIds is empty
} }
// Filter out elements whose ids are not in clientIds
const filteredData = jsonData.filter( const filteredData = jsonData.filter(
(item) => !clientIds.includes(item.id) (item) => !clientIds.includes(item.id)
); );
if (connection.socket.readyState === WebSocket.OPEN) { // Send the filtered data back to the client
connection.socket.send(JSON.stringify(filteredData)); connection.socket.send(JSON.stringify(filteredData));
}
} else { } else {
fastify.log.error("File not found:", filePath); console.error("File not found:", filePath);
clearInterval(sendInterval); clearInterval(sendInterval);
connection?.socket?.close();
console.error("Connection closed");
throw new Error("This is an intentional uncaught exception!");
} }
} catch (err) { } catch (err) {
fastify.log.error("Error sending data to client:", err); console.error("Error sending data to client:", err);
} }
}; };
// Send data to the client initially
sendData();
// Start sending data periodically
sendInterval = setInterval(sendData, 5000); sendInterval = setInterval(sendData, 5000);
// Handle incoming messages from the client to update the ids
connection.socket.on("message", (message) => { connection.socket.on("message", (message) => {
try { try {
const parsedMessage = JSON.parse(message); const parsedMessage = JSON.parse(message);
if (parsedMessage?.ids) { if (parsedMessage?.ids) {
clientIds = parsedMessage.ids; //console.log("Received ids from client:", parsedMessage.ids);
clientIds = parsedMessage.ids; // Update the ids list from the client
} else {
//console.log("No ids received in the message");
} }
} catch (error) { } catch (error) {
fastify.log.error("Error parsing incoming message:", error); console.error("Error parsing incoming message:", error);
} }
}); });
// Handle client disconnect
connection.socket.on("close", () => { connection.socket.on("close", () => {
fastify.log.info("Client disconnected"); console.log("Client disconnected");
cleanup(); clearInterval(sendInterval);
}); });
connection.socket.on("error", (error) => { // Handle server crash cleanup
fastify.log.error("WebSocket error:", error); const closeHandler = () => {
cleanup(); console.log("Server is closing. Cleaning up resources...");
}); clearInterval(sendInterval);
sendData();
}
// Graceful shutdown
async function gracefulShutdown() {
fastify.log.info("Shutting down gracefully...");
for (const connection of activeConnections) {
connection.socket.close(); connection.socket.close();
} };
await fastify.close();
process.exit(0);
}
process.on("SIGINT", gracefulShutdown); // Add close handler to process event
process.on("SIGTERM", gracefulShutdown); process.on("exit", closeHandler);
process.on("SIGINT", closeHandler);
// Error handling process.on("SIGTERM", closeHandler);
fastify.setErrorHandler((error, request, reply) => { process.on("uncaughtException", closeHandler);
fastify.log.error(error); process.on("unhandledRejection", closeHandler);
reply.status(500).send({ error: "Internal Server Error" }); }
);
}); });
// Start the server // Function to start the server
async function startServer() { function startServer() {
try { if (!serverRunning) {
await fastify.listen({ port: 2000, host: "0.0.0.0" }); fastify.listen(2000, (err) => {
if (err) {
console.error("Error starting server:", err);
process.exit(1); // Exit the process if server start fails
}
serverRunning = true; serverRunning = true;
fastify.log.info("Server started successfully on port 2000!"); console.log("Server started successfully on port 2000!");
} catch (err) { });
fastify.log.error(err); } else {
process.exit(1); console.log("Server is already running.");
} }
} }
// Function to stop the server // Function to stop the server
async function stopServer() { function stopServer() {
if (serverRunning) { if (serverRunning) {
try { return new Promise((resolve, reject) => {
await fastify.close(); fastify.close((err) => {
serverRunning = false; if (err) {
fastify.log.info("Server closed successfully!"); console.error("Error closing server:", err);
} catch (err) { reject(err);
fastify.log.error("Error closing server:", err);
throw err;
}
} else { } else {
fastify.log.info("Server is not running."); serverRunning = false;
console.log("Server closed successfully!");
resolve();
}
});
});
} else {
console.log("Server is not running.");
return Promise.resolve();
} }
} }
// Function to gracefully close and restart the server // Function to gracefully close and restart the server
async function restartServer() { function restartServer() {
if (serverRunning) { if (serverRunning) {
try { stopServer()
await stopServer(); .then(() => {
fastify.log.info("Restarting server..."); console.log("Restarting server...");
await startServer(); startServer();
} catch (error) { })
fastify.log.error("Failed to restart server:", error); .catch((error) => {
process.exit(1); console.error("Failed to restart server:", error);
} process.exit(1); // Exit the process if server restart fails
});
} else { } else {
fastify.log.info("Server is not running. Starting server..."); console.log("Server is not running. Starting server...");
await startServer(); startServer();
} }
} }
// Global error handlers // Add a global error handler for uncaught exceptions
process.on("uncaughtException", (err) => { process.on("uncaughtException", (err) => {
fastify.log.error("Uncaught Exception:", err); console.error("Uncaught Exception:", err);
restartServer(); restartServer();
}); });
// Add a global error handler for unhandled promise rejections
process.on("unhandledRejection", (reason, promise) => { process.on("unhandledRejection", (reason, promise) => {
fastify.log.error("Unhandled Rejection at:", promise, "reason:", reason); console.error("Unhandled Rejection at:", promise, "reason:", reason);
restartServer(); restartServer();
}); });