bugfixing websocket

This commit is contained in:
MuslemRahimi 2025-03-18 16:15:29 +01:00
parent fbf7507b17
commit 18daa749cb
3 changed files with 70 additions and 58 deletions

View File

@ -37,8 +37,7 @@ async def fetch_options_activity(page):
try:
data = await asyncio.to_thread(fin.options_activity, date_from=start_date, date_to=end_date, page=page, pagesize=1000)
return orjson.loads(fin.output(data))['option_activity']
except Exception as e:
print(f"Exception on page {page}: {e}")
except:
return []
# Asynchronous function to fetch multiple pages
@ -79,7 +78,8 @@ def clean_and_filter_data(res_list):
filtered_list.append({key: value for key, value in item.items() if key not in ['description_extended', 'updated']})
except Exception as e:
print(f"Error processing item: {e}")
continue
pass
return filtered_list
# Main execution flow

View File

@ -350,6 +350,7 @@ def run_threaded(job_func):
# Schedule the job to run
schedule.every().day.at("01:00").do(run_threaded, run_db_schedule_job)
schedule.every().day.at("22:30").do(run_threaded, run_options_jobs).tag('options_job')
schedule.every().day.at("05:00").do(run_threaded, run_options_historical_flow).tag('options_historical_flow_job')

View File

@ -178,13 +178,20 @@ fastify.register(async function (fastify) {
});
fastify.register(async function (fastify) {
fastify.get("/options-flow-reader", { websocket: true }, (connection, req) => {
let sendInterval;
let pingInterval;
let sendTimeout = null;
let lastSentData = [];
// Function to send data to the client
const sendData = async () => {
if (sendTimeout) {
clearTimeout(sendTimeout);
sendTimeout = null;
}
const filePath = path.join(__dirname, "../app/json/options-flow/feed/data.json");
try {
@ -192,8 +199,7 @@ fastify.register(async function (fastify) {
const fileData = fs.readFileSync(filePath, "utf8").trim();
if (!fileData) {
console.error("File is empty:", filePath);
setTimeout(sendData, 2000);
sendTimeout = setTimeout(sendData, 2000);
return;
}
@ -201,55 +207,60 @@ fastify.register(async function (fastify) {
try {
parsedData = JSON.parse(fileData);
} catch (jsonErr) {
console.error("Invalid JSON format:", jsonErr);
setTimeout(sendData, 2000);
sendTimeout = setTimeout(sendData, 2000);
return;
}
// Send data only if the length has increased since the last send
if (parsedData.length > lastSentData.length) {
if (parsedData.length > lastSentData.length && connection.socket.readyState === 1) {
connection.socket.send(JSON.stringify(parsedData));
console.log("Options data sent: Length", parsedData.length);
lastSentData = parsedData;
}
} else {
console.error("File not found:", filePath);
setTimeout(sendData, 2000);
sendTimeout = setTimeout(sendData, 2000);
}
} catch (err) {
console.error("Error sending data to client:", err);
setTimeout(sendData, 2000);
sendTimeout = setTimeout(sendData, 2000);
}
};
};
// Send data to the client initially
// Initial send and interval setup
sendData();
// Start sending data periodically
sendInterval = setInterval(sendData, 500);
// Handle client disconnect
connection.socket.on("close", () => {
console.log("Client disconnected");
clearInterval(sendInterval);
});
// Heartbeat mechanism
pingInterval = setInterval(() => {
if (connection.socket.readyState === 1) {
connection.socket.ping();
}
}, 25000);
// Handle server crash cleanup
const closeHandler = () => {
console.log("Server is closing. Cleaning up resources...");
// Cleanup function
const cleanup = () => {
clearInterval(sendInterval);
if (connection.socket.readyState === connection.socket.OPEN) {
clearInterval(pingInterval);
if (sendTimeout) clearTimeout(sendTimeout);
[
'exit', 'SIGINT', 'SIGTERM',
'uncaughtException', 'unhandledRejection'
].forEach(event => process.off(event, cleanup));
if (connection.socket.readyState === 1) {
connection.socket.close();
}
};
// Add close handler to process events
process.on("exit", closeHandler);
process.on("SIGINT", closeHandler);
process.on("SIGTERM", closeHandler);
process.on("uncaughtException", closeHandler);
process.on("unhandledRejection", closeHandler);
// Process event handlers
process.on('exit', cleanup);
process.on('SIGINT', cleanup);
process.on('SIGTERM', cleanup);
process.on('uncaughtException', cleanup);
process.on('unhandledRejection', cleanup);
// WebSocket close handler
connection.socket.on('close', () => {
console.log('Client disconnected');
cleanup();
});
});
});