diff --git a/app/cron_options_bubble.py b/app/cron_options_bubble.py index 3e5ff88..037ccb6 100755 --- a/app/cron_options_bubble.py +++ b/app/cron_options_bubble.py @@ -97,7 +97,7 @@ def options_bubble_data(chunk): df['open_interest'] = pd.to_numeric(df['open_interest'], errors='coerce') df['days_to_expiration'] = (df['date_expiration'] - df['date']).dt.days - df_30d = df[(df['days_to_expiration'] >= 40) & (df['days_to_expiration'] <= 80)] + df_30d = df[(df['days_to_expiration'] >= 0) & (df['days_to_expiration'] <= 1000)] # Calculate implied volatility for options in the 30-day range iv_data = [] for _, option in df_30d.iterrows(): diff --git a/app/main.py b/app/main.py index c79e13d..6b9eec5 100755 --- a/app/main.py +++ b/app/main.py @@ -38,6 +38,7 @@ from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from functools import partial +from datetime import datetime # DB constants & context manager @@ -1256,8 +1257,6 @@ async def get_indicator(data: IndicatorListData, api_key: str = Security(get_api -from datetime import datetime - async def process_watchlist_ticker(ticker, rule_of_list, quote_keys_to_include, screener_dict, etf_symbols, crypto_symbols): """Process a single ticker concurrently.""" ticker = ticker.upper() @@ -1382,48 +1381,103 @@ async def get_watchlist(data: GetWatchList, api_key: str = Security(get_api_key) @app.post("/get-price-alert") -async def get_price_alert(data: UserId, api_key: str = Security(get_api_key)): - user_id = data.dict()['userId'] +async def get_price_alert(data: dict, api_key: str = Security(get_api_key)): + user_id = data.get('userId') + if not user_id: + raise HTTPException(status_code=400, detail="User ID is required") # Fetch all alerts for the user in a single database call - result = pb.collection("priceAlert").get_full_list(query_params={"filter": f"user='{user_id}' && triggered=false"}) + try: + result = pb.collection("priceAlert").get_full_list( + query_params={"filter": f"user='{user_id}' && triggered=false"} + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Database query failed: {str(e)}") + + # Extract unique tickers + unique_tickers = {item.symbol for item in result if hasattr(item, 'symbol')} + + async def fetch_ticker_data(ticker): + try: + news_task = load_json_async(f"json/market-news/companies/{ticker}.json") + earnings_task = load_json_async(f"json/earnings/next/{ticker}.json") + + news_dict, earnings_dict = await asyncio.gather(news_task, earnings_task) + + # Process news + news = [] + if news_dict: + news = [ + {key: value for key, value in item.items() if key not in ['image', 'text']} + for item in news_dict[:5] + ] + + # Process earnings + earnings = None + if earnings_dict: + earnings = {**earnings_dict, 'symbol': ticker} + + return news, earnings + except Exception as e: + print(f"Error fetching data for {ticker}: {e}") + return [], None - # Function to read JSON file asynchronously async def fetch_quote_data(item): try: async with aiofiles.open(f"json/quote/{item.symbol}.json", mode='r') as file: quote_data = orjson.loads(await file.read()) - return { - 'symbol': item.symbol, - 'name': item.name, - 'id': item.id, - 'assetType': item.asset_type, - 'targetPrice': item.target_price, - 'priceWhenCreated': item.price_when_created, - 'price': quote_data.get("price"), - 'changesPercentage': quote_data.get("changesPercentage"), - 'volume': quote_data.get("volume"), - } + + return { + 'symbol': item.symbol, + 'name': getattr(item, 'name', ''), + 'id': item.id, + 'assetType': getattr(item, 'asset_type', ''), + 'targetPrice': getattr(item, 'target_price', None), + 'condition': getattr(item, 'condition', '').capitalize(), + 'priceWhenCreated': getattr(item, 'price_when_created', None), + 'price': quote_data.get("price"), + 'changesPercentage': quote_data.get("changesPercentage"), + 'volume': quote_data.get("volume"), + } + except FileNotFoundError: + print(f"Quote file not found for {item.symbol}") + return None except Exception as e: print(f"Error processing {item.symbol}: {e}") return None - - # Run all fetch_quote_data tasks concurrently - tasks = [fetch_quote_data(item) for item in result] - res_list = [res for res in await asyncio.gather(*tasks) if res] - - # Serialize and compress the response data - res = orjson.dumps(res_list) - compressed_data = gzip.compress(res) - return StreamingResponse( - io.BytesIO(compressed_data), - media_type="application/json", - headers={"Content-Encoding": "gzip"} - ) + try: + # Run all tasks concurrently + ticker_tasks = [fetch_ticker_data(ticker) for ticker in unique_tickers] + quote_tasks = [fetch_quote_data(item) for item in result] + + ticker_results = await asyncio.gather(*ticker_tasks) + quote_results = await asyncio.gather(*quote_tasks) + + # Process results + combined_results = [res for res in quote_results if res] + combined_news = [news_item for news, _ in ticker_results for news_item in news] + combined_earnings = [earnings for _, earnings in ticker_results if earnings] + + # Final response structure + res = { + 'data': combined_results, + 'news': combined_news, + 'earnings': combined_earnings, + } + + # Serialize and compress the response data + res_serialized = orjson.dumps(res) + compressed_data = gzip.compress(res_serialized) + print(combined_earnings) + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) - - + except Exception as e: + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") def process_option_activity(item): item['put_call'] = 'Calls' if item['put_call'] == 'CALL' else 'Puts' @@ -2504,6 +2558,25 @@ async def get_pre_post_quote(data:TickerData, api_key: str = Security(get_api_ke redis_client.expire(cache_key, 60) # Set cache expiration time to 1 day return res +@app.post("/get-quote") +async def get_pre_post_quote(data:TickerData, api_key: str = Security(get_api_key)): + ticker = data.ticker.upper() + + cache_key = f"get-quote-{ticker}" + cached_result = redis_client.get(cache_key) + if cached_result: + return orjson.loads(cached_result) + + try: + with open(f"json/quote/{ticker}.json", 'rb') as file: + res = orjson.loads(file.read()) + except: + res = {} + + redis_client.set(cache_key, orjson.dumps(res)) + redis_client.expire(cache_key, 60) # Set cache expiration time to 1 day + return res + @app.post("/bull-bear-say") async def get_bull_bear_say(data:TickerData, api_key: str = Security(get_api_key)): ticker = data.ticker.upper()