diff --git a/app/cron_analyst_db.py b/app/cron_analyst_db.py index 2b841fe..2cc3953 100755 --- a/app/cron_analyst_db.py +++ b/app/cron_analyst_db.py @@ -18,6 +18,16 @@ api_key = os.getenv('BENZINGA_API_KEY') headers = {"accept": "application/json"} +query_template = """ + SELECT date, close + FROM "{ticker}" + WHERE date BETWEEN ? AND ? +""" +buy_ratings = ['Outperform', 'Overweight', 'Market Outperform', 'Buy', 'Positive', 'Sector Outperform'] + +sell_ratings = ['Negative', 'Underperform', 'Underweight', 'Reduce', 'Sell'] + +ticker_price_cache = {} # Dictionary to store cached price data for tickers # Define a function to remove duplicates based on a key def remove_duplicates(data, key): @@ -99,6 +109,12 @@ def calculate_rating(data): if difference > 30: normalized_rating = min(normalized_rating, 4.5) + if overall_success_rate < 50: + normalized_rating = min(normalized_rating, 4.6) + + if overall_average_return < 30: + normalized_rating = min(normalized_rating, 4.6) + return round(normalized_rating, 2) def get_top_stocks(): @@ -179,7 +195,15 @@ async def get_analyst_ratings(analyst_id, session): #print(f"Error fetching page {page} for analyst {analyst_id}: {e}") break - return res_list + + # Date filter: only include items with 'date' >= '2015-01-01' + filtered_data = [ + {key: value for key, value in item.items() if key not in {'url_news', 'url', 'url_calendar', 'updated', 'time', 'currency'}} + for item in res_list + if datetime.strptime(item['date'], '%Y-%m-%d') >= datetime(2015, 1, 1) + ] + + return filtered_data async def get_all_analyst_stats(): url = "https://api.benzinga.com/api/v2.1/calendar/ratings/analysts" @@ -202,7 +226,7 @@ async def get_all_analyst_stats(): res_list += data except Exception as e: pass - print(len(res_list)) + # Remove duplicates of analysts and filter based on ratings accuracy res_list = remove_duplicates(res_list, 'id') res_list = [item for item in res_list if item.get('ratings_accuracy', {}).get('total_ratings', 0) != 0] @@ -220,90 +244,181 @@ async def get_all_analyst_stats(): return final_list -async def process_analyst(item, session): +async def process_analyst(item, con, session, start_date, end_date): + # Fetch analyst ratings data = await get_analyst_ratings(item['analystId'], session) item['ratingsList'] = data - item['totalRatings'] = len(data) # True total ratings + item['totalRatings'] = len(data) item['lastRating'] = data[0]['date'] if data else None item['numOfStocks'] = len({d['ticker'] for d in data}) - # Stats dictionary for calculating score + total_return = 0 + valid_ratings_count = 0 + success_count = 0 # To track successful ratings for success rate calculation + + for stock in data: + try: + ticker = stock['ticker'] + rating_date = stock['date'] + rating_current = stock['rating_current'] + + # Skip neutral or undefined ratings + if rating_current not in buy_ratings and rating_current not in sell_ratings: + continue + + # Check if the ticker data is already cached + if ticker not in ticker_price_cache: + # If not cached, query the stock data and cache it + query = query_template.format(ticker=ticker) + df = pd.read_sql_query(query, con, params=(start_date, end_date)) + ticker_price_cache[ticker] = df + else: + # Use cached data + df = ticker_price_cache[ticker] + + # Ensure we have data for the rating date + rating_date_data = df[df['date'] == rating_date] + if rating_date_data.empty: + # Try finding the closest date within a few days if exact date is missing + for days_offset in range(1, 5): + closest_date = (pd.to_datetime(rating_date) - pd.Timedelta(days=days_offset)).strftime('%Y-%m-%d') + rating_date_data = df[df['date'] == closest_date] + if not rating_date_data.empty: + break + + if rating_date_data.empty: + continue # Skip if no close price data found + + # Get close price on rating date + close_price_on_rating = rating_date_data['close'].values[0] + + # Calculate the date 12 months later + future_date = (pd.to_datetime(rating_date) + pd.DateOffset(months=12)).strftime('%Y-%m-%d') + + # Try to find the close price 12 months later + future_date_data = df[df['date'] == future_date] + if future_date_data.empty: + # If 12 months price isn't available, use the latest available price + future_date_data = df.iloc[-1] # Use the last available price + if future_date_data.empty: + continue # If no future data, skip this rating + + close_price_in_future = future_date_data['close'] if isinstance(future_date_data, pd.Series) else future_date_data['close'].values[0] + + # Calculate return + stock_return = (close_price_in_future - close_price_on_rating) / close_price_on_rating + total_return += stock_return + valid_ratings_count += 1 + + # Determine if the rating was successful + if rating_current in buy_ratings: + if close_price_in_future > close_price_on_rating: + success_count += 1 # Success for buy ratings + elif rating_current in sell_ratings: + if close_price_in_future < close_price_on_rating: + success_count += 1 # Success for sell ratings + except: + pass + + # Calculate average return if there are valid ratings + if valid_ratings_count > 0: + item['avgReturn'] = round(total_return / valid_ratings_count * 100, 2) # Percentage format + else: + item['avgReturn'] = 0 + + # Calculate success rate + if valid_ratings_count > 0: + item['successRate'] = round(success_count / valid_ratings_count * 100, 2) # Success rate in percentage + else: + item['successRate'] = 0 + + # Populate other stats and score stats_dict = { 'avgReturn': item.get('avgReturn', 0), 'successRate': item.get('successRate', 0), 'totalRatings': item['totalRatings'], 'lastRating': item['lastRating'], } + item['analystScore'] = calculate_rating(stats_dict) -async def get_single_analyst_data(analyst_list): +async def get_single_analyst_data(analyst_list, con): + start_date = '2015-01-01' + end_date = datetime.today().strftime("%Y-%m-%d") + async with aiohttp.ClientSession() as session: - tasks = [process_analyst(item, session) for item in analyst_list] + tasks = [process_analyst(item, con, session, start_date, end_date) for item in analyst_list] for task in tqdm(asyncio.as_completed(tasks), total=len(analyst_list)): await task async def run(): - #Step1 get all analyst id's and stats - analyst_list = await get_all_analyst_stats() - print('Number of analyst:', len(analyst_list)) - #Step2 get rating history for each individual analyst and score the analyst - await get_single_analyst_data(analyst_list) + # Step1: Get all analyst id's and stats + con = sqlite3.connect('stocks.db') + analyst_list = await get_all_analyst_stats() + print('Number of analysts:', len(analyst_list)) + + #Test Mode + #analyst_list = [ item for item in analyst_list if item['analystId'] =='597f5b95c1f5580001ef542a'] - try: - con = sqlite3.connect('stocks.db') - print('Start extracting main sectors') - for item in tqdm(analyst_list): - ticker_list = [entry['ticker'] for entry in item['ratingsList']] - sector_list = [] - for ticker in ticker_list: - sector = extract_sector(ticker, con) - sector_list.append(sector) + # Step2: Get rating history for each individual analyst and score the analyst + await get_single_analyst_data(analyst_list, con) + try: + print('Start extracting main sectors') + for item in tqdm(analyst_list): + ticker_list = [entry['ticker'] for entry in item['ratingsList']] + sector_list = [] + for ticker in ticker_list: + sector = extract_sector(ticker, con) + sector_list.append(sector) - sector_counts = Counter(sector_list) - main_sectors = sector_counts.most_common(3) - main_sectors = [item[0] for item in main_sectors if item[0] is not None] - item['mainSectors'] = main_sectors - con.close() - except Exception as e: - print(e) + sector_counts = Counter(sector_list) + main_sectors = sector_counts.most_common(3) + main_sectors = [item[0] for item in main_sectors if item[0] is not None] + item['mainSectors'] = main_sectors - analyst_list = sorted(analyst_list, key=lambda x: float(x['analystScore']), reverse=True) - number_of_all_analysts = len(analyst_list) + except Exception as e: + print(e) - for rank, item in enumerate(analyst_list): - item['rank'] = rank+1 - item['numOfAnalysts'] = number_of_all_analysts - item['avgReturn'] = round(float(item['avgReturn']),2) - item['successRate'] = round(float(item['successRate']),2) - with open(f"json/analyst/analyst-db/{item['analystId']}.json", 'w') as file: - ujson.dump(item, file) + # Sort analysts by score + analyst_list = sorted(analyst_list, key=lambda x: (float(x['analystScore']), float(x['avgReturn']), float(x['successRate'])), reverse=True) + number_of_all_analysts = len(analyst_list) + # Assign rank and other metrics to analysts + for rank, item in enumerate(analyst_list): + item['rank'] = rank + 1 + item['numOfAnalysts'] = number_of_all_analysts + item['avgReturn'] = round(float(item['avgReturn']), 2) + item['successRate'] = round(float(item['successRate']), 2) + with open(f"json/analyst/analyst-db/{item['analystId']}.json", 'w') as file: + ujson.dump(item, file) - #Save top 100 analysts - top_analysts_list = [] - #Drop the element ratingsList for the top 100 analysts list - for item in analyst_list[0:100]: - top_analysts_list.append({ - 'analystName': item['analystName'], - 'analystId': item['analystId'], - 'rank': item['rank'], - 'analystScore': item['analystScore'], - 'companyName': item['companyName'], - 'successRate': item['successRate'], - 'avgReturn': item['avgReturn'], - 'totalRatings': item['totalRatings'], - 'lastRating': item['lastRating'] - }) + # Save top 100 analysts + top_analysts_list = [] + for item in analyst_list[0:100]: + top_analysts_list.append({ + 'analystName': item['analystName'], + 'analystId': item['analystId'], + 'rank': item['rank'], + 'analystScore': item['analystScore'], + 'companyName': item['companyName'], + 'successRate': item['successRate'], + 'avgReturn': item['avgReturn'], + 'totalRatings': item['totalRatings'], + 'lastRating': item['lastRating'] + }) - with open(f"json/analyst/top-analysts.json", 'w') as file: - ujson.dump(top_analysts_list, file) + with open(f"json/analyst/top-analysts.json", 'w') as file: + ujson.dump(top_analysts_list, file) - #Save all analyst data in raw form for the next step - with open(f"json/analyst/all-analyst-data.json", 'w') as file: - ujson.dump(analyst_list, file) + # Save all analyst data in raw form for the next step + with open(f"json/analyst/all-analyst-data.json", 'w') as file: + ujson.dump(analyst_list, file) - #Save top stocks with strong buys from 5 star analysts - get_top_stocks() + # Save top stocks with strong buys from 5-star analysts + get_top_stocks() + + # Close the connection + con.close() if __name__ == "__main__":