From 441f154fbc4debc85b302b1199947830d1d93e44 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Mon, 25 Nov 2024 23:02:37 +0100 Subject: [PATCH] bugfixing --- app/cron_dashboard.py | 193 ++++++++++++++++++++++++------------------ fastify/app.js | 68 +++++++-------- 2 files changed, 145 insertions(+), 116 deletions(-) diff --git a/app/cron_dashboard.py b/app/cron_dashboard.py index 56e07c2..f538532 100644 --- a/app/cron_dashboard.py +++ b/app/cron_dashboard.py @@ -352,102 +352,129 @@ async def get_analyst_report(): return {} async def run(): - async with aiohttp.ClientSession() as session: - recent_earnings = await get_recent_earnings(session) + async with aiohttp.ClientSession() as session: + recent_earnings = await get_recent_earnings(session) - upcoming_earnings = await get_upcoming_earnings(session, today, filter_today=False) - # If results are less than 5, try without the time filter. - if len(upcoming_earnings) < 5: - upcoming_earnings = await get_upcoming_earnings(session, today, filter_today=True) + upcoming_earnings = await get_upcoming_earnings(session, today, filter_today=False) - # If still less than 5 results, try fetching for tomorrow. - if len(upcoming_earnings) < 5: - upcoming_earnings = await get_upcoming_earnings(session, tomorrow, filter_today=True) + upcoming_earnings = [ + item for item in upcoming_earnings + if item['symbol'] not in [earning['symbol'] for earning in recent_earnings] + ] - - #recent_dividends = await get_recent_dividends(session) - recent_analyst_report = await get_analyst_report() + if len(upcoming_earnings) < 5: + upcoming_earnings = await get_upcoming_earnings(session, today, filter_today=True) - #Avoid clashing of recent and upcoming earnings - upcoming_earnings = [item for item in upcoming_earnings if item['symbol'] not in [earning['symbol'] for earning in recent_earnings]] + if len(upcoming_earnings) < 5: + upcoming_earnings = await get_upcoming_earnings(session, tomorrow, filter_today=True) - try: - with open(f"json/retail-volume/data.json", 'r') as file: - retail_tracker = ujson.load(file)[0:5] - except: - retail_tracker = [] - try: - with open(f"json/options-flow/feed/data.json", 'r') as file: - options_flow = ujson.load(file) - - # Filter the options_flow to include only items with ticker in total_symbol - options_flow = [item for item in options_flow if item['ticker'] in stock_symbols] - - highest_volume = sorted(options_flow, key=lambda x: int(x['volume']), reverse=True) - highest_volume = [{key: item[key] for key in ['cost_basis', 'ticker','underlying_type', 'date_expiration', 'put_call', 'volume', 'strike_price']} for item in highest_volume[0:4]] + recent_analyst_report = await get_analyst_report() - highest_premium = sorted(options_flow, key=lambda x: int(x['cost_basis']), reverse=True) - highest_premium = [{key: item[key] for key in ['cost_basis', 'ticker','underlying_type', 'date_expiration', 'put_call', 'volume', 'strike_price']} for item in highest_premium[0:4]] - - highest_open_interest = sorted(options_flow, key=lambda x: int(x['open_interest']), reverse=True) - highest_open_interest = [{key: item[key] for key in ['cost_basis', 'ticker','underlying_type', 'date_expiration', 'put_call', 'open_interest', 'strike_price']} for item in highest_open_interest[0:4]] - - options_flow = {'premium': highest_premium, 'volume': highest_volume, 'openInterest':highest_open_interest} - except Exception as e: - print(e) - options_flow = {} + upcoming_earnings = [ + item for item in upcoming_earnings + if item['symbol'] not in [earning['symbol'] for earning in recent_earnings] + ] - market_status = check_market_hours() - if market_status == 0: - try: - with open(f"json/market-movers/markethours/gainers.json", 'r') as file: - gainers = ujson.load(file) - with open(f"json/market-movers/markethours/losers.json", 'r') as file: - losers = ujson.load(file) - market_movers = {'gainers': gainers['1D'][:5], 'losers': losers['1D'][:5]} - except: - market_movers = {} - elif market_status == 1: - try: - with open(f"json/market-movers/premarket/gainers.json", 'r') as file: - data = ujson.load(file) - gainers = [{ 'symbol': item['symbol'], 'name': item['name'], 'price': item['price'], 'changesPercentage': item['changesPercentage'], 'marketCap': item['marketCap']} for item in data[:5]] + + try: + with open("json/options-flow/feed/data.json", 'r') as file: + options_flow = ujson.load(file) + + # Filter the options_flow to include only items with ticker in total_symbol + options_flow = [item for item in options_flow if item['ticker'] in stock_symbols] + + highest_volume = sorted(options_flow, key=lambda x: int(x['volume']), reverse=True) + highest_volume = [ + {key: item[key] for key in ['cost_basis', 'ticker', 'underlying_type', 'date_expiration', 'put_call', 'volume', 'strike_price']} + for item in highest_volume[0:4] + ] - with open(f"json/market-movers/premarket/losers.json", 'r') as file: - data = ujson.load(file) - losers = [{ 'symbol': item['symbol'], 'name': item['name'], 'price': item['price'], 'changesPercentage': item['changesPercentage'], 'marketCap': item['marketCap']} for item in data[:5]] - - market_movers={'gainers': gainers, 'losers': losers} - except: - market_movers = {} - elif market_status == 2: - try: - with open(f"json/market-movers/afterhours/gainers.json", 'r') as file: - data = ujson.load(file) - gainers = [{ 'symbol': item['symbol'], 'name': item['name'], 'price': item['price'], 'changesPercentage': item['changesPercentage'], 'marketCap': item['marketCap']} for item in data[:5]] + highest_premium = sorted(options_flow, key=lambda x: int(x['cost_basis']), reverse=True) + highest_premium = [ + {key: item[key] for key in ['cost_basis', 'ticker', 'underlying_type', 'date_expiration', 'put_call', 'volume', 'strike_price']} + for item in highest_premium[0:4] + ] - with open(f"json/market-movers/afterhours/losers.json", 'r') as file: - data = ujson.load(file) - losers = [{ 'symbol': item['symbol'], 'name': item['name'], 'price': item['price'], 'changesPercentage': item['changesPercentage'], 'marketCap': item['marketCap']} for item in data[:5]] - - market_movers={'gainers': gainers, 'losers': losers} + highest_open_interest = sorted(options_flow, key=lambda x: int(x['open_interest']), reverse=True) + highest_open_interest = [ + {key: item[key] for key in ['cost_basis', 'ticker', 'underlying_type', 'date_expiration', 'put_call', 'open_interest', 'strike_price']} + for item in highest_open_interest[0:4] + ] - except: - market_movers = {} + options_flow = { + 'premium': highest_premium, + 'volume': highest_volume, + 'openInterest': highest_open_interest + } + except Exception as e: + print(e) + options_flow = {} - data = { - 'marketMovers': market_movers, - 'marketStatus': market_status, - 'optionsFlow': options_flow, - 'recentEarnings': recent_earnings, - 'upcomingEarnings': upcoming_earnings, - 'analystReport': recent_analyst_report, - } + market_status = check_market_hours() + if market_status == 0: + try: + with open("json/market-movers/markethours/gainers.json", 'r') as file: + gainers = ujson.load(file) + with open("json/market-movers/markethours/losers.json", 'r') as file: + losers = ujson.load(file) + market_movers = {'gainers': gainers['1D'][:5], 'losers': losers['1D'][:5]} + except: + market_movers = {} + elif market_status == 1: + try: + with open("json/market-movers/premarket/gainers.json", 'r') as file: + data = ujson.load(file) + gainers = [ + {'symbol': item['symbol'], 'name': item['name'], 'price': item['price'], + 'changesPercentage': item['changesPercentage'], 'marketCap': item['marketCap']} + for item in data[:5] + ] - - if len(data) > 0: - await save_json(data) + with open("json/market-movers/premarket/losers.json", 'r') as file: + data = ujson.load(file) + losers = [ + {'symbol': item['symbol'], 'name': item['name'], 'price': item['price'], + 'changesPercentage': item['changesPercentage'], 'marketCap': item['marketCap']} + for item in data[:5] + ] + + market_movers = {'gainers': gainers, 'losers': losers} + except: + market_movers = {} + elif market_status == 2: + try: + with open("json/market-movers/afterhours/gainers.json", 'r') as file: + data = ujson.load(file) + gainers = [ + {'symbol': item['symbol'], 'name': item['name'], 'price': item['price'], + 'changesPercentage': item['changesPercentage'], 'marketCap': item['marketCap']} + for item in data[:5] + ] + + with open("json/market-movers/afterhours/losers.json", 'r') as file: + data = ujson.load(file) + losers = [ + {'symbol': item['symbol'], 'name': item['name'], 'price': item['price'], + 'changesPercentage': item['changesPercentage'], 'marketCap': item['marketCap']} + for item in data[:5] + ] + + market_movers = {'gainers': gainers, 'losers': losers} + except: + market_movers = {} + + data = { + 'marketMovers': market_movers, + 'marketStatus': market_status, + 'optionsFlow': options_flow, + 'recentEarnings': recent_earnings, + 'upcomingEarnings': upcoming_earnings, + 'analystReport': recent_analyst_report, + } + + if len(data) > 0: + await save_json(data) try: diff --git a/fastify/app.js b/fastify/app.js index a2ed1e6..05d5d28 100755 --- a/fastify/app.js +++ b/fastify/app.js @@ -344,6 +344,7 @@ fastify.register(async function (fastify) { }); + fastify.register(async function (fastify) { fastify.get( "/multiple-realtime-data", @@ -351,71 +352,73 @@ fastify.register(async function (fastify) { (connection, req) => { let tickers = []; let sendInterval; - // Mapping for each ticker's `isSend` status to avoid duplicate sends - const tickerStatus = {}; - + // Store the last sent data for each ticker + const lastSentData = {}; + // Function to send data for all tickers as a list const sendData = async () => { const dataToSend = []; - + // Iterate over tickers and collect data for (const symbol of tickers) { const filePath = path.join( __dirname, `../app/json/websocket/companies/${symbol}.json` ); - + try { if (fs.existsSync(filePath)) { const fileData = fs.readFileSync(filePath, "utf8"); const jsonData = JSON.parse(fileData); - - // Only send data if conditions are met + + // Only send data if conditions are met and data has changed if ( jsonData?.ap != null && jsonData?.t != null && ["Q", "T"].includes(jsonData?.type) && - connection.socket.readyState === WebSocket.OPEN && - !tickerStatus[symbol] + connection.socket.readyState === WebSocket.OPEN ) { - // Collect data to send later - dataToSend.push({ - symbol, // Include the ticker symbol in the sent data - ap: jsonData?.ap, - }); - - // Set ticker as "sent" and reset after 500ms - tickerStatus[symbol] = true; - setTimeout(() => { - tickerStatus[symbol] = false; - }, 500); + // Check if the current data is different from the last sent data + const currentDataSignature = `${jsonData.ap}`; + const lastSentSignature = lastSentData[symbol]; + + if (currentDataSignature !== lastSentSignature) { + // Collect data to send + dataToSend.push({ + symbol, // Include the ticker symbol in the sent data + ap: jsonData.ap, + }); + + // Update the last sent data for this ticker + lastSentData[symbol] = currentDataSignature; + } } } else { console.error("File not found for ticker:", symbol); } } catch (err) { - console.error("Error sending data for ticker:", symbol, err); + console.error("Error processing data for ticker:", symbol, err); } } - + // Send all collected data as a single message if (dataToSend.length > 0 && connection.socket.readyState === WebSocket.OPEN) { connection.socket.send(JSON.stringify(dataToSend)); } }; - + // Start receiving messages from the client connection.socket.on("message", (message) => { try { // Parse message as JSON to get tickers array tickers = JSON.parse(message.toString("utf-8")); console.log("Received tickers from client:", tickers); - - // Initialize ticker status for each symbol - tickers.forEach((ticker) => { - tickerStatus[ticker] = false; + + // Reset last sent data for new tickers + tickers?.forEach((ticker) => { + lastSentData[ticker] = null; }); - + // Start periodic data sending if not already started if (!sendInterval) { sendInterval = setInterval(sendData, 5000); @@ -424,14 +427,14 @@ fastify.register(async function (fastify) { console.error("Failed to parse tickers from client message:", err); } }); - + // Handle client disconnect connection.socket.on("close", () => { console.log("Client disconnected"); clearInterval(sendInterval); removeProcessListeners(); }); - + // Handle server crash cleanup const closeHandler = () => { console.log("Server is closing. Cleaning up resources..."); @@ -439,14 +442,14 @@ fastify.register(async function (fastify) { connection.socket.close(); removeProcessListeners(); }; - + // Add close handler to process events process.on("exit", closeHandler); process.on("SIGINT", closeHandler); process.on("SIGTERM", closeHandler); process.on("uncaughtException", closeHandler); process.on("unhandledRejection", closeHandler); - + // Function to remove process listeners to avoid memory leaks const removeProcessListeners = () => { process.off("exit", closeHandler); @@ -461,7 +464,6 @@ fastify.register(async function (fastify) { - // Function to start the server function startServer() { if (!serverRunning) {