diff --git a/app/cron_ai_score.py b/app/cron_ai_score.py index 5d541ba..698f8c2 100644 --- a/app/cron_ai_score.py +++ b/app/cron_ai_score.py @@ -1,31 +1,124 @@ +import orjson +import asyncio +import aiohttp +import aiofiles +import sqlite3 +from datetime import datetime +from ml_models.fundamental_predictor import FundamentalPredictor +import yfinance as yf +from collections import defaultdict +import pandas as pd +from tqdm import tqdm +import concurrent.futures +import re +from itertools import combinations + from ta.momentum import * from ta.trend import * from ta.volatility import * from ta.volume import * +import gc +#Enable automatic garbage collection +gc.enable() -import yfinance as yf -import numpy as np -import pandas as pd -from sklearn.ensemble import RandomForestClassifier -from sklearn.metrics import accuracy_score, classification_report -from sklearn.preprocessing import MinMaxScaler -from sklearn.metrics import mean_squared_error, r2_score -from sklearn.model_selection import train_test_split -import pickle -from datetime import datetime -import asyncio -import time +async def save_json(symbol, data): + with open(f"json/fundamental-predictor-analysis/{symbol}.json", 'w') as file: + file.write(orjson.dumps(data)) -class TrendPredictor: - def __init__(self, nth_day, path="ml_models/weights/ai_score"): - self.model = RandomForestClassifier(n_estimators=1000, max_depth=500, min_samples_split=500, random_state=42, n_jobs=-1) - self.scaler = MinMaxScaler() - self.nth_day = nth_day - self.path = path - self.model_loaded = False - def generate_features(self, df): - new_predictors = [] + + +async def download_data(ticker, con, start_date, end_date): + try: + # Define paths to the statement files + statements = [ + f"json/financial-statements/ratios/quarter/{ticker}.json", + f"json/financial-statements/cash-flow-statement/quarter/{ticker}.json", + f"json/financial-statements/income-statement/quarter/{ticker}.json", + f"json/financial-statements/balance-sheet-statement/quarter/{ticker}.json", + f"json/financial-statements/income-statement-growth/quarter/{ticker}.json", + f"json/financial-statements/balance-sheet-statement-growth/quarter/{ticker}.json", + f"json/financial-statements/cash-flow-statement-growth/quarter/{ticker}.json", + f"json/financial-statements/key-metrics/quarter/{ticker}.json", + f"json/financial-statements/owner-earnings/quarter/{ticker}.json", + ] + + # Helper function to load JSON data asynchronously + async def load_json_from_file(path): + async with aiofiles.open(path, 'r') as f: + content = await f.read() + return orjson.loads(content) + + # Helper function to filter data based on keys and year + async def filter_data(data, ignore_keys, year_threshold=2000): + return [{k: v for k, v in item.items() if k not in ignore_keys} for item in data if int(item["date"][:4]) >= year_threshold] + + # Define keys to ignore + ignore_keys = ["symbol", "reportedCurrency", "calendarYear", "fillingDate", "acceptedDate", "period", "cik", "link", "finalLink","pbRatio","ptbRatio"] + + # Load and filter data for each statement type + + ratios = await load_json_from_file(statements[0]) + ratios = await filter_data(ratios, ignore_keys) + + cashflow = await load_json_from_file(statements[1]) + cashflow = await filter_data(cashflow, ignore_keys) + + income = await load_json_from_file(statements[2]) + income = await filter_data(income, ignore_keys) + + balance = await load_json_from_file(statements[3]) + balance = await filter_data(balance, ignore_keys) + + income_growth = await load_json_from_file(statements[4]) + income_growth = await filter_data(income_growth, ignore_keys) + + balance_growth = await load_json_from_file(statements[5]) + balance_growth = await filter_data(balance_growth, ignore_keys) + + + cashflow_growth = await load_json_from_file(statements[6]) + cashflow_growth = await filter_data(cashflow_growth, ignore_keys) + + key_metrics = await load_json_from_file(statements[7]) + key_metrics = await filter_data(key_metrics, ignore_keys) + + owner_earnings = await load_json_from_file(statements[8]) + owner_earnings = await filter_data(owner_earnings, ignore_keys) + + + # Combine all the data + combined_data = defaultdict(dict) + + # Merge the data based on 'date' + for entries in zip(income, income_growth, balance, balance_growth, cashflow, cashflow_growth, ratios, key_metrics, owner_earnings): + for entry in entries: + date = entry['date'] + for key, value in entry.items(): + if key not in combined_data[date]: + combined_data[date][key] = value + + combined_data = list(combined_data.values()) + #Generate more features + #combined_data = calculate_combinations(combined_data) + + # Download historical stock data using yfinance + df = yf.download(ticker, start=start_date, end=end_date, interval="1d").reset_index() + df = df.rename(columns={'Adj Close': 'close', 'Date': 'date', 'Open': 'open', 'High': 'high', 'Low': 'low', 'Volume': 'volume'}) + df['date'] = df['date'].dt.strftime('%Y-%m-%d') + + df['sma_50'] = df['close'].rolling(window=50).mean() + df['sma_200'] = df['close'].rolling(window=200).mean() + df['golden_cross'] = ((df['sma_50'] > df['sma_200']) & (df['sma_50'].shift(1) <= df['sma_200'].shift(1))).astype(int) + + df['volatility'] = df['close'].rolling(window=30).std() + df['daily_return'] = df['close'].pct_change() + df['cumulative_return'] = (1 + df['daily_return']).cumprod() - 1 + df['volume_change'] = df['volume'].pct_change() + df['roc'] = df['close'].pct_change(periods=30) * 100 # 12-day ROC + df['avg_volume_30d'] = df['volume'].rolling(window=30).mean() + df['drawdown'] = df['close'] / df['close'].rolling(window=252).max() - 1 + df['macd'] = macd(df['close']) df['macd_signal'] = macd_signal(df['close']) @@ -39,11 +132,13 @@ class TrendPredictor: df['nvi'] = NegativeVolumeIndexIndicator(close=df['close'], volume=df['volume']).negative_volume_index() df['obv'] = OnBalanceVolumeIndicator(close=df['close'], volume=df['volume']).on_balance_volume() df['vpt'] = VolumePriceTrendIndicator(close=df['close'], volume=df['volume']).volume_price_trend() - - df['rsi'] = rsi(df["close"], window=14) - df['stoch_rsi'] = stochrsi_k(df['close'], window=14, smooth1=3, smooth2=3) - df['bb_hband'] = bollinger_hband(df['close'], window=14)/df['close'] - df['bb_lband'] = bollinger_lband(df['close'], window=14)/df['close'] + + df['rsi'] = rsi(df["close"], window=30) + df['rolling_rsi'] = df['rsi'].rolling(window=10).mean() + df['stoch_rsi'] = stochrsi_k(df['close'], window=30, smooth1=3, smooth2=3) + df['rolling_stoch_rsi'] = df['stoch_rsi'].rolling(window=10).mean() + df['bb_hband'] = bollinger_hband(df['close'], window=30)/df['close'] + df['bb_lband'] = bollinger_lband(df['close'], window=30)/df['close'] df['adi'] = acc_dist_index(high=df['high'],low=df['low'],close=df['close'],volume=df['volume']) df['cmf'] = chaikin_money_flow(high=df['high'],low=df['low'],close=df['close'],volume=df['volume'], window=20) @@ -53,119 +148,164 @@ class TrendPredictor: df['williams'] = WilliamsRIndicator(high=df['high'], low=df['low'], close=df['close']).williams_r() - df['stoch'] = stoch(df['high'], df['low'], df['close'], window=14) + df['stoch'] = stoch(df['high'], df['low'], df['close'], window=30) - new_predictors+=['williams','fi','emv','cmf','adi','bb_hband','bb_lband','vpt','stoch','stoch_rsi','rsi','nvi','obv','macd','macd_signal','macd_hist','adx','adx_pos','adx_neg','cci','mfi'] - return new_predictors + ta_indicators = [ + 'rsi', 'macd', 'macd_signal', 'macd_hist', 'adx', 'adx_pos', 'adx_neg', + 'cci', 'mfi', 'nvi', 'obv', 'vpt', 'stoch_rsi', 'bb_hband', 'bb_lband', + 'adi', 'cmf', 'emv', 'fi', 'williams', 'stoch','sma_50','sma_200','golden_cross', + 'volatility','daily_return','cumulative_return', 'roc','avg_volume_30d', + 'rolling_rsi','rolling_stoch_rsi' + ] - def train_model(self, X_train, y_train): - X_train = np.where(np.isinf(X_train), np.nan, X_train) - X_train = np.nan_to_num(X_train) - X_train = self.scaler.fit_transform(X_train) + # Match each combined data entry with the closest available stock price in df + for item in combined_data: + target_date = item['date'] + counter = 0 + max_attempts = 10 + + # Look for the closest matching date in the stock data + while target_date not in df['date'].values and counter < max_attempts: + target_date = (pd.to_datetime(target_date) - pd.Timedelta(days=1)).strftime('%Y-%m-%d') + counter += 1 + + # If max attempts are reached and no matching date is found, skip the entry + if counter == max_attempts: + continue + + # Find the close price for the matching date + close_price = round(df[df['date'] == target_date]['close'].values[0], 2) + item['price'] = close_price + + # Dynamically add all indicator values to the combined_data entry + for indicator in ta_indicators: + indicator_value = df[df['date'] == target_date][indicator].values[0] + item[indicator] = indicator_value # Add the indicator value to the combined_data entry + + + # Sort the combined data by date + combined_data = sorted(combined_data, key=lambda x: x['date']) + # Convert combined data into a DataFrame + df_combined = pd.DataFrame(combined_data).dropna() + + key_elements = [ + 'revenue', + 'costOfRevenue', + 'grossProfit', + 'netIncome', + 'operatingIncome', + 'operatingExpenses', + 'researchAndDevelopmentExpenses', + 'ebitda', + 'freeCashFlow', + 'incomeBeforeTax', + 'incomeTaxExpense', + 'epsdiluted', + 'debtRepayment', + 'dividendsPaid', + 'depreciationAndAmortization', + 'netCashUsedProvidedByFinancingActivities', + 'changeInWorkingCapital', + 'stockBasedCompensation', + 'deferredIncomeTax', + 'commonStockRepurchased', + 'operatingCashFlow', + 'capitalExpenditure', + 'accountsReceivables', + 'purchasesOfInvestments', + 'cashAndCashEquivalents', + 'shortTermInvestments', + 'cashAndShortTermInvestments', + 'longTermInvestments', + 'otherCurrentLiabilities', + 'totalCurrentLiabilities', + 'longTermDebt', + 'totalDebt', + 'netDebt', + 'commonStock', + 'totalEquity', + 'totalLiabilitiesAndStockholdersEquity', + 'totalStockholdersEquity', + 'totalInvestments', + 'taxAssets', + 'totalAssets', + 'inventory', + 'propertyPlantEquipmentNet', + 'ownersEarnings', + 'averagePPE' + + ] + + # Compute ratios for all combinations of key elements + for num, denom in combinations(key_elements, 2): + # Compute ratio num/denom + column_name = f'{num}_to_{denom}' + try: + df_combined[column_name] = df_combined[num] / df_combined[denom] + except: + df_combined[column_name] = 0 + # Compute reverse ratio denom/num + reverse_column_name = f'{denom}_to_{num}' + try: + df_combined[reverse_column_name] = df_combined[denom] / df_combined[num] + except: + df_combined[reverse_column_name] = 0 + + # Create 'Target' column based on price change + df_combined['Target'] = ((df_combined['price'].shift(-1) - df_combined['price']) / df_combined['price'] > 0).astype(int) + + # Return a copy of the combined DataFrame + df_copy = df_combined.copy() + #print(df_copy[['date','revenue','ownersEarnings','revenuePerShare']]) + return df_copy + + except Exception as e: + print(e) + pass + + +async def process_symbol(ticker, con, start_date, end_date): + try: + test_size = 0.4 + start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d") + end_date = datetime.today().strftime("%Y-%m-%d") + predictor = FundamentalPredictor() + df = await download_data(ticker, con, start_date, end_date) + split_size = int(len(df) * (1-test_size)) + test_data = df.iloc[split_size:] + best_features = [col for col in df.columns if col not in ['date','price','Target']] + data, prediction_list = predictor.evaluate_model(test_data[best_features], test_data['Target']) - # Train model - self.model.fit(X_train, y_train) - with open(f'{self.path}/weights.pkl', 'wb') as f: - pickle.dump(self.model, f, protocol=pickle.HIGHEST_PROTOCOL) - def load_model(self): - if not self.model_loaded: - with open(f'{self.path}/weights.pkl', 'rb') as f: - self.model = pickle.load(f) - self.model_loaded = True + ''' + output_list = [{'date': date, 'price': price, 'prediction': prediction, 'target': target} + for (date, price,target), prediction in zip(test_data[['date', 'price','Target']].iloc[-6:].values, prediction_list[-6:])] + ''' + #print(output_list) - def alpha_to_score(self, alpha): - # Convert alpha (Target) to AI Score - if alpha <= -20: - return 1 # Very Low Alpha - elif -20 < alpha <= -10: - return 2 # Low Alpha - elif -10 < alpha <= -5: - return 3 # Low Alpha - elif -5 < alpha <= 0: - return 4 # Medium Alpha - elif 0 < alpha <= 2: - return 5 # Medium Alpha - elif 2 < alpha <= 4: - return 6 # High Alpha - elif 4 < alpha <= 6: - return 7 # High Alpha - elif 6 < alpha <= 8: - return 8 # High Alpha - elif 8 < alpha <= 10: - return 9 # High Alpha - elif 10 < alpha: - return 10 # Very High Alpha - else: - return None - - def predict_and_score(self, df): - self.load_model() # Ensure model is loaded once - - latest_data = df.iloc[-1].values.reshape(1, -1) - latest_data = self.scaler.fit_transform(latest_data) - - # Predict the class (AI score) - prediction = self.model.predict(latest_data)[0] - - # Return structured result with ticker information and score - print(f"Predicted AI Score: {prediction}") - return prediction - - def evaluate_model(self, X_test, y_test): - self.load_model() - X_test = np.where(np.isinf(X_test), np.nan, X_test) - X_test = np.nan_to_num(X_test) - X_test = self.scaler.transform(X_test) - - predictions = self.model.predict(X_test) - accuracy = accuracy_score(y_test, predictions) - print(f"Accuracy: {accuracy}") - print("Classification Report:") - print(classification_report(y_test, predictions)) - - return accuracy - -async def download_data(ticker, start_date, end_date, spy_df, nth_day): - df = yf.download(ticker, start=start_date, end=end_date, interval="1d") - df = df.rename(columns={'Adj Close': 'close', 'Open': 'open', 'High': 'high', 'Low': 'low', 'Volume': 'volume'}) - - df = df.reindex(spy_df.index) - df['spy_close'] = spy_df['spy_close'] - df['stock_return'] = df['close'].pct_change() - df['spy_return'] = df['spy_close'].pct_change() - df['excess_return'] = df['stock_return'] - df['spy_return'] - - df["Target"] = df['excess_return'].rolling(window=nth_day).sum().shift(-nth_day)*100 - # Convert the continuous Target (alpha) to a score (class) - df["Target"] = df["Target"].apply(lambda x: TrendPredictor.alpha_to_score(self=None, alpha=x)) - return df + if len(data) != 0: + if data['precision'] >= 50 and data['accuracy'] >= 50: + await save_json(ticker, data) + + except Exception as e: + print(e) - -async def train_process(nth_day): - tickers = ['KO','WMT','BA','PLD','AZN','LLY','INFN','GRMN','VVX','EPD','PII','WY','BLMN','AAP','ON','TGT','SMG','EL','EOG','ULTA','DV','PLNT','GLOB','LKQ','CWH','PSX','SO','TGT','GD','MU','NKE','AMGN','BX','CAT','PEP','LIN','ABBV','COST','MRK','HD','JNJ','PG','SPCB','CVX','SHEL','MS','GS','MA','V','JPM','XLF','DPZ','CMG','MCD','ALTM','PDD','MNST','SBUX','AMAT','ZS','IBM','SMCI','ORCL','XLK','VUG','VTI','VOO','IWM','IEFA','PEP','WMT','XOM','V','AVGO','BIDU','GOOGL','SNAP','DASH','SPOT','NVO','META','MSFT','ADBE','DIA','PFE','BAC','RIVN','NIO','CISS','INTC','AAPL','BYND','MSFT','HOOD','MARA','SHOP','CRM','PYPL','UBER','SAVE','QQQ','IVV','SPY','EVOK','GME','F','NVDA','AMD','AMZN','TSM','TSLA'] +#Train mode +async def train_process(tickers, con): tickers = list(set(tickers)) - #print(len(tickers)) - df_train = pd.DataFrame() df_test = pd.DataFrame() - best_features = ['close','williams','fi','emv','adi','cmf','bb_hband','bb_lband','vpt','stoch','stoch_rsi','rsi','nvi','macd','mfi','cci','obv','adx','adx_pos','adx_neg'] - test_size = 0.1 - start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d") + test_size = 0.2 + start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d") end_date = datetime.today().strftime("%Y-%m-%d") - predictor = TrendPredictor(nth_day=nth_day) - - spy_df = yf.download("SPY", start=start_date, end=end_date, interval="1d") - spy_df = spy_df.rename(columns={'Adj Close': 'spy_close'}) + df_train = pd.DataFrame() + df_test = pd.DataFrame() - tasks = [download_data(ticker, start_date, end_date, spy_df, nth_day) for ticker in tickers] + tasks = [download_data(ticker, con, start_date, end_date) for ticker in tickers] dfs = await asyncio.gather(*tasks) for df in dfs: try: - predictors = predictor.generate_features(df) - predictors = [pred for pred in predictors if pred in df.columns] - df = df.dropna(subset=df.columns[df.columns != "nth_day"]) split_size = int(len(df) * (1-test_size)) train_data = df.iloc[:split_size] test_data = df.iloc[split_size:] @@ -174,42 +314,73 @@ async def train_process(nth_day): except: pass + + best_features = [col for col in df_train.columns if col not in ['date','price','Target']] df_train = df_train.sample(frac=1).reset_index(drop=True) + print('======Train Set Datapoints======') + print(len(df_train)) + print(df_train) + print(df_test) + #selected_features = predictor.feature_selection(df_train[best_features], df_train['Target'],k=10) + #print(selected_features) + selected_features = [col for col in df_train if col not in ['price','date','Target']] - predictor.train_model(df_train[best_features], df_train['Target']) - predictor.evaluate_model(df_test[best_features], df_test['Target']) + predictor = FundamentalPredictor() + predictor.train_model(df_train[selected_features], df_train['Target']) + predictor.evaluate_model(df_test[selected_features], df_test['Target']) -async def test_process(nth_day): - best_features = ['close','williams','fi','emv','adi','cmf','bb_hband','bb_lband','vpt','stoch','stoch_rsi','rsi','nvi','macd','mfi','cci','obv','adx','adx_pos','adx_neg'] - test_size = 0.1 - start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d") +async def test_process(con): + test_size = 0.2 + start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d") end_date = datetime.today().strftime("%Y-%m-%d") - predictor = TrendPredictor(nth_day=nth_day) - - spy_df = yf.download("SPY", start=start_date, end=end_date, interval="1d") - spy_df = spy_df.rename(columns={'Adj Close': 'spy_close'}) - - df = await download_data('AAPL', start_date, end_date, spy_df, nth_day) - predictors = predictor.generate_features(df) - - #save it to get the latest date with the latest row otherwise it drops it since of NaN for Target - df_copy = df.copy() - - df = df.dropna(subset=df.columns[df.columns != "nth_day"]) + predictor = FundamentalPredictor() + df = await download_data('GME', con, start_date, end_date) split_size = int(len(df) * (1-test_size)) test_data = df.iloc[split_size:] - predictor.evaluate_model(test_data[best_features], test_data['Target']) + selected_features = [col for col in test_data if col not in ['price','date','Target']] + predictor.evaluate_model(test_data[selected_features], test_data['Target']) - #Evaluate based on non-nan results of target but predict the latest date - predictor.predict_and_score(df_copy[best_features]) - print(df_copy) -async def main(): - nth_day = 60 # 60 days forward prediction +async def run(): - await train_process(nth_day = 60) - #await test_process(nth_day = 60) + #Train first model + + con = sqlite3.connect('stocks.db') + + cursor = con.cursor() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 300E9") + stock_symbols = ['AAPL'] #[row[0] for row in cursor.fetchall()] + print('Number of Stocks') + print(len(stock_symbols)) + await train_process(stock_symbols, con) + + + #Prediction Steps for all stock symbols + + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9") + stock_symbols = [row[0] for row in cursor.fetchall()] + + total_symbols = ['GME'] #stock_symbols + + print(f"Total tickers: {len(total_symbols)}") + start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d") + end_date = datetime.today().strftime("%Y-%m-%d") + + chunk_size = len(total_symbols)# // 100 # Divide the list into N chunks + chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)] + for chunk in chunks: + tasks = [] + for ticker in tqdm(chunk): + tasks.append(process_symbol(ticker, con, start_date, end_date)) + + await asyncio.gather(*tasks) + + con.close() + +try: + asyncio.run(run()) +except Exception as e: + print(e) -if __name__ == "__main__": - asyncio.run(main()) diff --git a/app/cron_financial_statements.py b/app/cron_financial_statements.py index 7aca481..6adc34a 100644 --- a/app/cron_financial_statements.py +++ b/app/cron_financial_statements.py @@ -34,7 +34,7 @@ async def save_json(symbol, period, data_type, data): async def get_financial_statements(session, symbol, semaphore, request_counter): base_url = "https://financialmodelingprep.com/api/v3" periods = ['quarter', 'annual'] - financial_data_types = ['income-statement', 'balance-sheet-statement', 'cash-flow-statement', 'ratios'] + financial_data_types = ['key-metrics', 'income-statement', 'balance-sheet-statement', 'cash-flow-statement', 'ratios'] growth_data_types = ['income-statement-growth', 'balance-sheet-statement-growth', 'cash-flow-statement-growth'] async with semaphore: @@ -63,6 +63,19 @@ async def get_financial_statements(session, symbol, semaphore, request_counter): await asyncio.sleep(60) # Pause for 60 seconds request_counter[0] = 0 # Reset the request counter after the pause + # Fetch owner earnings data + owner_earnings_url = f"https://financialmodelingprep.com/api/v4/owner_earnings?symbol={symbol}&apikey={api_key}" + owner_earnings_data = await fetch_data(session, owner_earnings_url, symbol) + if owner_earnings_data: + await save_json(symbol, 'quarter', 'owner-earnings', owner_earnings_data) + + request_counter[0] += 1 # Increment the request counter + if request_counter[0] >= 500: + await asyncio.sleep(60) # Pause for 60 seconds + request_counter[0] = 0 # Reset the request counter after the pause + + + async def run(): con = sqlite3.connect('stocks.db') cursor = con.cursor()