update cron jobs

This commit is contained in:
MuslemRahimi 2024-09-30 02:03:37 +02:00
parent 14ad89640e
commit 96c96254fc
5 changed files with 174 additions and 90 deletions

View File

@ -26,7 +26,28 @@ async def save_json(symbol, data):
file.write(orjson.dumps(data))
def trend_intensity(close, window=20):
ma = close.rolling(window=window).mean()
std = close.rolling(window=window).std()
return ((close - ma) / std).abs().rolling(window=window).mean()
def fisher_transform(high, low, window=10):
value = (high + low) / 2
norm_value = (2 * ((value - value.rolling(window=window).min()) /
(value.rolling(window=window).max() - value.rolling(window=window).min())) - 1)
return 0.5 * np.log((1 + norm_value) / (1 - norm_value))
def calculate_fdi(high, low, close, window=30):
n1 = (np.log(high.rolling(window=window).max() - low.rolling(window=window).min()) -
np.log(close.rolling(window=window).max() - close.rolling(window=window).min())) / np.log(2)
return (2 - n1) * 100
def hurst_exponent(ts, max_lag=100):
lags = range(2, max_lag)
tau = [np.sqrt(np.std(np.subtract(ts[lag:], ts[:-lag]))) for lag in lags]
poly = np.polyfit(np.log(lags), np.log(tau), 1)
return poly[0] * 2.0
async def download_data(ticker, con, start_date, end_date):
try:
@ -109,7 +130,18 @@ async def download_data(ticker, con, start_date, end_date):
df['sma_50'] = df['close'].rolling(window=50).mean()
df['sma_200'] = df['close'].rolling(window=200).mean()
df['golden_cross'] = ((df['sma_50'] > df['sma_200']) & (df['sma_50'].shift(1) <= df['sma_200'].shift(1))).astype(int)
df['sma_crossover'] = ((df['sma_50'] > df['sma_200']) & (df['sma_50'].shift(1) <= df['sma_200'].shift(1))).astype(int)
df['ema_50'] = EMAIndicator(close=df['close'], window=50).ema_indicator()
df['ema_200'] = EMAIndicator(close=df['close'], window=200).ema_indicator()
df['ema_crossover'] = ((df['ema_50'] > df['ema_200']) & (df['ema_50'].shift(1) <= df['ema_200'].shift(1))).astype(int)
ichimoku = IchimokuIndicator(high=df['high'], low=df['low'])
df['ichimoku_a'] = ichimoku.ichimoku_a()
df['ichimoku_b'] = ichimoku.ichimoku_b()
df['atr'] = AverageTrueRange(high=df['high'], low=df['low'], close=df['close']).average_true_range()
bb = BollingerBands(close=df['close'])
df['bb_width'] = (bb.bollinger_hband() - bb.bollinger_lband()) / df['close']
df['volatility'] = df['close'].rolling(window=30).std()
df['daily_return'] = df['close'].pct_change()
@ -137,8 +169,6 @@ async def download_data(ticker, con, start_date, end_date):
df['rolling_rsi'] = df['rsi'].rolling(window=10).mean()
df['stoch_rsi'] = stochrsi_k(df['close'], window=30, smooth1=3, smooth2=3)
df['rolling_stoch_rsi'] = df['stoch_rsi'].rolling(window=10).mean()
df['bb_hband'] = bollinger_hband(df['close'], window=30)/df['close']
df['bb_lband'] = bollinger_lband(df['close'], window=30)/df['close']
df['adi'] = acc_dist_index(high=df['high'],low=df['low'],close=df['close'],volume=df['volume'])
df['cmf'] = chaikin_money_flow(high=df['high'],low=df['low'],close=df['close'],volume=df['volume'], window=20)
@ -146,16 +176,27 @@ async def download_data(ticker, con, start_date, end_date):
df['fi'] = force_index(close=df['close'], volume=df['volume'], window= 13)
df['williams'] = WilliamsRIndicator(high=df['high'], low=df['low'], close=df['close']).williams_r()
df['kama'] = KAMAIndicator(close=df['close']).kama()
df['stoch'] = stoch(df['high'], df['low'], df['close'], window=30)
df['rocr'] = df['close'] / df['close'].shift(30) - 1 # Rate of Change Ratio (ROCR)
df['ppo'] = (df['ema_50'] - df['ema_200']) / df['ema_50'] * 100
df['vwap'] = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum()
df['volatility_ratio'] = df['close'].rolling(window=30).std() / df['close'].rolling(window=60).std()
df['fdi'] = calculate_fdi(df['high'], df['low'], df['close'])
#df['hurst'] = df['close'].rolling(window=100).apply(hurst_exponent)
df['fisher'] = fisher_transform(df['high'], df['low'])
df['tii'] = trend_intensity(df['close'])
ta_indicators = [
'rsi', 'macd', 'macd_signal', 'macd_hist', 'adx', 'adx_pos', 'adx_neg',
'cci', 'mfi', 'nvi', 'obv', 'vpt', 'stoch_rsi', 'bb_hband', 'bb_lband',
'adi', 'cmf', 'emv', 'fi', 'williams', 'stoch','sma_50','sma_200','golden_cross',
'cci', 'mfi', 'nvi', 'obv', 'vpt', 'stoch_rsi','bb_width',
'adi', 'cmf', 'emv', 'fi', 'williams', 'stoch','sma_crossover',
'volatility','daily_return','cumulative_return', 'roc','avg_volume_30d',
'rolling_rsi','rolling_stoch_rsi'
'rolling_rsi','rolling_stoch_rsi', 'ema_crossover','ichimoku_a','ichimoku_b',
'atr','kama','rocr','ppo','volatility_ratio','vwap','tii','fdi','fisher'
]
# Match each combined data entry with the closest available stock price in df
@ -178,10 +219,11 @@ async def download_data(ticker, con, start_date, end_date):
item['price'] = close_price
# Dynamically add all indicator values to the combined_data entry
for indicator in ta_indicators:
indicator_value = df[df['date'] == target_date][indicator].values[0]
item[indicator] = indicator_value # Add the indicator value to the combined_data entry
# Sort the combined data by date
combined_data = sorted(combined_data, key=lambda x: x['date'])
@ -232,29 +274,45 @@ async def download_data(ticker, con, start_date, end_date):
'inventory',
'propertyPlantEquipmentNet',
'ownersEarnings',
'averagePPE'
]
# Compute ratios for all combinations of key elements
for num, denom in combinations(key_elements, 2):
# Compute ratio num/denom
column_name = f'{num}_to_{denom}'
try:
df_combined[column_name] = df_combined[num] / df_combined[denom]
except:
ratio = df_combined[num] / df_combined[denom]
# Check for valid ratio
df_combined[column_name] = np.where((ratio != 0) &
(ratio != np.inf) &
(ratio != -np.inf) &
(~np.isnan(ratio)),
ratio, 0)
except Exception as e:
print(f"Error calculating {column_name}: {e}")
df_combined[column_name] = 0
# Compute reverse ratio denom/num
reverse_column_name = f'{denom}_to_{num}'
try:
df_combined[reverse_column_name] = df_combined[denom] / df_combined[num]
except:
reverse_ratio = df_combined[denom] / df_combined[num]
# Check for valid reverse ratio
df_combined[reverse_column_name] = np.where((reverse_ratio != 0) &
(reverse_ratio != np.inf) &
(reverse_ratio != -np.inf) &
(~np.isnan(reverse_ratio)),
reverse_ratio, 0)
except Exception as e:
print(f"Error calculating {reverse_column_name}: {e}")
df_combined[reverse_column_name] = 0
# Create 'Target' column based on price change
df_combined['Target'] = ((df_combined['price'].shift(-1) - df_combined['price']) / df_combined['price'] > 0).astype(int)
# Return a copy of the combined DataFrame
df_combined = df_combined.dropna()
df_combined = df_combined.where(~df_combined.isin([np.inf, -np.inf]), 0)
df_copy = df_combined.copy()
#print(df_copy[['date','revenue','ownersEarnings','revenuePerShare']])
return df_copy
@ -266,7 +324,7 @@ async def download_data(ticker, con, start_date, end_date):
async def process_symbol(ticker, con, start_date, end_date):
try:
test_size = 0.4
test_size = 0.2
start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d")
end_date = datetime.today().strftime("%Y-%m-%d")
predictor = FundamentalPredictor()
@ -317,18 +375,18 @@ async def train_process(tickers, con):
best_features = [col for col in df_train.columns if col not in ['date','price','Target']]
df_train = df_train.sample(frac=1).reset_index(drop=True)
df_train = df_train.sample(frac=1).reset_index(drop=True) #df_train.reset_index(drop=True)
print(df_train)
print('======Train Set Datapoints======')
print(len(df_train))
print(df_train)
print(df_test)
#selected_features = predictor.feature_selection(df_train[best_features], df_train['Target'],k=10)
#print(selected_features)
selected_features = [col for col in df_train if col not in ['price','date','Target']]
predictor = FundamentalPredictor()
predictor.train_model(df_train[selected_features], df_train['Target'])
predictor.evaluate_model(df_test[selected_features], df_test['Target'])
#print(selected_features)
selected_features = [col for col in df_train if col not in ['price','date','Target']]
best_features = predictor.feature_selection(df_train[selected_features], df_train['Target'],k=100)
print(best_features)
predictor.train_model(df_train[best_features], df_train['Target'])
predictor.evaluate_model(df_test[best_features], df_test['Target'])
async def test_process(con):
test_size = 0.2
@ -350,15 +408,15 @@ async def run():
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 300E9")
stock_symbols = ['AAPL'] #[row[0] for row in cursor.fetchall()]
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 10E9 AND symbol NOT LIKE '%.%'")
stock_symbols = [row[0] for row in cursor.fetchall()] #['AAPL','GME','LLY','NVDA'] #
print('Number of Stocks')
print(len(stock_symbols))
await train_process(stock_symbols, con)
#Prediction Steps for all stock symbols
'''
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9")
stock_symbols = [row[0] for row in cursor.fetchall()]
@ -376,7 +434,7 @@ async def run():
tasks.append(process_symbol(ticker, con, start_date, end_date))
await asyncio.gather(*tasks)
'''
con.close()
try:

