diff --git a/app/cron_fundamental_predictor.py b/app/cron_fundamental_predictor.py index e192e05..490edaf 100755 --- a/app/cron_fundamental_predictor.py +++ b/app/cron_fundamental_predictor.py @@ -1,6 +1,7 @@ -import ujson +import orjson import asyncio import aiohttp +import aiofiles import sqlite3 from datetime import datetime from ml_models.fundamental_predictor import FundamentalPredictor @@ -10,103 +11,114 @@ import pandas as pd from tqdm import tqdm import concurrent.futures import re +import subprocess async def save_json(symbol, data): with open(f"json/fundamental-predictor-analysis/{symbol}.json", 'w') as file: - ujson.dump(data, file) + orjson.dump(data, file) + async def download_data(ticker, con, start_date, end_date): try: - query_template = """ - SELECT - income, income_growth, balance, balance_growth, cashflow, cashflow_growth, ratios - FROM - stocks - WHERE - symbol = ? - """ + # Define paths to the statement files + statements = [ + f"json/financial-statements/ratios/quarter/{ticker}.json", + f"json/financial-statements/cash-flow-statement/quarter/{ticker}.json", + f"json/financial-statements/income-statement/quarter/{ticker}.json", + f"json/financial-statements/balance-sheet-statement/quarter/{ticker}.json", + f"json/financial-statements/income-statement-growth/quarter/{ticker}.json", + f"json/financial-statements/balance-sheet-statement-growth/quarter/{ticker}.json", + f"json/financial-statements/cash-flow-statement-growth/quarter/{ticker}.json" + ] - query_df = pd.read_sql_query(query_template, con, params=(ticker,)) + # Helper function to load JSON data asynchronously + async def load_json_from_file(path): + async with aiofiles.open(path, 'r') as f: + content = await f.read() + return orjson.loads(content) - income = ujson.loads(query_df['income'].iloc[0]) - #Only consider company with at least 10 year worth of data - if len(income) < 40: - raise ValueError("Income data length is too small.") + # Helper function to filter data based on keys and year + async def filter_data(data, ignore_keys, year_threshold=2000): + return [{k: v for k, v in item.items() if k not in ignore_keys} for item in data if int(item["date"][:4]) >= year_threshold] - income = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in income if int(item["date"][:4]) >= 2000] - income_growth = ujson.loads(query_df['income_growth'].iloc[0]) - income_growth = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in income_growth if int(item["date"][:4]) >= 2000] - - balance = ujson.loads(query_df['balance'].iloc[0]) - balance = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in balance if int(item["date"][:4]) >= 2000] - balance_growth = ujson.loads(query_df['balance_growth'].iloc[0]) - balance_growth = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in balance_growth if int(item["date"][:4]) >= 2000] + # Define keys to ignore + ignore_keys = ["symbol", "reportedCurrency", "calendarYear", "fillingDate", "acceptedDate", "period", "cik", "link", "finalLink"] - cashflow = ujson.loads(query_df['cashflow'].iloc[0]) - cashflow = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in cashflow if int(item["date"][:4]) >= 2000] - cashflow_growth = ujson.loads(query_df['cashflow_growth'].iloc[0]) - cashflow_growth = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in cashflow_growth if int(item["date"][:4]) >= 2000] + # Load and filter data for each statement type + income = await load_json_from_file(statements[2]) + income = await filter_data(income, ignore_keys) + income_growth = await load_json_from_file(statements[4]) + income_growth = await filter_data(income_growth, ignore_keys) - ratios = ujson.loads(query_df['ratios'].iloc[0]) - ratios = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in ratios if int(item["date"][:4]) >= 2000] + balance = await load_json_from_file(statements[3]) + balance = await filter_data(balance, ignore_keys) + balance_growth = await load_json_from_file(statements[5]) + balance_growth = await filter_data(balance_growth, ignore_keys) + + cashflow = await load_json_from_file(statements[1]) + cashflow = await filter_data(cashflow, ignore_keys) + + cashflow_growth = await load_json_from_file(statements[6]) + cashflow_growth = await filter_data(cashflow_growth, ignore_keys) + + ratios = await load_json_from_file(statements[0]) + ratios = await filter_data(ratios, ignore_keys) + + # Combine all the data combined_data = defaultdict(dict) - # Iterate over all lists simultaneously + + # Merge the data based on 'date' for entries in zip(income, income_growth, balance, balance_growth, cashflow, cashflow_growth, ratios): - # Iterate over each entry in the current set of entries for entry in entries: date = entry['date'] - # Merge entry data into combined_data, skipping duplicate keys for key, value in entry.items(): if key not in combined_data[date]: combined_data[date][key] = value - + combined_data = list(combined_data.values()) + # Download historical stock data using yfinance df = yf.download(ticker, start=start_date, end=end_date, interval="1d").reset_index() df = df.rename(columns={'Adj Close': 'close', 'Date': 'date'}) - #print(df[['date','close']]) df['date'] = df['date'].dt.strftime('%Y-%m-%d') - + # Match each combined data entry with the closest available stock price in df for item in combined_data: - # Find close price for '2023-09-30' or the closest available date prior to it target_date = item['date'] counter = 0 max_attempts = 10 - - while target_date not in df['date'].values and counter < max_attempts: - # If the target date doesn't exist, move one day back - target_date = (pd.to_datetime(target_date) - pd.Timedelta(days=1)).strftime('%Y-%m-%d') - - counter += 1 - if counter == max_attempts: - break - # Get the close price for the found or closest date - close_price = round(df[df['date'] == target_date]['close'].values[0],2) + # Look for the closest matching date in the stock data + while target_date not in df['date'].values and counter < max_attempts: + target_date = (pd.to_datetime(target_date) - pd.Timedelta(days=1)).strftime('%Y-%m-%d') + counter += 1 + + # If max attempts are reached and no matching date is found, skip the entry + if counter == max_attempts: + continue + + # Find the close price for the matching date + close_price = round(df[df['date'] == target_date]['close'].values[0], 2) item['price'] = close_price - #print(f"Close price for {target_date}: {close_price}") - - - + # Sort the combined data by date combined_data = sorted(combined_data, key=lambda x: x['date']) - - df_income = pd.DataFrame(combined_data).dropna() + # Convert combined data into a DataFrame + df_combined = pd.DataFrame(combined_data).dropna() - df_income['Target'] = ((df_income['price'].shift(-1) - df_income['price']) / df_income['price'] > 0).astype(int) + # Create 'Target' column based on price change + df_combined['Target'] = ((df_combined['price'].shift(-1) - df_combined['price']) / df_combined['price'] > 0).astype(int) - df_copy = df_income.copy() - #print(df_copy) - + # Return a copy of the combined DataFrame + df_copy = df_combined.copy() return df_copy - except Exception as e: - print(e) + except: + pass async def process_symbol(ticker, con, start_date, end_date): @@ -114,7 +126,7 @@ async def process_symbol(ticker, con, start_date, end_date): test_size = 0.4 start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d") end_date = datetime.today().strftime("%Y-%m-%d") - predictor = FundamentalPredictor(path="ml_models/weights") + predictor = FundamentalPredictor() df = await download_data(ticker, con, start_date, end_date) split_size = int(len(df) * (1-test_size)) test_data = df.iloc[split_size:] @@ -135,11 +147,75 @@ async def process_symbol(ticker, con, start_date, end_date): except Exception as e: print(e) + +#Train mode +async def train_process(tickers, con): + tickers = list(set(tickers)) + df_train = pd.DataFrame() + df_test = pd.DataFrame() + test_size = 0.4 + start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d") + end_date = datetime.today().strftime("%Y-%m-%d") + predictor = FundamentalPredictor() + df_train = pd.DataFrame() + df_test = pd.DataFrame() + + + tasks = [download_data(ticker, con, start_date, end_date) for ticker in tickers] + dfs = await asyncio.gather(*tasks) + for df in dfs: + try: + split_size = int(len(df) * (1-test_size)) + train_data = df.iloc[:split_size] + test_data = df.iloc[split_size:] + df_train = pd.concat([df_train, train_data], ignore_index=True) + df_test = pd.concat([df_test, test_data], ignore_index=True) + except: + pass + + + best_features = [col for col in df_train.columns if col not in ['date','price','Target']] + + df_train = df_train.sample(frac=1).reset_index(drop=True) + print('======Train Set Datapoints======') + print(len(df_train)) + #selected_features = predictor.feature_selection(df_train[best_features], df_train['Target'],k=10) + #print(selected_features) + #selected_features = [col for col in df_train if col not in ['price','date','Target']] + selected_features = ['shortTermCoverageRatios','netProfitMargin','debtRepayment','totalDebt','interestIncome','researchAndDevelopmentExpenses','priceEarningsToGrowthRatio','priceCashFlowRatio','cashPerShare','debtRatio','growthRevenue','revenue','growthNetIncome','ebitda','priceEarningsRatio','priceToBookRatio','epsdiluted','priceToSalesRatio','growthOtherCurrentLiabilities', 'receivablesTurnover', 'totalLiabilitiesAndStockholdersEquity', 'totalLiabilitiesAndTotalEquity', 'totalAssets', 'growthOtherCurrentAssets', 'retainedEarnings', 'totalEquity'] + + predictor.train_model(df_train[selected_features], df_train['Target']) + predictor.evaluate_model(df_test[selected_features], df_test['Target']) + +async def test_process(con): + test_size = 0.4 + start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d") + end_date = datetime.today().strftime("%Y-%m-%d") + predictor = FundamentalPredictor() + df = await download_data('GME', con, start_date, end_date) + split_size = int(len(df) * (1-test_size)) + test_data = df.iloc[split_size:] + #selected_features = [col for col in test_data if col not in ['price','date','Target']] + selected_features = ['shortTermCoverageRatios','netProfitMargin','debtRepayment','totalDebt','interestIncome','researchAndDevelopmentExpenses','priceEarningsToGrowthRatio','priceCashFlowRatio','cashPerShare','debtRatio','growthRevenue','revenue','growthNetIncome','ebitda','priceEarningsRatio','priceToBookRatio','epsdiluted','priceToSalesRatio','growthOtherCurrentLiabilities', 'receivablesTurnover', 'totalLiabilitiesAndStockholdersEquity', 'totalLiabilitiesAndTotalEquity', 'totalAssets', 'growthOtherCurrentAssets', 'retainedEarnings', 'totalEquity'] + predictor.evaluate_model(test_data[selected_features], test_data['Target']) + + async def run(): + + #Train first model + con = sqlite3.connect('stocks.db') cursor = con.cursor() cursor.execute("PRAGMA journal_mode = wal") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 300E9") + stock_symbols = [row[0] for row in cursor.fetchall()] + print('Number of Stocks') + print(len(stock_symbols)) + await train_process(stock_symbols, con) + + + #Prediction Steps for all stock symbols cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9") stock_symbols = [row[0] for row in cursor.fetchall()] @@ -160,6 +236,7 @@ async def run(): await asyncio.gather(*tasks) con.close() + try: asyncio.run(run()) except Exception as e: diff --git a/app/ml_models/__pycache__/fundamental_predictor.cpython-310.pyc b/app/ml_models/__pycache__/fundamental_predictor.cpython-310.pyc index 659dd33..5fb432c 100644 Binary files a/app/ml_models/__pycache__/fundamental_predictor.cpython-310.pyc and b/app/ml_models/__pycache__/fundamental_predictor.cpython-310.pyc differ diff --git a/app/ml_models/fundamental_predictor.py b/app/ml_models/fundamental_predictor.py index 4a77fbb..e12b87c 100755 --- a/app/ml_models/fundamental_predictor.py +++ b/app/ml_models/fundamental_predictor.py @@ -20,114 +20,16 @@ from tqdm import tqdm from collections import defaultdict import asyncio import aiohttp +import aiofiles import pickle import time -import sqlite3 -import ujson - #Based on the paper: https://arxiv.org/pdf/1603.00751 - -async def download_data(ticker, con, start_date, end_date): - try: - query_template = """ - SELECT - income, income_growth, balance, balance_growth, cashflow, cashflow_growth, ratios - FROM - stocks - WHERE - symbol = ? - """ - - query_df = pd.read_sql_query(query_template, con, params=(ticker,)) - - income = ujson.loads(query_df['income'].iloc[0]) - - #Only consider company with at least 10 year worth of data - ''' - if len(income) < 40: - raise ValueError("Income data length is too small.") - ''' - - income = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in income if int(item["date"][:4]) >= 2000] - income_growth = ujson.loads(query_df['income_growth'].iloc[0]) - income_growth = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in income_growth if int(item["date"][:4]) >= 2000] - - balance = ujson.loads(query_df['balance'].iloc[0]) - balance = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in balance if int(item["date"][:4]) >= 2000] - balance_growth = ujson.loads(query_df['balance_growth'].iloc[0]) - balance_growth = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in balance_growth if int(item["date"][:4]) >= 2000] - - cashflow = ujson.loads(query_df['cashflow'].iloc[0]) - cashflow = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in cashflow if int(item["date"][:4]) >= 2000] - cashflow_growth = ujson.loads(query_df['cashflow_growth'].iloc[0]) - cashflow_growth = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in cashflow_growth if int(item["date"][:4]) >= 2000] - - - ratios = ujson.loads(query_df['ratios'].iloc[0]) - ratios = [{k: v for k, v in item.items() if k not in ["symbol","reportedCurrency","calendarYear","fillingDate","acceptedDate","period","cik","link", "finalLink"]} for item in ratios if int(item["date"][:4]) >= 2000] - - combined_data = defaultdict(dict) - # Iterate over all lists simultaneously - for entries in zip(income, income_growth, balance, balance_growth, cashflow, cashflow_growth, ratios): - # Iterate over each entry in the current set of entries - for entry in entries: - date = entry['date'] - # Merge entry data into combined_data, skipping duplicate keys - for key, value in entry.items(): - if key not in combined_data[date]: - combined_data[date][key] = value - - combined_data = list(combined_data.values()) - - df = yf.download(ticker, start=start_date, end=end_date, interval="1d").reset_index() - df = df.rename(columns={'Adj Close': 'close', 'Date': 'date'}) - #print(df[['date','close']]) - df['date'] = df['date'].dt.strftime('%Y-%m-%d') - - - for item in combined_data: - # Find close price for '2023-09-30' or the closest available date prior to it - target_date = item['date'] - counter = 0 - max_attempts = 10 - - while target_date not in df['date'].values and counter < max_attempts: - # If the target date doesn't exist, move one day back - target_date = (pd.to_datetime(target_date) - pd.Timedelta(days=1)).strftime('%Y-%m-%d') - counter += 1 - if counter == max_attempts: - break - - - # Get the close price for the found or closest date - close_price = round(df[df['date'] == target_date]['close'].values[0],2) - item['price'] = close_price - #print(f"Close price for {target_date}: {close_price}") - - - - combined_data = sorted(combined_data, key=lambda x: x['date']) - - - df_combined = pd.DataFrame(combined_data).dropna() - - df_combined['Target'] = ((df_combined['price'].shift(-1) - df_combined['price']) / df_combined['price'] > 0).astype(int) - - df_copy = df_combined.copy() - - return df_copy - - except Exception as e: - print(e) - - class FundamentalPredictor: - def __init__(self, path='weights'): + def __init__(self): self.model = self.build_model() #RandomForestClassifier(n_estimators=1000, max_depth = 20, min_samples_split=10, random_state=42, n_jobs=10) self.scaler = MinMaxScaler() - self.path = path def build_model(self): clear_session() @@ -183,18 +85,18 @@ class FundamentalPredictor: X_train = self.preprocess_data(X_train) X_train = self.reshape_for_lstm(X_train) - checkpoint = ModelCheckpoint(f'{self.path}/fundamental_weights/weights.keras', save_best_only=True, monitor='val_loss', mode='min') + checkpoint = ModelCheckpoint(f'ml_models/fundamental_weights/weights.keras', save_best_only=True, monitor='val_loss', mode='min') early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) self.model.fit(X_train, y_train, epochs=250, batch_size=32, validation_split=0.2, callbacks=[checkpoint, early_stopping]) - self.model.save(f'{self.path}/fundamental_weights/weights.keras') + self.model.save(f'ml_models/fundamental_weights/weights.keras') def evaluate_model(self, X_test, y_test): X_test = self.preprocess_data(X_test) X_test = self.reshape_for_lstm(X_test) self.model = self.build_model() - self.model = load_model(f'{self.path}/fundamental_weights/weights.keras') + self.model = load_model(f'ml_models/fundamental_weights/weights.keras') test_predictions = self.model.predict(X_test).flatten() @@ -231,72 +133,4 @@ class FundamentalPredictor: # Sort features by variance and select top k features sorted_features = sorted(variances, key=variances.get, reverse=True)[:k] return sorted_features - ''' - -#Train mode -async def train_process(tickers, con): - tickers = list(set(tickers)) - df_train = pd.DataFrame() - df_test = pd.DataFrame() - test_size = 0.4 - start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d") - end_date = datetime.today().strftime("%Y-%m-%d") - predictor = FundamentalPredictor() - df_train = pd.DataFrame() - df_test = pd.DataFrame() - - - tasks = [download_data(ticker, con, start_date, end_date) for ticker in tickers] - dfs = await asyncio.gather(*tasks) - for df in dfs: - try: - split_size = int(len(df) * (1-test_size)) - train_data = df.iloc[:split_size] - test_data = df.iloc[split_size:] - df_train = pd.concat([df_train, train_data], ignore_index=True) - df_test = pd.concat([df_test, test_data], ignore_index=True) - except: - pass - - - best_features = [col for col in df_train.columns if col not in ['date','price','Target']] - - df_train = df_train.sample(frac=1).reset_index(drop=True) - print('======Train Set Datapoints======') - print(len(df_train)) - #selected_features = predictor.feature_selection(df_train[best_features], df_train['Target'],k=10) - #print(selected_features) - #selected_features = [col for col in df_train if col not in ['price','date','Target']] - selected_features = ['shortTermCoverageRatios','netProfitMargin','debtRepayment','totalDebt','interestIncome','researchAndDevelopmentExpenses','priceEarningsToGrowthRatio','priceCashFlowRatio','cashPerShare','debtRatio','growthRevenue','revenue','growthNetIncome','ebitda','priceEarningsRatio','priceToBookRatio','epsdiluted','priceToSalesRatio','growthOtherCurrentLiabilities', 'receivablesTurnover', 'totalLiabilitiesAndStockholdersEquity', 'totalLiabilitiesAndTotalEquity', 'totalAssets', 'growthOtherCurrentAssets', 'retainedEarnings', 'totalEquity'] - - predictor.train_model(df_train[selected_features], df_train['Target']) - predictor.evaluate_model(df_test[selected_features], df_test['Target']) - - -async def test_process(con): - test_size = 0.4 - start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d") - end_date = datetime.today().strftime("%Y-%m-%d") - predictor = FundamentalPredictor() - df = await download_data('GME', con, start_date, end_date) - split_size = int(len(df) * (1-test_size)) - test_data = df.iloc[split_size:] - #selected_features = [col for col in test_data if col not in ['price','date','Target']] - selected_features = ['shortTermCoverageRatios','netProfitMargin','debtRepayment','totalDebt','interestIncome','researchAndDevelopmentExpenses','priceEarningsToGrowthRatio','priceCashFlowRatio','cashPerShare','debtRatio','growthRevenue','revenue','growthNetIncome','ebitda','priceEarningsRatio','priceToBookRatio','epsdiluted','priceToSalesRatio','growthOtherCurrentLiabilities', 'receivablesTurnover', 'totalLiabilitiesAndStockholdersEquity', 'totalLiabilitiesAndTotalEquity', 'totalAssets', 'growthOtherCurrentAssets', 'retainedEarnings', 'totalEquity'] - predictor.evaluate_model(test_data[selected_features], test_data['Target']) - -async def main(): - con = sqlite3.connect('../stocks.db') - cursor = con.cursor() - cursor.execute("PRAGMA journal_mode = wal") - cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E9") - stock_symbols = [row[0] for row in cursor.fetchall()] - print('Number of Stocks') - print(len(stock_symbols)) - await train_process(stock_symbols, con) - await test_process(con) - - con.close() - -# Run the main function -#asyncio.run(main()) \ No newline at end of file + ''' \ No newline at end of file diff --git a/app/ml_models/fundamental_weights/weights.keras b/app/ml_models/fundamental_weights/weights.keras new file mode 100644 index 0000000..dda64a3 Binary files /dev/null and b/app/ml_models/fundamental_weights/weights.keras differ