From 06f8a3ec79aa76bf5eeaa223a750720fdc2657fa Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Fri, 27 Dec 2024 12:00:42 +0100 Subject: [PATCH] add sector flow cron job --- app/cron_dark_pool_flow.py | 2 - app/cron_sector_flow.py | 90 ++++++++++++++++++++++++++++++++++++++ app/main.py | 52 +++++++++++++++++++++- 3 files changed, 140 insertions(+), 4 deletions(-) create mode 100644 app/cron_sector_flow.py diff --git a/app/cron_dark_pool_flow.py b/app/cron_dark_pool_flow.py index 96cb6c9..225799e 100644 --- a/app/cron_dark_pool_flow.py +++ b/app/cron_dark_pool_flow.py @@ -95,8 +95,6 @@ def main(): # Fetch new data from the API data = get_data() - print(data[0]) - res = [] for item in data: symbol = item['ticker'] diff --git a/app/cron_sector_flow.py b/app/cron_sector_flow.py new file mode 100644 index 0000000..cd37209 --- /dev/null +++ b/app/cron_sector_flow.py @@ -0,0 +1,90 @@ +import os +import pandas as pd +import orjson +from dotenv import load_dotenv +import sqlite3 +from datetime import datetime +import pytz +import requests # Add missing import + + +load_dotenv() +api_key = os.getenv('UNUSUAL_WHALES_API_KEY') +headers = { + "Accept": "application/json, text/plain", + "Authorization": api_key +} + + +def save_json(data): + directory = "json/sector-flow" + os.makedirs(directory, exist_ok=True) # Ensure the directory exists + with open(f"{directory}/data.json", 'wb') as file: # Use binary mode for orjson + file.write(orjson.dumps(data)) + +def safe_round(value): + """Attempt to convert a value to float and round it. Return the original value if not possible.""" + try: + return round(float(value), 2) + except (ValueError, TypeError): + return value + +def calculate_neutral_premium(data_item): + """Calculate the neutral premium for a data item.""" + call_premium = float(data_item['call_premium']) + put_premium = float(data_item['put_premium']) + bearish_premium = float(data_item['bearish_premium']) + bullish_premium = float(data_item['bullish_premium']) + + total_premiums = bearish_premium + bullish_premium + observed_premiums = call_premium + put_premium + neutral_premium = observed_premiums - total_premiums + + return safe_round(neutral_premium) + +def get_sector_data(): + try: + url ="https://api.unusualwhales.com/api/market/sector-etfs" + response = requests.get(url, headers=headers) + data = response.json().get('data',[]) + res_list = [] + processed_data = [] + for item in data: + symbol = item['ticker'] + + + bearish_premium = float(item['bearish_premium']) + bullish_premium = float(item['bullish_premium']) + neutral_premium = calculate_neutral_premium(item) + + new_item = { + key if key != 'full_name' else 'name': safe_round(value) + for key, value in item.items() + if key != 'in_out_flow' + } + new_item['premium_ratio'] = [ + safe_round(bearish_premium), + neutral_premium, + safe_round(bullish_premium) + ] + + with open(f"json/quote/{symbol}.json") as file: + quote_data = orjson.loads(file.read()) + new_item['price'] = round(quote_data.get('price',0),2) + new_item['changesPercentage'] = round(quote_data.get('changesPercentage',0),2) + + processed_data.append(new_item) + + return processed_data + except Exception as e: + print(e) + return [] + +def main(): + sector_data = get_sector_data() + + if len(sector_data) > 0: + save_json(sector_data) + +if __name__ == '__main__': + main() diff --git a/app/main.py b/app/main.py index d3140ed..2a1f21e 100755 --- a/app/main.py +++ b/app/main.py @@ -2954,17 +2954,36 @@ async def get_options_flow_feed(api_key: str = Security(get_api_key)): @app.get("/dark-pool-flow-feed") async def get_dark_pool_feed(api_key: str = Security(get_api_key)): - directory = "json/dark-pool/historical-flow" - res_list = load_latest_json(directory) + cache_key = f"dark-pooll-flow-feed" + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + try: + directory = "json/dark-pool/historical-flow" + res_list = load_latest_json(directory) + res_list = res_list[0:1000] + except Ex: + res_list = [] + data = orjson.dumps(res_list) compressed_data = gzip.compress(data) + + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key,60) + return StreamingResponse( io.BytesIO(compressed_data), media_type="application/json", headers={"Content-Encoding": "gzip"} ) + + @app.get("/options-zero-dte") async def get_options_flow_feed(api_key: str = Security(get_api_key)): try: @@ -4200,6 +4219,35 @@ async def get_statistics(data: TickerData, api_key: str = Security(get_api_key)) headers={"Content-Encoding": "gzip"} ) +@app.get("/sector-flow") +async def get_sector_flow(api_key: str = Security(get_api_key)): + cache_key = f"sector-flow" + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + + try: + with open(f"json/sector-flow/data.json", 'rb') as file: + res = orjson.loads(file.read()) + except: + res = {} + + data = orjson.dumps(res) + compressed_data = gzip.compress(data) + + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key,5*60) + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + @app.get("/newsletter") async def get_newsletter():