View File

@ -14,17 +14,9 @@ api_key = os.getenv('FMP_API_KEY')
# Rate limiting
MAX_REQUESTS_PER_MINUTE = 100
request_semaphore = asyncio.Semaphore(MAX_REQUESTS_PER_MINUTE)
last_request_time = datetime.min
async def fetch_data(session, url):
global last_request_time
async with request_semaphore:
# Ensure at least 60 seconds between batches of MAX_REQUESTS_PER_MINUTE
current_time = datetime.now()
if (current_time - last_request_time).total_seconds() < 60:
await asyncio.sleep(60 - (current_time - last_request_time).total_seconds())
last_request_time = datetime.now()
try:
async with session.get(url) as response:
if response.status == 200:
@ -46,68 +38,96 @@ def get_existing_data(symbol, interval):
async def get_data(session, symbol, time_period):
existing_data = get_existing_data(symbol, time_period)
if not existing_data:
# If no existing data, fetch all data
return await fetch_all_data(session, symbol, time_period)
last_date = datetime.strptime(existing_data[-1]['date'], "%Y-%m-%d %H:%M:%S")
current_date = datetime.utcnow()
# If data is up to date, skip fetching
if (current_date - last_date).days < 1:
return # Data is up to date, skip to next symbol
return # Data is recent, skip further fetch
# Fetch only missing data
# Fetch missing data only from the last saved date to the current date
start_date = (last_date + timedelta(days=1)).strftime("%Y-%m-%d")
end_date = current_date.strftime("%Y-%m-%d")
print(start_date, end_date)
url = f"https://financialmodelingprep.com/api/v3/historical-chart/{time_period}/{symbol}?serietype=bar&extend=false&from={start_date}&to={end_date}&apikey={api_key}"
new_data = await fetch_data(session, url)
if new_data:
existing_data.extend(new_data)
existing_data.sort(key=lambda x: x['date'])
existing_data.sort(key=lambda x: x['date']) # Sort by date
await save_json(symbol, existing_data, time_period)
async def fetch_all_data(session, symbol, time_period):
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=180)
url = f"https://financialmodelingprep.com/api/v3/historical-chart/{time_period}/{symbol}?serietype=bar&extend=false&from={start_date.strftime('%Y-%m-%d')}&to={end_date.strftime('%Y-%m-%d')}&apikey={api_key}"
start_date = end_date - timedelta(days=365*20)
data = await fetch_data(session, url)
if data:
data.sort(key=lambda x: x['date'])
await save_json(symbol, data, time_period)
step = timedelta(days=5) # Step of 5 days
current_start_date = start_date
all_data = [] # To accumulate all the data
while current_start_date < end_date:
current_end_date = min(current_start_date + step, end_date)
url = f"https://financialmodelingprep.com/api/v3/historical-chart/{time_period}/{symbol}?serietype=bar&extend=false&from={current_start_date.strftime('%Y-%m-%d')}&to={current_end_date.strftime('%Y-%m-%d')}&apikey={api_key}"
data = await fetch_data(session, url)
if data:
all_data.extend(data) # Accumulate the fetched data
print(f"Fetched {len(data)} records from {current_start_date.strftime('%Y-%m-%d')} to {current_end_date.strftime('%Y-%m-%d')}")
# Move the window forward by 5 days
current_start_date = current_end_date
if all_data:
# Sort the data by date before saving
all_data.sort(key=lambda x: x['date'])
await save_json(symbol, all_data, time_period)
async def save_json(symbol, data, interval):
os.makedirs(f"json/export/price/{interval}", exist_ok=True)
with open(f"json/export/price/{interval}/{symbol}.json", 'w') as file:
file_path = f"json/export/price/{interval}/{symbol}.json"
with open(file_path, 'w') as file:
ujson.dump(data, file)
async def process_symbol(session, symbol):
await get_data(session, symbol, '30min')
await get_data(session, symbol, '1hour')
async def run():
# Load symbols from databases
con = sqlite3.connect('stocks.db')
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks")
stock_symbols = [row[0] for row in cursor.fetchall()]
etf_con = sqlite3.connect('etf.db')
etf_cursor = etf_con.cursor()
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
con.close()
etf_con.close()
total_symbols = stock_symbols + etf_symbols
# List of total symbols to process
total_symbols = ['GOOGL'] # Use stock_symbols + etf_symbols if needed
# Setting up aiohttp connector with rate limiting
connector = TCPConnector(limit=MAX_REQUESTS_PER_MINUTE)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [process_symbol(session, symbol) for symbol in total_symbols]
for i, _ in enumerate(tqdm(asyncio.as_completed(tasks), total=len(tasks)), 1):
# Use tqdm to track progress of tasks
for i, task in enumerate(tqdm(asyncio.as_completed(tasks), total=len(tasks)), 1):
await task # Ensure all tasks are awaited properly
if i % MAX_REQUESTS_PER_MINUTE == 0:
print(f'Processed {i} symbols')
await asyncio.sleep(60) # Sleep for 60 seconds after every MAX_REQUESTS_PER_MINUTE symbols
print(f'Processed {i} symbols, sleeping to respect rate limits...')
await asyncio.sleep(60) # Pause for 60 seconds to avoid hitting rate limits
if __name__ == "__main__":
asyncio.run(run())
asyncio.run(run())

