add batch learning

This commit is contained in:
MuslemRahimi 2024-10-05 17:43:33 +02:00
parent 0a6a7b9b3b
commit 282da7d2cf
3 changed files with 116 additions and 140 deletions

View File

@ -100,54 +100,21 @@ async def download_data(ticker, con, start_date, end_date, skip_downloading):
f"json/financial-statements/owner-earnings/quarter/{ticker}.json", f"json/financial-statements/owner-earnings/quarter/{ticker}.json",
] ]
# Helper function to load JSON data asynchronously # Async loading and filtering
async def load_json_from_file(path):
async with aiofiles.open(path, 'r') as f:
content = await f.read()
return orjson.loads(content)
# Helper function to filter data based on keys and year
async def filter_data(data, ignore_keys, year_threshold=2000):
return [{k: v for k, v in item.items() if k not in ignore_keys} for item in data if int(item["date"][:4]) >= year_threshold]
# Define keys to ignore
ignore_keys = ["symbol", "reportedCurrency", "calendarYear", "fillingDate", "acceptedDate", "period", "cik", "link", "finalLink","pbRatio","ptbRatio"] ignore_keys = ["symbol", "reportedCurrency", "calendarYear", "fillingDate", "acceptedDate", "period", "cik", "link", "finalLink","pbRatio","ptbRatio"]
async def load_and_filter_json(path):
async with aiofiles.open(path, 'r') as f:
data = orjson.loads(await f.read())
return [{k: v for k, v in item.items() if k not in ignore_keys and int(item["date"][:4]) >= 2000} for item in data]
# Load and filter data for each statement type # Load all files concurrently
data = await asyncio.gather(*(load_and_filter_json(s) for s in statements))
ratios = await load_json_from_file(statements[0]) ratios, key_metrics, cashflow, income, balance, income_growth, balance_growth, cashflow_growth, owner_earnings = data
ratios = await filter_data(ratios, ignore_keys)
#Threshold of enough datapoints needed! #Threshold of enough datapoints needed!
if len(ratios) < 50: if len(ratios) < 50:
return return
key_metrics = await load_json_from_file(statements[1])
key_metrics = await filter_data(key_metrics, ignore_keys)
cashflow = await load_json_from_file(statements[2])
cashflow = await filter_data(cashflow, ignore_keys)
income = await load_json_from_file(statements[3])
income = await filter_data(income, ignore_keys)
balance = await load_json_from_file(statements[4])
balance = await filter_data(balance, ignore_keys)
income_growth = await load_json_from_file(statements[5])
income_growth = await filter_data(income_growth, ignore_keys)
balance_growth = await load_json_from_file(statements[6])
balance_growth = await filter_data(balance_growth, ignore_keys)
cashflow_growth = await load_json_from_file(statements[7])
cashflow_growth = await filter_data(cashflow_growth, ignore_keys)
owner_earnings = await load_json_from_file(statements[8])
owner_earnings = await filter_data(owner_earnings, ignore_keys)
# Combine all the data # Combine all the data
combined_data = defaultdict(dict) combined_data = defaultdict(dict)
@ -171,126 +138,106 @@ async def download_data(ticker, con, start_date, end_date, skip_downloading):
df_ta = generate_ta_features(df) df_ta = generate_ta_features(df)
# Filter columns in df_stats and df_ta that are not in df # Filter columns in df_stats and df_ta that are not in df
# Drop unnecessary columns from df_stats and df_ta
df_stats_filtered = df_stats.drop(columns=df_columns.intersection(df_stats.columns), errors='ignore') df_stats_filtered = df_stats.drop(columns=df_columns.intersection(df_stats.columns), errors='ignore')
df_ta_filtered = df_ta.drop(columns=df_columns.intersection(df_ta.columns), errors='ignore') df_ta_filtered = df_ta.drop(columns=df_columns.intersection(df_ta.columns), errors='ignore')
# Extract the column names for indicators
ta_columns = df_ta_filtered.columns.tolist() ta_columns = df_ta_filtered.columns.tolist()
stats_columns = df_stats_filtered.columns.tolist() stats_columns = df_stats_filtered.columns.tolist()
# Concatenate df with the filtered df_stats and df_ta # Concatenate df with the filtered df_stats and df_ta
df = pd.concat([df, df_ta_filtered, df_stats_filtered], axis=1) df = pd.concat([df, df_ta_filtered, df_stats_filtered], axis=1)
# Set up a dictionary for faster lookup of close prices and columns by date
df_dict = df.set_index('date').to_dict(orient='index')
# Match each combined data entry with the closest available stock price in df # Helper function to find closest date within max_attempts
for item in combined_data: def find_closest_date(target_date, max_attempts=10):
target_date = item['date']
counter = 0 counter = 0
max_attempts = 10 while target_date not in df_dict and counter < max_attempts:
# Look for the closest matching date in the stock data
while target_date not in df['date'].values and counter < max_attempts:
target_date = (pd.to_datetime(target_date) - pd.Timedelta(days=1)).strftime('%Y-%m-%d') target_date = (pd.to_datetime(target_date) - pd.Timedelta(days=1)).strftime('%Y-%m-%d')
counter += 1 counter += 1
return target_date if target_date in df_dict else None
# If max attempts are reached and no matching date is found, skip the entry # Match combined data entries with stock data
if counter == max_attempts: for item in combined_data:
target_date = item['date']
closest_date = find_closest_date(target_date)
# Skip if no matching date is found
if not closest_date:
continue continue
# Find the close price for the matching date # Fetch data from the dictionary for the closest matching date
close_price = round(df[df['date'] == target_date]['close'].values[0], 2) data = df_dict[closest_date]
item['price'] = close_price
# Dynamically add all indicator values to the combined_data entry # Add close price to the item
item['price'] = round(data['close'], 2)
for column in ta_columns:
column_value = df[df['date'] == target_date][column].values[0]
item[column] = column_value # Add the column value to the combined_data entry
for column in stats_columns:
column_value = df[df['date'] == target_date][column].values[0]
item[column] = column_value # Add the column value to the combined_data entry
# Dynamically add indicator values from ta_columns and stats_columns
for column in ta_columns + stats_columns:
item[column] = data.get(column, None)
# Sort the combined data by date # Sort the combined data by date
combined_data = sorted(combined_data, key=lambda x: x['date']) combined_data = sorted(combined_data, key=lambda x: x['date'])
# Convert combined data into a DataFrame
# Convert combined data to a DataFrame and drop rows with NaN values
df_combined = pd.DataFrame(combined_data).dropna() df_combined = pd.DataFrame(combined_data).dropna()
fundamental_columns = [ fundamental_columns = [
'revenue', 'revenue', 'costOfRevenue', 'grossProfit', 'netIncome', 'operatingIncome', 'operatingExpenses',
'costOfRevenue', 'researchAndDevelopmentExpenses', 'ebitda', 'freeCashFlow', 'incomeBeforeTax', 'incomeTaxExpense',
'grossProfit', 'debtRepayment', 'dividendsPaid', 'depreciationAndAmortization', 'netCashUsedProvidedByFinancingActivities',
'netIncome', 'changeInWorkingCapital', 'stockBasedCompensation', 'deferredIncomeTax', 'commonStockRepurchased',
'operatingIncome', 'operatingCashFlow', 'capitalExpenditure', 'accountsReceivables', 'purchasesOfInvestments',
'operatingExpenses', 'cashAndCashEquivalents', 'shortTermInvestments', 'cashAndShortTermInvestments', 'longTermInvestments',
'researchAndDevelopmentExpenses', 'otherCurrentLiabilities', 'totalCurrentLiabilities', 'longTermDebt', 'totalDebt', 'netDebt', 'commonStock',
'ebitda', 'totalEquity', 'totalLiabilitiesAndStockholdersEquity', 'totalStockholdersEquity', 'totalInvestments',
'freeCashFlow', 'taxAssets', 'totalAssets', 'inventory', 'propertyPlantEquipmentNet', 'ownersEarnings',
'incomeBeforeTax',
'incomeTaxExpense',
'debtRepayment',
'dividendsPaid',
'depreciationAndAmortization',
'netCashUsedProvidedByFinancingActivities',
'changeInWorkingCapital',
'stockBasedCompensation',
'deferredIncomeTax',
'commonStockRepurchased',
'operatingCashFlow',
'capitalExpenditure',
'accountsReceivables',
'purchasesOfInvestments',
'cashAndCashEquivalents',
'shortTermInvestments',
'cashAndShortTermInvestments',
'longTermInvestments',
'otherCurrentLiabilities',
'totalCurrentLiabilities',
'longTermDebt',
'totalDebt',
'netDebt',
'commonStock',
'totalEquity',
'totalLiabilitiesAndStockholdersEquity',
'totalStockholdersEquity',
'totalInvestments',
'taxAssets',
'totalAssets',
'inventory',
'propertyPlantEquipmentNet',
'ownersEarnings',
] ]
# Compute ratios for all combinations of key elements # Function to compute combinations within a group
new_columns = {} def compute_column_ratios(columns, df, new_columns):
# Loop over combinations of column pairs column_combinations = list(combinations(columns, 2))
for columns in [fundamental_columns, stats_columns, ta_columns]:
for num, denom in combinations(columns, 2): for num, denom in column_combinations:
# Compute ratio and reverse ratio with np.errstate(divide='ignore', invalid='ignore'):
ratio = df_combined[num] / df_combined[denom] # Compute ratio and reverse ratio safely
reverse_ratio = round(df_combined[denom] / df_combined[num],2) ratio = df[num] / df[denom]
reverse_ratio = df[denom] / df[num]
# Define column names for both ratios # Define column names for both ratios
column_name = f'{num}_to_{denom}' column_name = f'{num}_to_{denom}'
reverse_column_name = f'{denom}_to_{num}' reverse_column_name = f'{denom}_to_{num}'
# Store the new columns in the dictionary, replacing invalid values with 0 # Assign values to new columns, handling invalid values
new_columns[column_name] = np.nan_to_num(ratio, nan=0, posinf=0, neginf=0) new_columns[column_name] = np.nan_to_num(ratio, nan=0, posinf=0, neginf=0)
new_columns[reverse_column_name] = np.nan_to_num(reverse_ratio, nan=0, posinf=0, neginf=0) new_columns[reverse_column_name] = np.nan_to_num(reverse_ratio, nan=0, posinf=0, neginf=0)
# Add all new columns to the original DataFrame at once # Create an empty dictionary for the new columns
df_combined = pd.concat([df_combined, pd.DataFrame(new_columns)], axis=1) new_columns = {}
# To defragment the DataFrame, make a copy # Compute combinations for each group of columns
df_combined = df_combined.copy() compute_column_ratios(fundamental_columns, df_combined, new_columns)
df_combined = df_combined.dropna() compute_column_ratios(stats_columns, df_combined, new_columns)
df_combined = df_combined.where(~df_combined.isin([np.inf, -np.inf]), 0) compute_column_ratios(ta_columns, df_combined, new_columns)
# Concatenate the new ratio columns with the original DataFrame
df_combined = pd.concat([df_combined, pd.DataFrame(new_columns, index=df_combined.index)], axis=1)
# Clean up and replace invalid values
df_combined = df_combined.replace([np.inf, -np.inf], 0).dropna()
# Create 'Target' column to indicate if the next price is higher than the current one
df_combined['Target'] = ((df_combined['price'].shift(-1) - df_combined['price']) / df_combined['price'] > 0).astype(int) df_combined['Target'] = ((df_combined['price'].shift(-1) - df_combined['price']) / df_combined['price'] > 0).astype(int)
df_copy = df_combined.copy() # Copy DataFrame and round float values
df_copy = df_copy.map(lambda x: round(x, 2) if isinstance(x, float) else x) df_copy = df_combined.copy().map(lambda x: round(x, 2) if isinstance(x, float) else x)
if df_copy.shape[0] > 0: # Save to a file if there are rows in the DataFrame
if not df_copy.empty:
with open(file_path, 'wb') as file: with open(file_path, 'wb') as file:
file.write(orjson.dumps(df_copy.to_dict(orient='records'))) file.write(orjson.dumps(df_copy.to_dict(orient='records')))
@ -354,7 +301,8 @@ async def warm_start_training(tickers, con, skip_downloading):
predictor = ScorePredictor() predictor = ScorePredictor()
selected_features = [col for col in df_train if col not in ['price', 'date', 'Target']] #top_uncorrelated_features(df_train, top_n=200) selected_features = [col for col in df_train if col not in ['price', 'date', 'Target']] #top_uncorrelated_features(df_train, top_n=200)
predictor.warm_start_training(df_train[selected_features], df_train['Target']) #predictor.warm_start_training(df_train[selected_features], df_train['Target'])
predictor.batch_train_model(df_train[selected_features], df_train['Target'], batch_size=1000)
predictor.evaluate_model(df_test[selected_features], df_test['Target']) predictor.evaluate_model(df_test[selected_features], df_test['Target'])
return predictor return predictor
@ -400,20 +348,19 @@ async def run():
if train_mode: if train_mode:
# Warm start training # Warm start training
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E6 AND symbol NOT LIKE '%.%' AND symbol NOT LIKE '%-%'") cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%' AND symbol NOT LIKE '%-%'")
warm_start_symbols = [row[0] for row in cursor.fetchall()] warm_start_symbols = [row[0] for row in cursor.fetchall()]
print('Warm Start Training') print(f'Warm Start Training: {len(warm_start_symbols)}')
predictor = await warm_start_training(warm_start_symbols, con, skip_downloading) predictor = await warm_start_training(warm_start_symbols, con, skip_downloading)
else: else:
# Fine-tuning and evaluation for all stocks # Fine-tuning and evaluation for all stocks
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%'") cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E6 AND symbol NOT LIKE '%.%'")
stock_symbols = [row[0] for row in cursor.fetchall()] stock_symbols = [row[0] for row in cursor.fetchall()]
print(f"Total tickers for fine-tuning: {len(stock_symbols)}") print(f"Total tickers for fine-tuning: {len(stock_symbols)}")
start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d") start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d")
end_date = datetime.today().strftime("%Y-%m-%d") end_date = datetime.today().strftime("%Y-%m-%d")
tasks = [] tasks = []
for ticker in tqdm(stock_symbols): for ticker in tqdm(stock_symbols):
await fine_tune_and_evaluate(ticker, con, start_date, end_date, skip_downloading) await fine_tune_and_evaluate(ticker, con, start_date, end_date, skip_downloading)

