update fundamental ml model

This commit is contained in:
MuslemRahimi 2024-09-28 14:50:38 +02:00
parent ed11719628
commit ac81bd613a
4 changed files with 142 additions and 231 deletions

View File

@ -1,6 +1,7 @@
import ujson import orjson
import asyncio import asyncio
import aiohttp import aiohttp
import aiofiles
import sqlite3 import sqlite3
from datetime import datetime from datetime import datetime
from ml_models.fundamental_predictor import FundamentalPredictor from ml_models.fundamental_predictor import FundamentalPredictor
@ -10,103 +11,114 @@ import pandas as pd
from tqdm import tqdm from tqdm import tqdm
import concurrent.futures import concurrent.futures
import re import re
import subprocess
async def save_json(symbol, data): async def save_json(symbol, data):
with open(f"json/fundamental-predictor-analysis/{symbol}.json", 'w') as file: with open(f"json/fundamental-predictor-analysis/{symbol}.json", 'w') as file:
ujson.dump(data, file) orjson.dump(data, file)
async def download_data(ticker, con, start_date, end_date): async def download_data(ticker, con, start_date, end_date):
try: try:
query_template = """ # Define paths to the statement files
SELECT statements = [
income, income_growth, balance, balance_growth, cashflow, cashflow_growth, ratios f"json/financial-statements/ratios/quarter/{ticker}.json",
FROM f"json/financial-statements/cash-flow-statement/quarter/{ticker}.json",
stocks f"json/financial-statements/income-statement/quarter/{ticker}.json",
WHERE f"json/financial-statements/balance-sheet-statement/quarter/{ticker}.json",
symbol = ? f"json/financial-statements/income-statement-growth/quarter/{ticker}.json",
""" f"json/financial-statements/balance-sheet-statement-growth/quarter/{ticker}.json",
f"json/financial-statements/cash-flow-statement-growth/quarter/{ticker}.json"
]
query_df = pd.read_sql_query(query_template, con, params=(ticker,)) # Helper function to load JSON data asynchronously
async def load_json_from_file(path):
async with aiofiles.open(path, 'r') as f:
content = await f.read()
return orjson.loads(content)
income = ujson.loads(query_df['income'].iloc[0]) # Helper function to filter data based on keys and year
#Only consider company with at least 10 year worth of data async def filter_data(data, ignore_keys, year_threshold=2000):
if len(income) < 40: return [{k: v for k, v in item.items() if k not in ignore_keys} for item in data if int(item["date"][:4]) >= year_threshold]
raise ValueError("Income data length is too small.")
income = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in income if int(item["date"][:4]) >= 2000] # Define keys to ignore
income_growth = ujson.loads(query_df['income_growth'].iloc[0]) ignore_keys = ["symbol", "reportedCurrency", "calendarYear", "fillingDate", "acceptedDate", "period", "cik", "link", "finalLink"]
income_growth = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in income_growth if int(item["date"][:4]) >= 2000]
balance = ujson.loads(query_df['balance'].iloc[0]) # Load and filter data for each statement type
balance = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in balance if int(item["date"][:4]) >= 2000] income = await load_json_from_file(statements[2])
balance_growth = ujson.loads(query_df['balance_growth'].iloc[0]) income = await filter_data(income, ignore_keys)
balance_growth = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in balance_growth if int(item["date"][:4]) >= 2000]
cashflow = ujson.loads(query_df['cashflow'].iloc[0]) income_growth = await load_json_from_file(statements[4])
cashflow = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in cashflow if int(item["date"][:4]) >= 2000] income_growth = await filter_data(income_growth, ignore_keys)
cashflow_growth = ujson.loads(query_df['cashflow_growth'].iloc[0])
cashflow_growth = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in cashflow_growth if int(item["date"][:4]) >= 2000]
balance = await load_json_from_file(statements[3])
balance = await filter_data(balance, ignore_keys)
ratios = ujson.loads(query_df['ratios'].iloc[0]) balance_growth = await load_json_from_file(statements[5])
ratios = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in ratios if int(item["date"][:4]) >= 2000] balance_growth = await filter_data(balance_growth, ignore_keys)
cashflow = await load_json_from_file(statements[1])
cashflow = await filter_data(cashflow, ignore_keys)
cashflow_growth = await load_json_from_file(statements[6])
cashflow_growth = await filter_data(cashflow_growth, ignore_keys)
ratios = await load_json_from_file(statements[0])
ratios = await filter_data(ratios, ignore_keys)
# Combine all the data
combined_data = defaultdict(dict) combined_data = defaultdict(dict)
# Iterate over all lists simultaneously
# Merge the data based on 'date'
for entries in zip(income, income_growth, balance, balance_growth, cashflow, cashflow_growth, ratios): for entries in zip(income, income_growth, balance, balance_growth, cashflow, cashflow_growth, ratios):
# Iterate over each entry in the current set of entries
for entry in entries: for entry in entries:
date = entry['date'] date = entry['date']
# Merge entry data into combined_data, skipping duplicate keys
for key, value in entry.items(): for key, value in entry.items():
if key not in combined_data[date]: if key not in combined_data[date]:
combined_data[date][key] = value combined_data[date][key] = value
combined_data = list(combined_data.values()) combined_data = list(combined_data.values())
# Download historical stock data using yfinance
df = yf.download(ticker, start=start_date, end=end_date, interval="1d").reset_index() df = yf.download(ticker, start=start_date, end=end_date, interval="1d").reset_index()
df = df.rename(columns={'Adj Close': 'close', 'Date': 'date'}) df = df.rename(columns={'Adj Close': 'close', 'Date': 'date'})
#print(df[['date','close']])
df['date'] = df['date'].dt.strftime('%Y-%m-%d') df['date'] = df['date'].dt.strftime('%Y-%m-%d')
# Match each combined data entry with the closest available stock price in df
for item in combined_data: for item in combined_data:
# Find close price for '2023-09-30' or the closest available date prior to it
target_date = item['date'] target_date = item['date']
counter = 0 counter = 0
max_attempts = 10 max_attempts = 10
# Look for the closest matching date in the stock data
while target_date not in df['date'].values and counter < max_attempts: while target_date not in df['date'].values and counter < max_attempts:
# If the target date doesn't exist, move one day back
target_date = (pd.to_datetime(target_date) - pd.Timedelta(days=1)).strftime('%Y-%m-%d') target_date = (pd.to_datetime(target_date) - pd.Timedelta(days=1)).strftime('%Y-%m-%d')
counter += 1 counter += 1
if counter == max_attempts:
break
# Get the close price for the found or closest date # If max attempts are reached and no matching date is found, skip the entry
close_price = round(df[df['date'] == target_date]['close'].values[0],2) if counter == max_attempts:
continue
# Find the close price for the matching date
close_price = round(df[df['date'] == target_date]['close'].values[0], 2)
item['price'] = close_price item['price'] = close_price
#print(f"Close price for {target_date}: {close_price}") # Sort the combined data by date
combined_data = sorted(combined_data, key=lambda x: x['date']) combined_data = sorted(combined_data, key=lambda x: x['date'])
# Convert combined data into a DataFrame
df_combined = pd.DataFrame(combined_data).dropna()
df_income = pd.DataFrame(combined_data).dropna() # Create 'Target' column based on price change
df_combined['Target'] = ((df_combined['price'].shift(-1) - df_combined['price']) / df_combined['price'] > 0).astype(int)
df_income['Target'] = ((df_income['price'].shift(-1) - df_income['price']) / df_income['price'] > 0).astype(int)
df_copy = df_income.copy()
#print(df_copy)
# Return a copy of the combined DataFrame
df_copy = df_combined.copy()
return df_copy return df_copy
except Exception as e: except:
print(e) pass
async def process_symbol(ticker, con, start_date, end_date): async def process_symbol(ticker, con, start_date, end_date):
@ -114,7 +126,7 @@ async def process_symbol(ticker, con, start_date, end_date):
test_size = 0.4 test_size = 0.4
start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d") start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d")
end_date = datetime.today().strftime("%Y-%m-%d") end_date = datetime.today().strftime("%Y-%m-%d")
predictor = FundamentalPredictor(path="ml_models/weights") predictor = FundamentalPredictor()
df = await download_data(ticker, con, start_date, end_date) df = await download_data(ticker, con, start_date, end_date)
split_size = int(len(df) * (1-test_size)) split_size = int(len(df) * (1-test_size))
test_data = df.iloc[split_size:] test_data = df.iloc[split_size:]
@ -135,11 +147,75 @@ async def process_symbol(ticker, con, start_date, end_date):
except Exception as e: except Exception as e:
print(e) print(e)
#Train mode
async def train_process(tickers, con):
tickers = list(set(tickers))
df_train = pd.DataFrame()
df_test = pd.DataFrame()
test_size = 0.4
start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d")
end_date = datetime.today().strftime("%Y-%m-%d")
predictor = FundamentalPredictor()
df_train = pd.DataFrame()
df_test = pd.DataFrame()
tasks = [download_data(ticker, con, start_date, end_date) for ticker in tickers]
dfs = await asyncio.gather(*tasks)
for df in dfs:
try:
split_size = int(len(df) * (1-test_size))
train_data = df.iloc[:split_size]
test_data = df.iloc[split_size:]
df_train = pd.concat([df_train, train_data], ignore_index=True)
df_test = pd.concat([df_test, test_data], ignore_index=True)
except:
pass
best_features = [col for col in df_train.columns if col not in ['date','price','Target']]
df_train = df_train.sample(frac=1).reset_index(drop=True)
print('======Train Set Datapoints======')
print(len(df_train))
#selected_features = predictor.feature_selection(df_train[best_features], df_train['Target'],k=10)
#print(selected_features)
#selected_features = [col for col in df_train if col not in ['price','date','Target']]
selected_features = ['shortTermCoverageRatios','netProfitMargin','debtRepayment','totalDebt','interestIncome','researchAndDevelopmentExpenses','priceEarningsToGrowthRatio','priceCashFlowRatio','cashPerShare','debtRatio','growthRevenue','revenue','growthNetIncome','ebitda','priceEarningsRatio','priceToBookRatio','epsdiluted','priceToSalesRatio','growthOtherCurrentLiabilities', 'receivablesTurnover', 'totalLiabilitiesAndStockholdersEquity', 'totalLiabilitiesAndTotalEquity', 'totalAssets', 'growthOtherCurrentAssets', 'retainedEarnings', 'totalEquity']
predictor.train_model(df_train[selected_features], df_train['Target'])
predictor.evaluate_model(df_test[selected_features], df_test['Target'])
async def test_process(con):
test_size = 0.4
start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d")
end_date = datetime.today().strftime("%Y-%m-%d")
predictor = FundamentalPredictor()
df = await download_data('GME', con, start_date, end_date)
split_size = int(len(df) * (1-test_size))
test_data = df.iloc[split_size:]
#selected_features = [col for col in test_data if col not in ['price','date','Target']]
selected_features = ['shortTermCoverageRatios','netProfitMargin','debtRepayment','totalDebt','interestIncome','researchAndDevelopmentExpenses','priceEarningsToGrowthRatio','priceCashFlowRatio','cashPerShare','debtRatio','growthRevenue','revenue','growthNetIncome','ebitda','priceEarningsRatio','priceToBookRatio','epsdiluted','priceToSalesRatio','growthOtherCurrentLiabilities', 'receivablesTurnover', 'totalLiabilitiesAndStockholdersEquity', 'totalLiabilitiesAndTotalEquity', 'totalAssets', 'growthOtherCurrentAssets', 'retainedEarnings', 'totalEquity']
predictor.evaluate_model(test_data[selected_features], test_data['Target'])
async def run(): async def run():
#Train first model
con = sqlite3.connect('stocks.db') con = sqlite3.connect('stocks.db')
cursor = con.cursor() cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal") cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 300E9")
stock_symbols = [row[0] for row in cursor.fetchall()]
print('Number of Stocks')
print(len(stock_symbols))
await train_process(stock_symbols, con)
#Prediction Steps for all stock symbols
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9") cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9")
stock_symbols = [row[0] for row in cursor.fetchall()] stock_symbols = [row[0] for row in cursor.fetchall()]
@ -160,6 +236,7 @@ async def run():
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
con.close() con.close()
try: try:
asyncio.run(run()) asyncio.run(run())
except Exception as e: except Exception as e:

