diff --git a/app/cron_clinical_trial.py b/app/cron_clinical_trial.py new file mode 100644 index 0000000..7a03743 --- /dev/null +++ b/app/cron_clinical_trial.py @@ -0,0 +1,81 @@ +from pytrials.client import ClinicalTrials +import ujson +import asyncio +import aiohttp +import sqlite3 +from tqdm import tqdm +from datetime import datetime,timedelta +import os +from dotenv import load_dotenv +from concurrent.futures import ThreadPoolExecutor +import pandas as pd +import time + + +ct = ClinicalTrials() + +async def get_data(company_name): + try: + get_ct_data = ct.get_study_fields( + search_expr=f"{company_name}", + fields=["Study Results","Funder Type","Start Date", "Completion Date","Study Status","Study Title", 'Phases', 'Brief Summary', 'Age','Sex', 'Enrollment','Study Type','Sponsor','Study URL','NCT Number'], + max_studies=1000, + ) + df = pd.DataFrame.from_records(get_ct_data[1:], columns=get_ct_data[0]) + df['Completion Date'] = pd.to_datetime(df['Completion Date'],errors='coerce') + df_sorted = df.sort_values(by='Completion Date', ascending=False) + # Convert 'Completion Date' back to string format + df_sorted['Completion Date'] = df_sorted['Completion Date'].apply(lambda x: x.strftime('%Y-%m-%d') if pd.notnull(x) else None) + df_sorted['Phases'] = df_sorted['Phases'].replace('PHASE2|PHASE3', 'Phase 2/3') + df_sorted['Phases'] = df_sorted['Phases'].replace('PHASE1|PHASE2', 'Phase 1/2') + df_sorted['Phases'] = df_sorted['Phases'].replace('EARLY_PHASE1', 'Phase 1') + + df_sorted['Study Status'] = df_sorted['Study Status'].replace('ACTIVE_NOT_RECRUITING', 'Active') + df_sorted['Study Status'] = df_sorted['Study Status'].replace('NOT_YET_RECRUITING', 'Active') + df_sorted['Study Status'] = df_sorted['Study Status'].replace('UNKNOWN', '-') + + data = df_sorted.to_dict('records') + return data + except Exception as e: + print(f"Error fetching data for {ticker}: {e}") + return [] + +async def save_json(symbol, data): + # Use async file writing to avoid blocking the event loop + loop = asyncio.get_event_loop() + path = f"json/clinical-trial/companies/{symbol}.json" + await loop.run_in_executor(None, ujson.dump, data, open(path, 'w')) + +async def process_ticker(symbol, name): + data = await get_data(name) + if len(data)>0: + await save_json(symbol, data) + +async def run(): + con = sqlite3.connect('stocks.db') + + cursor = con.cursor() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute("SELECT DISTINCT symbol, name FROM stocks WHERE industry = 'Biotechnology' AND symbol NOT LIKE '%.%'") + company_data = [{'symbol': row[0], 'name': row[1]} for row in cursor.fetchall()] + con.close() + #test mode + #company_data = [{'symbol': 'DSGN', 'name': 'Design Therapeutics, Inc.'}] + + async with aiohttp.ClientSession() as session: + tasks = [] + for item in company_data: + tasks.append(process_ticker(item['symbol'], item['name'])) + + # Run tasks concurrently in batches to avoid too many open connections + batch_size = 10 # Adjust based on your system's capacity + for i in tqdm(range(0, len(tasks), batch_size)): + batch = tasks[i:i + batch_size] + await asyncio.gather(*batch) + + +if __name__ == "__main__": + try: + asyncio.run(run()) + except Exception as e: + print(f"An error occurred: {e}") \ No newline at end of file diff --git a/app/main.py b/app/main.py index bf7f35e..289cbca 100755 --- a/app/main.py +++ b/app/main.py @@ -2901,4 +2901,34 @@ async def get_market_maker(data:TickerData): redis_client.set(cache_key, ujson.dumps(res)) redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day - return res \ No newline at end of file + return res + +@app.post("/clinical-trial") +async def get_clinical_trial(data:TickerData): + ticker = data.ticker.upper() + cache_key = f"clinical-trial-{ticker}" + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + + try: + with open(f"json/clinical-trial/companies/{ticker}.json", 'r') as file: + res = ujson.load(file) + except: + res = [] + + data = ujson.dumps(res).encode('utf-8') + compressed_data = gzip.compress(data) + + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) \ No newline at end of file diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index 5e03b48..3d48dad 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -331,6 +331,17 @@ def run_ownership_stats(): ] subprocess.run(command) +def run_clinical_trial(): + week = datetime.today().weekday() + if week <= 5: + subprocess.run(["python3", "cron_clinical_trial.py"]) + command = [ + "sudo", "rsync", "-avz", "-e", "ssh", + "/root/backend/app/json/clinical-trial", + f"root@{useast_ip_address}:/root/backend/app/json" + ] + subprocess.run(command) + # Create functions to run each schedule in a separate thread def run_threaded(job_func): job_thread = threading.Thread(target=job_func) @@ -352,6 +363,7 @@ schedule.every().day.at("10:15").do(run_threaded, run_share_statistics).tag('sha schedule.every().day.at("10:30").do(run_threaded, run_sec_filings).tag('sec_filings_job') schedule.every().day.at("11:00").do(run_threaded, run_executive).tag('executive_job') schedule.every().day.at("11:30").do(run_threaded, run_retail_volume).tag('retail_volume_job') +schedule.every().day.at("11:45").do(run_threaded, run_clinical_trial).tag('clinical_trial_job') schedule.every().day.at("13:30").do(run_threaded, run_stockdeck).tag('stockdeck_job')