153 lines
5.7 KiB
Python
Executable File
153 lines
5.7 KiB
Python
Executable File
import numpy as np
|
|
import pandas as pd
|
|
from sklearn.model_selection import train_test_split
|
|
from sklearn.preprocessing import MinMaxScaler
|
|
|
|
from sklearn.linear_model import LinearRegression
|
|
from sklearn.svm import SVR
|
|
from sklearn.ensemble import RandomForestRegressor
|
|
from sklearn.neighbors import KNeighborsRegressor
|
|
|
|
from keras.models import Sequential, load_model
|
|
from keras.layers import LSTM, Dense, Dropout, BatchNormalization, Bidirectional
|
|
from keras.regularizers import l2
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from xgboost import XGBRegressor
|
|
from backtesting import Backtesting
|
|
import yfinance as yf
|
|
|
|
|
|
class regression_model:
|
|
def __init__(self, model_name, data, test_size, time_step, nth_day):
|
|
self.model_name = model_name
|
|
self.data = data
|
|
self.test_size = test_size
|
|
self.time_step = time_step
|
|
self.nth_day = nth_day
|
|
|
|
def correct_weekday(self, select_date):
|
|
# Monday is 0 and Sunday is 6
|
|
if select_date.weekday() > 4:
|
|
select_date = select_date - timedelta(select_date.weekday() - 4)
|
|
else:
|
|
pass
|
|
return select_date
|
|
|
|
def run(self):
|
|
dates = self.data['Date']
|
|
df = self.data['Close']
|
|
scaler = MinMaxScaler(feature_range=(0, 1))
|
|
df = scaler.fit_transform(np.array(df).reshape(-1, 1))
|
|
|
|
test_split_idx = int(df.shape[0] * (1 - self.test_size))
|
|
|
|
train_data = df[:test_split_idx].copy()
|
|
test_data = df[test_split_idx:].copy()
|
|
|
|
|
|
|
|
# convert an array of values into a dataset matrix
|
|
def create_dataset(dataset):
|
|
dataX, dataY = [], []
|
|
for i in range(len(dataset) - self.time_step - 1 - self.nth_day):
|
|
a = dataset[i:(i + self.time_step), 0]
|
|
dataX.append(a)
|
|
dataY.append(dataset[i + self.time_step + self.nth_day, 0])
|
|
return np.array(dataX), np.array(dataY)
|
|
|
|
def create_date_dataset(dataset):
|
|
dataX = []
|
|
for i in range(len(dataset) - self.time_step - 1 - self.nth_day):
|
|
a = dataset[i:(i + self.time_step)].iloc[-1]
|
|
dataX.append(a)
|
|
return pd.DataFrame(dataX)
|
|
|
|
X_train, y_train = create_dataset(train_data)
|
|
X_test, y_test = create_dataset(test_data)
|
|
|
|
def fit_model(model, X_train, y_train):
|
|
if self.model_name == 'LSTM':
|
|
model.fit(X_train, y_train, epochs=100, batch_size=32, validation_split=0.1)
|
|
else:
|
|
model.fit(X_train, y_train)
|
|
|
|
if self.model_name == 'LinearRegression':
|
|
model = LinearRegression(n_jobs=-1)
|
|
elif self.model_name == "XGBoost":
|
|
model = XGBRegressor(max_depth=10)
|
|
elif self.model_name == "SVR":
|
|
model = SVR()
|
|
elif self.model_name == 'RandomForestRegressor':
|
|
model = RandomForestRegressor()
|
|
elif self.model_name == 'KNeighborsRegressor':
|
|
model = KNeighborsRegressor()
|
|
elif self.model_name == 'LSTM':
|
|
model = Sequential()
|
|
model.add(Bidirectional(LSTM(units=100, return_sequences=True, kernel_regularizer=l2(0.01)), input_shape=(self.time_step, 1)))
|
|
model.add(BatchNormalization())
|
|
model.add(Dropout(0.5))
|
|
model.add(Bidirectional(LSTM(units=50, return_sequences=True, kernel_regularizer=l2(0.01))))
|
|
model.add(BatchNormalization())
|
|
model.add(Dropout(0.25))
|
|
model.add(Bidirectional(LSTM(units=10, kernel_regularizer=l2(0.01))))
|
|
model.add(BatchNormalization())
|
|
model.add(Dropout(0.2))
|
|
model.add(Dense(units=1))
|
|
model.compile(optimizer='sgd', loss='mean_squared_error')
|
|
else:
|
|
model = LinearRegression()
|
|
|
|
fit_model(model, X_train, y_train)
|
|
|
|
train_predict = model.predict(X_train)
|
|
train_predict = train_predict.reshape(-1, 1)
|
|
test_predict = model.predict(X_test)
|
|
test_predict = test_predict.reshape(-1, 1)
|
|
|
|
train_predict = scaler.inverse_transform(train_predict)
|
|
test_predict = scaler.inverse_transform(test_predict)
|
|
original_ytrain = scaler.inverse_transform(y_train.reshape(-1, 1))
|
|
original_ytest = scaler.inverse_transform(y_test.reshape(-1, 1))
|
|
|
|
performance = Backtesting(original_ytrain, train_predict, original_ytest, test_predict).run()
|
|
|
|
train_dates = dates[:test_split_idx].copy()
|
|
test_dates = dates[test_split_idx:].copy()
|
|
|
|
train_dates = create_date_dataset(train_dates)
|
|
test_dates = create_date_dataset(test_dates)
|
|
|
|
train_res = pd.DataFrame()
|
|
train_res['Date'] = train_dates
|
|
train_res['train'] = pd.DataFrame(train_predict)
|
|
|
|
test_res = pd.DataFrame()
|
|
test_res['Date'] = test_dates
|
|
test_res['test'] = pd.DataFrame(test_predict)
|
|
|
|
# Predict nth_day
|
|
x_input = test_data[len(test_data) - self.time_step:].reshape(1, -1)
|
|
yhat = model.predict(x_input)
|
|
new_pred_df = pd.DataFrame(scaler.inverse_transform(yhat.reshape(-1, 1)).reshape(1, -1).tolist()[0])
|
|
|
|
pred_res = pd.DataFrame()
|
|
pred_res['yhat'] = new_pred_df
|
|
print(performance)
|
|
print(pred_res)
|
|
|
|
return performance, train_res, test_res, pred_res
|
|
|
|
|
|
ticker = 'AMD'
|
|
start_date = datetime(2000, 1, 1)
|
|
end_date = datetime(2024,2,1) #datetime.today()
|
|
df = yf.download(ticker, start=start_date, end=end_date, interval="1d")
|
|
df = df.reset_index()
|
|
model_name = 'LinearRegression'
|
|
test_size = 0.2
|
|
time_step = 1
|
|
nth_day = 20 # Change this value to the desired nth_day
|
|
metric, train_df, test_df, pred_df = regression_model(model_name, df, test_size=test_size, \
|
|
time_step=time_step, nth_day=nth_day).run()
|