diff --git a/app/cron_implied_volatility.py b/app/cron_implied_volatility.py new file mode 100644 index 0000000..9979a0e --- /dev/null +++ b/app/cron_implied_volatility.py @@ -0,0 +1,95 @@ +import ujson +import asyncio +import aiohttp +import sqlite3 +from datetime import datetime,timedelta +from tqdm import tqdm +import pandas as pd +import time + +from dotenv import load_dotenv +import os +load_dotenv() +api_key = os.getenv('NASDAQ_API_KEY') + + +# Get today's date +today = datetime.today() +# Calculate the date six months ago +dates = [today - timedelta(days=i) for i in range(365)] #six months ago +date_str = ','.join(date.strftime('%Y-%m-%d') for date in dates) + +async def save_json(symbol, data): + with open(f"json/implied-volatility/companies/{symbol}.json", 'w') as file: + ujson.dump(data, file) + + +# Function to filter the list +def filter_past_six_months(data): + filtered_data = [] + for entry in data: + entry_date = datetime.strptime(entry['date'], '%Y-%m-%d') + if entry_date >= six_months_ago: + filtered_data.append(entry) + sorted_data = sorted(filtered_data, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d')) + return sorted_data + +async def get_data(ticker_list): + ticker_str = ','.join(ticker_list) + async with aiohttp.ClientSession() as session: + url = url = f"https://data.nasdaq.com/api/v3/datatables/ORATS/OPT?date={date_str}&ticker={ticker_str}&api_key={api_key}" + async with session.get(url) as response: + if response.status == 200: + res = await response.json() + data = res['datatable']['data'] + columns = res['datatable']['columns'] + return data, columns + else: + return [], [] + + +async def run(): + con = sqlite3.connect('stocks.db') + etf_con = sqlite3.connect('etf.db') + + cursor = con.cursor() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute("SELECT DISTINCT symbol FROM stocks") + stocks_symbols = [row[0] for row in cursor.fetchall()] + + etf_cursor = etf_con.cursor() + etf_cursor.execute("PRAGMA journal_mode = wal") + etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") + etf_symbols = [row[0] for row in etf_cursor.fetchall()] + + + + total_symbols = stocks_symbols+etf_symbols + + chunk_size = len(total_symbols) // 70 # Divide the list into N chunks + chunks = [total_symbols[i:i + chunk_size] for i in range(0, len(total_symbols), chunk_size)] + + for chunk in tqdm(chunks): + data, columns = await get_data(chunk) + transformed_data = [] + for element in tqdm(data): + # Assuming the number of columns matches the length of each element in `data` + transformed_data.append({columns[i]["name"]: element[i] for i in range(len(columns))}) + + + for symbol in chunk: + try: + filtered_data = [item for item in transformed_data if symbol == item['ticker']] + sorted_data = sorted(filtered_data, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d')) + if len(sorted_data) > 0: + await save_json(symbol, sorted_data) + except Exception as e: + print(e) + + con.close() + etf_con.close() + +try: + asyncio.run(run()) +except Exception as e: + print(e) \ No newline at end of file diff --git a/app/main.py b/app/main.py index 5ed1339..1589f41 100755 --- a/app/main.py +++ b/app/main.py @@ -2983,4 +2983,34 @@ async def get_borrowed_share(data:TickerData): redis_client.set(cache_key, ujson.dumps(res)) redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day - return res \ No newline at end of file + return res + +@app.post("/implied-volatility") +async def get_clinical_trial(data:TickerData): + ticker = data.ticker.upper() + cache_key = f"implied-volatility-{ticker}" + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + + try: + with open(f"json/implied-volatility/companies/{ticker}.json", 'r') as file: + res = ujson.load(file) + except: + res = [] + + data = ujson.dumps(res).encode('utf-8') + compressed_data = gzip.compress(data) + + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index d030835..f9f94e5 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -362,6 +362,15 @@ def run_borrowed_share(): ] subprocess.run(command) +def run_implied_volatility(): + subprocess.run(["python3", "cron_implied_volatility.py"]) + command = [ + "sudo", "rsync", "-avz", "-e", "ssh", + "/root/backend/app/json/implied-volatility", + f"root@{useast_ip_address}:/root/backend/app/json" + ] + subprocess.run(command) + # Create functions to run each schedule in a separate thread def run_threaded(job_func): job_thread = threading.Thread(target=job_func) @@ -384,6 +393,7 @@ schedule.every().day.at("10:30").do(run_threaded, run_sec_filings).tag('sec_fili schedule.every().day.at("11:00").do(run_threaded, run_executive).tag('executive_job') schedule.every().day.at("11:30").do(run_threaded, run_retail_volume).tag('retail_volume_job') schedule.every().day.at("11:45").do(run_threaded, run_clinical_trial).tag('clinical_trial_job') +schedule.every().day.at("12:00").do(run_threaded, run_implied_volatility).tag('implied_volatility_job') schedule.every().day.at("13:30").do(run_threaded, run_stockdeck).tag('stockdeck_job')