backend/fastify/app.js
2024-12-10 16:55:19 +01:00

457 lines
13 KiB
JavaScript
Executable File

let serverRunning = false;
const fastify = require("fastify")({});
const cors = require("@fastify/cors");
//Load API KEYS
require("dotenv").config({ path: "../app/.env" });
const fmpAPIKey = process.env.FMP_API_KEY;
//const mixpanelAPIKey = process.env.MIXPANEL_API_KEY;
//const Mixpanel = require('mixpanel');
//const UAParser = require('ua-parser-js');
const fs = require("fs");
const path = require("path");
//const pino = require('pino');
//const mixpanel = Mixpanel.init(mixpanelAPIKey, { debug: false });
//const PocketBase = require("pocketbase/cjs");
//const pb = new PocketBase("http://127.0.0.1:8090");
// globally disable auto cancellation
//See https://github.com/pocketbase/js-sdk#auto-cancellation
//Bug happens that get-post gives an error of auto-cancellation. Hence set it to false;
//pb.autoCancellation(false);
const { serialize } = require("object-to-formdata");
// Register the CORS plugin
//Add Cors so that only localhost and my stocknear.com can send acceptable requests
fastify.register(cors);
const corsMiddleware = (request, reply, done) => {
const allowedOrigins = [
"http://localhost:4173",
"http://127.0.0.1:4173",
"http://localhost:5173",
"http://127.0.0.1:5173",
"https://stocknear.com",
"https://www.stocknear.com",
"http://stocknear.com",
"http://www.stocknear.com",
];
const origin = request?.headers?.origin;
if (!origin || allowedOrigins?.includes(origin)) {
reply.header("Access-Control-Allow-Origin", origin || "*");
reply.header("Access-Control-Allow-Methods", "GET,POST");
reply.header("Access-Control-Allow-Headers", "Content-Type");
done();
} else {
reply.code(403).send({ error: "Forbidden" });
}
};
fastify.addHook("onRequest", corsMiddleware);
function wait(ms) {
var start = new Date().getTime();
var end = start;
while (end < start + ms) {
end = new Date().getTime();
}
}
fastify.register(require("@fastify/websocket"));
const WebSocket = require("ws");
let isSend = false;
let sendInterval;
function formatTimestampNewYork(timestamp) {
const d = new Date(timestamp / 1e6);
return new Intl.DateTimeFormat("en-US", {
timeZone: "America/New_York",
year: "numeric",
month: "2-digit",
day: "2-digit",
hour: "2-digit",
minute: "2-digit",
hour12: false,
})
.format(d)
.replace(/(\d+)\/(\d+)\/(\d+),/, "$3-$1-$2")
.replace(",", "");
}
fastify.register(async function (fastify) {
fastify.get(
"/realtime-crypto-data",
{ websocket: true },
(connection, req) => {
// Send a welcome message to the client
// Listen for incoming messages from the client
connection.socket.on("message", (message) => {
symbol = message.toString("utf-8");
console.log("Received message from client:", symbol);
// If you want to dynamically update the subscription based on client's message
updateSubscription();
});
//======================
const login = {
event: "login",
data: {
apiKey: fmpAPIKey,
},
};
const subscribe = {
event: "subscribe",
data: {
ticker: "", // Initial value; will be updated dynamically
},
};
function updateSubscription() {
subscribe.data.ticker = symbol;
}
// Create a new WebSocket instance for your backend
const ws = new WebSocket("wss://crypto.financialmodelingprep.com");
// Handle WebSocket connection open
ws.on("open", function open() {
ws.send(JSON.stringify(login));
wait(2000); //2 seconds in milliseconds
ws.send(JSON.stringify(subscribe));
});
// Handle WebSocket errors
ws.on("error", function (error) {
console.error("WebSocket error:", error);
// Handle the error gracefully, you might want to notify the client or log it.
// For now, let's close the connection if an error occurs
connection.socket.close();
});
// Handle WebSocket messages
ws.on("message", function (data, flags) {
const stringData = data.toString("utf-8");
if (connection.socket.readyState === WebSocket.OPEN && !isSend) {
connection.socket.send(stringData);
//console.log(stringData)
isSend = true;
setTimeout(() => {
isSend = false;
}, 2000);
//wait(2000);
}
//wait(2000);
});
//======================
// Handle client disconnect
connection.socket.on("close", () => {
console.log("Client disconnected");
connection?.socket?.close();
// Check if the WebSocket is open before trying to close it
if (ws.readyState === WebSocket.OPEN) {
try {
ws.close();
} catch (e) {
console.error("Error while closing WebSocket:", e);
}
}
});
}
);
});
fastify.register(async function (fastify) {
fastify.get(
"/options-flow-reader",
{ websocket: true },
(connection, req) => {
let jsonData;
let sendInterval;
// Function to send data to the client
const sendData = async () => {
const filePath = path.join(
__dirname,
"../app/json/options-flow/feed/data.json"
);
try {
if (fs.existsSync(filePath)) {
const fileData = fs.readFileSync(filePath, "utf8");
jsonData = JSON.parse(fileData);
connection.socket.send(JSON.stringify(jsonData));
} else {
console.error("File not found:", filePath);
clearInterval(sendInterval);
connection?.socket?.close();
console.error("Connection closed");
throw new Error("This is an intentional uncaught exception!");
}
} catch (err) {
console.error("Error sending data to client:", err);
}
};
// Send data to the client initially
sendData();
// Start sending data periodically
sendInterval = setInterval(sendData, 5000);
// Handle client disconnect
connection.socket.on("close", () => {
console.log("Client disconnected");
clearInterval(sendInterval);
});
// Handle server crash cleanup
const closeHandler = () => {
console.log("Server is closing. Cleaning up resources...");
clearInterval(sendInterval);
connection.socket.close();
};
// Add close handler to process event
process.on("exit", closeHandler);
process.on("SIGINT", closeHandler);
process.on("SIGTERM", closeHandler);
process.on("uncaughtException", closeHandler);
process.on("unhandledRejection", closeHandler);
}
);
});
fastify.register(async function (fastify) {
fastify.get(
"/price-data",
{ websocket: true },
(connection, req) => {
let tickers = [];
let sendInterval;
// Store the last sent data for each ticker
const lastSentData = {};
// Function to send data for all tickers as a list
const sendData = async () => {
const dataToSend = [];
for (const symbol of tickers) {
const filePath = path?.join(
__dirname,
`../app/json/websocket/companies/${symbol?.toUpperCase()}.json`
);
try {
if (fs?.existsSync(filePath)) {
const fileData = fs?.readFileSync(filePath, "utf8");
if (!fileData) {
console.error(`File is empty for ticker: ${symbol}`);
continue;
}
let jsonData;
try {
jsonData = JSON?.parse(fileData);
} catch (parseError) {
console.error(`Invalid JSON format for ticker: ${symbol}`, parseError);
console.error(`File content: ${fileData}`);
continue;
}
// Only send data if conditions are met and data has changed
if (
jsonData?.lp != null &&
jsonData?.ap != null &&
jsonData?.bp != null &&
jsonData?.t != null &&
["Q","T"]?.includes(jsonData?.type) &&
connection.socket.readyState === WebSocket.OPEN
) {
const avgPrice = (
parseFloat(jsonData?.ap) +
parseFloat(jsonData?.bp) +
parseFloat(jsonData?.lp)
) / 3;
const finalPrice = Math.abs(avgPrice - jsonData?.bp) / jsonData?.bp > 0.05
? jsonData.bp
: avgPrice;
const currentDataSignature = `${jsonData?.bp}`;
const lastSentSignature = lastSentData[symbol];
if (currentDataSignature !== lastSentSignature) {
dataToSend?.push({
symbol,
ap: jsonData?.ap,
bp: jsonData?.bp,
lp: jsonData?.lp,
avgPrice: finalPrice,
type: jsonData?.type,
time: formatTimestampNewYork(jsonData?.t),
});
lastSentData[symbol] = currentDataSignature;
}
}
} else {
console.error("File not found for ticker:", symbol);
}
} catch (err) {
console.error("Error processing data for ticker:", symbol, err);
}
}
// Send all collected data as a single message
if (dataToSend.length > 0 && connection.socket.readyState === WebSocket.OPEN) {
connection.socket.send(JSON.stringify(dataToSend));
//console.log(dataToSend);
}
};
// Start receiving messages from the client
connection.socket.on("message", (message) => {
try {
// Parse message as JSON to get tickers array
tickers = JSON.parse(message.toString("utf-8"));
console.log("Received tickers from client:", tickers);
// Reset last sent data for new tickers
tickers?.forEach((ticker) => {
lastSentData[ticker] = null;
});
// Start periodic data sending if not already started
if (!sendInterval) {
sendInterval = setInterval(sendData, 1000);
}
} catch (err) {
console.error("Failed to parse tickers from client message:", err);
}
});
// Handle client disconnect
connection.socket.on("close", () => {
console.log("Client disconnected");
clearInterval(sendInterval);
removeProcessListeners();
});
// Handle server crash cleanup
const closeHandler = () => {
console.log("Server is closing. Cleaning up resources...");
clearInterval(sendInterval);
connection.socket.close();
removeProcessListeners();
};
// Add close handler to process events
process.on("exit", closeHandler);
process.on("SIGINT", closeHandler);
process.on("SIGTERM", closeHandler);
process.on("uncaughtException", closeHandler);
process.on("unhandledRejection", closeHandler);
// Function to remove process listeners to avoid memory leaks
const removeProcessListeners = () => {
process.off("exit", closeHandler);
process.off("SIGINT", closeHandler);
process.off("SIGTERM", closeHandler);
process.off("uncaughtException", closeHandler);
process.off("unhandledRejection", closeHandler);
};
}
);
});
// Function to start the server
function startServer() {
if (!serverRunning) {
fastify.listen(2000, (err) => {
if (err) {
console.error("Error starting server:", err);
process.exit(1); // Exit the process if server start fails
}
serverRunning = true;
console.log("Server started successfully on port 2000!");
});
} else {
console.log("Server is already running.");
}
}
// Function to stop the server
function stopServer() {
if (serverRunning) {
return new Promise((resolve, reject) => {
fastify.close((err) => {
if (err) {
console.error("Error closing server:", err);
reject(err);
} else {
serverRunning = false;
console.log("Server closed successfully!");
resolve();
}
});
});
} else {
console.log("Server is not running.");
return Promise.resolve();
}
}
// Function to gracefully close and restart the server
function restartServer() {
if (serverRunning) {
stopServer()
.then(() => {
console.log("Restarting server...");
startServer();
})
.catch((error) => {
console.error("Failed to restart server:", error);
process.exit(1); // Exit the process if server restart fails
});
} else {
console.log("Server is not running. Starting server...");
startServer();
}
}
// Add a global error handler for uncaught exceptions
process.on("uncaughtException", (err) => {
console.error("Uncaught Exception:", err);
restartServer();
});
// Add a global error handler for unhandled promise rejections
process.on("unhandledRejection", (reason, promise) => {
console.error("Unhandled Rejection at:", promise, "reason:", reason);
restartServer();
});
// Start the server
startServer();