backend/app/ml_models/score_model.py
2024-09-30 11:52:07 +02:00

138 lines
5.3 KiB
Python

import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
from sklearn.ensemble import RandomForestClassifier
import numpy as np
from xgboost import XGBClassifier
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from keras.models import Sequential, Model
from keras.layers import Input, Multiply, Reshape, LSTM, Dense, Conv1D, Dropout, BatchNormalization, GlobalAveragePooling1D, MaxPooling1D, Bidirectional
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from keras.models import load_model
from sklearn.feature_selection import SelectKBest, f_classif
from tensorflow.keras.backend import clear_session
from keras import regularizers
from keras.layers import Layer
from tqdm import tqdm
from collections import defaultdict
import asyncio
import aiohttp
import aiofiles
import pickle
import time
# Based on the paper: https://arxiv.org/pdf/1603.00751
class ScorePredictor:
def __init__(self):
self.scaler = MinMaxScaler()
self.model = self.build_model()
def build_model(self):
clear_session()
# Input layer
inputs = Input(shape=(2139,))
# First dense layer
x = Dense(1024, activation='relu', kernel_regularizer=regularizers.l2(0.01))(inputs)
x = Dropout(0.3)(x)
x = BatchNormalization()(x)
# Additional dense layers
for units in [512,256, 256]:
x = Dense(units, activation='relu', kernel_regularizer=regularizers.l2(0.01))(x)
x = Dropout(0.2)(x)
x = BatchNormalization()(x)
# Reshape for attention mechanism
x = Reshape((256, 1))(x)
# Attention mechanism
attention = Dense(256, activation='relu')(x)
attention = Dense(1, activation='softmax')(attention)
# Apply attention
x = Multiply()([x, attention])
# Global average pooling
x = GlobalAveragePooling1D()(x)
# Output layer
outputs = Dense(1, activation='sigmoid')(x)
# Create the model
model = Model(inputs=inputs, outputs=outputs)
# Optimizer with a lower learning rate
optimizer = Adam(learning_rate=0.1, clipnorm = 1.0)
# Compile the model
model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
return model
def preprocess_data(self, X):
# X = X.applymap(lambda x: 9999 if x == 0 else x) # Replace 0 with 9999 as suggested in the paper
X = np.where(np.isinf(X), np.nan, X)
X = np.nan_to_num(X)
X = self.scaler.fit_transform(X)
return X
def reshape_for_lstm(self, X):
return X.reshape((X.shape[0], X.shape[1], 1))
def train_model(self, X_train, y_train):
X_train = self.preprocess_data(X_train)
#X_train = self.reshape_for_lstm(X_train)
checkpoint = ModelCheckpoint('ml_models/weights/fundamental_weights/weights.keras',
save_best_only=True, save_freq = 1,
monitor='val_loss', mode='min')
early_stopping = EarlyStopping(monitor='val_loss', patience=70, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=60, min_lr=0.00001)
self.model.fit(X_train, y_train, epochs=100_000, batch_size=32,
validation_split=0.1, callbacks=[checkpoint, early_stopping, reduce_lr])
self.model.save('ml_models/weights/fundamental_weights/weights.keras')
def evaluate_model(self, X_test, y_test):
X_test = self.preprocess_data(X_test)
X_test = self.reshape_for_lstm(X_test)
self.model = load_model('ml_models/weights/fundamental_weights/weights.keras')
test_predictions = self.model.predict(X_test).flatten()
test_predictions[test_predictions >= 0.5] = 1
test_predictions[test_predictions < 0.5] = 0
test_precision = precision_score(y_test, test_predictions)
test_accuracy = accuracy_score(y_test, test_predictions)
print("Test Set Metrics:")
print(f"Precision: {round(test_precision * 100)}%")
print(f"Accuracy: {round(test_accuracy * 100)}%")
next_value_prediction = 1 if test_predictions[-1] >= 0.5 else 0
return {'accuracy': round(test_accuracy * 100),
'precision': round(test_precision * 100),
'sentiment': 'Bullish' if next_value_prediction == 1 else 'Bearish'}, test_predictions
def feature_selection(self, X_train, y_train, k=100):
print('feature selection:')
print(X_train.shape, y_train.shape)
selector = SelectKBest(score_func=f_classif, k=k)
selector.fit(X_train, y_train)
selector.transform(X_train)
selected_features = [col for i, col in enumerate(X_train.columns) if selector.get_support()[i]]
return selected_features