From 282da7d2cfe19135e666e48bfafbb3852bc9b364 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Sat, 5 Oct 2024 17:43:33 +0200 Subject: [PATCH] add batch learning --- app/cron_ai_score.py | 207 +++++++----------- .../__pycache__/score_model.cpython-310.pyc | Bin 3758 -> 4529 bytes app/ml_models/score_model.py | 49 ++++- 3 files changed, 116 insertions(+), 140 deletions(-) diff --git a/app/cron_ai_score.py b/app/cron_ai_score.py index fb747ef..f66126a 100644 --- a/app/cron_ai_score.py +++ b/app/cron_ai_score.py @@ -100,54 +100,21 @@ async def download_data(ticker, con, start_date, end_date, skip_downloading): f"json/financial-statements/owner-earnings/quarter/{ticker}.json", ] - # Helper function to load JSON data asynchronously - async def load_json_from_file(path): - async with aiofiles.open(path, 'r') as f: - content = await f.read() - return orjson.loads(content) - - # Helper function to filter data based on keys and year - async def filter_data(data, ignore_keys, year_threshold=2000): - return [{k: v for k, v in item.items() if k not in ignore_keys} for item in data if int(item["date"][:4]) >= year_threshold] - - # Define keys to ignore + # Async loading and filtering ignore_keys = ["symbol", "reportedCurrency", "calendarYear", "fillingDate", "acceptedDate", "period", "cik", "link", "finalLink","pbRatio","ptbRatio"] + async def load_and_filter_json(path): + async with aiofiles.open(path, 'r') as f: + data = orjson.loads(await f.read()) + return [{k: v for k, v in item.items() if k not in ignore_keys and int(item["date"][:4]) >= 2000} for item in data] - # Load and filter data for each statement type - - ratios = await load_json_from_file(statements[0]) - ratios = await filter_data(ratios, ignore_keys) + # Load all files concurrently + data = await asyncio.gather(*(load_and_filter_json(s) for s in statements)) + ratios, key_metrics, cashflow, income, balance, income_growth, balance_growth, cashflow_growth, owner_earnings = data #Threshold of enough datapoints needed! if len(ratios) < 50: return - key_metrics = await load_json_from_file(statements[1]) - key_metrics = await filter_data(key_metrics, ignore_keys) - - - cashflow = await load_json_from_file(statements[2]) - cashflow = await filter_data(cashflow, ignore_keys) - - income = await load_json_from_file(statements[3]) - income = await filter_data(income, ignore_keys) - - balance = await load_json_from_file(statements[4]) - balance = await filter_data(balance, ignore_keys) - - income_growth = await load_json_from_file(statements[5]) - income_growth = await filter_data(income_growth, ignore_keys) - - balance_growth = await load_json_from_file(statements[6]) - balance_growth = await filter_data(balance_growth, ignore_keys) - - - cashflow_growth = await load_json_from_file(statements[7]) - cashflow_growth = await filter_data(cashflow_growth, ignore_keys) - - owner_earnings = await load_json_from_file(statements[8]) - owner_earnings = await filter_data(owner_earnings, ignore_keys) - # Combine all the data combined_data = defaultdict(dict) @@ -171,126 +138,106 @@ async def download_data(ticker, con, start_date, end_date, skip_downloading): df_ta = generate_ta_features(df) # Filter columns in df_stats and df_ta that are not in df + # Drop unnecessary columns from df_stats and df_ta df_stats_filtered = df_stats.drop(columns=df_columns.intersection(df_stats.columns), errors='ignore') df_ta_filtered = df_ta.drop(columns=df_columns.intersection(df_ta.columns), errors='ignore') + + # Extract the column names for indicators ta_columns = df_ta_filtered.columns.tolist() stats_columns = df_stats_filtered.columns.tolist() # Concatenate df with the filtered df_stats and df_ta df = pd.concat([df, df_ta_filtered, df_stats_filtered], axis=1) + # Set up a dictionary for faster lookup of close prices and columns by date + df_dict = df.set_index('date').to_dict(orient='index') - # Match each combined data entry with the closest available stock price in df - for item in combined_data: - target_date = item['date'] + # Helper function to find closest date within max_attempts + def find_closest_date(target_date, max_attempts=10): counter = 0 - max_attempts = 10 - - # Look for the closest matching date in the stock data - while target_date not in df['date'].values and counter < max_attempts: + while target_date not in df_dict and counter < max_attempts: target_date = (pd.to_datetime(target_date) - pd.Timedelta(days=1)).strftime('%Y-%m-%d') counter += 1 + return target_date if target_date in df_dict else None - # If max attempts are reached and no matching date is found, skip the entry - if counter == max_attempts: + # Match combined data entries with stock data + for item in combined_data: + target_date = item['date'] + closest_date = find_closest_date(target_date) + + # Skip if no matching date is found + if not closest_date: continue - # Find the close price for the matching date - close_price = round(df[df['date'] == target_date]['close'].values[0], 2) - item['price'] = close_price + # Fetch data from the dictionary for the closest matching date + data = df_dict[closest_date] - # Dynamically add all indicator values to the combined_data entry - - for column in ta_columns: - column_value = df[df['date'] == target_date][column].values[0] - item[column] = column_value # Add the column value to the combined_data entry - for column in stats_columns: - column_value = df[df['date'] == target_date][column].values[0] - item[column] = column_value # Add the column value to the combined_data entry - + # Add close price to the item + item['price'] = round(data['close'], 2) + + # Dynamically add indicator values from ta_columns and stats_columns + for column in ta_columns + stats_columns: + item[column] = data.get(column, None) # Sort the combined data by date combined_data = sorted(combined_data, key=lambda x: x['date']) - # Convert combined data into a DataFrame + + # Convert combined data to a DataFrame and drop rows with NaN values df_combined = pd.DataFrame(combined_data).dropna() + fundamental_columns = [ - 'revenue', - 'costOfRevenue', - 'grossProfit', - 'netIncome', - 'operatingIncome', - 'operatingExpenses', - 'researchAndDevelopmentExpenses', - 'ebitda', - 'freeCashFlow', - 'incomeBeforeTax', - 'incomeTaxExpense', - 'debtRepayment', - 'dividendsPaid', - 'depreciationAndAmortization', - 'netCashUsedProvidedByFinancingActivities', - 'changeInWorkingCapital', - 'stockBasedCompensation', - 'deferredIncomeTax', - 'commonStockRepurchased', - 'operatingCashFlow', - 'capitalExpenditure', - 'accountsReceivables', - 'purchasesOfInvestments', - 'cashAndCashEquivalents', - 'shortTermInvestments', - 'cashAndShortTermInvestments', - 'longTermInvestments', - 'otherCurrentLiabilities', - 'totalCurrentLiabilities', - 'longTermDebt', - 'totalDebt', - 'netDebt', - 'commonStock', - 'totalEquity', - 'totalLiabilitiesAndStockholdersEquity', - 'totalStockholdersEquity', - 'totalInvestments', - 'taxAssets', - 'totalAssets', - 'inventory', - 'propertyPlantEquipmentNet', - 'ownersEarnings', + 'revenue', 'costOfRevenue', 'grossProfit', 'netIncome', 'operatingIncome', 'operatingExpenses', + 'researchAndDevelopmentExpenses', 'ebitda', 'freeCashFlow', 'incomeBeforeTax', 'incomeTaxExpense', + 'debtRepayment', 'dividendsPaid', 'depreciationAndAmortization', 'netCashUsedProvidedByFinancingActivities', + 'changeInWorkingCapital', 'stockBasedCompensation', 'deferredIncomeTax', 'commonStockRepurchased', + 'operatingCashFlow', 'capitalExpenditure', 'accountsReceivables', 'purchasesOfInvestments', + 'cashAndCashEquivalents', 'shortTermInvestments', 'cashAndShortTermInvestments', 'longTermInvestments', + 'otherCurrentLiabilities', 'totalCurrentLiabilities', 'longTermDebt', 'totalDebt', 'netDebt', 'commonStock', + 'totalEquity', 'totalLiabilitiesAndStockholdersEquity', 'totalStockholdersEquity', 'totalInvestments', + 'taxAssets', 'totalAssets', 'inventory', 'propertyPlantEquipmentNet', 'ownersEarnings', ] - # Compute ratios for all combinations of key elements - new_columns = {} - # Loop over combinations of column pairs - for columns in [fundamental_columns, stats_columns, ta_columns]: - for num, denom in combinations(columns, 2): - # Compute ratio and reverse ratio - ratio = df_combined[num] / df_combined[denom] - reverse_ratio = round(df_combined[denom] / df_combined[num],2) + # Function to compute combinations within a group + def compute_column_ratios(columns, df, new_columns): + column_combinations = list(combinations(columns, 2)) + + for num, denom in column_combinations: + with np.errstate(divide='ignore', invalid='ignore'): + # Compute ratio and reverse ratio safely + ratio = df[num] / df[denom] + reverse_ratio = df[denom] / df[num] # Define column names for both ratios column_name = f'{num}_to_{denom}' reverse_column_name = f'{denom}_to_{num}' - # Store the new columns in the dictionary, replacing invalid values with 0 + # Assign values to new columns, handling invalid values new_columns[column_name] = np.nan_to_num(ratio, nan=0, posinf=0, neginf=0) new_columns[reverse_column_name] = np.nan_to_num(reverse_ratio, nan=0, posinf=0, neginf=0) - # Add all new columns to the original DataFrame at once - df_combined = pd.concat([df_combined, pd.DataFrame(new_columns)], axis=1) - - # To defragment the DataFrame, make a copy - df_combined = df_combined.copy() - df_combined = df_combined.dropna() - df_combined = df_combined.where(~df_combined.isin([np.inf, -np.inf]), 0) - + # Create an empty dictionary for the new columns + new_columns = {} + # Compute combinations for each group of columns + compute_column_ratios(fundamental_columns, df_combined, new_columns) + compute_column_ratios(stats_columns, df_combined, new_columns) + compute_column_ratios(ta_columns, df_combined, new_columns) + + # Concatenate the new ratio columns with the original DataFrame + df_combined = pd.concat([df_combined, pd.DataFrame(new_columns, index=df_combined.index)], axis=1) + + # Clean up and replace invalid values + df_combined = df_combined.replace([np.inf, -np.inf], 0).dropna() + + # Create 'Target' column to indicate if the next price is higher than the current one df_combined['Target'] = ((df_combined['price'].shift(-1) - df_combined['price']) / df_combined['price'] > 0).astype(int) - df_copy = df_combined.copy() - df_copy = df_copy.map(lambda x: round(x, 2) if isinstance(x, float) else x) + # Copy DataFrame and round float values + df_copy = df_combined.copy().map(lambda x: round(x, 2) if isinstance(x, float) else x) - if df_copy.shape[0] > 0: + # Save to a file if there are rows in the DataFrame + if not df_copy.empty: with open(file_path, 'wb') as file: file.write(orjson.dumps(df_copy.to_dict(orient='records'))) @@ -354,7 +301,8 @@ async def warm_start_training(tickers, con, skip_downloading): predictor = ScorePredictor() selected_features = [col for col in df_train if col not in ['price', 'date', 'Target']] #top_uncorrelated_features(df_train, top_n=200) - predictor.warm_start_training(df_train[selected_features], df_train['Target']) + #predictor.warm_start_training(df_train[selected_features], df_train['Target']) + predictor.batch_train_model(df_train[selected_features], df_train['Target'], batch_size=1000) predictor.evaluate_model(df_test[selected_features], df_test['Target']) return predictor @@ -400,20 +348,19 @@ async def run(): if train_mode: # Warm start training - cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E6 AND symbol NOT LIKE '%.%' AND symbol NOT LIKE '%-%'") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%' AND symbol NOT LIKE '%-%'") warm_start_symbols = [row[0] for row in cursor.fetchall()] - print('Warm Start Training') + print(f'Warm Start Training: {len(warm_start_symbols)}') predictor = await warm_start_training(warm_start_symbols, con, skip_downloading) else: # Fine-tuning and evaluation for all stocks - cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 1E9 AND symbol NOT LIKE '%.%'") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E6 AND symbol NOT LIKE '%.%'") stock_symbols = [row[0] for row in cursor.fetchall()] print(f"Total tickers for fine-tuning: {len(stock_symbols)}") start_date = datetime(1995, 1, 1).strftime("%Y-%m-%d") end_date = datetime.today().strftime("%Y-%m-%d") - tasks = [] for ticker in tqdm(stock_symbols): await fine_tune_and_evaluate(ticker, con, start_date, end_date, skip_downloading) diff --git a/app/ml_models/__pycache__/score_model.cpython-310.pyc b/app/ml_models/__pycache__/score_model.cpython-310.pyc index 0974412fbd2d2519905c40c315c69ffcf09a0e68..30e3db846755c1fa797582c1f9f6d4db85f21699 100644 GIT binary patch delta 1954 zcmaJ>&5smC6z}Tq>G{|lcHK{K2SssY8CO671!e_YZb$+uc7mOrsoI%Yx~IqL8jztk zb67kQ#XhJI6G-&ng~V_YWAxxn{sHS|Oq5BC2j$`kU)8LD8)J3)SFhe@y{h->=9Vv? zEJXRdqriCoP1Km1x>DHOzYQy_U=%1Wf>{mJP<7Q%b2T*N>#hNOEigmNwIoju>@eeI zLdSK&teXvUZZ6Ebc_}l3LO9}%NZJfW!=hVU_p`|z13y+U9*(=?l4l2xg)dLI6YC0- z?xftvlshHoX?I%AGwzI>XWiLj3eI5Xl7gKz)!lsMw{IK`w!a8}yp-NT^K(V);Or&j zS~!REmlW5=1v~PLnqbLxEA7A|oX48R0FFt}Vu6wks6n2a88 zEX(nGHGNw>vNX605gQlBZyx)p$EpJx{0!y*e!cpM7=!9yT(F)6;C9Uq{C1t-#!&cG zb#NcXsd6FxUR~<*Ox&%-e%J{JFNUqQS8uig>2pVrfgjee&&{^C64hdEog=gs#e_TOeJVXrR<^0Qd2YsaKOmH5(^+i^ z?M!RhiTP(hV51x|k&e{=`RqUR(e#eC8}0j0U$B)G>$;hs8|i|65ZP&4-&T2AM943< zn8#rE@p42%x*d35sB~V4&}q68<{?zlul1R+U9kRB9OH!Qz!q|-w=MnFm`(5KnKWn2 zpoMh1v47E~&kH-DOm_>|Bj5!AGG^xA@#tP5c{J2DW7^y=m@lQbjn8*UatExr2D{~E zr&T`-*P!EW*x`EAA#ExSg&Q5(YO}IN=hG|Z#I{E~3hCIx;}r?F+O2kDG5x{Z)2E`y z>N>zbBLjp9zpT8gKrkgiYiLzx2%|(#v^DLha%N%GU}|C{+N6@`Yw9YTFRaY>)M;ha zN`z!c+lg{rORO6RxXOyYVkA~#i+@>dAguN;L-#ThYl(49!Fpm|Q$E$#5K`W8m~|bD zS9F#EbU$-soWgRRR4)fv*S<*I3TT@}uJH=}`dg z$XvX*)gwIH?m~~qK8&F!nNOM5!tZj6wDFKQ?MZcRT^JINR+jKq9D0)_;ECf}43>Z& zi>JU=M^Ql*wxdmH6Y7)(C0;sNk0?1o32xO{L@OKZgY>@n@fn%kgCt~TTn{22Q&G7O zb1v%}(?iP}w>YxgY7>uj0pxwUx9(Z4`UD*VPYw8C+^w!Zb-z>_yXE+~O5aGCvu`IY zf_z{J5H&n71c>S$z-y=sQ~>(AfJuP#N9Q&3fLQHFE7|?%aN5jnEy(^iPkW5SjJ}nA zl>P2S0y;dynv}%ND8Sr+qUn;2J1>Y@>cSg1+;}~`oI7hH*;<3)C$~FO4SqJ$dF@#(P1Yx!9)x(v>;4wv+zufn1=F_F6>24qJ-D!aN`}c6Xny%{N zr+P%ZOr0hJBCVa+Pynq(GQ=PnQU6TqyJm`;+87xXueDL}RhuHF*r^ZiD1v$7)*Epx z=6w3z zq@Eb<#@vM{OtfHuwG*w?4x4-fYV|g6G~;TD_lt98A^p#D#5J=>Cd6HHTv*QCv(n)h zdc}KuCtlW~mLJF{T;?ziWL1;3;k(gl4_cA>+E--jjb>Pj7XVl*2&1W&KOCyst6$cg z7H8ZNnG%cc-tAp1V;6g&YoNz0N*Y260lSG$qVyAn_~;%tb|6(2W8OY8E|@pu$*(eI zWxe=8@xXh(=P1}Dc6^?Po%5|INc4+-v=FXM(+iRI>Y!jj16p~@glS}KsjCho)*{r!Z_b5@x>$!V^P7R+;U~rI zT%J_K!`xM}Q4IG?%&ycvZm>|^{9m}lnuN+OMBxE~F*mKo%n@w-608hLD&p_CPW=UH C