import yfinance as yf import pandas as pd from datetime import datetime, timedelta from sklearn.ensemble import RandomForestClassifier import numpy as np from xgboost import XGBClassifier from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, accuracy_score from sklearn.model_selection import train_test_split from sklearn.preprocessing import MinMaxScaler, StandardScaler from tqdm import tqdm from sklearn.feature_selection import SelectKBest, f_classif from collections import defaultdict import asyncio import aiohttp import pickle import time import sqlite3 import ujson #Based on the paper: https://arxiv.org/pdf/1603.00751 async def download_data(ticker, con, start_date, end_date): try: query_template = """ SELECT income, income_growth, balance, balance_growth, cashflow, cashflow_growth, ratios FROM stocks WHERE symbol = ? """ query_df = pd.read_sql_query(query_template, con, params=(ticker,)) income = ujson.loads(query_df['income'].iloc[0]) #Only consider company with at least 10 year worth of data if len(income) < 40: raise ValueError("Income data length is too small.") income = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in income if int(item["date"][:4]) >= 2000] income_growth = ujson.loads(query_df['income_growth'].iloc[0]) income_growth = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in income_growth if int(item["date"][:4]) >= 2000] balance = ujson.loads(query_df['balance'].iloc[0]) balance = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in balance if int(item["date"][:4]) >= 2000] balance_growth = ujson.loads(query_df['balance_growth'].iloc[0]) balance_growth = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in balance_growth if int(item["date"][:4]) >= 2000] cashflow = ujson.loads(query_df['cashflow'].iloc[0]) cashflow = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in cashflow if int(item["date"][:4]) >= 2000] cashflow_growth = ujson.loads(query_df['cashflow_growth'].iloc[0]) cashflow_growth = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in cashflow_growth if int(item["date"][:4]) >= 2000] ratios = ujson.loads(query_df['ratios'].iloc[0]) ratios = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in ratios if int(item["date"][:4]) >= 2000] combined_data = defaultdict(dict) # Iterate over all lists simultaneously for entries in zip(income, income_growth, balance, balance_growth, cashflow, cashflow_growth, ratios): # Iterate over each entry in the current set of entries for entry in entries: date = entry['date'] # Merge entry data into combined_data, skipping duplicate keys for key, value in entry.items(): if key not in combined_data[date]: combined_data[date][key] = value combined_data = list(combined_data.values()) df = yf.download(ticker, start=start_date, end=end_date, interval="1d").reset_index() df = df.rename(columns={'Adj Close': 'close', 'Date': 'date'}) #print(df[['date','close']]) df['date'] = df['date'].dt.strftime('%Y-%m-%d') for item in combined_data: # Find close price for '2023-09-30' or the closest available date prior to it target_date = item['date'] counter = 0 max_attempts = 10 while target_date not in df['date'].values and counter < max_attempts: # If the target date doesn't exist, move one day back target_date = (pd.to_datetime(target_date) - pd.Timedelta(days=1)).strftime('%Y-%m-%d') counter += 1 if counter == max_attempts: break # Get the close price for the found or closest date close_price = round(df[df['date'] == target_date]['close'].values[0],2) item['price'] = close_price #print(f"Close price for {target_date}: {close_price}") combined_data = sorted(combined_data, key=lambda x: x['date']) df_income = pd.DataFrame(combined_data).dropna() df_income['Target'] = ((df_income['price'].shift(-1) - df_income['price']) / df_income['price'] > 0).astype(int) df_copy = df_income.copy() return df_copy except Exception as e: print(e) class FundamentalPredictor: def __init__(self, path='weights'): self.model = XGBClassifier() #RandomForestClassifier(n_estimators=1000, max_depth = 20, min_samples_split=10, random_state=42, n_jobs=10) self.scaler = StandardScaler() self.path = path def feature_selection(self, X_train, y_train,k=8): ''' selector = SelectKBest(score_func=f_classif, k=8) selector.fit(X_train, y_train) selector.transform(X_train) selected_features = [col for i, col in enumerate(X_train.columns) if selector.get_support()[i]] return selected_features ''' # Calculate the variance of each feature with respect to the target variances = {} for col in X_train.columns: grouped_variance = X_train.groupby(y_train)[col].var().mean() variances[col] = grouped_variance # Sort features by variance and select top k features sorted_features = sorted(variances, key=variances.get, reverse=True)[:k] return sorted_features def train_model(self, X_train, y_train): X_train = X_train.applymap(lambda x: 1 if x == 0 else x) #Replace 0 with 1 as suggested in the paper X_train = np.where(np.isinf(X_train), np.nan, X_train) X_train = np.nan_to_num(X_train) X_train = self.scaler.fit_transform(X_train) self.model.fit(X_train, y_train) pickle.dump(self.model, open(f'{self.path}/fundamental_weights/weights.pkl', 'wb')) def evaluate_model(self, X_test, y_test): X_test = X_test.applymap(lambda x: 1 if x == 0 else x) #Replace 0 with 1 as suggested in the paper X_test = np.where(np.isinf(X_test), np.nan, X_test) X_test = np.nan_to_num(X_test) X_test = self.scaler.fit_transform(X_test) with open(f'{self.path}/fundamental_weights/weights.pkl', 'rb') as f: self.model = pickle.load(f) #test_predictions = self.model.predict(X_test) test_predictions = self.model.predict_proba(X_test)[:,1] test_predictions[test_predictions >=.5] = 1 test_predictions[test_predictions <.5] = 0 #print(y_test) test_precision = precision_score(y_test, test_predictions) test_accuracy = accuracy_score(y_test, test_predictions) #test_recall = recall_score(y_test, test_predictions) #test_f1 = f1_score(y_test, test_predictions) #test_roc_auc = roc_auc_score(y_test, test_predictions) print("Test Set Metrics:") print(f"Precision: {round(test_precision * 100)}%") print(f"Accuracy: {round(test_accuracy * 100)}%") #print(f"Recall: {round(test_recall * 100)}%") #print(f"F1-Score: {round(test_f1 * 100)}%") #print(f"ROC-AUC: {round(test_roc_auc * 100)}%") #print("Number of value counts in the test set") #print(pd.DataFrame(test_predictions).value_counts()) next_value_prediction = 1 if test_predictions[-1] >= 0.5 else 0 return {'accuracy': round(test_accuracy*100), 'precision': round(test_precision*100), 'sentiment': 'Bullish' if next_value_prediction == 1 else 'Bearish'}, test_predictions #Train mode async def train_process(tickers, con): tickers = list(set(tickers)) df_train = pd.DataFrame() df_test = pd.DataFrame() test_size = 0.4 start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d") end_date = datetime.today().strftime("%Y-%m-%d") predictor = FundamentalPredictor() df_train = pd.DataFrame() df_test = pd.DataFrame() tasks = [download_data(ticker, con, start_date, end_date) for ticker in tickers] dfs = await asyncio.gather(*tasks) for df in dfs: try: split_size = int(len(df) * (1-test_size)) train_data = df.iloc[:split_size] test_data = df.iloc[split_size:] df_train = pd.concat([df_train, train_data], ignore_index=True) df_test = pd.concat([df_test, test_data], ignore_index=True) except: pass best_features = [col for col in df_train.columns if col not in ['date','price','Target']] df_train = df_train.sample(frac=1).reset_index(drop=True) print('======Train Set Datapoints======') print(len(df_train)) #selected_features = predictor.feature_selection(df_train[best_features], df_train['Target'],k=10) #print(selected_features) #selected_features = [col for col in df_train if col not in ['price','date','Target']] selected_features = ['growthRevenue','ebitda','priceToBookRatio','eps','priceToSalesRatio','growthOtherCurrentLiabilities', 'receivablesTurnover', 'totalLiabilitiesAndStockholdersEquity', 'totalLiabilitiesAndTotalEquity', 'totalAssets', 'growthOtherCurrentAssets', 'retainedEarnings', 'totalEquity', 'totalStockholdersEquity', 'totalNonCurrentAssets'] predictor.train_model(df_train[selected_features], df_train['Target']) predictor.evaluate_model(df_test[selected_features], df_test['Target']) async def test_process(con): test_size = 0.4 start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d") end_date = datetime.today().strftime("%Y-%m-%d") predictor = FundamentalPredictor() df = await download_data('GME', con, start_date, end_date) split_size = int(len(df) * (1-test_size)) test_data = df.iloc[split_size:] selected_features = ['growthRevenue','ebitda','priceToBookRatio','eps','priceToSalesRatio','growthOtherCurrentLiabilities', 'receivablesTurnover', 'totalLiabilitiesAndStockholdersEquity', 'totalLiabilitiesAndTotalEquity', 'totalAssets', 'growthOtherCurrentAssets', 'retainedEarnings', 'totalEquity', 'totalStockholdersEquity', 'totalNonCurrentAssets'] #selected_features = [col for col in test_data if col not in ['price','date','Target']] predictor.evaluate_model(test_data[selected_features], test_data['Target']) async def main(): con = sqlite3.connect('../stocks.db') cursor = con.cursor() cursor.execute("PRAGMA journal_mode = wal") cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E9") stock_symbols = [row[0] for row in cursor.fetchall()] print(len(stock_symbols)) #selected_features = ['operatingIncomeRatio','growthRevenue','revenue','netIncome','priceToSalesRatio'] await train_process(stock_symbols, con) await test_process(con) con.close() # Run the main function asyncio.run(main())