View File

@ -33,8 +33,8 @@ async def get_data(session, total_symbols):
res_list = []
for item in data:
symbol = item['symbol']
item['sentiment'] = round(item['sentiment']*100, 2)
item['lastSentiment'] = round(item['lastSentiment']*100, 2)
item['sentiment'] = round(item['sentiment']*100)
item['lastSentiment'] = round(item['lastSentiment']*100)
if symbol in total_symbols:
try:
with open(f"json/quote/{symbol}.json", 'r') as file:

View File

@ -8,13 +8,15 @@ from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_sco
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from keras.models import Sequential
from keras.layers import LSTM, Dense, Conv1D, Dropout, BatchNormalization
from keras.layers import LSTM, Dense, Conv1D, Dropout, BatchNormalization, MaxPooling1D, Bidirectional
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from keras.models import load_model
from sklearn.feature_selection import SelectKBest, f_classif
from tensorflow.keras.backend import clear_session
from keras import regularizers
from keras.layers import Layer
from tqdm import tqdm
from collections import defaultdict
@ -26,6 +28,7 @@ import time
# Based on the paper: https://arxiv.org/pdf/1603.00751
class FundamentalPredictor:
def __init__(self):
self.model = self.build_model()
@ -35,35 +38,34 @@ class FundamentalPredictor:
clear_session()
model = Sequential()
model.add(Conv1D(filters=64, kernel_size=3, padding='same', activation='relu', input_shape=(None, 1)))
model.add(Conv1D(filters=32, kernel_size=3, padding='same', activation='relu'))
# First LSTM layer with dropout and batch normalization
model.add(LSTM(256, return_sequences=True, kernel_regularizer=regularizers.l2(0.01)))
model.add(Dropout(0.5))
model.add(BatchNormalization())
# Second LSTM layer with dropout and batch normalization
model.add(LSTM(256, return_sequences=True, kernel_regularizer=regularizers.l2(0.01)))
model.add(Dropout(0.5))
model.add(BatchNormalization())
# Third LSTM layer with dropout and batch normalization
model.add(LSTM(128, kernel_regularizer=regularizers.l2(0.01)))
model.add(Dropout(0.5))
model.add(BatchNormalization())
model.add(Dense(64, activation='relu', kernel_regularizer=regularizers.l2(0.01)))
model.add(Dense(1000, activation='relu', kernel_regularizer=regularizers.l2(0.01)))
model.add(Dropout(0.2))
model.add(BatchNormalization())
# Dense layer with sigmoid activation for binary classification
model.add(Dense(2000, activation='relu', kernel_regularizer=regularizers.l2(0.01)))
model.add(Dropout(0.2))
model.add(BatchNormalization())
model.add(Dense(3000, activation='relu', kernel_regularizer=regularizers.l2(0.01)))
model.add(Dropout(0.2))
model.add(BatchNormalization())
model.add(Dense(2000, activation='relu', kernel_regularizer=regularizers.l2(0.01)))
model.add(Dropout(0.2))
model.add(BatchNormalization())
model.add(Dense(1000, activation='relu', kernel_regularizer=regularizers.l2(0.01)))
model.add(Dropout(0.2))
model.add(BatchNormalization())
model.add(Dense(500, activation='relu', kernel_regularizer=regularizers.l2(0.01)))
# Output layer for binary classification
model.add(Dense(1, activation='sigmoid'))
# Adam optimizer with a learning rate of 0.01
optimizer = Adam(learning_rate=0.01)
# Optimizer with a lower learning rate and scheduler
optimizer = Adam(learning_rate=0.1)
# Compile model with binary crossentropy loss and accuracy metric
# Compile the model
model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
return model
@ -80,14 +82,16 @@ class FundamentalPredictor:
def train_model(self, X_train, y_train):
X_train = self.preprocess_data(X_train)
X_train = self.reshape_for_lstm(X_train)
#X_train = self.reshape_for_lstm(X_train)
checkpoint = ModelCheckpoint('ml_models/weights/fundamental_weights/weights.keras',
save_best_only=True, monitor='val_loss', mode='min')
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
self.model.fit(X_train, y_train, epochs=250, batch_size=32,
validation_split=0.2, callbacks=[checkpoint, early_stopping])
save_best_only=True, save_freq = 1,
monitor='val_loss', mode='min')
early_stopping = EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=10, min_lr=0.00001)
self.model.fit(X_train, y_train, epochs=100_000, batch_size=64,
validation_split=0.1, callbacks=[checkpoint, early_stopping, reduce_lr])
self.model.save('ml_models/weights/fundamental_weights/weights.keras')
def evaluate_model(self, X_test, y_test):
@ -113,7 +117,9 @@ class FundamentalPredictor:
'precision': round(test_precision * 100),
'sentiment': 'Bullish' if next_value_prediction == 1 else 'Bearish'}, test_predictions
def feature_selection(self, X_train, y_train, k=8):
def feature_selection(self, X_train, y_train, k=100):
print('feature selection:')
print(X_train.shape, y_train.shape)
selector = SelectKBest(score_func=f_classif, k=k)
selector.fit(X_train, y_train)