View File

@ -20,114 +20,16 @@ from tqdm import tqdm
from collections import defaultdict from collections import defaultdict
import asyncio import asyncio
import aiohttp import aiohttp
import aiofiles
import pickle import pickle
import time import time
import sqlite3
import ujson
#Based on the paper: https://arxiv.org/pdf/1603.00751 #Based on the paper: https://arxiv.org/pdf/1603.00751
async def download_data(ticker, con, start_date, end_date):
try:
query_template = """
SELECT
income, income_growth, balance, balance_growth, cashflow, cashflow_growth, ratios
FROM
stocks
WHERE
symbol = ?
"""
query_df = pd.read_sql_query(query_template, con, params=(ticker,))
income = ujson.loads(query_df['income'].iloc[0])
#Only consider company with at least 10 year worth of data
'''
if len(income) < 40:
raise ValueError("Income data length is too small.")
'''
income = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in income if int(item["date"][:4]) >= 2000]
income_growth = ujson.loads(query_df['income_growth'].iloc[0])
income_growth = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in income_growth if int(item["date"][:4]) >= 2000]
balance = ujson.loads(query_df['balance'].iloc[0])
balance = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in balance if int(item["date"][:4]) >= 2000]
balance_growth = ujson.loads(query_df['balance_growth'].iloc[0])
balance_growth = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in balance_growth if int(item["date"][:4]) >= 2000]
cashflow = ujson.loads(query_df['cashflow'].iloc[0])
cashflow = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in cashflow if int(item["date"][:4]) >= 2000]
cashflow_growth = ujson.loads(query_df['cashflow_growth'].iloc[0])
cashflow_growth = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in cashflow_growth if int(item["date"][:4]) >= 2000]
ratios = ujson.loads(query_df['ratios'].iloc[0])
ratios = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in ratios if int(item["date"][:4]) >= 2000]
combined_data = defaultdict(dict)
# Iterate over all lists simultaneously
for entries in zip(income, income_growth, balance, balance_growth, cashflow, cashflow_growth, ratios):
# Iterate over each entry in the current set of entries
for entry in entries:
date = entry['date']
# Merge entry data into combined_data, skipping duplicate keys
for key, value in entry.items():
if key not in combined_data[date]:
combined_data[date][key] = value
combined_data = list(combined_data.values())
df = yf.download(ticker, start=start_date, end=end_date, interval="1d").reset_index()
df = df.rename(columns={'Adj Close': 'close', 'Date': 'date'})
#print(df[['date','close']])
df['date'] = df['date'].dt.strftime('%Y-%m-%d')
for item in combined_data:
# Find close price for '2023-09-30' or the closest available date prior to it
target_date = item['date']
counter = 0
max_attempts = 10
while target_date not in df['date'].values and counter < max_attempts:
# If the target date doesn't exist, move one day back
target_date = (pd.to_datetime(target_date) - pd.Timedelta(days=1)).strftime('%Y-%m-%d')
counter += 1
if counter == max_attempts:
break
# Get the close price for the found or closest date
close_price = round(df[df['date'] == target_date]['close'].values[0],2)
item['price'] = close_price
#print(f"Close price for {target_date}: {close_price}")
combined_data = sorted(combined_data, key=lambda x: x['date'])
df_combined = pd.DataFrame(combined_data).dropna()
df_combined['Target'] = ((df_combined['price'].shift(-1) - df_combined['price']) / df_combined['price'] > 0).astype(int)
df_copy = df_combined.copy()
return df_copy
except Exception as e:
print(e)
class FundamentalPredictor: class FundamentalPredictor:
def __init__(self, path='weights'): def __init__(self):
self.model = self.build_model() #RandomForestClassifier(n_estimators=1000, max_depth = 20, min_samples_split=10, random_state=42, n_jobs=10) self.model = self.build_model() #RandomForestClassifier(n_estimators=1000, max_depth = 20, min_samples_split=10, random_state=42, n_jobs=10)
self.scaler = MinMaxScaler() self.scaler = MinMaxScaler()
self.path = path
def build_model(self): def build_model(self):
clear_session() clear_session()
@ -183,18 +85,18 @@ class FundamentalPredictor:
X_train = self.preprocess_data(X_train) X_train = self.preprocess_data(X_train)
X_train = self.reshape_for_lstm(X_train) X_train = self.reshape_for_lstm(X_train)
checkpoint = ModelCheckpoint(f'{self.path}/fundamental_weights/weights.keras', save_best_only=True, monitor='val_loss', mode='min') checkpoint = ModelCheckpoint(f'ml_models/fundamental_weights/weights.keras', save_best_only=True, monitor='val_loss', mode='min')
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
self.model.fit(X_train, y_train, epochs=250, batch_size=32, validation_split=0.2, callbacks=[checkpoint, early_stopping]) self.model.fit(X_train, y_train, epochs=250, batch_size=32, validation_split=0.2, callbacks=[checkpoint, early_stopping])
self.model.save(f'{self.path}/fundamental_weights/weights.keras') self.model.save(f'ml_models/fundamental_weights/weights.keras')
def evaluate_model(self, X_test, y_test): def evaluate_model(self, X_test, y_test):
X_test = self.preprocess_data(X_test) X_test = self.preprocess_data(X_test)
X_test = self.reshape_for_lstm(X_test) X_test = self.reshape_for_lstm(X_test)
self.model = self.build_model() self.model = self.build_model()
self.model = load_model(f'{self.path}/fundamental_weights/weights.keras') self.model = load_model(f'ml_models/fundamental_weights/weights.keras')
test_predictions = self.model.predict(X_test).flatten() test_predictions = self.model.predict(X_test).flatten()
@ -232,71 +134,3 @@ class FundamentalPredictor:
sorted_features = sorted(variances, key=variances.get, reverse=True)[:k] sorted_features = sorted(variances, key=variances.get, reverse=True)[:k]
return sorted_features return sorted_features
''' '''
#Train mode
async def train_process(tickers, con):
tickers = list(set(tickers))
df_train = pd.DataFrame()
df_test = pd.DataFrame()
test_size = 0.4
start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d")
end_date = datetime.today().strftime("%Y-%m-%d")
predictor = FundamentalPredictor()
df_train = pd.DataFrame()
df_test = pd.DataFrame()
tasks = [download_data(ticker, con, start_date, end_date) for ticker in tickers]
dfs = await asyncio.gather(*tasks)
for df in dfs:
try:
split_size = int(len(df) * (1-test_size))
train_data = df.iloc[:split_size]
test_data = df.iloc[split_size:]
df_train = pd.concat([df_train, train_data], ignore_index=True)
df_test = pd.concat([df_test, test_data], ignore_index=True)
except:
pass
best_features = [col for col in df_train.columns if col not in ['date','price','Target']]
df_train = df_train.sample(frac=1).reset_index(drop=True)
print('======Train Set Datapoints======')
print(len(df_train))
#selected_features = predictor.feature_selection(df_train[best_features], df_train['Target'],k=10)
#print(selected_features)
#selected_features = [col for col in df_train if col not in ['price','date','Target']]
selected_features = ['shortTermCoverageRatios','netProfitMargin','debtRepayment','totalDebt','interestIncome','researchAndDevelopmentExpenses','priceEarningsToGrowthRatio','priceCashFlowRatio','cashPerShare','debtRatio','growthRevenue','revenue','growthNetIncome','ebitda','priceEarningsRatio','priceToBookRatio','epsdiluted','priceToSalesRatio','growthOtherCurrentLiabilities', 'receivablesTurnover', 'totalLiabilitiesAndStockholdersEquity', 'totalLiabilitiesAndTotalEquity', 'totalAssets', 'growthOtherCurrentAssets', 'retainedEarnings', 'totalEquity']
predictor.train_model(df_train[selected_features], df_train['Target'])
predictor.evaluate_model(df_test[selected_features], df_test['Target'])
async def test_process(con):
test_size = 0.4
start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d")
end_date = datetime.today().strftime("%Y-%m-%d")
predictor = FundamentalPredictor()
df = await download_data('GME', con, start_date, end_date)
split_size = int(len(df) * (1-test_size))
test_data = df.iloc[split_size:]
#selected_features = [col for col in test_data if col not in ['price','date','Target']]
selected_features = ['shortTermCoverageRatios','netProfitMargin','debtRepayment','totalDebt','interestIncome','researchAndDevelopmentExpenses','priceEarningsToGrowthRatio','priceCashFlowRatio','cashPerShare','debtRatio','growthRevenue','revenue','growthNetIncome','ebitda','priceEarningsRatio','priceToBookRatio','epsdiluted','priceToSalesRatio','growthOtherCurrentLiabilities', 'receivablesTurnover', 'totalLiabilitiesAndStockholdersEquity', 'totalLiabilitiesAndTotalEquity', 'totalAssets', 'growthOtherCurrentAssets', 'retainedEarnings', 'totalEquity']
predictor.evaluate_model(test_data[selected_features], test_data['Target'])
async def main():
con = sqlite3.connect('../stocks.db')
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E9")
stock_symbols = [row[0] for row in cursor.fetchall()]
print('Number of Stocks')
print(len(stock_symbols))
await train_process(stock_symbols, con)
await test_process(con)
con.close()
# Run the main function
#asyncio.run(main())

Binary file not shown.