diff --git a/app/cron_ai_score.py b/app/cron_ai_score.py index fb747ef..f66126a 100644 --- a/app/cron_ai_score.py +++ b/app/cron_ai_score.py @@ -100,54 +100,21 @@ async def download_data(ticker, con, start_date, end_date, skip_downloading): f"json/financial-statements/owner-earnings/quarter/{ticker}.json", ] - # Helper function to load JSON data asynchronously - async def load_json_from_file(path): - async with aiofiles.open(path, 'r') as f: - content = await f.read() - return orjson.loads(content) - - # Helper function to filter data based on keys and year - async def filter_data(data, ignore_keys, year_threshold=2000): - return [{k: v for k, v in item.items() if k not in ignore_keys} for item in data if int(item["date"][:4]) >= year_threshold] - - # Define keys to ignore + # Async loading and filtering ignore_keys = ["symbol", "reportedCurrency", "calendarYear", "fillingDate", "acceptedDate", "period", "cik", "link", "finalLink","pbRatio","ptbRatio"] + async def load_and_filter_json(path): + async with aiofiles.open(path, 'r') as f: + data = orjson.loads(await f.read()) + return [{k: v for k, v in item.items() if k not in ignore_keys and int(item["date"][:4]) >= 2000} for item in data] - # Load and filter data for each statement type - - ratios = await load_json_from_file(statements[0]) - ratios = await filter_data(ratios, ignore_keys) + # Load all files concurrently + data = await asyncio.gather(*(load_and_filter_json(s) for s in statements)) + ratios, key_metrics, cashflow, income, balance, income_growth, balance_growth, cashflow_growth, owner_earnings = data #Threshold of enough datapoints needed! if len(ratios) < 50: return - key_metrics = await load_json_from_file(statements[1]) - key_metrics = await filter_data(key_metrics, ignore_keys) - - - cashflow = await load_json_from_file(statements[2]) - cashflow = await filter_data(cashflow, ignore_keys) - - income = await load_json_from_file(statements[3]) - income = await filter_data(income, ignore_keys) - - balance = await load_json_from_file(statements[4]) - balance = await filter_data(balance, ignore_keys) - - income_growth = await load_json_from_file(statements[5]) - income_growth = await filter_data(income_growth, ignore_keys) - - balance_growth = await load_json_from_file(statements[6]) - balance_growth = await filter_data(balance_growth, ignore_keys) - - - cashflow_growth = await load_json_from_file(statements[7]) - cashflow_growth = await filter_data(cashflow_growth, ignore_keys) - - owner_earnings = await load_json_from_file(statements[8]) - owner_earnings = await filter_data(owner_earnings, ignore_keys) - # Combine all the data combined_data = defaultdict(dict) @@ -171,126 +138,106 @@ async def download_data(ticker, con, start_date, end_date, skip_downloading): df_ta = generate_ta_features(df) # Filter columns in df_stats and df_ta that are not in df + # Drop unnecessary columns from df_stats and df_ta df_stats_filtered = df_stats.drop(columns=df_columns.intersection(df_stats.columns), errors='ignore') df_ta_filtered = df_ta.drop(columns=df_columns.intersection(df_ta.columns), errors='ignore') + + # Extract the column names for indicators ta_columns = df_ta_filtered.columns.tolist() stats_columns = df_stats_filtered.columns.tolist() # Concatenate df with the filtered df_stats and df_ta df = pd.concat([df, df_ta_filtered, df_stats_filtered], axis=1) + # Set up a dictionary for faster lookup of close prices and columns by date + df_dict = df.set_index('date').to_dict(orient='index') - # Match each combined data entry with the closest available stock price in df - for item in combined_data: - target_date = item['date'] + # Helper function to find closest date within max_attempts + def find_closest_date(target_date, max_attempts=10): counter = 0 - max_attempts = 10 - - # Look for the closest matching date in the stock data - while target_date not in df['date'].values and counter < max_attempts: + while target_date not in df_dict and counter < max_attempts: target_date = (pd.to_datetime(target_date) - pd.Timedelta(days=1)).strftime('%Y-%m-%d') counter += 1 + return target_date if target_date in df_dict else None - # If max attempts are reached and no matching date is found, skip the entry - if counter == max_attempts: + # Match combined data entries with stock data + for item in combined_data: + target_date = item['date'] + closest_date = find_closest_date(target_date) + + # Skip if no matching date is found + if not closest_date: continue - # Find the close price for the matching date - close_price = round(df[df['date'] == target_date]['close'].values[0], 2) - item['price'] = close_price + # Fetch data from the dictionary for the closest matching date + data = df_dict[closest_date] - # Dynamically add all indicator values to the combined_data entry - - for column in ta_columns: - column_value = df[df['date'] == target_date][column].values[0] - item[column] = column_value # Add the column value to the combined_data entry - for column in stats_columns: - column_value = df[df['date'] == target_date][column].values[0] - item[column] = column_value # Add the column value to the combined_data entry - + # Add close price to the item + item['price'] = round(data['close'], 2) + + # Dynamically add indicator values from ta_columns and stats_columns + for column in ta_columns + stats_columns: + item[column] = data.get(column, None) # Sort the combined data by date combined_data = sorted(combined_data, key=lambda x: x['date']) - # Convert combined data into a DataFrame + + # Convert combined data to a DataFrame and drop rows with NaN values df_combined = pd.DataFrame(combined_data).dropna() + fundamental_columns = [ - 'revenue', - 'costOfRevenue', - 'grossProfit', - 'netIncome', - 'operatingIncome', - 'operatingExpenses', - 'researchAndDevelopmentExpenses', - 'ebitda', - 'freeCashFlow', - 'incomeBeforeTax', - 'incomeTaxExpense', - 'debtRepayment', - 'dividendsPaid', - 'depreciationAndAmortization', - 'netCashUsedProvidedByFinancingActivities', - 'changeInWorkingCapital', - 'stockBasedCompensation', - 'deferredIncomeTax', - 'commonStockRepurchased', - 'operatingCashFlow', - 'capitalExpenditure', - 'accountsReceivables', - 'purchasesOfInvestments', - 'cashAndCashEquivalents', - 'shortTermInvestments', - 'cashAndShortTermInvestments', - 'longTermInvestments', - 'otherCurrentLiabilities', - 'totalCurrentLiabilities', - 'longTermDebt', - 'totalDebt', - 'netDebt', - 'commonStock', - 'totalEquity', - 'totalLiabilitiesAndStockholdersEquity', - 'totalStockholdersEquity', - 'totalInvestments', - 'taxAssets', - 'totalAssets', - 'inventory', - 'propertyPlantEquipmentNet', - 'ownersEarnings', + 'revenue', 'costOfRevenue', 'grossProfit', 'netIncome', 'operatingIncome', 'operatingExpenses', + 'researchAndDevelopmentExpenses', 'ebitda', 'freeCashFlow', 'incomeBeforeTax', 'incomeTaxExpense', + 'debtRepayment', 'dividendsPaid', 'depreciationAndAmortization', 'netCashUsedProvidedByFinancingActivities', + 'changeInWorkingCapital', 'stockBasedCompensation', 'deferredIncomeTax', 'commonStockRepurchased', + 'operatingCashFlow', 'capitalExpenditure', 'accountsReceivables', 'purchasesOfInvestments', + 'cashAndCashEquivalents', 'shortTermInvestments', 'cashAndShortTermInvestments', 'longTermInvestments', + 'otherCurrentLiabilities', 'totalCurrentLiabilities', 'longTermDebt', 'totalDebt', 'netDebt', 'commonStock', + 'totalEquity', 'totalLiabilitiesAndStockholdersEquity', 'totalStockholdersEquity', 'totalInvestments', + 'taxAssets', 'totalAssets', 'inventory', 'propertyPlantEquipmentNet', 'ownersEarnings', ] - # Compute ratios for all combinations of key elements - new_columns = {} - # Loop over combinations of column pairs - for columns in [fundamental_columns, stats_columns, ta_columns]: - for num, denom in combinations(columns, 2): - # Compute ratio and reverse ratio - ratio = df_combined[num] / df_combined[denom] - reverse_ratio = round(df_combined[denom] / df_combined[num],2) + # Function to compute combinations within a group + def compute_column_ratios(columns, df, new_columns): + column_combinations = list(combinations(columns, 2)) + + for num, denom in column_combinations: + with np.errstate(divide='ignore', invalid='ignore'): + # Compute ratio and reverse ratio safely + ratio = df[num] / df[denom] + reverse_ratio = df[denom] / df[num] # Define column names for both ratios column_name = f'{num}_to_{denom}' reverse_column_name = f'{denom}_to_{num}' - # Store the new columns in the dictionary, replacing invalid values with 0 + # Assign values to new columns, handling invalid values new_columns[column_name] = np.nan_to_num(ratio, nan=0, posinf=0, neginf=0) new_columns[reverse_column_name] = np.nan_to_num(reverse_ratio, nan=0, posinf=0, neginf=0) - # Add all new columns to the original DataFrame at once - df_combined = pd.concat([df_combined, pd.DataFrame(new_columns)], axis=1) - - # To defragment the DataFrame, make a copy - df_combined = df_combined.copy() - df_combined = df_combined.dropna() - df_combined = df_combined.where(~df_combined.isin([np.inf, -np.inf]), 0) - + # Create an empty dictionary for the new columns + new_columns = {} + # Compute combinations for each group of columns + compute_column_ratios(fundamental_columns, df_combined, new_columns) + compute_column_ratios(stats_columns, df_combined, new_columns) + compute_column_ratios(ta_columns, df_combined, new_columns) + + # Concatenate the new ratio columns with the original DataFrame + df_combined = pd.concat([df_combined, pd.DataFrame(new_columns, index=df_combined.index)], axis=1) + + # Clean up and replace invalid values + df_combined = df_combined.replace([np.inf, -np.inf], 0).dropna() + + # Create 'Target' column to indicate if the next price is higher than the current one df_combined['Target'] = ((df_combined['price'].shift(-1) - df_combined['price']) / df_combined['price'] > 0).astype(int) - df_copy = df_combined.copy() - df_copy = df_copy.map(lambda x: round(x, 2) if isinstance(x, float) else x) + # Copy DataFrame and round float values + df_copy = df_combined.copy().map(lambda x: round(x, 2) if isinstance(x, float) else x) - if df_copy.shape[0] > 0: + # Save to a file if there are rows in the DataFrame + if not df_copy.empty: with open(file_path, 'wb') as file: file.write(orjson.dumps(df_copy.to_dict(orient='records'))) @@ -354,7 +301,8 @@ async def warm_start_training(tickers, con, skip_downloading): predictor = ScorePredictor() selected_features = [col for col in df_train if col not in ['price', 'date', 'Target']] #top_uncorrelated_features(df_train, top_n=200) - predictor.warm_start_training(df_train[selected_features], df_train['Target']) + #predictor.warm_start_training(df_train[selected_features], df_train['Target']) + predictor.batch_train_model(df_train[selected_features], df_train['Target'], batch_size=1000) predictor.evaluate_model(df_test[selected_features], df_test['Target']) return predictor @@ -400,20 +348,19 @@ async def run(): if train_mode: # Warm start training - cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E6 AND symbol NOT LIKE '%.%' AND symbol NOT LIKE '%-%'") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%' AND symbol NOT LIKE '%-%'") warm_start_symbols = [row[0] for row in cursor.fetchall()] - print('Warm Start Training') + print(f'Warm Start Training: {len(warm_start_symbols)}') predictor = await warm_start_training(warm_start_symbols, con, skip_downloading) else: # Fine-tuning and evaluation for all stocks - cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%'") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E6 AND symbol NOT LIKE '%.%'") stock_symbols = [row[0] for row in cursor.fetchall()] print(f"Total tickers for fine-tuning: {len(stock_symbols)}") start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d") end_date = datetime.today().strftime("%Y-%m-%d") - tasks = [] for ticker in tqdm(stock_symbols): await fine_tune_and_evaluate(ticker, con, start_date, end_date, skip_downloading) diff --git a/app/ml_models/__pycache__/score_model.cpython-310.pyc b/app/ml_models/__pycache__/score_model.cpython-310.pyc index 0974412..30e3db8 100644 Binary files a/app/ml_models/__pycache__/score_model.cpython-310.pyc and b/app/ml_models/__pycache__/score_model.cpython-310.pyc differ diff --git a/app/ml_models/score_model.py b/app/ml_models/score_model.py index f3ef114..db8d3a8 100644 --- a/app/ml_models/score_model.py +++ b/app/ml_models/score_model.py @@ -1,11 +1,10 @@ -import yfinance as yf import pandas as pd from datetime import datetime, timedelta import numpy as np from xgboost import XGBClassifier from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, accuracy_score from sklearn.preprocessing import MinMaxScaler -from sklearn.decomposition import PCA # Import PCA +from sklearn.decomposition import PCA import lightgbm as lgb from tqdm import tqdm @@ -16,17 +15,27 @@ import aiofiles import pickle import time + class ScorePredictor: def __init__(self): self.scaler = MinMaxScaler() - self.pca = PCA(n_components=5) + self.pca = PCA(n_components=0.95) # Retain components explaining 95% variance self.warm_start_model_path = 'ml_models/weights/ai-score/warm_start_weights.pkl' self.model = lgb.LGBMClassifier( - n_estimators=20_000, # If you want to use a larger model we've found 20_000 trees to be better - learning_rate=0.01, # and a learning rate of 0.001 - max_depth=20, # and max_depth=6 - num_leaves=2**6-1, # and num_leaves of 2**6-1 - colsample_bytree=0.1 + n_estimators=1000, # Number of boosting iterations - good balance between performance and training time + learning_rate=0.005, # Smaller learning rate for better generalization + max_depth=8, # Controlled depth to prevent overfitting + num_leaves=31, # 2^5-1, prevents overfitting while maintaining model complexity + colsample_bytree=0.8, # Use 80% of features per tree to reduce overfitting + subsample=0.8, # Use 80% of data per tree to reduce overfitting + min_child_samples=20, # Minimum samples per leaf to ensure reliable splits + random_state=42, # For reproducibility + class_weight='balanced', # Important for potentially imbalanced stock data + reg_alpha=0.1, # L1 regularization + reg_lambda=0.1, # L2 regularization + n_jobs=-1, # Use all CPU cores + verbose=-1, # Reduce output noise + warm_start= True, ) ''' XGBClassifier( @@ -43,14 +52,14 @@ class ScorePredictor: X = np.where(np.isinf(X), np.nan, X) X = np.nan_to_num(X) X = self.scaler.fit_transform(X) # Transform using the fitted scaler - return X#self.pca.fit_transform(X) # Fit PCA and transform + return self.pca.fit_transform(X) # Fit PCA and transform def preprocess_test_data(self, X): """Preprocess test data by scaling and applying PCA.""" X = np.where(np.isinf(X), np.nan, X) X = np.nan_to_num(X) X = self.scaler.transform(X) # Transform using the fitted scaler - return X#self.pca.transform(X) # Transform using the fitted PCA + return self.pca.transform(X) # Transform using the fitted PCA def warm_start_training(self, X_train, y_train): X_train = self.preprocess_train_data(X_train) @@ -58,6 +67,26 @@ class ScorePredictor: pickle.dump(self.model, open(f'{self.warm_start_model_path}', 'wb')) print("Warm start model saved.") + def batch_train_model(self, X_train, y_train, batch_size=1000): + """Train the model in batches to handle large datasets.""" + num_samples = len(X_train) + for start_idx in range(0, num_samples, batch_size): + end_idx = min(start_idx + batch_size, num_samples) + X_batch = X_train[start_idx:end_idx] + y_batch = y_train[start_idx:end_idx] + + # Preprocess each batch + X_batch = self.preprocess_train_data(X_batch) + + # Fit model on each batch (incremental training with warm_start=True) + self.model.fit(X_batch, y_batch, eval_set=[(X_batch, y_batch)]) + + print(f"Trained on batch {start_idx} to {end_idx}") + + # After batch training, save the model + pickle.dump(self.model, open(f'{self.warm_start_model_path}', 'wb')) + print("Batch learning completed and model saved.") + def fine_tune_model(self, X_train, y_train): X_train = self.preprocess_train_data(X_train) with open(f'{self.warm_start_model_path}', 'rb') as f: