update batch training
This commit is contained in:
parent
282da7d2cf
commit
3e6ef8b540
@ -248,64 +248,62 @@ async def download_data(ticker, con, start_date, end_date, skip_downloading):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
async def chunked_gather(tickers, con, start_date, end_date, skip_downloading, chunk_size=10):
|
async def chunked_gather(tickers, con, skip_downloading, chunk_size=10):
|
||||||
|
test_size = 0.2
|
||||||
|
start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d")
|
||||||
|
end_date = datetime.today().strftime("%Y-%m-%d")
|
||||||
|
df_train = pd.DataFrame()
|
||||||
|
df_test = pd.DataFrame()
|
||||||
|
|
||||||
# Helper function to divide the tickers into chunks
|
# Helper function to divide the tickers into chunks
|
||||||
def chunks(lst, size):
|
def chunks(lst, size):
|
||||||
for i in range(0, len(lst), size):
|
for i in range(0, len(lst), size):
|
||||||
yield lst[i:i+size]
|
yield lst[i:i+size]
|
||||||
|
|
||||||
results = []
|
dfs = []
|
||||||
|
|
||||||
for chunk in tqdm(chunks(tickers, chunk_size)):
|
for num, chunk in enumerate(tqdm(chunks(tickers, chunk_size))):
|
||||||
# Create tasks for each chunk
|
# Create tasks for each chunk
|
||||||
tasks = [download_data(ticker, con, start_date, end_date, skip_downloading) for ticker in chunk]
|
tasks = [download_data(ticker, con, start_date, end_date, skip_downloading) for ticker in chunk]
|
||||||
# Await the results for the current chunk
|
# Await the results for the current chunk
|
||||||
chunk_results = await asyncio.gather(*tasks)
|
chunk_results = await asyncio.gather(*tasks)
|
||||||
# Accumulate the results
|
# Accumulate the results
|
||||||
results.extend(chunk_results)
|
dfs.extend(chunk_results)
|
||||||
|
|
||||||
|
train_list = []
|
||||||
|
test_list = []
|
||||||
|
|
||||||
|
for df in dfs:
|
||||||
|
try:
|
||||||
|
split_size = int(len(df) * (1 - test_size))
|
||||||
|
train_data = df.iloc[:split_size]
|
||||||
|
test_data = df.iloc[split_size:]
|
||||||
|
|
||||||
|
# Append to the lists
|
||||||
|
train_list.append(train_data)
|
||||||
|
test_list.append(test_data)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Concatenate all at once outside the loop
|
||||||
|
df_train = pd.concat(train_list, ignore_index=True)
|
||||||
|
df_test = pd.concat(test_list, ignore_index=True)
|
||||||
|
df_train = df_train.sample(frac=1).reset_index(drop=True)
|
||||||
|
df_test = df_test.sample(frac=1).reset_index(drop=True)
|
||||||
|
|
||||||
|
print('======Warm Start Train Set Datapoints======')
|
||||||
|
print(f'Batch Training: {num}')
|
||||||
|
print(len(df_train))
|
||||||
|
|
||||||
|
predictor = ScorePredictor()
|
||||||
|
selected_features = [col for col in df_train if col not in ['price', 'date', 'Target']] #top_uncorrelated_features(df_train, top_n=200)
|
||||||
|
predictor.warm_start_training(df_train[selected_features], df_train['Target'])
|
||||||
|
predictor.evaluate_model(df_test[selected_features], df_test['Target'])
|
||||||
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
async def warm_start_training(tickers, con, skip_downloading):
|
async def warm_start_training(tickers, con, skip_downloading):
|
||||||
start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d")
|
|
||||||
end_date = datetime.today().strftime("%Y-%m-%d")
|
dfs = await chunked_gather(tickers, con, skip_downloading, chunk_size=50)
|
||||||
df_train = pd.DataFrame()
|
|
||||||
df_test = pd.DataFrame()
|
|
||||||
test_size = 0.2
|
|
||||||
|
|
||||||
dfs = await chunked_gather(tickers, con, start_date, end_date, skip_downloading, chunk_size=10)
|
|
||||||
|
|
||||||
train_list = []
|
|
||||||
test_list = []
|
|
||||||
|
|
||||||
for df in dfs:
|
|
||||||
try:
|
|
||||||
split_size = int(len(df) * (1 - test_size))
|
|
||||||
train_data = df.iloc[:split_size]
|
|
||||||
test_data = df.iloc[split_size:]
|
|
||||||
|
|
||||||
# Append to the lists
|
|
||||||
train_list.append(train_data)
|
|
||||||
test_list.append(test_data)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Concatenate all at once outside the loop
|
|
||||||
df_train = pd.concat(train_list, ignore_index=True)
|
|
||||||
df_test = pd.concat(test_list, ignore_index=True)
|
|
||||||
df_train = df_train.sample(frac=1).reset_index(drop=True)
|
|
||||||
df_test = df_test.sample(frac=1).reset_index(drop=True)
|
|
||||||
|
|
||||||
print('======Warm Start Train Set Datapoints======')
|
|
||||||
print(len(df_train))
|
|
||||||
|
|
||||||
predictor = ScorePredictor()
|
|
||||||
selected_features = [col for col in df_train if col not in ['price', 'date', 'Target']] #top_uncorrelated_features(df_train, top_n=200)
|
|
||||||
#predictor.warm_start_training(df_train[selected_features], df_train['Target'])
|
|
||||||
predictor.batch_train_model(df_train[selected_features], df_train['Target'], batch_size=1000)
|
|
||||||
predictor.evaluate_model(df_test[selected_features], df_test['Target'])
|
|
||||||
|
|
||||||
return predictor
|
|
||||||
|
|
||||||
async def fine_tune_and_evaluate(ticker, con, start_date, end_date, skip_downloading):
|
async def fine_tune_and_evaluate(ticker, con, start_date, end_date, skip_downloading):
|
||||||
try:
|
try:
|
||||||
@ -328,7 +326,7 @@ async def fine_tune_and_evaluate(ticker, con, start_date, end_date, skip_downloa
|
|||||||
data = predictor.evaluate_model(test_data[selected_features], test_data['Target'])
|
data = predictor.evaluate_model(test_data[selected_features], test_data['Target'])
|
||||||
|
|
||||||
if len(data) != 0:
|
if len(data) != 0:
|
||||||
if data['precision'] >= 50 and data['accuracy'] >= 50 and data['accuracy'] < 100 and data['precision'] < 100 and data['f1_score'] > 50 and data['recall_score'] > 50 and data['roc_auc_score'] > 50:
|
if data['precision'] >= 50 and data['accuracy'] >= 50 and data['accuracy'] < 100 and data['precision'] < 100 and data['f1_score'] >= 50 and data['recall_score'] >= 50 and data['roc_auc_score'] >= 50:
|
||||||
await save_json(ticker, data)
|
await save_json(ticker, data)
|
||||||
print(f"Saved results for {ticker}")
|
print(f"Saved results for {ticker}")
|
||||||
gc.collect()
|
gc.collect()
|
||||||
@ -348,10 +346,10 @@ async def run():
|
|||||||
|
|
||||||
if train_mode:
|
if train_mode:
|
||||||
# Warm start training
|
# Warm start training
|
||||||
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%' AND symbol NOT LIKE '%-%'")
|
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E6 AND symbol NOT LIKE '%.%' AND symbol NOT LIKE '%-%'")
|
||||||
warm_start_symbols = [row[0] for row in cursor.fetchall()]
|
warm_start_symbols = [row[0] for row in cursor.fetchall()]
|
||||||
print(f'Warm Start Training: {len(warm_start_symbols)}')
|
print(f'Warm Start Training: Total Tickers {len(warm_start_symbols)}')
|
||||||
predictor = await warm_start_training(warm_start_symbols, con, skip_downloading)
|
await warm_start_training(warm_start_symbols, con, skip_downloading)
|
||||||
else:
|
else:
|
||||||
# Fine-tuning and evaluation for all stocks
|
# Fine-tuning and evaluation for all stocks
|
||||||
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E6 AND symbol NOT LIKE '%.%'")
|
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E6 AND symbol NOT LIKE '%.%'")
|
||||||
|
|||||||
Binary file not shown.
@ -13,29 +13,29 @@ import asyncio
|
|||||||
import aiohttp
|
import aiohttp
|
||||||
import aiofiles
|
import aiofiles
|
||||||
import pickle
|
import pickle
|
||||||
|
import os
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class ScorePredictor:
|
class ScorePredictor:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.scaler = MinMaxScaler()
|
self.scaler = MinMaxScaler()
|
||||||
self.pca = PCA(n_components=0.95) # Retain components explaining 95% variance
|
self.pca = PCA(n_components=0.95) # Retain components explaining 95% variance
|
||||||
self.warm_start_model_path = 'ml_models/weights/ai-score/warm_start_weights.pkl'
|
self.warm_start_model_path = 'ml_models/weights/ai-score/warm_start_weights.pkl'
|
||||||
self.model = lgb.LGBMClassifier(
|
self.model = lgb.LGBMClassifier(
|
||||||
n_estimators=1000, # Number of boosting iterations - good balance between performance and training time
|
n_estimators=200, # Number of boosting iterations - good balance between performance and training time
|
||||||
learning_rate=0.005, # Smaller learning rate for better generalization
|
learning_rate=0.005, # Smaller learning rate for better generalization
|
||||||
max_depth=8, # Controlled depth to prevent overfitting
|
max_depth=8, # Controlled depth to prevent overfitting
|
||||||
num_leaves=31, # 2^5-1, prevents overfitting while maintaining model complexity
|
num_leaves=32, # 2^max_depth, prevents overfitting while maintaining model complexity
|
||||||
colsample_bytree=0.8, # Use 80% of features per tree to reduce overfitting
|
colsample_bytree=0.8, # Use 80% of features per tree to reduce overfitting
|
||||||
subsample=0.8, # Use 80% of data per tree to reduce overfitting
|
subsample=0.8, # Use 80% of data per tree to reduce overfitting
|
||||||
min_child_samples=20, # Minimum samples per leaf to ensure reliable splits
|
min_child_samples=20, # Minimum samples per leaf to ensure reliable splits
|
||||||
random_state=42, # For reproducibility
|
random_state=42, # For reproducibility
|
||||||
class_weight='balanced', # Important for potentially imbalanced stock data
|
|
||||||
reg_alpha=0.1, # L1 regularization
|
reg_alpha=0.1, # L1 regularization
|
||||||
reg_lambda=0.1, # L2 regularization
|
reg_lambda=0.1, # L2 regularization
|
||||||
n_jobs=-1, # Use all CPU cores
|
n_jobs=10, # Use N CPU cores
|
||||||
verbose=-1, # Reduce output noise
|
verbose=0, # Reduce output noise
|
||||||
warm_start= True,
|
|
||||||
)
|
)
|
||||||
'''
|
'''
|
||||||
XGBClassifier(
|
XGBClassifier(
|
||||||
@ -63,29 +63,13 @@ class ScorePredictor:
|
|||||||
|
|
||||||
def warm_start_training(self, X_train, y_train):
|
def warm_start_training(self, X_train, y_train):
|
||||||
X_train = self.preprocess_train_data(X_train)
|
X_train = self.preprocess_train_data(X_train)
|
||||||
|
if os.path.exists(self.warm_start_model_path):
|
||||||
|
with open(f'{self.warm_start_model_path}', 'rb') as f:
|
||||||
|
self.model = pickle.load(f)
|
||||||
self.model.fit(X_train, y_train)
|
self.model.fit(X_train, y_train)
|
||||||
pickle.dump(self.model, open(f'{self.warm_start_model_path}', 'wb'))
|
pickle.dump(self.model, open(f'{self.warm_start_model_path}', 'wb'))
|
||||||
print("Warm start model saved.")
|
print("Warm start model saved.")
|
||||||
|
|
||||||
def batch_train_model(self, X_train, y_train, batch_size=1000):
|
|
||||||
"""Train the model in batches to handle large datasets."""
|
|
||||||
num_samples = len(X_train)
|
|
||||||
for start_idx in range(0, num_samples, batch_size):
|
|
||||||
end_idx = min(start_idx + batch_size, num_samples)
|
|
||||||
X_batch = X_train[start_idx:end_idx]
|
|
||||||
y_batch = y_train[start_idx:end_idx]
|
|
||||||
|
|
||||||
# Preprocess each batch
|
|
||||||
X_batch = self.preprocess_train_data(X_batch)
|
|
||||||
|
|
||||||
# Fit model on each batch (incremental training with warm_start=True)
|
|
||||||
self.model.fit(X_batch, y_batch, eval_set=[(X_batch, y_batch)])
|
|
||||||
|
|
||||||
print(f"Trained on batch {start_idx} to {end_idx}")
|
|
||||||
|
|
||||||
# After batch training, save the model
|
|
||||||
pickle.dump(self.model, open(f'{self.warm_start_model_path}', 'wb'))
|
|
||||||
print("Batch learning completed and model saved.")
|
|
||||||
|
|
||||||
def fine_tune_model(self, X_train, y_train):
|
def fine_tune_model(self, X_train, y_train):
|
||||||
X_train = self.preprocess_train_data(X_train)
|
X_train = self.preprocess_train_data(X_train)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user