View File

@ -1,11 +1,10 @@
import yfinance as yf
import pandas as pd import pandas as pd
from datetime import datetime, timedelta from datetime import datetime, timedelta
import numpy as np import numpy as np
from xgboost import XGBClassifier from xgboost import XGBClassifier
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, accuracy_score from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, accuracy_score
from sklearn.preprocessing import MinMaxScaler from sklearn.preprocessing import MinMaxScaler
from sklearn.decomposition import PCA # Import PCA from sklearn.decomposition import PCA
import lightgbm as lgb import lightgbm as lgb
from tqdm import tqdm from tqdm import tqdm
@ -16,17 +15,27 @@ import aiofiles
import pickle import pickle
import time import time
class ScorePredictor: class ScorePredictor:
def __init__(self): def __init__(self):
self.scaler = MinMaxScaler() self.scaler = MinMaxScaler()
self.pca = PCA(n_components=5) self.pca = PCA(n_components=0.95) # Retain components explaining 95% variance
self.warm_start_model_path = 'ml_models/weights/ai-score/warm_start_weights.pkl' self.warm_start_model_path = 'ml_models/weights/ai-score/warm_start_weights.pkl'
self.model = lgb.LGBMClassifier( self.model = lgb.LGBMClassifier(
n_estimators=20_000, # If you want to use a larger model we've found 20_000 trees to be better n_estimators=1000, # Number of boosting iterations - good balance between performance and training time
learning_rate=0.01, # and a learning rate of 0.001 learning_rate=0.005, # Smaller learning rate for better generalization
max_depth=20, # and max_depth=6 max_depth=8, # Controlled depth to prevent overfitting
num_leaves=2**6-1, # and num_leaves of 2**6-1 num_leaves=31, # 2^5-1, prevents overfitting while maintaining model complexity
colsample_bytree=0.1 colsample_bytree=0.8, # Use 80% of features per tree to reduce overfitting
subsample=0.8, # Use 80% of data per tree to reduce overfitting
min_child_samples=20, # Minimum samples per leaf to ensure reliable splits
random_state=42, # For reproducibility
class_weight='balanced', # Important for potentially imbalanced stock data
reg_alpha=0.1, # L1 regularization
reg_lambda=0.1, # L2 regularization
n_jobs=-1, # Use all CPU cores
verbose=-1, # Reduce output noise
warm_start= True,
) )
''' '''
XGBClassifier( XGBClassifier(
@ -43,14 +52,14 @@ class ScorePredictor:
X = np.where(np.isinf(X), np.nan, X) X = np.where(np.isinf(X), np.nan, X)
X = np.nan_to_num(X) X = np.nan_to_num(X)
X = self.scaler.fit_transform(X) # Transform using the fitted scaler X = self.scaler.fit_transform(X) # Transform using the fitted scaler
return X#self.pca.fit_transform(X) # Fit PCA and transform return self.pca.fit_transform(X) # Fit PCA and transform
def preprocess_test_data(self, X): def preprocess_test_data(self, X):
"""Preprocess test data by scaling and applying PCA.""" """Preprocess test data by scaling and applying PCA."""
X = np.where(np.isinf(X), np.nan, X) X = np.where(np.isinf(X), np.nan, X)
X = np.nan_to_num(X) X = np.nan_to_num(X)
X = self.scaler.transform(X) # Transform using the fitted scaler X = self.scaler.transform(X) # Transform using the fitted scaler
return X#self.pca.transform(X) # Transform using the fitted PCA return self.pca.transform(X) # Transform using the fitted PCA
def warm_start_training(self, X_train, y_train): def warm_start_training(self, X_train, y_train):
X_train = self.preprocess_train_data(X_train) X_train = self.preprocess_train_data(X_train)
@ -58,6 +67,26 @@ class ScorePredictor:
pickle.dump(self.model, open(f'{self.warm_start_model_path}', 'wb')) pickle.dump(self.model, open(f'{self.warm_start_model_path}', 'wb'))
print("Warm start model saved.") print("Warm start model saved.")
def batch_train_model(self, X_train, y_train, batch_size=1000):
"""Train the model in batches to handle large datasets."""
num_samples = len(X_train)
for start_idx in range(0, num_samples, batch_size):
end_idx = min(start_idx + batch_size, num_samples)
X_batch = X_train[start_idx:end_idx]
y_batch = y_train[start_idx:end_idx]
# Preprocess each batch
X_batch = self.preprocess_train_data(X_batch)
# Fit model on each batch (incremental training with warm_start=True)
self.model.fit(X_batch, y_batch, eval_set=[(X_batch, y_batch)])
print(f"Trained on batch {start_idx} to {end_idx}")
# After batch training, save the model
pickle.dump(self.model, open(f'{self.warm_start_model_path}', 'wb'))
print("Batch learning completed and model saved.")
def fine_tune_model(self, X_train, y_train): def fine_tune_model(self, X_train, y_train):
X_train = self.preprocess_train_data(X_train) X_train = self.preprocess_train_data(X_train)
with open(f'{self.warm_start_model_path}', 'rb') as f: with open(f'{self.warm_start_model_path}', 'rb') as f: