diff --git a/app/cron_ai_score.py b/app/cron_ai_score.py index f66126a..43f8b43 100644 --- a/app/cron_ai_score.py +++ b/app/cron_ai_score.py @@ -248,64 +248,62 @@ async def download_data(ticker, con, start_date, end_date, skip_downloading): pass -async def chunked_gather(tickers, con, start_date, end_date, skip_downloading, chunk_size=10): +async def chunked_gather(tickers, con, skip_downloading, chunk_size=10): + test_size = 0.2 + start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d") + end_date = datetime.today().strftime("%Y-%m-%d") + df_train = pd.DataFrame() + df_test = pd.DataFrame() + # Helper function to divide the tickers into chunks def chunks(lst, size): for i in range(0, len(lst), size): yield lst[i:i+size] - results = [] + dfs = [] - for chunk in tqdm(chunks(tickers, chunk_size)): + for num, chunk in enumerate(tqdm(chunks(tickers, chunk_size))): # Create tasks for each chunk tasks = [download_data(ticker, con, start_date, end_date, skip_downloading) for ticker in chunk] # Await the results for the current chunk chunk_results = await asyncio.gather(*tasks) # Accumulate the results - results.extend(chunk_results) + dfs.extend(chunk_results) + + train_list = [] + test_list = [] + + for df in dfs: + try: + split_size = int(len(df) * (1 - test_size)) + train_data = df.iloc[:split_size] + test_data = df.iloc[split_size:] + + # Append to the lists + train_list.append(train_data) + test_list.append(test_data) + except: + pass + + # Concatenate all at once outside the loop + df_train = pd.concat(train_list, ignore_index=True) + df_test = pd.concat(test_list, ignore_index=True) + df_train = df_train.sample(frac=1).reset_index(drop=True) + df_test = df_test.sample(frac=1).reset_index(drop=True) + + print('======Warm Start Train Set Datapoints======') + print(f'Batch Training: {num}') + print(len(df_train)) + + predictor = ScorePredictor() + selected_features = [col for col in df_train if col not in ['price', 'date', 'Target']] #top_uncorrelated_features(df_train, top_n=200) + predictor.warm_start_training(df_train[selected_features], df_train['Target']) + predictor.evaluate_model(df_test[selected_features], df_test['Target']) + - return results - async def warm_start_training(tickers, con, skip_downloading): - start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d") - end_date = datetime.today().strftime("%Y-%m-%d") - df_train = pd.DataFrame() - df_test = pd.DataFrame() - test_size = 0.2 - - dfs = await chunked_gather(tickers, con, start_date, end_date, skip_downloading, chunk_size=10) - - train_list = [] - test_list = [] - - for df in dfs: - try: - split_size = int(len(df) * (1 - test_size)) - train_data = df.iloc[:split_size] - test_data = df.iloc[split_size:] - - # Append to the lists - train_list.append(train_data) - test_list.append(test_data) - except: - pass - - # Concatenate all at once outside the loop - df_train = pd.concat(train_list, ignore_index=True) - df_test = pd.concat(test_list, ignore_index=True) - df_train = df_train.sample(frac=1).reset_index(drop=True) - df_test = df_test.sample(frac=1).reset_index(drop=True) - - print('======Warm Start Train Set Datapoints======') - print(len(df_train)) - - predictor = ScorePredictor() - selected_features = [col for col in df_train if col not in ['price', 'date', 'Target']] #top_uncorrelated_features(df_train, top_n=200) - #predictor.warm_start_training(df_train[selected_features], df_train['Target']) - predictor.batch_train_model(df_train[selected_features], df_train['Target'], batch_size=1000) - predictor.evaluate_model(df_test[selected_features], df_test['Target']) - - return predictor + + dfs = await chunked_gather(tickers, con, skip_downloading, chunk_size=50) async def fine_tune_and_evaluate(ticker, con, start_date, end_date, skip_downloading): try: @@ -328,7 +326,7 @@ async def fine_tune_and_evaluate(ticker, con, start_date, end_date, skip_downloa data = predictor.evaluate_model(test_data[selected_features], test_data['Target']) if len(data) != 0: - if data['precision'] >= 50 and data['accuracy'] >= 50 and data['accuracy'] < 100 and data['precision'] < 100 and data['f1_score'] > 50 and data['recall_score'] > 50 and data['roc_auc_score'] > 50: + if data['precision'] >= 50 and data['accuracy'] >= 50 and data['accuracy'] < 100 and data['precision'] < 100 and data['f1_score'] >= 50 and data['recall_score'] >= 50 and data['roc_auc_score'] >= 50: await save_json(ticker, data) print(f"Saved results for {ticker}") gc.collect() @@ -348,10 +346,10 @@ async def run(): if train_mode: # Warm start training - cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%' AND symbol NOT LIKE '%-%'") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E6 AND symbol NOT LIKE '%.%' AND symbol NOT LIKE '%-%'") warm_start_symbols = [row[0] for row in cursor.fetchall()] - print(f'Warm Start Training: {len(warm_start_symbols)}') - predictor = await warm_start_training(warm_start_symbols, con, skip_downloading) + print(f'Warm Start Training: Total Tickers {len(warm_start_symbols)}') + await warm_start_training(warm_start_symbols, con, skip_downloading) else: # Fine-tuning and evaluation for all stocks cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E6 AND symbol NOT LIKE '%.%'") diff --git a/app/ml_models/__pycache__/score_model.cpython-310.pyc b/app/ml_models/__pycache__/score_model.cpython-310.pyc index 30e3db8..129f98c 100644 Binary files a/app/ml_models/__pycache__/score_model.cpython-310.pyc and b/app/ml_models/__pycache__/score_model.cpython-310.pyc differ diff --git a/app/ml_models/score_model.py b/app/ml_models/score_model.py index db8d3a8..5b5b729 100644 --- a/app/ml_models/score_model.py +++ b/app/ml_models/score_model.py @@ -13,29 +13,29 @@ import asyncio import aiohttp import aiofiles import pickle +import os import time + class ScorePredictor: def __init__(self): self.scaler = MinMaxScaler() self.pca = PCA(n_components=0.95) # Retain components explaining 95% variance self.warm_start_model_path = 'ml_models/weights/ai-score/warm_start_weights.pkl' self.model = lgb.LGBMClassifier( - n_estimators=1000, # Number of boosting iterations - good balance between performance and training time + n_estimators=200, # Number of boosting iterations - good balance between performance and training time learning_rate=0.005, # Smaller learning rate for better generalization max_depth=8, # Controlled depth to prevent overfitting - num_leaves=31, # 2^5-1, prevents overfitting while maintaining model complexity + num_leaves=32, # 2^max_depth, prevents overfitting while maintaining model complexity colsample_bytree=0.8, # Use 80% of features per tree to reduce overfitting subsample=0.8, # Use 80% of data per tree to reduce overfitting min_child_samples=20, # Minimum samples per leaf to ensure reliable splits random_state=42, # For reproducibility - class_weight='balanced', # Important for potentially imbalanced stock data reg_alpha=0.1, # L1 regularization reg_lambda=0.1, # L2 regularization - n_jobs=-1, # Use all CPU cores - verbose=-1, # Reduce output noise - warm_start= True, + n_jobs=10, # Use N CPU cores + verbose=0, # Reduce output noise ) ''' XGBClassifier( @@ -63,29 +63,13 @@ class ScorePredictor: def warm_start_training(self, X_train, y_train): X_train = self.preprocess_train_data(X_train) + if os.path.exists(self.warm_start_model_path): + with open(f'{self.warm_start_model_path}', 'rb') as f: + self.model = pickle.load(f) self.model.fit(X_train, y_train) pickle.dump(self.model, open(f'{self.warm_start_model_path}', 'wb')) print("Warm start model saved.") - def batch_train_model(self, X_train, y_train, batch_size=1000): - """Train the model in batches to handle large datasets.""" - num_samples = len(X_train) - for start_idx in range(0, num_samples, batch_size): - end_idx = min(start_idx + batch_size, num_samples) - X_batch = X_train[start_idx:end_idx] - y_batch = y_train[start_idx:end_idx] - - # Preprocess each batch - X_batch = self.preprocess_train_data(X_batch) - - # Fit model on each batch (incremental training with warm_start=True) - self.model.fit(X_batch, y_batch, eval_set=[(X_batch, y_batch)]) - - print(f"Trained on batch {start_idx} to {end_idx}") - - # After batch training, save the model - pickle.dump(self.model, open(f'{self.warm_start_model_path}', 'wb')) - print("Batch learning completed and model saved.") def fine_tune_model(self, X_train, y_train): X_train = self.preprocess_train_data(X_train)