update websocket
This commit is contained in:
parent
b4e6217359
commit
3bf04eeb1a
94
app/cron_websocket.py
Normal file
94
app/cron_websocket.py
Normal file
@ -0,0 +1,94 @@
|
|||||||
|
import asyncio
|
||||||
|
import websockets
|
||||||
|
import orjson
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Dict, Any
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s: %(message)s'
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class WebSocketStockTicker:
|
||||||
|
def __init__(self, api_key: str, uri: str = "wss://websockets.financialmodelingprep.com"):
|
||||||
|
self.api_key = api_key
|
||||||
|
self.uri = uri
|
||||||
|
self.output_dir = Path('json/websocket/companies')
|
||||||
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
self.login_payload = {
|
||||||
|
"event": "login",
|
||||||
|
"data": {"apiKey": self.api_key}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.subscribe_payload = {
|
||||||
|
"event": "subscribe",
|
||||||
|
"data": {"ticker": ["*"]}
|
||||||
|
}
|
||||||
|
|
||||||
|
async def _safe_write(self, file_path: Path, data: Dict[str, Any]) -> None:
|
||||||
|
"""Safely write data to file with error handling."""
|
||||||
|
try:
|
||||||
|
with open(file_path, 'wb') as f:
|
||||||
|
f.write(orjson.dumps(data))
|
||||||
|
except IOError as e:
|
||||||
|
logger.error(f"File write error for {file_path}: {e}")
|
||||||
|
|
||||||
|
async def _process_message(self, message: str) -> None:
|
||||||
|
"""Process and store individual WebSocket messages."""
|
||||||
|
try:
|
||||||
|
data = orjson.loads(message)
|
||||||
|
|
||||||
|
if 's' in data:
|
||||||
|
symbol = data['s'].upper()
|
||||||
|
safe_symbol = ''.join(c for c in symbol if c.isalnum() or c in ['-', '_'])
|
||||||
|
file_path = self.output_dir / f"{safe_symbol}.json"
|
||||||
|
|
||||||
|
await self._safe_write(file_path, data)
|
||||||
|
logger.info(f"Processed data for {safe_symbol}")
|
||||||
|
|
||||||
|
except orjson.JSONDecodeError:
|
||||||
|
logger.warning(f"Invalid JSON received: {message}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing message: {e}")
|
||||||
|
|
||||||
|
async def connect(self) -> None:
|
||||||
|
"""Establish WebSocket connection with auto-reconnect."""
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
async with websockets.connect(self.uri, ping_interval=30) as websocket:
|
||||||
|
# Login and subscribe
|
||||||
|
await websocket.send(orjson.dumps(self.login_payload))
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
await websocket.send(orjson.dumps(self.subscribe_payload))
|
||||||
|
|
||||||
|
# Handle incoming messages
|
||||||
|
async for message in websocket:
|
||||||
|
await self._process_message(message)
|
||||||
|
|
||||||
|
except (websockets.exceptions.ConnectionClosedError,
|
||||||
|
websockets.exceptions.WebSocketException) as e:
|
||||||
|
logger.warning(f"WebSocket error: {e}. Reconnecting in 5 seconds...")
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Unexpected error: {e}. Reconnecting in 5 seconds...")
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
load_dotenv()
|
||||||
|
api_key = os.getenv('FMP_API_KEY')
|
||||||
|
|
||||||
|
if not api_key:
|
||||||
|
logger.error("API Key not found. Please set FMP_API_KEY in .env file.")
|
||||||
|
return
|
||||||
|
|
||||||
|
ticker = WebSocketStockTicker(api_key)
|
||||||
|
await ticker.connect()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
152
fastify/app.js
152
fastify/app.js
@ -72,7 +72,6 @@ fastify.register(require("./create-post-image/server"), { sharp });
|
|||||||
fastify.register(require("./delete-comment/server"), { pb });
|
fastify.register(require("./delete-comment/server"), { pb });
|
||||||
fastify.register(require("./delete-post/server"), { pb });
|
fastify.register(require("./delete-post/server"), { pb });
|
||||||
fastify.register(require("./leaderboard/server"), { pb });
|
fastify.register(require("./leaderboard/server"), { pb });
|
||||||
fastify.register(require("./feedback/server"), { pb });
|
|
||||||
fastify.register(require("./edit-name-watchlist/server"), { pb });
|
fastify.register(require("./edit-name-watchlist/server"), { pb });
|
||||||
fastify.register(require("./create-strategy/server"), { pb });
|
fastify.register(require("./create-strategy/server"), { pb });
|
||||||
fastify.register(require("./delete-strategy/server"), { pb });
|
fastify.register(require("./delete-strategy/server"), { pb });
|
||||||
@ -127,110 +126,111 @@ function formatTimestampNewYork(timestamp) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fastify.register(async function (fastify) {
|
fastify.register(async function (fastify) {
|
||||||
fastify.get("/realtime-data", { websocket: true }, (connection, req) => {
|
fastify.get(
|
||||||
// Send a welcome message to the client
|
"/realtime-data",
|
||||||
|
{ websocket: true },
|
||||||
|
(connection, req) => {
|
||||||
|
|
||||||
//connection.socket.send('hi from server');
|
let jsonData;
|
||||||
|
let sendInterval;
|
||||||
|
let symbol;
|
||||||
|
let isSend = false;
|
||||||
|
|
||||||
// Listen for incoming messages from the client
|
|
||||||
connection.socket.on("message", (message) => {
|
|
||||||
symbol = message.toString("utf-8");
|
|
||||||
console.log("Received message from client:", symbol);
|
|
||||||
|
|
||||||
// If you want to dynamically update the subscription based on client's message
|
// Function to send data to the client
|
||||||
updateSubscription();
|
const sendData = async () => {
|
||||||
});
|
if (!symbol) return; // Check if symbol is defined
|
||||||
|
const filePath = path.join(__dirname, `../app/json/websocket/companies/${symbol}.json`);
|
||||||
//======================
|
|
||||||
const login = {
|
|
||||||
event: "login",
|
|
||||||
data: {
|
|
||||||
apiKey: fmpAPIKey,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
const subscribe = {
|
|
||||||
event: "subscribe",
|
|
||||||
data: {
|
|
||||||
ticker: "", // Initial value; will be updated dynamically
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
function updateSubscription() {
|
|
||||||
subscribe.data.ticker = symbol;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a new WebSocket instance for your backend
|
|
||||||
const ws = new WebSocket("wss://websockets.financialmodelingprep.com");
|
|
||||||
|
|
||||||
// Handle WebSocket connection open
|
|
||||||
|
|
||||||
ws.on("open", function open() {
|
|
||||||
ws.send(JSON.stringify(login));
|
|
||||||
wait(2000); //2 seconds in milliseconds
|
|
||||||
ws.send(JSON.stringify(subscribe));
|
|
||||||
});
|
|
||||||
|
|
||||||
// Handle WebSocket errors
|
|
||||||
ws.on("error", function (error) {
|
|
||||||
console.error("WebSocket error:", error);
|
|
||||||
// Handle the error gracefully, you might want to notify the client or log it.
|
|
||||||
// For now, let's close the connection if an error occurs
|
|
||||||
connection.socket.close();
|
|
||||||
});
|
|
||||||
|
|
||||||
ws.on("message", function (data, flags) {
|
|
||||||
const stringData = data.toString("utf-8");
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const jsonData = JSON.parse(stringData);
|
if (fs.existsSync(filePath)) {
|
||||||
|
const fileData = fs.readFileSync(filePath, "utf8");
|
||||||
|
jsonData = JSON.parse(fileData);
|
||||||
|
|
||||||
// Check if bpData is a number, not equal to zero, and jsonData properties are not null/undefined
|
// Logic to send data if certain conditions are met
|
||||||
if (
|
if (
|
||||||
//jsonData?.bp != null &&
|
|
||||||
//jsonData?.ap != null &&
|
|
||||||
jsonData?.lp != null &&
|
jsonData?.lp != null &&
|
||||||
jsonData?.t != null &&
|
jsonData?.t != null &&
|
||||||
["Q", "T"]?.includes(jsonData?.type) &&
|
["Q", "T"].includes(jsonData?.type) &&
|
||||||
connection.socket.readyState === WebSocket.OPEN &&
|
connection.socket.readyState === WebSocket.OPEN &&
|
||||||
!isSend
|
!isSend
|
||||||
) {
|
) {
|
||||||
connection.socket.send(
|
connection.socket.send(
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
bp: jsonData.bp,
|
bp: jsonData?.bp,
|
||||||
ap: jsonData.ap,
|
ap: jsonData?.ap,
|
||||||
lp: jsonData.lp?.toFixed(2),
|
lp: jsonData?.lp?.toFixed(2),
|
||||||
type: jsonData.type,
|
type: jsonData?.type,
|
||||||
time: formatTimestampNewYork(jsonData?.t),
|
time: formatTimestampNewYork(jsonData?.t),
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
isSend = true;
|
isSend = true;
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
isSend = false;
|
isSend = false;
|
||||||
}, 500);
|
}, 500); // Reset isSend after 500ms
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} else {
|
||||||
console.error("Error parsing JSON:", error);
|
console.error("File not found:", filePath);
|
||||||
|
clearInterval(sendInterval);
|
||||||
|
connection.socket.close();
|
||||||
|
console.error("Connection closed");
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error("Error sending data to client:", err);
|
||||||
|
clearInterval(sendInterval);
|
||||||
|
connection.socket.close();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Start receiving messages from the client
|
||||||
|
connection.socket.on("message", (message) => {
|
||||||
|
symbol = message.toString("utf-8")?.toUpperCase();
|
||||||
|
console.log("Received message from client:", symbol);
|
||||||
|
|
||||||
|
// Send data immediately upon receiving a symbol
|
||||||
|
sendData();
|
||||||
|
|
||||||
|
// Start periodic data sending if not already started
|
||||||
|
if (!sendInterval) {
|
||||||
|
sendInterval = setInterval(sendData, 5000);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
//======================
|
|
||||||
|
|
||||||
// Handle client disconnect
|
// Handle client disconnect
|
||||||
connection.socket.on("close", () => {
|
connection.socket.on("close", () => {
|
||||||
console.log("Client disconnected");
|
console.log("Client disconnected");
|
||||||
connection?.socket?.close();
|
clearInterval(sendInterval);
|
||||||
// Check if the WebSocket is open before trying to close it
|
removeProcessListeners();
|
||||||
if (ws.readyState === WebSocket.OPEN) {
|
});
|
||||||
try {
|
|
||||||
ws.close();
|
// Handle server crash cleanup
|
||||||
} catch (e) {
|
const closeHandler = () => {
|
||||||
console.error("Error while closing WebSocket:", e);
|
console.log("Server is closing. Cleaning up resources...");
|
||||||
}
|
clearInterval(sendInterval);
|
||||||
|
connection.socket.close();
|
||||||
|
removeProcessListeners();
|
||||||
|
};
|
||||||
|
|
||||||
|
// Add close handler to process events
|
||||||
|
process.on("exit", closeHandler);
|
||||||
|
process.on("SIGINT", closeHandler);
|
||||||
|
process.on("SIGTERM", closeHandler);
|
||||||
|
process.on("uncaughtException", closeHandler);
|
||||||
|
process.on("unhandledRejection", closeHandler);
|
||||||
|
|
||||||
|
// Function to remove process listeners to avoid memory leaks
|
||||||
|
const removeProcessListeners = () => {
|
||||||
|
process.off("exit", closeHandler);
|
||||||
|
process.off("SIGINT", closeHandler);
|
||||||
|
process.off("SIGTERM", closeHandler);
|
||||||
|
process.off("uncaughtException", closeHandler);
|
||||||
|
process.off("unhandledRejection", closeHandler);
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
);
|
||||||
});
|
});
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
fastify.register(async function (fastify) {
|
fastify.register(async function (fastify) {
|
||||||
fastify.get(
|
fastify.get(
|
||||||
|
|||||||
@ -1,24 +0,0 @@
|
|||||||
// Declare a route
|
|
||||||
module.exports = function (fastify, opts, done) {
|
|
||||||
|
|
||||||
const pb = opts.pb;
|
|
||||||
|
|
||||||
fastify.post('/feedback', async (request, reply) => {
|
|
||||||
const data = request.body;
|
|
||||||
|
|
||||||
try {
|
|
||||||
await pb.collection("feedback").create(data)
|
|
||||||
output = 'success';
|
|
||||||
}
|
|
||||||
catch(e) {
|
|
||||||
//console.log(e)
|
|
||||||
output = 'failure';
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
reply.send({ items: output })
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
done();
|
|
||||||
};
|
|
||||||
Loading…
x
Reference in New Issue
Block a user