From fc570571f0141185ddb92b4fa4375d556330bf13 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Fri, 23 Aug 2024 15:27:19 +0200 Subject: [PATCH] add dividends cron job --- app/cron_dividends.py | 105 ++++++++++++++++++++++++++++++++++++++++ app/main.py | 53 +++++++------------- app/primary_cron_job.py | 12 ++++- 3 files changed, 133 insertions(+), 37 deletions(-) create mode 100644 app/cron_dividends.py diff --git a/app/cron_dividends.py b/app/cron_dividends.py new file mode 100644 index 0000000..076b527 --- /dev/null +++ b/app/cron_dividends.py @@ -0,0 +1,105 @@ +import aiohttp +import ujson +import sqlite3 +import asyncio +import pandas as pd +from tqdm import tqdm +import orjson + +async def save_as_json(symbol, data): + with open(f"json/dividends/companies/{symbol}.json", 'w') as file: + ujson.dump(data, file) + + +async def get_data(ticker, con, etf_con, stock_symbols, etf_symbols): + try: + if ticker in etf_symbols: + table_name = 'etfs' + column_name = 'etf_dividend' + else: + table_name = 'stocks' + column_name = 'stock_dividend' + + query_template = f""" + SELECT + {column_name}, quote + FROM + {table_name} + WHERE + symbol = ? + """ + + df = pd.read_sql_query(query_template, etf_con if table_name == 'etfs' else con, params=(ticker,)) + + dividend_data = orjson.loads(df[column_name].iloc[0]) + + res = dividend_data.get('historical', []) + + filtered_res = [item for item in res if item['recordDate'] != '' and item['paymentDate'] != ''] + + # Calculate payout frequency based on dividends recorded in 2023 + payout_frequency = sum(1 for item in filtered_res if '2023' in item['recordDate']) + quote_data = orjson.loads(df['quote'].iloc[0])[0] + eps = quote_data.get('eps') + current_price = quote_data.get('price') + + amount = filtered_res[0]['adjDividend'] if filtered_res else 0 + annual_dividend = round(amount * payout_frequency, 2) + dividend_yield = round((annual_dividend / current_price) * 100, 2) if current_price else None + + payout_ratio = round((1 - (eps - annual_dividend) / eps) * 100, 2) if eps else None + + previous_index = next((i for i, item in enumerate(filtered_res) if '2023' in item['recordDate']), None) + + # Calculate previousAnnualDividend and dividendGrowth + previous_annual_dividend = (filtered_res[previous_index]['adjDividend'] * payout_frequency) if previous_index is not None else 0 + dividend_growth = round(((annual_dividend - previous_annual_dividend) / previous_annual_dividend) * 100, 2) if previous_annual_dividend else None + + + return { + 'payoutFrequency': payout_frequency, + 'annualDividend': annual_dividend, + 'dividendYield': dividend_yield, + 'payoutRatio': payout_ratio, + 'dividendGrowth': dividend_growth, + 'history': filtered_res, + } + + except: + res = {} + + return res + + +async def run(): + + con = sqlite3.connect('stocks.db') + cursor = con.cursor() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") + stock_symbols = [row[0] for row in cursor.fetchall()] + + etf_con = sqlite3.connect('etf.db') + etf_cursor = etf_con.cursor() + etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") + etf_symbols = [row[0] for row in etf_cursor.fetchall()] + + total_symbols = stock_symbols + etf_symbols + + for ticker in tqdm(total_symbols): + res = await get_data(ticker, con, etf_con, stock_symbols, etf_symbols) + try: + if len(res.get('history')) > 0 and res.get('dividendGrowth') != None: + await save_as_json(ticker, res) + except: + pass + + + con.close() + etf_con.close() + + +try: + asyncio.run(run()) +except Exception as e: + print(e) diff --git a/app/main.py b/app/main.py index 2d16efc..ba2f9fc 100755 --- a/app/main.py +++ b/app/main.py @@ -611,46 +611,27 @@ async def stock_dividend(data: TickerData, api_key: str = Security(get_api_key)) cached_result = redis_client.get(cache_key) if cached_result: - return orjson.loads(cached_result) - - if ticker in etf_symbols: - table_name = 'etfs' - column_name = 'etf_dividend' - else: - table_name = 'stocks' - column_name = 'stock_dividend' - - query_template = f""" - SELECT - {column_name}, quote - FROM - {table_name} - WHERE - symbol = ? - """ - - df = pd.read_sql_query(query_template, etf_con if table_name == 'etfs' else con, params=(ticker,)) - - try: - dividend_data = orjson.loads(df[column_name].iloc[0]) - if column_name == 'stock_dividend': - res = dividend_data.get('historical', []) - else: - res = dividend_data.get('historical', []) - except: - res = [] + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"}) try: - quote_data = orjson.loads(df['quote'].iloc[0])[0] - eps = quote_data.get('eps') - current_price = quote_data.get('price') + with open(f"json/dividends/companies/{ticker}.json", 'rb') as file: + res = orjson.loads(file.read()) except: - eps = None - current_price = None + res = {'history': []} - final_res = [res, eps, current_price] - redis_client.set(cache_key, orjson.dumps(final_res), 3600*3600) # Set cache expiration time to 1 hour - return final_res + data = orjson.dumps(res) + compressed_data = gzip.compress(data) + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 3600*3600) + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index 6bb95f8..3e5b8f0 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -511,6 +511,16 @@ def run_market_cap(): ] run_command(command) + +def run_dividends(): + run_command(["python3", "cron_dividends.py"]) + command = [ + "sudo", "rsync", "-avz", "-e", "ssh", + "/root/backend/app/json/dividends", + f"root@{useast_ip_address}:/root/backend/app/json" + ] + run_command(command) + # Create functions to run each schedule in a separate thread def run_threaded(job_func): job_thread = threading.Thread(target=job_func) @@ -530,8 +540,8 @@ schedule.every().day.at("09:00").do(run_threaded, run_hedge_fund).tag('hedge_fun schedule.every().day.at("07:30").do(run_threaded, run_government_contract).tag('government_contract_job') schedule.every().day.at("07:30").do(run_threaded, run_financial_statements).tag('financial_statements_job') - schedule.every().day.at("08:00").do(run_threaded, run_cron_insider_trading).tag('insider_trading_job') +schedule.every().day.at("08:30").do(run_threaded, run_dividends).tag('dividends_job') schedule.every().day.at("09:00").do(run_threaded, run_congress_trading).tag('congress_job') schedule.every().day.at("10:00").do(run_threaded, run_shareholders).tag('shareholders_job') schedule.every().day.at("10:30").do(run_threaded, run_sec_filings).tag('sec_filings_job')