From f8fef5235b8c3d0695993ad18831888c50f73054 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Wed, 26 Mar 2025 20:07:55 +0100 Subject: [PATCH] add analyst flow cron job --- app/cron_analyst_flow.py | 59 ++++++++++++++++++++++++++++++++++++++++ app/main.py | 29 ++++++++++++++++++++ app/primary_cron_job.py | 1 + 3 files changed, 89 insertions(+) create mode 100644 app/cron_analyst_flow.py diff --git a/app/cron_analyst_flow.py b/app/cron_analyst_flow.py new file mode 100644 index 0000000..3bb1d97 --- /dev/null +++ b/app/cron_analyst_flow.py @@ -0,0 +1,59 @@ +import orjson +from datetime import datetime, timedelta +import os +from tqdm import tqdm + +today = datetime.today().date() +threshold_date = today - timedelta(days=14) + +directory_path = "json/analyst" + +def save_json(data): + os.makedirs(directory_path, exist_ok=True) + with open(f"{directory_path}/flow-data.json", 'wb') as file: + file.write(orjson.dumps(data)) + +def get_analyst_from_directory(): + directory = "json/analyst/analyst-db/" + res = [] + try: + data = [file for file in os.listdir(directory) if file.endswith(".json")] + + for file_name in data: + try: + with open(f"{directory}{file_name}", "r") as file: + analyst_data = orjson.loads(file.read()) + if analyst_data['analystScore'] >= 3: + ratings = [item for item in analyst_data['ratingsList'] + if datetime.strptime(item["date"], "%Y-%m-%d").date() >= threshold_date] + if ratings: + for item_ratings in ratings: + try: + res.append({ + 'analystName': analyst_data['analystName'], + 'analystId': analyst_data['analystId'], + 'analystScore': analyst_data['analystScore'], + 'date': item_ratings['date'], + 'name': item_ratings['name'], + 'symbol': item_ratings['ticker'], + 'adjusted_pt_current': item_ratings['adjusted_pt_current'], + 'adjusted_pt_prior': item_ratings['adjusted_pt_prior'], + 'upside': item_ratings['upside'], + 'action': item_ratings['action_company'], + 'rating_current': item_ratings['rating_current'] + }) + except Exception as e: + print(e) + except Exception as e: + print(f"Error processing {file_name}: {e}") + except Exception as e: + print("Error reading directory:", e) + return [] + return res + + +if __name__ == "__main__": + data = get_analyst_from_directory() + sorted_data = sorted(data, key=lambda x: datetime.strptime(x['date'], "%Y-%m-%d"), reverse=True) + if sorted_data: + save_json(sorted_data) diff --git a/app/main.py b/app/main.py index fb9c90c..3af3f81 100755 --- a/app/main.py +++ b/app/main.py @@ -3143,6 +3143,35 @@ async def get_all_analysts(data:AnalystId, api_key: str = Security(get_api_key)) headers={"Content-Encoding": "gzip"} ) +@app.get("/analyst-flow") +async def get_all_analysts(api_key: str = Security(get_api_key)): + + cache_key = f"analyst-flow" + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"}) + + try: + with open(f"json/analyst/flow-data.json", 'rb') as file: + res = orjson.loads(file.read()) + except: + res = [] + + data = orjson.dumps(res) + compressed_data = gzip.compress(data) + + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 60*5) + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + @app.post("/wiim") async def get_wiim(data:TickerData, api_key: str = Security(get_api_key)): ticker = data.ticker.upper() diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index edfa0e7..2f8dc12 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -215,6 +215,7 @@ def run_analyst_rating(): run_command(["python3", "cron_analyst_insight.py"]) run_command(["python3", "cron_analyst_db.py"]) run_command(["python3", "cron_analyst_ticker.py"]) + run_command(["python3", "cron_analyst_flow.py"]) def run_market_moods(): week = datetime.today().weekday()