From 144a79f00a437b66a56112796eee59dd41d11539 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Fri, 4 Oct 2024 20:01:23 +0200 Subject: [PATCH] update analyst cron job --- app/cron_ai_score.py | 532 +++++++++--------- app/cron_analyst_db.py | 198 ++++--- app/cron_analyst_ticker.py | 8 +- .../__pycache__/score_model.cpython-310.pyc | Bin 3174 -> 3028 bytes app/ml_models/score_model.py | 11 +- .../feature_engineering.cpython-310.pyc | Bin 0 -> 6816 bytes app/utils/feature_engineering.py | 204 +++++++ 7 files changed, 570 insertions(+), 383 deletions(-) create mode 100644 app/utils/__pycache__/feature_engineering.cpython-310.pyc create mode 100644 app/utils/feature_engineering.py diff --git a/app/cron_ai_score.py b/app/cron_ai_score.py index 950b422..81f3d1d 100644 --- a/app/cron_ai_score.py +++ b/app/cron_ai_score.py @@ -12,12 +12,9 @@ from tqdm import tqdm import concurrent.futures import re from itertools import combinations - -from ta.momentum import * -from ta.trend import * -from ta.volatility import * -from ta.volume import * +import os import gc +from utils.feature_engineering import * #Enable automatic garbage collection gc.enable() @@ -25,288 +22,262 @@ async def save_json(symbol, data): with open(f"json/ai-score/companies/{symbol}.json", 'wb') as file: file.write(orjson.dumps(data)) - -def trend_intensity(close, window=20): - ma = close.rolling(window=window).mean() - std = close.rolling(window=window).std() - return ((close - ma) / std).abs().rolling(window=window).mean() - - -def calculate_fdi(high, low, close, window=30): - n1 = (np.log(high.rolling(window=window).max() - low.rolling(window=window).min()) - - np.log(close.rolling(window=window).max() - close.rolling(window=window).min())) / np.log(2) - return (2 - n1) * 100 - - - +def top_uncorrelated_features(df, target_col='Target', top_n=10, threshold=0.75): + # Drop the columns to exclude from the DataFrame + df_filtered = df.drop(columns=['date','price']) + + # Compute the correlation matrix + correlation_matrix = df_filtered.corr() + + # Get the correlations with the target column, sorted by absolute value + correlations_with_target = correlation_matrix[target_col].drop(target_col).abs().sort_values(ascending=False) + + # Initialize the list of selected features + selected_features = [] + + # Iteratively select the most correlated features while minimizing correlation with each other + for feature in correlations_with_target.index: + # If we already have enough features, break + if len(selected_features) >= top_n: + break + + # Check correlation of this feature with already selected features + is_uncorrelated = True + for selected in selected_features: + if abs(correlation_matrix.loc[feature, selected]) > threshold: + is_uncorrelated = False + break + + # If it's uncorrelated with the selected features, add it to the list + if is_uncorrelated: + selected_features.append(feature) + return selected_features async def download_data(ticker, con, start_date, end_date): - try: - # Define paths to the statement files - statements = [ - f"json/financial-statements/ratios/quarter/{ticker}.json", - f"json/financial-statements/key-metrics/quarter/{ticker}.json", - f"json/financial-statements/cash-flow-statement/quarter/{ticker}.json", - f"json/financial-statements/income-statement/quarter/{ticker}.json", - f"json/financial-statements/balance-sheet-statement/quarter/{ticker}.json", - f"json/financial-statements/income-statement-growth/quarter/{ticker}.json", - f"json/financial-statements/balance-sheet-statement-growth/quarter/{ticker}.json", - f"json/financial-statements/cash-flow-statement-growth/quarter/{ticker}.json", - f"json/financial-statements/owner-earnings/quarter/{ticker}.json", - ] - # Helper function to load JSON data asynchronously - async def load_json_from_file(path): - async with aiofiles.open(path, 'r') as f: - content = await f.read() - return orjson.loads(content) + file_path = f"ml_models/training_data/ai-score/{ticker}.json" - # Helper function to filter data based on keys and year - async def filter_data(data, ignore_keys, year_threshold=2000): - return [{k: v for k, v in item.items() if k not in ignore_keys} for item in data if int(item["date"][:4]) >= year_threshold] + if os.path.exists(file_path): + with open(file_path, 'rb') as file: + return pd.DataFrame(orjson.loads(file.read())) + else: - # Define keys to ignore - ignore_keys = ["symbol", "reportedCurrency", "calendarYear", "fillingDate", "acceptedDate", "period", "cik", "link", "finalLink","pbRatio","ptbRatio"] + try: + # Define paths to the statement files + statements = [ + f"json/financial-statements/ratios/quarter/{ticker}.json", + f"json/financial-statements/key-metrics/quarter/{ticker}.json", + f"json/financial-statements/cash-flow-statement/quarter/{ticker}.json", + f"json/financial-statements/income-statement/quarter/{ticker}.json", + f"json/financial-statements/balance-sheet-statement/quarter/{ticker}.json", + f"json/financial-statements/income-statement-growth/quarter/{ticker}.json", + f"json/financial-statements/balance-sheet-statement-growth/quarter/{ticker}.json", + f"json/financial-statements/cash-flow-statement-growth/quarter/{ticker}.json", + f"json/financial-statements/owner-earnings/quarter/{ticker}.json", + ] - # Load and filter data for each statement type + # Helper function to load JSON data asynchronously + async def load_json_from_file(path): + async with aiofiles.open(path, 'r') as f: + content = await f.read() + return orjson.loads(content) - ratios = await load_json_from_file(statements[0]) - ratios = await filter_data(ratios, ignore_keys) + # Helper function to filter data based on keys and year + async def filter_data(data, ignore_keys, year_threshold=2000): + return [{k: v for k, v in item.items() if k not in ignore_keys} for item in data if int(item["date"][:4]) >= year_threshold] - #Threshold of enough datapoints needed! - if len(ratios) < 50: - return + # Define keys to ignore + ignore_keys = ["symbol", "reportedCurrency", "calendarYear", "fillingDate", "acceptedDate", "period", "cik", "link", "finalLink","pbRatio","ptbRatio"] - key_metrics = await load_json_from_file(statements[1]) - key_metrics = await filter_data(key_metrics, ignore_keys) - - - cashflow = await load_json_from_file(statements[2]) - cashflow = await filter_data(cashflow, ignore_keys) + # Load and filter data for each statement type - income = await load_json_from_file(statements[3]) - income = await filter_data(income, ignore_keys) + ratios = await load_json_from_file(statements[0]) + ratios = await filter_data(ratios, ignore_keys) - balance = await load_json_from_file(statements[4]) - balance = await filter_data(balance, ignore_keys) - - income_growth = await load_json_from_file(statements[5]) - income_growth = await filter_data(income_growth, ignore_keys) + #Threshold of enough datapoints needed! + if len(ratios) < 50: + return - balance_growth = await load_json_from_file(statements[6]) - balance_growth = await filter_data(balance_growth, ignore_keys) - - - cashflow_growth = await load_json_from_file(statements[7]) - cashflow_growth = await filter_data(cashflow_growth, ignore_keys) - - owner_earnings = await load_json_from_file(statements[8]) - owner_earnings = await filter_data(owner_earnings, ignore_keys) - - - # Combine all the data - combined_data = defaultdict(dict) - - # Merge the data based on 'date' - for entries in zip(ratios,key_metrics,income, balance, cashflow, owner_earnings, income_growth, balance_growth, cashflow_growth): - for entry in entries: - date = entry['date'] - for key, value in entry.items(): - if key not in combined_data[date]: - combined_data[date][key] = value - - combined_data = list(combined_data.values()) - - # Download historical stock data using yfinance - df = yf.download(ticker, start=start_date, end=end_date, interval="1d").reset_index() - df = df.rename(columns={'Adj Close': 'close', 'Date': 'date', 'Open': 'open', 'High': 'high', 'Low': 'low', 'Volume': 'volume'}) - df['date'] = df['date'].dt.strftime('%Y-%m-%d') - - df['sma_50'] = df['close'].rolling(window=50).mean() - df['sma_200'] = df['close'].rolling(window=200).mean() - df['sma_crossover'] = ((df['sma_50'] > df['sma_200']) & (df['sma_50'].shift(1) <= df['sma_200'].shift(1))).astype(int) - - df['ema_50'] = EMAIndicator(close=df['close'], window=50).ema_indicator() - df['ema_200'] = EMAIndicator(close=df['close'], window=200).ema_indicator() - df['ema_crossover'] = ((df['ema_50'] > df['ema_200']) & (df['ema_50'].shift(1) <= df['ema_200'].shift(1))).astype(int) - - ichimoku = IchimokuIndicator(high=df['high'], low=df['low']) - df['ichimoku_a'] = ichimoku.ichimoku_a() - df['ichimoku_b'] = ichimoku.ichimoku_b() - df['atr'] = AverageTrueRange(high=df['high'], low=df['low'], close=df['close']).average_true_range() - bb = BollingerBands(close=df['close']) - df['bb_width'] = (bb.bollinger_hband() - bb.bollinger_lband()) / df['close'] - - df['volatility'] = df['close'].rolling(window=30).std() - df['daily_return'] = df['close'].pct_change() - df['cumulative_return'] = (1 + df['daily_return']).cumprod() - 1 - df['volume_change'] = df['volume'].pct_change() - df['roc'] = df['close'].pct_change(periods=60) - df['avg_volume'] = df['volume'].rolling(window=60).mean() - df['drawdown'] = df['close'] / df['close'].rolling(window=252).max() - 1 - - - df['macd'] = macd(df['close']) - df['macd_signal'] = macd_signal(df['close']) - df['macd_hist'] = 2*macd_diff(df['close']) - df['adx'] = adx(df['high'],df['low'],df['close']) - df["adx_pos"] = adx_pos(df['high'],df['low'],df['close']) - df["adx_neg"] = adx_neg(df['high'],df['low'],df['close']) - df['cci'] = CCIIndicator(high=df['high'], low=df['low'], close=df['close']).cci() - df['mfi'] = MFIIndicator(high=df['high'], low=df['low'], close=df['close'], volume=df['volume']).money_flow_index() - - df['nvi'] = NegativeVolumeIndexIndicator(close=df['close'], volume=df['volume']).negative_volume_index() - df['obv'] = OnBalanceVolumeIndicator(close=df['close'], volume=df['volume']).on_balance_volume() - df['vpt'] = VolumePriceTrendIndicator(close=df['close'], volume=df['volume']).volume_price_trend() - - df['rsi'] = rsi(df["close"], window=60) - df['rolling_rsi'] = df['rsi'].rolling(window=10).mean() - df['stoch_rsi'] = stochrsi_k(df['close'], window=60, smooth1=3, smooth2=3) - df['rolling_stoch_rsi'] = df['stoch_rsi'].rolling(window=10).mean() - - df['adi'] = acc_dist_index(high=df['high'],low=df['low'],close=df['close'],volume=df['volume']) - df['cmf'] = chaikin_money_flow(high=df['high'],low=df['low'],close=df['close'],volume=df['volume'], window=20) - df['emv'] = ease_of_movement(high=df['high'],low=df['low'],volume=df['volume'], window=20) - df['fi'] = force_index(close=df['close'], volume=df['volume'], window= 13) - - df['williams'] = WilliamsRIndicator(high=df['high'], low=df['low'], close=df['close']).williams_r() - df['kama'] = KAMAIndicator(close=df['close']).kama() - - df['stoch'] = stoch(df['high'], df['low'], df['close'], window=30) - df['rocr'] = df['close'] / df['close'].shift(30) - 1 # Rate of Change Ratio (ROCR) - df['ppo'] = (df['ema_50'] - df['ema_200']) / df['ema_50'] * 100 - df['vwap'] = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum() - df['volatility_ratio'] = df['close'].rolling(window=30).std() / df['close'].rolling(window=60).std() - - df['fdi'] = calculate_fdi(df['high'], df['low'], df['close']) - df['tii'] = trend_intensity(df['close']) - - - ta_indicators = [ - 'rsi', 'macd', 'macd_signal', 'macd_hist', 'adx', 'adx_pos', 'adx_neg', - 'cci', 'mfi', 'nvi', 'obv', 'vpt', 'stoch_rsi','bb_width', - 'adi', 'cmf', 'emv', 'fi', 'williams', 'stoch','sma_crossover', - 'volatility','daily_return','cumulative_return', 'roc','avg_volume', - 'rolling_rsi','rolling_stoch_rsi', 'ema_crossover','ichimoku_a','ichimoku_b', - 'atr','kama','rocr','ppo','volatility_ratio','vwap','tii','fdi','drawdown', - 'volume_change' - ] - - # Match each combined data entry with the closest available stock price in df - for item in combined_data: - target_date = item['date'] - counter = 0 - max_attempts = 10 - - # Look for the closest matching date in the stock data - while target_date not in df['date'].values and counter < max_attempts: - target_date = (pd.to_datetime(target_date) - pd.Timedelta(days=1)).strftime('%Y-%m-%d') - counter += 1 - - # If max attempts are reached and no matching date is found, skip the entry - if counter == max_attempts: - continue - - # Find the close price for the matching date - close_price = round(df[df['date'] == target_date]['close'].values[0], 2) - item['price'] = close_price - - # Dynamically add all indicator values to the combined_data entry + key_metrics = await load_json_from_file(statements[1]) + key_metrics = await filter_data(key_metrics, ignore_keys) - for indicator in ta_indicators: - indicator_value = df[df['date'] == target_date][indicator].values[0] - item[indicator] = indicator_value # Add the indicator value to the combined_data entry + + cashflow = await load_json_from_file(statements[2]) + cashflow = await filter_data(cashflow, ignore_keys) + + income = await load_json_from_file(statements[3]) + income = await filter_data(income, ignore_keys) + + balance = await load_json_from_file(statements[4]) + balance = await filter_data(balance, ignore_keys) + + income_growth = await load_json_from_file(statements[5]) + income_growth = await filter_data(income_growth, ignore_keys) + + balance_growth = await load_json_from_file(statements[6]) + balance_growth = await filter_data(balance_growth, ignore_keys) + + + cashflow_growth = await load_json_from_file(statements[7]) + cashflow_growth = await filter_data(cashflow_growth, ignore_keys) + + owner_earnings = await load_json_from_file(statements[8]) + owner_earnings = await filter_data(owner_earnings, ignore_keys) + + + # Combine all the data + combined_data = defaultdict(dict) + + # Merge the data based on 'date' + for entries in zip(ratios,key_metrics,income, balance, cashflow, owner_earnings, income_growth, balance_growth, cashflow_growth): + for entry in entries: + date = entry['date'] + for key, value in entry.items(): + if key not in combined_data[date]: + combined_data[date][key] = value + + combined_data = list(combined_data.values()) + + # Download historical stock data using yfinance + df = yf.download(ticker, start=start_date, end=end_date, interval="1d").reset_index() + df = df.rename(columns={'Adj Close': 'close', 'Date': 'date', 'Open': 'open', 'High': 'high', 'Low': 'low', 'Volume': 'volume'}) + df['date'] = df['date'].dt.strftime('%Y-%m-%d') + + + # Get the list of columns in df + df_columns = df.columns + df_stats = generate_statistical_features(df) + df_ta = generate_ta_features(df) + + # Filter columns in df_stats and df_ta that are not in df + df_stats_filtered = df_stats.drop(columns=df_columns.intersection(df_stats.columns), errors='ignore') + df_ta_filtered = df_ta.drop(columns=df_columns.intersection(df_ta.columns), errors='ignore') + ta_columns = df_ta_filtered.columns.tolist() + stats_columns = df_stats_filtered.columns.tolist() + + # Concatenate df with the filtered df_stats and df_ta + df = pd.concat([df, df_ta_filtered, df_stats_filtered], axis=1) + + + # Match each combined data entry with the closest available stock price in df + for item in combined_data: + target_date = item['date'] + counter = 0 + max_attempts = 10 + + # Look for the closest matching date in the stock data + while target_date not in df['date'].values and counter < max_attempts: + target_date = (pd.to_datetime(target_date) - pd.Timedelta(days=1)).strftime('%Y-%m-%d') + counter += 1 + + # If max attempts are reached and no matching date is found, skip the entry + if counter == max_attempts: + continue + + # Find the close price for the matching date + close_price = round(df[df['date'] == target_date]['close'].values[0], 2) + item['price'] = close_price + + # Dynamically add all indicator values to the combined_data entry + + for column in ta_columns: + column_value = df[df['date'] == target_date][column].values[0] + item[column] = column_value # Add the column value to the combined_data entry + for column in stats_columns: + column_value = df[df['date'] == target_date][column].values[0] + item[column] = column_value # Add the column value to the combined_data entry + + + # Sort the combined data by date + combined_data = sorted(combined_data, key=lambda x: x['date']) + # Convert combined data into a DataFrame + df_combined = pd.DataFrame(combined_data).dropna() + ''' + fundamental_columns = [ + 'revenue', + 'costOfRevenue', + 'grossProfit', + 'netIncome', + 'operatingIncome', + 'operatingExpenses', + 'researchAndDevelopmentExpenses', + 'ebitda', + 'freeCashFlow', + 'incomeBeforeTax', + 'incomeTaxExpense', + 'debtRepayment', + 'dividendsPaid', + 'depreciationAndAmortization', + 'netCashUsedProvidedByFinancingActivities', + 'changeInWorkingCapital', + 'stockBasedCompensation', + 'deferredIncomeTax', + 'commonStockRepurchased', + 'operatingCashFlow', + 'capitalExpenditure', + 'accountsReceivables', + 'purchasesOfInvestments', + 'cashAndCashEquivalents', + 'shortTermInvestments', + 'cashAndShortTermInvestments', + 'longTermInvestments', + 'otherCurrentLiabilities', + 'totalCurrentLiabilities', + 'longTermDebt', + 'totalDebt', + 'netDebt', + 'commonStock', + 'totalEquity', + 'totalLiabilitiesAndStockholdersEquity', + 'totalStockholdersEquity', + 'totalInvestments', + 'taxAssets', + 'totalAssets', + 'inventory', + 'propertyPlantEquipmentNet', + 'ownersEarnings', + ] + + # Compute ratios for all combinations of key elements + new_columns = {} + # Loop over combinations of column pairs + for columns in [fundamental_columns]: + for num, denom in combinations(columns, 2): + # Compute ratio and reverse ratio + ratio = df_combined[num] / df_combined[denom] + reverse_ratio = round(df_combined[denom] / df_combined[num],2) + + # Define column names for both ratios + column_name = f'{num}_to_{denom}' + reverse_column_name = f'{denom}_to_{num}' + + # Store the new columns in the dictionary, replacing invalid values with 0 + new_columns[column_name] = np.nan_to_num(ratio, nan=0, posinf=0, neginf=0) + new_columns[reverse_column_name] = np.nan_to_num(reverse_ratio, nan=0, posinf=0, neginf=0) + + # Add all new columns to the original DataFrame at once + df_combined = pd.concat([df_combined, pd.DataFrame(new_columns)], axis=1) + ''' + # To defragment the DataFrame, make a copy + df_combined = df_combined.copy() + df_combined = df_combined.dropna() + df_combined = df_combined.where(~df_combined.isin([np.inf, -np.inf]), 0) - # Sort the combined data by date - combined_data = sorted(combined_data, key=lambda x: x['date']) - # Convert combined data into a DataFrame - df_combined = pd.DataFrame(combined_data).dropna() + df_combined['Target'] = ((df_combined['price'].shift(-1) - df_combined['price']) / df_combined['price'] > 0).astype(int) - - key_elements = [ - 'revenue', - 'costOfRevenue', - 'grossProfit', - 'netIncome', - 'operatingIncome', - 'operatingExpenses', - 'researchAndDevelopmentExpenses', - 'ebitda', - 'freeCashFlow', - 'incomeBeforeTax', - 'incomeTaxExpense', - 'debtRepayment', - 'dividendsPaid', - 'depreciationAndAmortization', - 'netCashUsedProvidedByFinancingActivities', - 'changeInWorkingCapital', - 'stockBasedCompensation', - 'deferredIncomeTax', - 'commonStockRepurchased', - 'operatingCashFlow', - 'capitalExpenditure', - 'accountsReceivables', - 'purchasesOfInvestments', - 'cashAndCashEquivalents', - 'shortTermInvestments', - 'cashAndShortTermInvestments', - 'longTermInvestments', - 'otherCurrentLiabilities', - 'totalCurrentLiabilities', - 'longTermDebt', - 'totalDebt', - 'netDebt', - 'commonStock', - 'totalEquity', - 'totalLiabilitiesAndStockholdersEquity', - 'totalStockholdersEquity', - 'totalInvestments', - 'taxAssets', - 'totalAssets', - 'inventory', - 'propertyPlantEquipmentNet', - 'ownersEarnings', - ] - # Compute ratios for all combinations of key elements + df_copy = df_combined.copy() + df_copy = df_copy.map(lambda x: round(x, 2) if isinstance(x, float) else x) - new_columns = {} + if df_copy.shape[0] > 0: + with open(file_path, 'wb') as file: + file.write(orjson.dumps(df_copy.to_dict(orient='records'))) - # Loop over combinations of column pairs - for num, denom in combinations(key_elements, 2): - # Compute ratio and reverse ratio - ratio = df_combined[num] / df_combined[denom] - reverse_ratio = df_combined[denom] / df_combined[num] + return df_copy - # Define column names for both ratios - column_name = f'{num}_to_{denom}' - reverse_column_name = f'{denom}_to_{num}' - - # Store the new columns in the dictionary, replacing invalid values with 0 - new_columns[column_name] = np.nan_to_num(ratio, nan=0, posinf=0, neginf=0) - new_columns[reverse_column_name] = np.nan_to_num(reverse_ratio, nan=0, posinf=0, neginf=0) - - # Add all new columns to the original DataFrame at once - df_combined = pd.concat([df_combined, pd.DataFrame(new_columns)], axis=1) - - - # To defragment the DataFrame, make a copy - df_combined = df_combined.copy() - - - # Create 'Target' column based on price change - df_combined['Target'] = ((df_combined['price'].shift(-1) - df_combined['price']) / df_combined['price'] > 0).astype(int) - - # Return a copy of the combined DataFrame - df_combined = df_combined.dropna() - df_combined = df_combined.where(~df_combined.isin([np.inf, -np.inf]), 0) - df_copy = df_combined.copy() - - return df_copy - - except Exception as e: - print(e) - pass + except Exception as e: + print(e) + pass async def chunked_gather(tickers, con, start_date, end_date, chunk_size=10): @@ -327,8 +298,6 @@ async def chunked_gather(tickers, con, start_date, end_date, chunk_size=10): return results - - async def warm_start_training(tickers, con): start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d") end_date = datetime.today().strftime("%Y-%m-%d") @@ -336,7 +305,7 @@ async def warm_start_training(tickers, con): df_test = pd.DataFrame() test_size = 0.2 - dfs = await chunked_gather(tickers, con, start_date, end_date, chunk_size=10) + dfs = await chunked_gather(tickers, con, start_date, end_date, chunk_size=1) train_list = [] test_list = [] @@ -356,13 +325,14 @@ async def warm_start_training(tickers, con): # Concatenate all at once outside the loop df_train = pd.concat(train_list, ignore_index=True) df_test = pd.concat(test_list, ignore_index=True) - + df_train = df_train.sample(frac=1).reset_index(drop=True) + df_test = df_test.sample(frac=1).reset_index(drop=True) + print('======Warm Start Train Set Datapoints======') - df_train = df_train.sample(frac=1).reset_index(drop=True) #df_train.reset_index(drop=True) print(len(df_train)) - + predictor = ScorePredictor() - selected_features = [col for col in df_train if col not in ['price', 'date', 'Target']] + selected_features = [col for col in df_train if col not in ['price', 'date', 'Target']] #top_uncorrelated_features(df_train, top_n=200) predictor.warm_start_training(df_train[selected_features], df_train['Target']) predictor.evaluate_model(df_test[selected_features], df_test['Target']) @@ -370,7 +340,7 @@ async def warm_start_training(tickers, con): async def fine_tune_and_evaluate(ticker, con, start_date, end_date): try: - df = await download_data(ticker, con, start_date, end_date) + df = await download_data(ticker,con, start_date, end_date) if df is None or len(df) == 0: print(f"No data available for {ticker}") return @@ -380,7 +350,7 @@ async def fine_tune_and_evaluate(ticker, con, start_date, end_date): train_data = df.iloc[:split_size] test_data = df.iloc[split_size:] - selected_features = [col for col in df.columns if col not in ['date','price','Target']] + selected_features = top_uncorrelated_features(train_data,top_n=50) #[col for col in train_data if col not in ['price', 'date', 'Target']] #top_uncorrelated_features(train_data,top_n=20) # Fine-tune the model predictor = ScorePredictor() predictor.fine_tune_model(train_data[selected_features], train_data['Target']) @@ -402,25 +372,27 @@ async def fine_tune_and_evaluate(ticker, con, start_date, end_date): del predictor # Explicitly delete the predictor to aid garbage collection async def run(): - train_mode = False # Set this to False for fine-tuning and evaluation + train_mode = True # Set this to False for fine-tuning and evaluation con = sqlite3.connect('stocks.db') cursor = con.cursor() cursor.execute("PRAGMA journal_mode = wal") if train_mode: # Warm start training - cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 10E9 AND symbol NOT LIKE '%.%' AND symbol NOT LIKE '%-%'") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%' AND symbol NOT LIKE '%-%'") warm_start_symbols = [row[0] for row in cursor.fetchall()] print('Warm Start Training for:', warm_start_symbols) predictor = await warm_start_training(warm_start_symbols, con) else: # Fine-tuning and evaluation for all stocks cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%'") - stock_symbols = [row[0] for row in cursor.fetchall()] + stock_symbols = ['AWR'] #[row[0] for row in cursor.fetchall()] print(f"Total tickers for fine-tuning: {len(stock_symbols)}") start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d") end_date = datetime.today().strftime("%Y-%m-%d") + + tasks = [] for ticker in tqdm(stock_symbols): await fine_tune_and_evaluate(ticker, con, start_date, end_date) diff --git a/app/cron_analyst_db.py b/app/cron_analyst_db.py index c20f56c..2b841fe 100755 --- a/app/cron_analyst_db.py +++ b/app/cron_analyst_db.py @@ -1,4 +1,3 @@ -from benzinga import financial_data import requests from datetime import datetime import numpy as np @@ -11,12 +10,12 @@ from dotenv import load_dotenv from tqdm import tqdm import pandas as pd from collections import Counter +import aiohttp +import asyncio load_dotenv() api_key = os.getenv('BENZINGA_API_KEY') -fin = financial_data.Benzinga(api_key) - headers = {"accept": "application/json"} @@ -58,7 +57,7 @@ def calculate_rating(data): last_rating_date = datetime.strptime(last_rating, "%Y-%m-%d") difference = (datetime.now() - last_rating_date).days except: - difference = 1000 # In case of None + difference = 1000 # In case of None or invalid date if total_ratings == 0 or difference >= 600: return 0 @@ -80,6 +79,7 @@ def calculate_rating(data): max_rating = 5 normalized_rating = min(max(weighted_sum / (weight_return + weight_success_rate + weight_total_ratings + weight_difference), min_rating), max_rating) + # Apply additional conditions based on total ratings and average return if normalized_rating >= 4: if total_ratings < 10: normalized_rating -= 2.4 @@ -89,78 +89,18 @@ def calculate_rating(data): normalized_rating -= 0.75 elif total_ratings < 30: normalized_rating -= 1 - elif overall_average_return <=10: - normalized_rating -=1.1 - ''' - if overall_average_return <= 0 and overall_average_return >= -5: - normalized_rating = min(normalized_rating - 2, 0) - elif overall_average_return < -5 and overall_average_return >= -10: - normalized_rating = min(normalized_rating - 3, 0) - else: - normalized_rating = min(normalized_rating - 4, 0) - ''' - if overall_average_return <= 0: - normalized_rating = min(normalized_rating - 2, 0) + elif overall_average_return <= 10: + normalized_rating -= 1.1 - normalized_rating = max(normalized_rating, 0) + if overall_average_return <= 0: + normalized_rating = max(normalized_rating - 2, 0) + + # Cap the rating if the last rating is older than 30 days + if difference > 30: + normalized_rating = min(normalized_rating, 4.5) return round(normalized_rating, 2) -def get_analyst_ratings(analyst_id): - - url = "https://api.benzinga.com/api/v2.1/calendar/ratings" - res_list = [] - - for page in range(0,5): - try: - querystring = {"token":api_key,"parameters[analyst_id]": analyst_id, "page": str(page), "pagesize":"1000"} - response = requests.request("GET", url, headers=headers, params=querystring) - data = ujson.loads(response.text)['ratings'] - res_list +=data - time.sleep(2) - except: - break - - return res_list - -def get_all_analyst_stats(): - url = "https://api.benzinga.com/api/v2.1/calendar/ratings/analysts" - res_list = [] - for _ in range(0,20): #Run the api N times because not all analyst are counted Bug from benzinga - for page in range(0,100): - try: - querystring = {"token":api_key,"page": f"{page}", 'pagesize': "1000"} - response = requests.request("GET", url, headers=headers, params=querystring) - - data = ujson.loads(response.text)['analyst_ratings_analyst'] - res_list+=data - except: - break - time.sleep(5) - - res_list = remove_duplicates(res_list, 'id') # remove duplicates of analyst - res_list = [item for item in res_list if item.get('ratings_accuracy', {}).get('total_ratings', 0) != 0] - - final_list = [] - for item in res_list: - analyst_dict = { - 'analystName': item['name_full'], - 'companyName': item['firm_name'], - 'analystId': item['id'], - 'firmId': item['firm_id'] - } - - stats_dict = { - 'avgReturn': item['ratings_accuracy'].get('overall_average_return', 0), - 'successRate': item['ratings_accuracy'].get('overall_success_rate', 0), - 'totalRatings': item['ratings_accuracy'].get('total_ratings', 0), - } - - final_list.append({**analyst_dict,**stats_dict}) - - - return final_list - def get_top_stocks(): with open(f"json/analyst/all-analyst-data.json", 'r') as file: analyst_stats_list = ujson.load(file) @@ -217,24 +157,97 @@ def get_top_stocks(): ujson.dump(result_sorted, file) -if __name__ == "__main__": +async def get_analyst_ratings(analyst_id, session): + url = "https://api.benzinga.com/api/v2.1/calendar/ratings" + res_list = [] + + for page in range(5): + try: + querystring = { + "token": api_key, + "parameters[analyst_id]": analyst_id, + "page": str(page), + "pagesize": "1000" + } + async with session.get(url, headers=headers, params=querystring) as response: + data = await response.json() + ratings = data.get('ratings', []) + if not ratings: + break # Stop fetching if no more ratings + res_list += ratings + except Exception as e: + #print(f"Error fetching page {page} for analyst {analyst_id}: {e}") + break + + return res_list + +async def get_all_analyst_stats(): + url = "https://api.benzinga.com/api/v2.1/calendar/ratings/analysts" + res_list = [] + + async with aiohttp.ClientSession() as session: + tasks = [ + session.get(url, headers=headers, params={"token": api_key, "page": str(page), 'pagesize': "1000"}) + for page in range(100) + ] + + # Gather responses concurrently + responses = await asyncio.gather(*tasks) + + # Process each response + for response in responses: + if response.status == 200: # Check for successful response + try: + data = ujson.loads(await response.text())['analyst_ratings_analyst'] + res_list += data + except Exception as e: + pass + print(len(res_list)) + # Remove duplicates of analysts and filter based on ratings accuracy + res_list = remove_duplicates(res_list, 'id') + res_list = [item for item in res_list if item.get('ratings_accuracy', {}).get('total_ratings', 0) != 0] + + # Construct the final result list + final_list = [{ + 'analystName': item['name_full'], + 'companyName': item['firm_name'], + 'analystId': item['id'], + 'firmId': item['firm_id'], + 'avgReturn': item['ratings_accuracy'].get('overall_average_return', 0), + 'successRate': item['ratings_accuracy'].get('overall_success_rate', 0), + 'totalRatings': item['ratings_accuracy'].get('total_ratings', 0), + } for item in res_list] + + return final_list + +async def process_analyst(item, session): + data = await get_analyst_ratings(item['analystId'], session) + item['ratingsList'] = data + item['totalRatings'] = len(data) # True total ratings + item['lastRating'] = data[0]['date'] if data else None + item['numOfStocks'] = len({d['ticker'] for d in data}) + + # Stats dictionary for calculating score + stats_dict = { + 'avgReturn': item.get('avgReturn', 0), + 'successRate': item.get('successRate', 0), + 'totalRatings': item['totalRatings'], + 'lastRating': item['lastRating'], + } + item['analystScore'] = calculate_rating(stats_dict) + +async def get_single_analyst_data(analyst_list): + async with aiohttp.ClientSession() as session: + tasks = [process_analyst(item, session) for item in analyst_list] + for task in tqdm(asyncio.as_completed(tasks), total=len(analyst_list)): + await task + +async def run(): #Step1 get all analyst id's and stats - analyst_list = get_all_analyst_stats() + analyst_list = await get_all_analyst_stats() print('Number of analyst:', len(analyst_list)) #Step2 get rating history for each individual analyst and score the analyst - for item in tqdm(analyst_list): - data = get_analyst_ratings(item['analystId']) - item['ratingsList'] = data - item['totalRatings'] = len(data) #true total ratings, which is important for the score - item['lastRating'] = data[0]['date'] if len(data) > 0 else None - item['numOfStocks'] = len({item['ticker'] for item in data}) - stats_dict = { - 'avgReturn': item.get('avgReturn', 0), - 'successRate': item.get('successRate', 0), - 'totalRatings': item.get('totalRatings', 0), - 'lastRating': item.get('lastRating', None), - } - item['analystScore'] = calculate_rating(stats_dict) + await get_single_analyst_data(analyst_list) try: con = sqlite3.connect('stocks.db') @@ -279,9 +292,8 @@ if __name__ == "__main__": 'successRate': item['successRate'], 'avgReturn': item['avgReturn'], 'totalRatings': item['totalRatings'], - 'lastRating': item['lastRating'], - 'mainSectors': item['mainSectors'] - }) + 'lastRating': item['lastRating'] + }) with open(f"json/analyst/top-analysts.json", 'w') as file: ujson.dump(top_analysts_list, file) @@ -291,4 +303,8 @@ if __name__ == "__main__": ujson.dump(analyst_list, file) #Save top stocks with strong buys from 5 star analysts - get_top_stocks() \ No newline at end of file + get_top_stocks() + + +if __name__ == "__main__": + asyncio.run(run()) \ No newline at end of file diff --git a/app/cron_analyst_ticker.py b/app/cron_analyst_ticker.py index 052c353..22395fd 100755 --- a/app/cron_analyst_ticker.py +++ b/app/cron_analyst_ticker.py @@ -220,12 +220,12 @@ def run(chunk,analyst_list): try: - stock_con = sqlite3.connect('stocks.db') - stock_cursor = stock_con.cursor() - stock_cursor.execute("SELECT DISTINCT symbol FROM stocks") + con = sqlite3.connect('stocks.db') + stock_cursor = con.cursor() + stock_cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") stock_symbols = [row[0] for row in stock_cursor.fetchall()] - stock_con.close() + con.close() #Save all analyst data in raw form for the next step with open(f"json/analyst/all-analyst-data.json", 'r') as file: diff --git a/app/ml_models/__pycache__/score_model.cpython-310.pyc b/app/ml_models/__pycache__/score_model.cpython-310.pyc index 2d30ef18b901dbc280b01b7d264eea092cbe927f..6b754b6d96b669d46787cfcd7dc46adb7a8c825d 100644 GIT binary patch delta 1023 zcmZva%WKq76o+&3nq-ohX{YUU`qD~EC2g(PqTqv$(1isNg{qW*4C&m=5Y1z_NqvAI zQr!rG<*sz;-nvi(5&r|1U1h5j$CV3rZan8!P^xFhmoxX=`@83nz54EOIVqKj64+W* z>eQ|6@;KSqo}2MVsJjI!Q6)4Y(=~|%z8V%I%e5lgwWE?-ipp*|s<;&q)55-}-|ZK^ z9#*5OTNQpG9Eb+pLE)2d$Q=S564u<>_D+59flmiTMq_(BE3JriLdaK=3yp&?X+f{+1hk=y2~zTYSA znv)i8VtAU(^2g?pvgo(I-pu?oW2g9A^9fnujbiQafhLvx+g`W{GvCjCiD$(Jm&Q>? zh#y1m1mZg4&tc)=2w4fx4^L_X4EQPx%g_zoE{cdV#?M>BmsfG8$Q*0PwHPLOk4>Oo zxZBxC637xu%quBO+GFuT) m{JS-Ic7LW>5cwjF|7D&V=U`%+p}$N(Bs~Pc^!xZFd*m1Mo9XQU delta 1151 zcmZuwO^*~s81Cxr@1C#S9d>qh7aev5Y{emh5=DUn2||b-ghc2-#-8n(*$!3Rld2wd z5|cgX$;8A>oiy+=Zt>-KS@$_?g|Eh|n!Bvc+i_cSj~(J`@tWp@b>fcJUXRyx+l{-r-H10*V*?i5 zj5psiQa=s8G}7P;Gu}$cH6scS{_J(uK1}ko;Fk(PWqFY$Qf8x!ii5|Gf)H}K2%qe$ zzsXj|P91pN)Jy%Z{m+eol^P%tsN?3@;|>fPfA=juzg&2*AoB}z0^#{92xv^?tEm~C zZtWIKCVax^U^puUrIjzI!$qX>1sx9(HrY$604YoOFiq6=W|xHOXLF}xgHUDjBBd-M zl_e<~{a}b~_0YV!vkHB31HdB=F`x6+9;uI(zKLEyjru4n2c<}IIVuE?OmPIGQytdHe2dE&MvFqpmoKqb*h>sp#QuJhu>*5KI7&P~PPy zF8t;7LM*w!vWiox<{xQj`yB8A&XG8$-toUWa|L=UzubePwO6pTvOh`Kls>!Z2E}kv z{A0|5rz|9gH;qR^XF001v9~H9g03YJu*F4p-g`j7joc zTA8y^iroEdJS+-GP#?qQ zlNp?rBHNWhd!l_rM+JH-0`*$BxsG>9;P$Ezjz6QjWme=;)YZpf@7$v1WyY!A@bhx4 Yymz6ZQ%29@osvBOOteL6>bJ1_FMfIzZ z|2($e{QGIc_!o7?KRI-+;xG9F8es^tWjrwX*Lq;%*J?TK+=HA+G`r=t^AGY2JFUVO zgmYRG*>5rXE#bE&T#^6Yd{7p%VnP%FRYXaYzc(ICikzsdTVhg7iK>`x=ERIR0{WCN zKR1?-?!gP=&a$a;N$RBufbzX~t(&Bu`(7)M=$mKuYT)rL;>aKgItld0V8FcR_hvZL}cZneT_T{-Q* zij#@wzOauk5z>4Vo|l#n}kgH;A?^ zX-HZ4qKh0i-As{81yk;KTV5J8nj)M5Dk1(;<|)(KFPP4L*^-Mu_$HdchlV~JejQ%* zjJ|;>M+k&DZ#*=GC2TAUECVbMXBG4epBuxwz=JwZtvhdQI=a`lMSj=RI1O)_SB<`t z<``R8HBwjWgO!OLtDk2p#W8$t3}0d^MYdAPte`iv0^HA7DMMQ!vr>W2R)2!6Oafm7 zehT^wQJpu$^p1t4Z}m$|pMf>2UuO6Scq`+)nZDYN-JfKRS#V5c*rPipwv64cf@1fl zX~}5HBE{zzIA?VKJBrO5`bWn4^Q(qfNNWR3jbs+VGYibn*Nkd|b{w>$&!9aA+OcQQ zmOz_3OcOMwc^=aVa4a0=0H%&;J-_4h7n$!Q_>KdM^^(zEK$M?j+9}YM4$)39?M2Y) zhiK0;?KEg7HBG!kvxbqr%=7;;ugc}I)prKGFARClinHRC9a@3snE%`%{`25J1^xrk z1ST67WLLoR;xlJTpa9sk|=>uG(j~Ds(J~ zJXWSD-sLzA_Z8#N1N}W-eOFi-;~KYnEYo<*s2SocE`}9Y;s_GNEB)%gDozpFd1n6t z^S}dOXjU1+yw@(h?8f&Q!g*V0!XUhZGk_+67vl$)Tuepm}^ z-P&=UGW<5fw;6ti;g6tqbzCnre!OEpaeHq6UFP@% z+;0CphCgNa3d46Gd39V;`^)oujXCbFBF-4kb*6z-Nkrv0uyiZa&O4_6KI^{+{0CaM);HuqOd$6;=pX(9{R@o7?BC`)fpU)fcla9G zyM&vAuQBB&ZLe|p!rtG2QF-cIx^Q8SZW&dfo-gAhiMNoBSYtPs8?cnh(=P;6g@7Ti zoF;jD8)?EES(--!`5~-|p^r=<-t0EKF?em+9`Ud>Bp)dfAq^MS)*9QPNH>t3c)n02 zY8y$o9(gTESEia^%tn}`5ES32Jeo!)PBah&>&o{1P`O)itJ@Biodee7MRvR03&_WWfJaD3ZzO|%G4kXC1w}|CrJoq9H$!>DXIiqQWKOIZ4jfH z9!QN)@Hmu|(JbeF&M&Rh_S?;;{3Z`GQqbPot7r!}*BFtkW~d6=AyPZ9ohWD1YkP-{ zu+x}x@)dEWj1aD~$D0E{C`ZP=lt*Aj*_}?ToULuIqpFCghx`tiTSEefm5nq;*=dNT z*-W2ud6WoEh&XC&ta*`847NsKdjxhy;5LCztrGUS=*bwG;Blm;Gz|Yk-%Deu3L4Sv zs3YB08n!9NiW48vAo3n1u&U^9#GMWj(j-y2Zp#m(s%!?WGzw(q->}O*E`OvPKkhu{ zgilV8id?c0HdEz#N&2`GC>yD)D&P9_+U=nuyhlcUmA@Jz$`?t;^zCe0422}>7(>29 z#OgKdLT^3zLUx0DUbG&l8Bb#xDNu+z;U=%^3_g(85voK@tquCwK*kU}6nTa$oFxl- z`-rgFl;;R}o|+Y+lyBU)J@V-Sqdt9qgi_UZ90iXXO(eQV;zjU{I({cu$4b}=zU0jY z>EN583N?$($kc}3i*;L0%VmL1PCtHr1r(`wfA^A~QbYZ+PKm7&S&?QPsdpuw0tJZ3XQBb6RS~5<2xD zXTHid)xDAJ;$WjSu!ugsHa^L>$c5VHw5PC4LjUe?8Bz@htSajbrw=vd`rUTYZF8-~ z=|9&&l4?R(qUJ6soVpoDST|mDW5bK0prwvtTQQ;mJA}!m{xDu|xWjn8`wrvvRy>U7 zeW@m|(KZ}LLf$7Azoh0X@^j{Xw*5xKmcJ!J4yDt@3-Ty&9wS8YK5aoL!a6GPcFf{^ zVz4Qn8W<6?WpaptM>xjd3K@4IZ#l0l(NrbTY-AN_q6&jcNm*-a^7rIOi4LgDmdf!# zaV2U*ujTxGOAsCfyM z^asmFR|jhaPgWU@YHNzLE#_!NiFu0n;$U?l*4istS7QDMDi^l904Y@96t~QZBNlhf zIrxD-r20AP^IHIqD(my?aCA|FkIO#<|6sk2W@Ln88Pw*;x|g6G{$o6-woyUUkY}T+ zK%QOqI(Q!{*75luJ09wO-0sL=gWk8cf;v6<;p)M$)v6CppgOLWdNYi0|Ajy!R{V**= z;p*u|Q0MEi4zaqvKiMpc>3lGO))u@TYWf77dfcpM*=$|2)sygB_?*V|ZW8E0=~usc zc2%=dWn(u6{I%V zkae8vy4EY>o^1^9syg1oLvKSly4~_*1Nq2iu#KP1u1p(y|AP)yk%8C3%nC}t8mh<% zr-t+2CVq@pT~&%=*>320RZnMOV1V99o;E!N=r3FAP>s(&u4KT!uV4_Iwms5ihS^Xv z8i9N*z{5Dz7s&J5cyf-?ZkzAt^39-(T!OnBo^nm4s#-uNM*+-^`IqYF(=!Zx!=Ezio|L9;4${4_JA6R=sb*Pw=OPA&pCQA3sJNyHNI zL#c)E0e&RYvGgXdmj@~_iis*b>Ut51Cya~?Ih=rVdiO{4my7aGgwAJMP)+EOK!+*} z_JGP~i&_;&%T)eAesO_R8k=inK027gJU7f?#*U?7GV>5>5_(-PEEXLk9`*v#k|le| zr0+?mygy^l;Fa3NUwMDQCAhr51bi_!XO;Ko?DBrWtyxR^)_yHlGadVkS=gV;og|t4 zg*?mdS96@Q{1s+U?{gV!@-xD6Rk`{kQ&vZ)T-0r&?xGYJYt#<`(@VUXO*UKjk3)1G zXKV*GDk?dg8j=cfRl?ahSS_l64i^Nf0tnH<5}>+OP3SHj26UE9&+R&eBB#beMiryv mSNp df_features['sma_200']) & (df_features['sma_50'].shift(1) <= df_features['sma_200'].shift(1))).astype(int) + + df_features['ema_50'] = EMAIndicator(close=df['close'], window=50).ema_indicator() + df_features['ema_200'] = EMAIndicator(close=df['close'], window=200).ema_indicator() + df_features['ema_crossover'] = ((df_features['ema_50'] > df_features['ema_200']) & (df_features['ema_50'].shift(1) <= df_features['ema_200'].shift(1))).astype(int) + + df_features['wma'] = WMAIndicator(df['close'], window = 30).wma() + + ichimoku = IchimokuIndicator(high=df['high'], low=df['low']) + df_features['ichimoku_a'] = ichimoku.ichimoku_a() + df_features['ichimoku_b'] = ichimoku.ichimoku_b() + df_features['atr'] = AverageTrueRange(high=df['high'], low=df['low'], close=df['close']).average_true_range() + bb = BollingerBands(close=df['close']) + df_features['bb_width'] = (bb.bollinger_hband() - bb.bollinger_lband()) / df['close'] + + + df_features['macd'] = macd(df['close']) + df_features['macd_signal'] = macd_signal(df['close']) + df_features['macd_hist'] = 2*macd_diff(df['close']) + df_features['adx'] = adx(df['high'],df['low'],df['close']) + df_features["adx_pos"] = adx_pos(df['high'],df['low'],df['close']) + df_features["adx_neg"] = adx_neg(df['high'],df['low'],df['close']) + df_features['cci'] = CCIIndicator(high=df['high'], low=df['low'], close=df['close']).cci() + df_features['mfi'] = MFIIndicator(high=df['high'], low=df['low'], close=df['close'], volume=df['volume']).money_flow_index() + + df_features['nvi'] = NegativeVolumeIndexIndicator(close=df['close'], volume=df['volume']).negative_volume_index() + df_features['obv'] = OnBalanceVolumeIndicator(close=df['close'], volume=df['volume']).on_balance_volume() + df_features['vpt'] = VolumePriceTrendIndicator(close=df['close'], volume=df['volume']).volume_price_trend() + + df_features['rsi'] = rsi(df["close"], window=60) + df_features['rolling_rsi'] = df_features['rsi'].rolling(window=10).mean() + df_features['stoch_rsi'] = stochrsi_k(df['close'], window=60, smooth1=3, smooth2=3) + df_features['rolling_stoch_rsi'] = df_features['stoch_rsi'].rolling(window=10).mean() + + df_features['adi'] = acc_dist_index(high=df['high'],low=df['low'],close=df['close'],volume=df['volume']) + df_features['cmf'] = chaikin_money_flow(high=df['high'],low=df['low'],close=df['close'],volume=df['volume'], window=20) + df_features['emv'] = ease_of_movement(high=df['high'],low=df['low'],volume=df['volume'], window=20) + df_features['fi'] = force_index(close=df['close'], volume=df['volume'], window= 13) + + df_features['williams'] = WilliamsRIndicator(high=df['high'], low=df['low'], close=df['close']).williams_r() + df_features['kama'] = KAMAIndicator(close=df['close']).kama() + + stoch = StochasticOscillator(high=df['high'], low=df['low'], close=df['close'], window=60, smooth_window=3) + df_features['stoch_k'] = stoch.stoch() + df_features['stoch_d'] = stoch.stoch_signal() + + df_features['rocr'] = df['close'] / df['close'].shift(30) - 1 # Rate of Change Ratio (ROCR) + df_features['ppo'] = (df_features['ema_50'] - df_features['ema_200']) / df_features['ema_50'] * 100 + df_features['vwap'] = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum() + df_features['volatility_ratio'] = df['close'].rolling(window=30).std() / df['close'].rolling(window=60).std() + + df_features['fdi'] = calculate_fdi(df['high'], df['low'], df['close']) + df_features['tii'] = trend_intensity(df['close']) + + df_features['fft'] = np.abs(np.fft.fft(df['close'])) + don_channel = DonchianChannel(high=df['high'], low=df['low'],close=df['close'], window=60) + df_features['don_hband'] = don_channel.donchian_channel_hband() + df_features['don_lband'] = don_channel.donchian_channel_lband() + df_features['don_mband'] = don_channel.donchian_channel_mband() + df_features['don_pband'] = don_channel.donchian_channel_pband() + df_features['don_wband'] = don_channel.donchian_channel_wband() + + aroon = AroonIndicator(high=df['high'], low=df['low'], window=60) + df_features['aroon_down'] = aroon.aroon_down() + df_features['aroon_indicator'] = aroon.aroon_indicator() + df_features['aroon_up'] = aroon.aroon_up() + + df_features['ultimate_oscillator'] = UltimateOscillator(high=df['high'], low=df['low'], close=df['close']).ultimate_oscillator() + df_features['choppiness'] = 100 * np.log10((df['high'].rolling(window=60).max() - df['low'].rolling(window=30).min()) / df_features['atr']) / np.log10(14) + df_features['ulcer'] = UlcerIndex(df['close'],window=60).ulcer_index() + df_features['keltner_hband'] = keltner_channel_hband_indicator(high=df['high'],low=df['low'],close=df['close'],window=60) + df_features['keltner_lband'] = keltner_channel_lband_indicator(high=df['high'],low=df['low'],close=df['close'],window=60) + + df_features = df_features.dropna() + return df_features + +def generate_statistical_features(df, windows=[20, 50], price_col='close', + high_col='high', low_col='low', volume_col='volume'): + """ + Generate comprehensive statistical features for financial time series data. + + Parameters: + ----------- + df : pandas.DataFrame + DataFrame containing the price and volume data + windows : list + List of rolling window sizes to use for feature generation + price_col : str + Name of the closing price column + high_col : str + Name of the high price column + low_col : str + Name of the low price column + volume_col : str + Name of the volume column + + Returns: + -------- + pandas.DataFrame + DataFrame with additional statistical features + """ + + # Create a copy of the dataframe to avoid modifying the original + df_features = df.copy() + + + # Calculate features for each window size + for window in windows: + # Returns + df_features[f'returns_{window}'] = df[price_col].pct_change(periods=window) + + # Log returns and statistics + log_returns = np.log(df[price_col]/df[price_col].shift(1)) + df_features[f'log_returns_{window}'] = log_returns.rolling(window=window).mean() + df_features[f'log_returns_std_{window}'] = log_returns.rolling(window=window).std() + + # Statistical moments + df_features[f'std_{window}'] = df[price_col].rolling(window=window).std() + df_features[f'var_{window}'] = df[price_col].rolling(window=window).var() + df_features[f'skew_{window}'] = df[price_col].rolling(window=window).skew() + df_features[f'kurt_{window}'] = df[price_col].rolling(window=window).kurt() + + # Volatility measures + df_features[f'realized_vol_{window}'] = ( + df_features[f'returns_{window}'].rolling(window=window).std() * np.sqrt(252)) + df_features[f'range_vol_{window}'] = ( + (df[high_col].rolling(window=window).max() - + df[low_col].rolling(window=window).min()) / df[price_col]) + + # Z-scores and normalized prices + df_features[f'zscore_{window}'] = ( + (df[price_col] - df[price_col].rolling(window=window).mean()) / + df[price_col].rolling(window=window).std()) + df_features[f'norm_price_{window}'] = ( + df[price_col] / df[price_col].rolling(window=window).mean() - 1) + + + # Correlation features + if volume_col in df.columns: + df_features[f'volume_price_corr_{window}'] = ( + df[price_col].rolling(window=window).corr(df[volume_col])) + df_features[f'high_low_corr_{window}'] = ( + df[high_col].rolling(window=window).corr(df[low_col])) + + + + # Quantile features + for q in [0.25, 0.75]: + df_features[f'price_q{int(q*100)}_{window}'] = ( + df[price_col].rolling(window=window).quantile(q)) + + # Price dynamics + df_features['price_acceleration'] = df[price_col].diff().diff() + df_features['momentum_change'] = df[price_col].pct_change().diff() + + # Advanced volatility + df_features['parkinson_vol'] = np.sqrt( + 1/(4*np.log(2)) * (np.log(df[high_col]/df[low_col])**2)) + + # Efficiency ratio + df_features['price_efficiency'] = ( + abs(df[price_col] - df[price_col].shift(20)) / + (df[high_col].rolling(20).max() - df[low_col].rolling(20).min()) + ) + + # Deviation metrics + df_features['deviation_from_vwap'] = ( + (df[price_col] - df[price_col].rolling(window=20).mean()) / + df[price_col].rolling(window=20).mean() + ) + + df_features['stock_return'] = df['close'].pct_change() + + df_features = df_features.dropna() + return df_features