From 0bdc818d6b1955abcec75d356179ab3d7f55f207 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Wed, 9 Oct 2024 12:45:23 +0200 Subject: [PATCH] revert back to old model && bugfixing pre-post quote --- app/cron_ai_score.py | 233 ++++++++++++++--------------------- app/ml_models/score_model.py | 129 ++++++++++++++----- app/primary_cron_job.py | 2 + 3 files changed, 196 insertions(+), 168 deletions(-) diff --git a/app/cron_ai_score.py b/app/cron_ai_score.py index d8007b6..d361766 100644 --- a/app/cron_ai_score.py +++ b/app/cron_ai_score.py @@ -11,18 +11,15 @@ import pandas as pd from tqdm import tqdm import concurrent.futures import re -import random from itertools import combinations - from dotenv import load_dotenv import os -import gc from utils.feature_engineering import * + +import gc #Enable automatic garbage collection gc.enable() - - load_dotenv() api_key = os.getenv('FMP_API_KEY') @@ -49,37 +46,6 @@ async def fetch_historical_price(ticker): raise Exception(f"Error fetching data: {response.status} {response.reason}") -def top_uncorrelated_features(df, target_col='Target', top_n=10, threshold=0.75): - # Drop the columns to exclude from the DataFrame - df_filtered = df.drop(columns=['date','price']) - - # Compute the correlation matrix - correlation_matrix = df_filtered.corr() - - # Get the correlations with the target column, sorted by absolute value - correlations_with_target = correlation_matrix[target_col].drop(target_col).abs().sort_values(ascending=False) - - # Initialize the list of selected features - selected_features = [] - - # Iteratively select the most correlated features while minimizing correlation with each other - for feature in correlations_with_target.index: - # If we already have enough features, break - if len(selected_features) >= top_n: - break - - # Check correlation of this feature with already selected features - is_uncorrelated = True - for selected in selected_features: - if abs(correlation_matrix.loc[feature, selected]) > threshold: - is_uncorrelated = False - break - - # If it's uncorrelated with the selected features, add it to the list - if is_uncorrelated: - selected_features.append(feature) - return selected_features - async def download_data(ticker, con, start_date, end_date, skip_downloading, save_data): file_path = f"ml_models/training_data/ai-score/{ticker}.json" @@ -97,13 +63,13 @@ async def download_data(ticker, con, start_date, end_date, skip_downloading, sav statements = [ f"json/financial-statements/ratios/quarter/{ticker}.json", f"json/financial-statements/key-metrics/quarter/{ticker}.json", - f"json/financial-statements/cash-flow-statement/quarter/{ticker}.json", - f"json/financial-statements/income-statement/quarter/{ticker}.json", - f"json/financial-statements/balance-sheet-statement/quarter/{ticker}.json", + #f"json/financial-statements/cash-flow-statement/quarter/{ticker}.json", + #f"json/financial-statements/income-statement/quarter/{ticker}.json", + #f"json/financial-statements/balance-sheet-statement/quarter/{ticker}.json", f"json/financial-statements/income-statement-growth/quarter/{ticker}.json", f"json/financial-statements/balance-sheet-statement-growth/quarter/{ticker}.json", f"json/financial-statements/cash-flow-statement-growth/quarter/{ticker}.json", - f"json/financial-statements/owner-earnings/quarter/{ticker}.json", + #f"json/financial-statements/owner-earnings/quarter/{ticker}.json", ] # Async loading and filtering @@ -115,7 +81,7 @@ async def download_data(ticker, con, start_date, end_date, skip_downloading, sav # Load all files concurrently data = await asyncio.gather(*(load_and_filter_json(s) for s in statements)) - ratios, key_metrics, cashflow, income, balance, income_growth, balance_growth, cashflow_growth, owner_earnings = data + ratios, key_metrics, income_growth, balance_growth, cashflow_growth = data #Threshold of enough datapoints needed! if len(ratios) < 50: @@ -127,7 +93,7 @@ async def download_data(ticker, con, start_date, end_date, skip_downloading, sav combined_data = defaultdict(dict) # Merge the data based on 'date' - for entries in zip(ratios,key_metrics,income, balance, cashflow, owner_earnings, income_growth, balance_growth, cashflow_growth): + for entries in zip(ratios,key_metrics, income_growth, balance_growth, cashflow_growth): for entry in entries: try: date = entry['date'] @@ -141,7 +107,6 @@ async def download_data(ticker, con, start_date, end_date, skip_downloading, sav # Download historical stock data using yfinance df = await fetch_historical_price(ticker) - # Get the list of columns in df df_columns = df.columns df_stats = generate_statistical_features(df) @@ -224,7 +189,7 @@ async def download_data(ticker, con, start_date, end_date, skip_downloading, sav new_columns = {} # Compute combinations for each group of columns - compute_column_ratios(fundamental_columns, df_combined, new_columns) + #compute_column_ratios(fundamental_columns, df_combined, new_columns) compute_column_ratios(stats_columns, df_combined, new_columns) compute_column_ratios(ta_columns, df_combined, new_columns) @@ -252,108 +217,101 @@ async def download_data(ticker, con, start_date, end_date, skip_downloading, sav pass -async def chunked_gather(tickers, con, skip_downloading, save_data, chunk_size): - test_size = 0.2 - start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d") - end_date = datetime.today().strftime("%Y-%m-%d") - df_train = pd.DataFrame() - df_test_dict = {} # Store test data for each ticker - all_test_data = [] # Store all test data for overall evaluation - +async def chunked_gather(tickers, con, start_date, end_date, skip_downloading, save_data, chunk_size=10): # Helper function to divide the tickers into chunks def chunks(lst, size): for i in range(0, len(lst), size): - yield lst[i:i + size] - - for chunk in tqdm(chunks(tickers, chunk_size)): + yield lst[i:i+size] + + results = [] + + for chunk in chunks(tickers, chunk_size): # Create tasks for each chunk - print(f"chunk size: {len(chunk)}") tasks = [download_data(ticker, con, start_date, end_date, skip_downloading, save_data) for ticker in chunk] # Await the results for the current chunk chunk_results = await asyncio.gather(*tasks) - - train_list = [] - test_list = [] - - for ticker, df in zip(chunk, chunk_results): - try: - # Split the data into training and testing sets - split_size = int(len(df) * (1 - test_size)) - train_data = df.iloc[:split_size] - test_data = df.iloc[split_size:] - - # Append train data for combined training - train_list.append(train_data) - test_list.append(test_data) - except: - pass - - # Concatenate all train data together - df_train = pd.concat(train_list, ignore_index=True) - df_test = pd.concat(test_list, ignore_index=True) - - # Shuffle the combined training data - df_train = df_train.sample(frac=1, random_state=42).reset_index(drop=True) - df_test = df_test.sample(frac=1, random_state=42).reset_index(drop=True) - - print('====== Start Training Model on Combined Data ======') - predictor = ScorePredictor() - selected_features = [col for col in df_train if col not in ['price', 'date', 'Target']] - - # Train the model on the combined training data - predictor.warm_start_training(df_train[selected_features], df_train['Target']) - print(f'Training complete on {len(df_train)} samples.') - - # Evaluate the model on the overall test dataset - print('====== Evaluating on Overall Test Dataset ======') - data = predictor.evaluate_model(df_test[selected_features], df_test['Target']) - print(f'Overall Evaluation Metrics: {data}') - - -async def warm_start_training(tickers, con, skip_downloading, save_data): + # Accumulate the results + results.extend(chunk_results) - dfs = await chunked_gather(tickers, con, skip_downloading, save_data, chunk_size=100) + return results -async def fine_tune_and_evaluate(ticker, con, start_date, end_date, test_size, skip_downloading, save_data): + +async def warm_start_training(tickers, con, skip_downloading, save_data): + start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d") + end_date = datetime.today().strftime("%Y-%m-%d") + test_size = 0.2 + + dfs = await chunked_gather(tickers, con, start_date, end_date, skip_downloading, save_data, chunk_size=300) + + train_list = [] + test_list = [] + + for df in dfs: + try: + # Split the data into training and testing sets + split_size = int(len(df) * (1 - test_size)) + train_data = df.iloc[:split_size] + test_data = df.iloc[split_size:] + + # Append train data for combined training + train_list.append(train_data) + test_list.append(test_data) + except: + pass + + # Concatenate all train data together + df_train = pd.concat(train_list, ignore_index=True) + df_test = pd.concat(test_list, ignore_index=True) + + # Shuffle the combined training data + df_train = df_train.sample(frac=1, random_state=42).reset_index(drop=True) + df_test = df_test.sample(frac=1, random_state=42).reset_index(drop=True) + + print('======Warm Start Train Set Datapoints======') + print(len(df_train)) + + predictor = ScorePredictor() + selected_features = [col for col in df_train if col not in ['price', 'date', 'Target']] + predictor.warm_start_training(df_train[selected_features], df_train['Target']) + predictor.evaluate_model(df_test[selected_features], df_test['Target']) + return predictor + +async def fine_tune_and_evaluate(ticker, con, start_date, end_date, skip_downloading, save_data): try: - df_train = pd.DataFrame() - df_test_dict = {} # Store test data for each ticker - all_test_data = [] # Store all test data for overall evaluation - df = await download_data(ticker, con, start_date, end_date, skip_downloading, save_data) - split_size = int(len(df) * (1 - test_size)) - df_train = df.iloc[:split_size] - df_test = df.iloc[split_size:] - - # Shuffle the combined training data - df_train = df_train.sample(frac=1, random_state=42).reset_index(drop=True) - - print('====== Start Fine-tuning Model ======') - predictor = ScorePredictor() - selected_features = [col for col in df_train if col not in ['price', 'date', 'Target']] + if df is None or len(df) == 0: + print(f"No data available for {ticker}") + return + + test_size = 0.2 + split_size = int(len(df) * (1-test_size)) + train_data = df.iloc[:split_size] + test_data = df.iloc[split_size:] + + selected_features = [col for col in df.columns if col not in ['date','price','Target']] + # Fine-tune the model + predictor = ScorePredictor() + #predictor.fine_tune_model(train_data[selected_features], train_data['Target']) + + print(f"Evaluating fine-tuned model for {ticker}") + data = predictor.evaluate_model(test_data[selected_features], test_data['Target']) - # Train the model on the combined training data - predictor.fine_tune_model(df_train[selected_features], df_train['Target']) - print(f'Training complete on {len(df_train)} samples.') - print(f"Evaluating model for {ticker}") - data = predictor.evaluate_model(df_test[selected_features], df_test['Target']) - print(f'Overall Evaluation Metrics: {data}') if (data['precision'] >= 50 and data['accuracy'] >= 50 and - data['accuracy'] < 100 and data['precision'] < 100 and - data['f1_score'] >= 50 and data['recall_score'] >= 50 and - data['roc_auc_score'] >= 50): - # Save the evaluation data to a JSON file + data['accuracy'] < 100 and data['precision'] < 100 and + data['f1_score'] >= 50 and data['recall_score'] >= 50 and + data['roc_auc_score'] >= 50): await save_json(ticker, data) print(f"Saved results for {ticker}") + except Exception as e: - print(e) - pass + print(f"Error processing {ticker}: {e}") async def run(): train_mode = False # Set this to False for fine-tuning and evaluation skip_downloading = False - save_data = train_mode + save_data = True + con = sqlite3.connect('stocks.db') cursor = con.cursor() cursor.execute("PRAGMA journal_mode = wal") @@ -361,29 +319,24 @@ async def run(): if train_mode: # Warm start training warm_start_symbols = list(set(['CB','LOW','PFE','RTX','DIS','MS','BHP','BAC','PG','BABA','ACN','TMO','LLY','XOM','JPM','UNH','COST','HD','ASML','BRK-A','BRK-B','CAT','TT','SAP','APH','CVS','NOG','DVN','COP','OXY','MRO','MU','AVGO','INTC','LRCX','PLD','AMT','JNJ','ACN','TSM','V','ORCL','MA','BAC','BA','NFLX','ADBE','IBM','GME','NKE','ANGO','PNW','SHEL','XOM','WMT','BUD','AMZN','PEP','AMD','NVDA','AWR','TM','AAPL','GOOGL','META','MSFT','LMT','TSLA','DOV','PG','KO'])) - - print(f'Warm Start Training: Total Tickers {len(warm_start_symbols)}') - await warm_start_training(warm_start_symbols, con, skip_downloading, save_data) + print('Warm Start Training for:', warm_start_symbols) + predictor = await warm_start_training(warm_start_symbols, con, skip_downloading, save_data) else: + # Fine-tuning and evaluation for all stocks + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E6 AND symbol NOT LIKE '%.%'") + stock_symbols = [row[0] for row in cursor.fetchall()] + + print(f"Total tickers for fine-tuning: {len(stock_symbols)}") start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d") end_date = datetime.today().strftime("%Y-%m-%d") - test_size = 0.2 - cursor.execute(""" - SELECT DISTINCT symbol - FROM stocks - WHERE marketCap >= 500E6 - AND symbol NOT LIKE '%.%' - AND symbol NOT LIKE '%-%' - """) - stock_symbols = [row[0] for row in cursor.fetchall()] for ticker in tqdm(stock_symbols): - await fine_tune_and_evaluate(ticker, con, start_date, end_date, test_size, skip_downloading, save_data) + await fine_tune_and_evaluate(ticker, con, start_date, end_date, skip_downloading, save_data) + - con.close() if __name__ == "__main__": try: asyncio.run(run()) except Exception as e: - print(f"Main execution error: {e}") + print(f"Main execution error: {e}") \ No newline at end of file diff --git a/app/ml_models/score_model.py b/app/ml_models/score_model.py index 50731ce..6f7cfa7 100644 --- a/app/ml_models/score_model.py +++ b/app/ml_models/score_model.py @@ -1,10 +1,25 @@ +import yfinance as yf import pandas as pd from datetime import datetime, timedelta +from sklearn.ensemble import RandomForestClassifier import numpy as np +from xgboost import XGBClassifier from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, accuracy_score -from sklearn.preprocessing import MinMaxScaler +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import MinMaxScaler, StandardScaler +from keras.models import Sequential, Model +from keras.layers import Input, Multiply, Reshape, LSTM, Dense, Conv1D, Dropout, BatchNormalization, GlobalAveragePooling1D, MaxPooling1D, Bidirectional +from keras.optimizers import AdamW +from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau +from tensorflow.keras.activations import gelu +from keras.models import load_model +from sklearn.feature_selection import SelectKBest, f_classif +from tensorflow.keras.backend import clear_session +from keras import regularizers +from keras.layers import Layer +from tensorflow.keras import backend as K +import tensorflow as tf from sklearn.decomposition import PCA -import lightgbm as lgb from tqdm import tqdm from collections import defaultdict @@ -13,25 +28,62 @@ import aiohttp import aiofiles import pickle import time -import os + +class SelfAttention(Layer): + def __init__(self, **kwargs): + super(SelfAttention, self).__init__(**kwargs) + + def build(self, input_shape): + self.W = self.add_weight(name='attention_weight', shape=(input_shape[-1], 1), + initializer='random_normal', trainable=True) + super(SelfAttention, self).build(input_shape) + + def call(self, x): + # Alignment scores. Pass them through tanh function + e = K.tanh(K.dot(x, self.W)) + # Remove dimension of size 1 + e = K.squeeze(e, axis=-1) + # Compute the weights + alpha = K.softmax(e) + # Reshape to tensor of same shape as x for multiplication + alpha = K.expand_dims(alpha, axis=-1) + # Compute the context vector + context = x * alpha + context = K.sum(context, axis=1) + return context, alpha + + def compute_output_shape(self, input_shape): + return (input_shape[0], input_shape[-1]), (input_shape[0], input_shape[1]) class ScorePredictor: def __init__(self): self.scaler = MinMaxScaler() - self.pca = PCA(n_components=0.95) + self.model = None + self.warm_start_model_path = 'ml_models/weights/ai-score/warm_start_weights.keras' + self.pca = PCA(n_components=3) + def build_model(self): + clear_session() - # Define base models - self.lgb_model = lgb.LGBMClassifier( - n_estimators=2000, - learning_rate=0.001, - max_depth=5, - num_leaves=2**5-1, - n_jobs=10 - ) - + inputs = Input(shape=(3,)) + x = Dense(512, activation=gelu)(inputs) # Using GELU activation + x = Dropout(0.5)(x) + x = BatchNormalization()(x) - self.warm_start_model_path = 'ml_models/weights/ai-score/stacking_weights.pkl' + for units in [64, 32]: + x = Dense(units, activation=gelu)(x) # Using GELU activation + x = Dropout(0.2)(x) + x = BatchNormalization()(x) + + x = Reshape((32, 1))(x) + x, _ = SelfAttention()(x) + outputs = Dense(2, activation='softmax')(x) + + model = Model(inputs=inputs, outputs=outputs) + optimizer = AdamW(learning_rate=0.001, weight_decay=0.01, clipnorm=1.0) + model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy']) + + return model def preprocess_train_data(self, X): X = np.where(np.isinf(X), np.nan, X) @@ -42,31 +94,42 @@ class ScorePredictor: def preprocess_test_data(self, X): X = np.where(np.isinf(X), np.nan, X) X = np.nan_to_num(X) - X = self.scaler.transform(X) - return self.pca.transform(X) + X = self.scaler.fit_transform(X) + return self.pca.fit_transform(X) def warm_start_training(self, X_train, y_train): X_train = self.preprocess_train_data(X_train) - if os.path.exists(self.warm_start_model_path): - os.remove(self.warm_start_model_path) + self.model = self.build_model() - self.model.fit(X_train, y_train) - pickle.dump(self.model, open(self.warm_start_model_path, 'wb')) + checkpoint = ModelCheckpoint(self.warm_start_model_path, save_best_only=True, save_freq=1, monitor='val_loss', mode='min') + early_stopping = EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True) + reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=10, min_lr=0.001) + + self.model.fit(X_train, y_train, epochs=100_000, batch_size=256, validation_split=0.1, callbacks=[checkpoint, early_stopping, reduce_lr]) + self.model.save(self.warm_start_model_path) print("Warm start model saved.") def fine_tune_model(self, X_train, y_train): X_train = self.preprocess_train_data(X_train) - with open(self.warm_start_model_path, 'rb') as f: - self.model = pickle.load(f) + + if self.model is None: + self.model = load_model(self.warm_start_model_path, custom_objects={'SelfAttention': SelfAttention}) + + early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True) + reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=4, min_lr=0.0001) - self.model.fit(X_train, y_train) - print("Model fine-tuned") + self.model.fit(X_train, y_train, epochs=100, batch_size=128, validation_split=0.1, callbacks=[early_stopping, reduce_lr]) + print("Model fine-tuned (not saved).") def evaluate_model(self, X_test, y_test): X_test = self.preprocess_test_data(X_test) - test_predictions = self.model.predict_proba(X_test) - class_1_probabilities = test_predictions[:, 1] - binary_predictions = (class_1_probabilities >= 0.5).astype(int) + + with tf.device('/CPU:0'): + # Load model and make predictions + self.model = load_model(self.warm_start_model_path, custom_objects={'SelfAttention': SelfAttention}) + test_predictions = self.model.predict(X_test) + class_1_probabilities = test_predictions[:, 1] + binary_predictions = (class_1_probabilities >= 0.5).astype(int) # Calculate and print metrics test_precision = precision_score(y_test, binary_predictions) @@ -101,4 +164,14 @@ class ScorePredictor: 'recall_score': round(test_recall_score * 100), 'roc_auc_score': round(test_roc_auc_score * 100), 'score': score - } \ No newline at end of file + } + def feature_selection(self, X_train, y_train, k=100): + print('Feature selection:') + print(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}") + selector = SelectKBest(score_func=f_classif, k=k) + selector.fit(X_train, y_train) + + selector.transform(X_train) + selected_features = [col for i, col in enumerate(X_train.columns) if selector.get_support()[i]] + + return selected_features \ No newline at end of file diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index 697a294..ef3c074 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -206,6 +206,8 @@ def run_cron_quote(): run_command(["python3", "cron_quote.py"]) command = ["sudo", "rsync", "-avz", "-e", "ssh", "/root/backend/app/json/quote", f"root@{useast_ip_address}:/root/backend/app/json"] run_command(command) + command = ["sudo", "rsync", "-avz", "-e", "ssh", "/root/backend/app/json/pre-post-quote", f"root@{useast_ip_address}:/root/backend/app/json"] + run_command(command) def run_cron_options_flow():