diff --git a/app/cron_ai_score.py b/app/cron_ai_score.py index 4c9ad29..41914f2 100644 --- a/app/cron_ai_score.py +++ b/app/cron_ai_score.py @@ -26,7 +26,28 @@ async def save_json(symbol, data): file.write(orjson.dumps(data)) - +def trend_intensity(close, window=20): + ma = close.rolling(window=window).mean() + std = close.rolling(window=window).std() + return ((close - ma) / std).abs().rolling(window=window).mean() + +def fisher_transform(high, low, window=10): + value = (high + low) / 2 + norm_value = (2 * ((value - value.rolling(window=window).min()) / + (value.rolling(window=window).max() - value.rolling(window=window).min())) - 1) + return 0.5 * np.log((1 + norm_value) / (1 - norm_value)) + +def calculate_fdi(high, low, close, window=30): + n1 = (np.log(high.rolling(window=window).max() - low.rolling(window=window).min()) - + np.log(close.rolling(window=window).max() - close.rolling(window=window).min())) / np.log(2) + return (2 - n1) * 100 + + +def hurst_exponent(ts, max_lag=100): + lags = range(2, max_lag) + tau = [np.sqrt(np.std(np.subtract(ts[lag:], ts[:-lag]))) for lag in lags] + poly = np.polyfit(np.log(lags), np.log(tau), 1) + return poly[0] * 2.0 async def download_data(ticker, con, start_date, end_date): try: @@ -109,7 +130,18 @@ async def download_data(ticker, con, start_date, end_date): df['sma_50'] = df['close'].rolling(window=50).mean() df['sma_200'] = df['close'].rolling(window=200).mean() - df['golden_cross'] = ((df['sma_50'] > df['sma_200']) & (df['sma_50'].shift(1) <= df['sma_200'].shift(1))).astype(int) + df['sma_crossover'] = ((df['sma_50'] > df['sma_200']) & (df['sma_50'].shift(1) <= df['sma_200'].shift(1))).astype(int) + + df['ema_50'] = EMAIndicator(close=df['close'], window=50).ema_indicator() + df['ema_200'] = EMAIndicator(close=df['close'], window=200).ema_indicator() + df['ema_crossover'] = ((df['ema_50'] > df['ema_200']) & (df['ema_50'].shift(1) <= df['ema_200'].shift(1))).astype(int) + + ichimoku = IchimokuIndicator(high=df['high'], low=df['low']) + df['ichimoku_a'] = ichimoku.ichimoku_a() + df['ichimoku_b'] = ichimoku.ichimoku_b() + df['atr'] = AverageTrueRange(high=df['high'], low=df['low'], close=df['close']).average_true_range() + bb = BollingerBands(close=df['close']) + df['bb_width'] = (bb.bollinger_hband() - bb.bollinger_lband()) / df['close'] df['volatility'] = df['close'].rolling(window=30).std() df['daily_return'] = df['close'].pct_change() @@ -137,8 +169,6 @@ async def download_data(ticker, con, start_date, end_date): df['rolling_rsi'] = df['rsi'].rolling(window=10).mean() df['stoch_rsi'] = stochrsi_k(df['close'], window=30, smooth1=3, smooth2=3) df['rolling_stoch_rsi'] = df['stoch_rsi'].rolling(window=10).mean() - df['bb_hband'] = bollinger_hband(df['close'], window=30)/df['close'] - df['bb_lband'] = bollinger_lband(df['close'], window=30)/df['close'] df['adi'] = acc_dist_index(high=df['high'],low=df['low'],close=df['close'],volume=df['volume']) df['cmf'] = chaikin_money_flow(high=df['high'],low=df['low'],close=df['close'],volume=df['volume'], window=20) @@ -146,16 +176,27 @@ async def download_data(ticker, con, start_date, end_date): df['fi'] = force_index(close=df['close'], volume=df['volume'], window= 13) df['williams'] = WilliamsRIndicator(high=df['high'], low=df['low'], close=df['close']).williams_r() - + df['kama'] = KAMAIndicator(close=df['close']).kama() df['stoch'] = stoch(df['high'], df['low'], df['close'], window=30) + df['rocr'] = df['close'] / df['close'].shift(30) - 1 # Rate of Change Ratio (ROCR) + df['ppo'] = (df['ema_50'] - df['ema_200']) / df['ema_50'] * 100 + df['vwap'] = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum() + df['volatility_ratio'] = df['close'].rolling(window=30).std() / df['close'].rolling(window=60).std() + + df['fdi'] = calculate_fdi(df['high'], df['low'], df['close']) + #df['hurst'] = df['close'].rolling(window=100).apply(hurst_exponent) + df['fisher'] = fisher_transform(df['high'], df['low']) + df['tii'] = trend_intensity(df['close']) + ta_indicators = [ 'rsi', 'macd', 'macd_signal', 'macd_hist', 'adx', 'adx_pos', 'adx_neg', - 'cci', 'mfi', 'nvi', 'obv', 'vpt', 'stoch_rsi', 'bb_hband', 'bb_lband', - 'adi', 'cmf', 'emv', 'fi', 'williams', 'stoch','sma_50','sma_200','golden_cross', + 'cci', 'mfi', 'nvi', 'obv', 'vpt', 'stoch_rsi','bb_width', + 'adi', 'cmf', 'emv', 'fi', 'williams', 'stoch','sma_crossover', 'volatility','daily_return','cumulative_return', 'roc','avg_volume_30d', - 'rolling_rsi','rolling_stoch_rsi' + 'rolling_rsi','rolling_stoch_rsi', 'ema_crossover','ichimoku_a','ichimoku_b', + 'atr','kama','rocr','ppo','volatility_ratio','vwap','tii','fdi','fisher' ] # Match each combined data entry with the closest available stock price in df @@ -178,10 +219,11 @@ async def download_data(ticker, con, start_date, end_date): item['price'] = close_price # Dynamically add all indicator values to the combined_data entry + for indicator in ta_indicators: indicator_value = df[df['date'] == target_date][indicator].values[0] item[indicator] = indicator_value # Add the indicator value to the combined_data entry - + # Sort the combined data by date combined_data = sorted(combined_data, key=lambda x: x['date']) @@ -232,29 +274,45 @@ async def download_data(ticker, con, start_date, end_date): 'inventory', 'propertyPlantEquipmentNet', 'ownersEarnings', - 'averagePPE' - ] # Compute ratios for all combinations of key elements + for num, denom in combinations(key_elements, 2): # Compute ratio num/denom column_name = f'{num}_to_{denom}' try: - df_combined[column_name] = df_combined[num] / df_combined[denom] - except: + ratio = df_combined[num] / df_combined[denom] + # Check for valid ratio + df_combined[column_name] = np.where((ratio != 0) & + (ratio != np.inf) & + (ratio != -np.inf) & + (~np.isnan(ratio)), + ratio, 0) + except Exception as e: + print(f"Error calculating {column_name}: {e}") df_combined[column_name] = 0 + # Compute reverse ratio denom/num reverse_column_name = f'{denom}_to_{num}' try: - df_combined[reverse_column_name] = df_combined[denom] / df_combined[num] - except: + reverse_ratio = df_combined[denom] / df_combined[num] + # Check for valid reverse ratio + df_combined[reverse_column_name] = np.where((reverse_ratio != 0) & + (reverse_ratio != np.inf) & + (reverse_ratio != -np.inf) & + (~np.isnan(reverse_ratio)), + reverse_ratio, 0) + except Exception as e: + print(f"Error calculating {reverse_column_name}: {e}") df_combined[reverse_column_name] = 0 - + # Create 'Target' column based on price change df_combined['Target'] = ((df_combined['price'].shift(-1) - df_combined['price']) / df_combined['price'] > 0).astype(int) # Return a copy of the combined DataFrame + df_combined = df_combined.dropna() + df_combined = df_combined.where(~df_combined.isin([np.inf, -np.inf]), 0) df_copy = df_combined.copy() #print(df_copy[['date','revenue','ownersEarnings','revenuePerShare']]) return df_copy @@ -266,7 +324,7 @@ async def download_data(ticker, con, start_date, end_date): async def process_symbol(ticker, con, start_date, end_date): try: - test_size = 0.4 + test_size = 0.2 start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d") end_date = datetime.today().strftime("%Y-%m-%d") predictor = FundamentalPredictor() @@ -317,18 +375,18 @@ async def train_process(tickers, con): best_features = [col for col in df_train.columns if col not in ['date','price','Target']] - df_train = df_train.sample(frac=1).reset_index(drop=True) + df_train = df_train.sample(frac=1).reset_index(drop=True) #df_train.reset_index(drop=True) + print(df_train) print('======Train Set Datapoints======') print(len(df_train)) - print(df_train) - print(df_test) - #selected_features = predictor.feature_selection(df_train[best_features], df_train['Target'],k=10) - #print(selected_features) - selected_features = [col for col in df_train if col not in ['price','date','Target']] predictor = FundamentalPredictor() - predictor.train_model(df_train[selected_features], df_train['Target']) - predictor.evaluate_model(df_test[selected_features], df_test['Target']) + #print(selected_features) + selected_features = [col for col in df_train if col not in ['price','date','Target']] + best_features = predictor.feature_selection(df_train[selected_features], df_train['Target'],k=100) + print(best_features) + predictor.train_model(df_train[best_features], df_train['Target']) + predictor.evaluate_model(df_test[best_features], df_test['Target']) async def test_process(con): test_size = 0.2 @@ -350,15 +408,15 @@ async def run(): cursor = con.cursor() cursor.execute("PRAGMA journal_mode = wal") - cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 300E9") - stock_symbols = ['AAPL'] #[row[0] for row in cursor.fetchall()] + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 10E9 AND symbol NOT LIKE '%.%'") + stock_symbols = [row[0] for row in cursor.fetchall()] #['AAPL','GME','LLY','NVDA'] # print('Number of Stocks') print(len(stock_symbols)) await train_process(stock_symbols, con) #Prediction Steps for all stock symbols - + ''' cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9") stock_symbols = [row[0] for row in cursor.fetchall()] @@ -376,7 +434,7 @@ async def run(): tasks.append(process_symbol(ticker, con, start_date, end_date)) await asyncio.gather(*tasks) - + ''' con.close() try: diff --git a/app/cron_export_price.py b/app/cron_export_price.py index 4496f58..1b99b59 100644 --- a/app/cron_export_price.py +++ b/app/cron_export_price.py @@ -14,17 +14,9 @@ api_key = os.getenv('FMP_API_KEY') # Rate limiting MAX_REQUESTS_PER_MINUTE = 100 request_semaphore = asyncio.Semaphore(MAX_REQUESTS_PER_MINUTE) -last_request_time = datetime.min async def fetch_data(session, url): - global last_request_time async with request_semaphore: - # Ensure at least 60 seconds between batches of MAX_REQUESTS_PER_MINUTE - current_time = datetime.now() - if (current_time - last_request_time).total_seconds() < 60: - await asyncio.sleep(60 - (current_time - last_request_time).total_seconds()) - last_request_time = datetime.now() - try: async with session.get(url) as response: if response.status == 200: @@ -46,68 +38,96 @@ def get_existing_data(symbol, interval): async def get_data(session, symbol, time_period): existing_data = get_existing_data(symbol, time_period) if not existing_data: + # If no existing data, fetch all data return await fetch_all_data(session, symbol, time_period) last_date = datetime.strptime(existing_data[-1]['date'], "%Y-%m-%d %H:%M:%S") current_date = datetime.utcnow() + # If data is up to date, skip fetching if (current_date - last_date).days < 1: - return # Data is up to date, skip to next symbol + return # Data is recent, skip further fetch - # Fetch only missing data + # Fetch missing data only from the last saved date to the current date start_date = (last_date + timedelta(days=1)).strftime("%Y-%m-%d") end_date = current_date.strftime("%Y-%m-%d") + print(start_date, end_date) url = f"https://financialmodelingprep.com/api/v3/historical-chart/{time_period}/{symbol}?serietype=bar&extend=false&from={start_date}&to={end_date}&apikey={api_key}" new_data = await fetch_data(session, url) if new_data: existing_data.extend(new_data) - existing_data.sort(key=lambda x: x['date']) + existing_data.sort(key=lambda x: x['date']) # Sort by date await save_json(symbol, existing_data, time_period) async def fetch_all_data(session, symbol, time_period): end_date = datetime.utcnow() - start_date = end_date - timedelta(days=180) - url = f"https://financialmodelingprep.com/api/v3/historical-chart/{time_period}/{symbol}?serietype=bar&extend=false&from={start_date.strftime('%Y-%m-%d')}&to={end_date.strftime('%Y-%m-%d')}&apikey={api_key}" + start_date = end_date - timedelta(days=365*20) - data = await fetch_data(session, url) - if data: - data.sort(key=lambda x: x['date']) - await save_json(symbol, data, time_period) + step = timedelta(days=5) # Step of 5 days + current_start_date = start_date + + all_data = [] # To accumulate all the data + + while current_start_date < end_date: + current_end_date = min(current_start_date + step, end_date) + + url = f"https://financialmodelingprep.com/api/v3/historical-chart/{time_period}/{symbol}?serietype=bar&extend=false&from={current_start_date.strftime('%Y-%m-%d')}&to={current_end_date.strftime('%Y-%m-%d')}&apikey={api_key}" + + data = await fetch_data(session, url) + + if data: + all_data.extend(data) # Accumulate the fetched data + print(f"Fetched {len(data)} records from {current_start_date.strftime('%Y-%m-%d')} to {current_end_date.strftime('%Y-%m-%d')}") + + # Move the window forward by 5 days + current_start_date = current_end_date + + if all_data: + # Sort the data by date before saving + all_data.sort(key=lambda x: x['date']) + await save_json(symbol, all_data, time_period) + async def save_json(symbol, data, interval): os.makedirs(f"json/export/price/{interval}", exist_ok=True) - with open(f"json/export/price/{interval}/{symbol}.json", 'w') as file: + file_path = f"json/export/price/{interval}/{symbol}.json" + with open(file_path, 'w') as file: ujson.dump(data, file) async def process_symbol(session, symbol): - await get_data(session, symbol, '30min') await get_data(session, symbol, '1hour') async def run(): + # Load symbols from databases con = sqlite3.connect('stocks.db') cursor = con.cursor() cursor.execute("PRAGMA journal_mode = wal") cursor.execute("SELECT DISTINCT symbol FROM stocks") stock_symbols = [row[0] for row in cursor.fetchall()] - + etf_con = sqlite3.connect('etf.db') etf_cursor = etf_con.cursor() etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") etf_symbols = [row[0] for row in etf_cursor.fetchall()] - + con.close() etf_con.close() - total_symbols = stock_symbols + etf_symbols + # List of total symbols to process + total_symbols = ['GOOGL'] # Use stock_symbols + etf_symbols if needed + # Setting up aiohttp connector with rate limiting connector = TCPConnector(limit=MAX_REQUESTS_PER_MINUTE) async with aiohttp.ClientSession(connector=connector) as session: tasks = [process_symbol(session, symbol) for symbol in total_symbols] - for i, _ in enumerate(tqdm(asyncio.as_completed(tasks), total=len(tasks)), 1): + + # Use tqdm to track progress of tasks + for i, task in enumerate(tqdm(asyncio.as_completed(tasks), total=len(tasks)), 1): + await task # Ensure all tasks are awaited properly if i % MAX_REQUESTS_PER_MINUTE == 0: - print(f'Processed {i} symbols') - await asyncio.sleep(60) # Sleep for 60 seconds after every MAX_REQUESTS_PER_MINUTE symbols + print(f'Processed {i} symbols, sleeping to respect rate limits...') + await asyncio.sleep(60) # Pause for 60 seconds to avoid hitting rate limits if __name__ == "__main__": - asyncio.run(run()) \ No newline at end of file + asyncio.run(run()) diff --git a/app/cron_sentiment_tracker.py b/app/cron_sentiment_tracker.py index c835c58..de78229 100644 --- a/app/cron_sentiment_tracker.py +++ b/app/cron_sentiment_tracker.py @@ -33,8 +33,8 @@ async def get_data(session, total_symbols): res_list = [] for item in data: symbol = item['symbol'] - item['sentiment'] = round(item['sentiment']*100, 2) - item['lastSentiment'] = round(item['lastSentiment']*100, 2) + item['sentiment'] = round(item['sentiment']*100) + item['lastSentiment'] = round(item['lastSentiment']*100) if symbol in total_symbols: try: with open(f"json/quote/{symbol}.json", 'r') as file: diff --git a/app/ml_models/__pycache__/fundamental_predictor.cpython-310.pyc b/app/ml_models/__pycache__/fundamental_predictor.cpython-310.pyc index c1c0256..64451da 100644 Binary files a/app/ml_models/__pycache__/fundamental_predictor.cpython-310.pyc and b/app/ml_models/__pycache__/fundamental_predictor.cpython-310.pyc differ diff --git a/app/ml_models/fundamental_predictor.py b/app/ml_models/fundamental_predictor.py index ffb89e4..671b394 100755 --- a/app/ml_models/fundamental_predictor.py +++ b/app/ml_models/fundamental_predictor.py @@ -8,13 +8,15 @@ from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_sco from sklearn.model_selection import train_test_split from sklearn.preprocessing import MinMaxScaler, StandardScaler from keras.models import Sequential -from keras.layers import LSTM, Dense, Conv1D, Dropout, BatchNormalization +from keras.layers import LSTM, Dense, Conv1D, Dropout, BatchNormalization, MaxPooling1D, Bidirectional from keras.optimizers import Adam -from keras.callbacks import EarlyStopping, ModelCheckpoint +from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau from keras.models import load_model from sklearn.feature_selection import SelectKBest, f_classif from tensorflow.keras.backend import clear_session from keras import regularizers +from keras.layers import Layer + from tqdm import tqdm from collections import defaultdict @@ -26,6 +28,7 @@ import time # Based on the paper: https://arxiv.org/pdf/1603.00751 + class FundamentalPredictor: def __init__(self): self.model = self.build_model() @@ -35,35 +38,34 @@ class FundamentalPredictor: clear_session() model = Sequential() - model.add(Conv1D(filters=64, kernel_size=3, padding='same', activation='relu', input_shape=(None, 1))) - model.add(Conv1D(filters=32, kernel_size=3, padding='same', activation='relu')) - - # First LSTM layer with dropout and batch normalization - model.add(LSTM(256, return_sequences=True, kernel_regularizer=regularizers.l2(0.01))) - model.add(Dropout(0.5)) - model.add(BatchNormalization()) - - # Second LSTM layer with dropout and batch normalization - model.add(LSTM(256, return_sequences=True, kernel_regularizer=regularizers.l2(0.01))) - model.add(Dropout(0.5)) - model.add(BatchNormalization()) - - # Third LSTM layer with dropout and batch normalization - model.add(LSTM(128, kernel_regularizer=regularizers.l2(0.01))) - model.add(Dropout(0.5)) - model.add(BatchNormalization()) - - model.add(Dense(64, activation='relu', kernel_regularizer=regularizers.l2(0.01))) + model.add(Dense(1000, activation='relu', kernel_regularizer=regularizers.l2(0.01))) model.add(Dropout(0.2)) model.add(BatchNormalization()) - - # Dense layer with sigmoid activation for binary classification + + model.add(Dense(2000, activation='relu', kernel_regularizer=regularizers.l2(0.01))) + model.add(Dropout(0.2)) + model.add(BatchNormalization()) + + model.add(Dense(3000, activation='relu', kernel_regularizer=regularizers.l2(0.01))) + model.add(Dropout(0.2)) + model.add(BatchNormalization()) + + model.add(Dense(2000, activation='relu', kernel_regularizer=regularizers.l2(0.01))) + model.add(Dropout(0.2)) + model.add(BatchNormalization()) + + model.add(Dense(1000, activation='relu', kernel_regularizer=regularizers.l2(0.01))) + model.add(Dropout(0.2)) + model.add(BatchNormalization()) + model.add(Dense(500, activation='relu', kernel_regularizer=regularizers.l2(0.01))) + + # Output layer for binary classification model.add(Dense(1, activation='sigmoid')) - # Adam optimizer with a learning rate of 0.01 - optimizer = Adam(learning_rate=0.01) + # Optimizer with a lower learning rate and scheduler + optimizer = Adam(learning_rate=0.1) - # Compile model with binary crossentropy loss and accuracy metric + # Compile the model model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy']) return model @@ -80,14 +82,16 @@ class FundamentalPredictor: def train_model(self, X_train, y_train): X_train = self.preprocess_data(X_train) - X_train = self.reshape_for_lstm(X_train) + #X_train = self.reshape_for_lstm(X_train) checkpoint = ModelCheckpoint('ml_models/weights/fundamental_weights/weights.keras', - save_best_only=True, monitor='val_loss', mode='min') - early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) - - self.model.fit(X_train, y_train, epochs=250, batch_size=32, - validation_split=0.2, callbacks=[checkpoint, early_stopping]) + save_best_only=True, save_freq = 1, + monitor='val_loss', mode='min') + early_stopping = EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True) + reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=10, min_lr=0.00001) + + self.model.fit(X_train, y_train, epochs=100_000, batch_size=64, + validation_split=0.1, callbacks=[checkpoint, early_stopping, reduce_lr]) self.model.save('ml_models/weights/fundamental_weights/weights.keras') def evaluate_model(self, X_test, y_test): @@ -113,7 +117,9 @@ class FundamentalPredictor: 'precision': round(test_precision * 100), 'sentiment': 'Bullish' if next_value_prediction == 1 else 'Bearish'}, test_predictions - def feature_selection(self, X_train, y_train, k=8): + def feature_selection(self, X_train, y_train, k=100): + print('feature selection:') + print(X_train.shape, y_train.shape) selector = SelectKBest(score_func=f_classif, k=k) selector.fit(X_train, y_train)