diff --git a/app/cron_analyst_db.py b/app/cron_analyst_db.py index faf3bb0..86c7d1c 100755 --- a/app/cron_analyst_db.py +++ b/app/cron_analyst_db.py @@ -261,9 +261,9 @@ def get_top_stocks(): result = [ { 'symbol': ticker, - 'upside': round((info['median'] / info.get('price') - 1) * 100, 2) if info.get('price') else None, - 'priceTarget': info['median'], - 'analystCounter': len(info['analyst_ids']), + 'topAnalystUpside': round((info['median'] / info.get('price') - 1) * 100, 2) if info.get('price') else None, + 'topAnalystPriceTarget': info['median'], + 'topAnalystCounter': len(info['analyst_ids']), 'analystRating': "Strong Buy", 'marketCap': info['marketCap'], 'name': info['name'] @@ -272,10 +272,10 @@ def get_top_stocks(): ] # Filter outliers with upside between 20% and 250% - result = [item for item in result if item['upside'] is not None and 10 <= item['upside'] <= 250] + result = [item for item in result if item['topAnalystUpside'] is not None and 10 <= item['topAnalystUpside'] <= 250] # Sort results by the number of unique analysts (analystCounter) in descending order - result_sorted = sorted(result, key=lambda x: x['analystCounter'] if x['analystCounter'] is not None else float('-inf'), reverse=True) + result_sorted = sorted(result, key=lambda x: x['topAnalystCounter'] if x['topAnalystCounter'] is not None else float('-inf'), reverse=True) # Top 50 stocks result_sorted = result_sorted[:50] diff --git a/app/restart_json.py b/app/restart_json.py index a129f2a..fae363d 100755 --- a/app/restart_json.py +++ b/app/restart_json.py @@ -179,6 +179,114 @@ def count_consecutive_growth_years(financial_data, key_element): return consecutive_years +def filter_latest_analyst_unique_rating(data): + latest_entries = {} + + for entry in data: + try: + # Create a unique key by combining 'analyst' and 'name' + key = f"{entry.get('analyst')}-{entry.get('name')}" + + # Convert date and time to a datetime object + date_time_str = f"{entry.get('date')}" + date_time = datetime.strptime(date_time_str, "%Y-%m-%d") + + # Check if this entry is the latest for the given key + if key not in latest_entries or date_time > latest_entries[key]['dateTime']: + latest_entries[key] = {'dateTime': date_time, 'entry': entry} + except Exception as e: + print(f"Error processing entry: {e}") + + # Extract and return the latest entries + return [value['entry'] for value in latest_entries.values()] + +def process_top_analyst_data(data, current_price): + data = [item for item in data if item.get('analystScore', 0) >= 4] if data else [] + if symbol == 'AMD': + print(data) + data = filter_latest_analyst_unique_rating(data) + + one_year_ago = datetime.now() - timedelta(days=365) + + # Filter recent data from the last 12 months + recent_data = [ + item for item in data + if 'date' in item and datetime.strptime(item['date'], "%Y-%m-%d") >= one_year_ago + ][:30] # Consider only the last 30 ratings + + # Count filtered analysts + if len(recent_data) > 0: + filtered_analyst_count = len(recent_data) + + # Extract and filter price targets + price_targets = [ + float(item['adjusted_pt_current']) for item in recent_data + if 'adjusted_pt_current' in item and item['adjusted_pt_current'] and not math.isnan(float(item['adjusted_pt_current'])) + ] + + # Calculate median price target + median_price_target = None + if price_targets: + price_targets.sort() + median_index = len(price_targets) // 2 + median_price_target = ( + price_targets[median_index] + if len(price_targets) % 2 != 0 else + (price_targets[median_index - 1] + price_targets[median_index]) / 2 + ) + if median_price_target <= 0: + median_price_target = None + # Calculate changes percentage + upside = None + if median_price_target != None and current_price is not None: + upside = round(((median_price_target / current_price - 1) * 100), 2) + + # Define rating scores + rating_scores = { + "Strong Buy": 5, + "Buy": 4, + "Hold": 3, + "Sell": 2, + "Strong Sell": 1, + } + + # Calculate total rating score + total_rating_score = sum( + rating_scores.get(item.get('rating_current'), 0) for item in recent_data + ) + + # Calculate average rating score + average_rating_score = ( + total_rating_score / filtered_analyst_count + if filtered_analyst_count > 0 else 0 + ) + + # Determine consensus rating + if average_rating_score >= 4.5: + consensus_rating = "Strong Buy" + elif average_rating_score >= 3.5: + consensus_rating = "Buy" + elif average_rating_score >= 2.5: + consensus_rating = "Hold" + elif average_rating_score >= 1.5: + consensus_rating = "Sell" + else: + consensus_rating = "Strong Sell" + + return { + "topAnalystCounter": filtered_analyst_count, + "topAnalystPriceTarget": median_price_target, + "topAnalystUpside": upside, + "topAnalystRating": consensus_rating, + } + else: + return { + "topAnalystCounter": None, + "topAnalystPriceTarget": None, + "topAnalystUpside": None, + "topAnalystRating": None, + } + def process_financial_data(file_path, key_list): """ @@ -662,6 +770,24 @@ async def get_stock_screener(con): item['priceTarget'] = None item['upside'] = None + #top analyst rating + try: + with open(f"json/analyst/history/{symbol}.json") as file: + data = orjson.loads(file.read()) + res_dict = process_top_analyst_data(data, item['price']) + + item['topAnalystCounter'] = res_dict['topAnalystCounter'] + item['topAnalystPriceTarget'] = res_dict['topAnalystPriceTarget'] + item['topAnalystUpside'] = res_dict['topAnalystUpside'] + item['topAnalystRating'] = res_dict['topAnalystRating'] + except: + item['topAnalystCounter'] = None + item['topAnalystPriceTarget'] = None + item['topAnalystUpside'] = None + item['topAnalystRating'] = None + + + try: with open(f"json/fail-to-deliver/companies/{symbol}.json", 'r') as file: res = orjson.loads(file.read())[-1]