backend/app/cron_fda_calendar.py
2024-06-27 21:57:25 +02:00

86 lines
2.8 KiB
Python

import ujson
import asyncio
import aiohttp
import os
from dotenv import load_dotenv
import requests
# Load environment variables
load_dotenv()
benzinga_api_key = os.getenv('BENZINGA_API_KEY_EXTRA')
fmp_api_key = os.getenv('FMP_API_KEY')
url = "https://api.benzinga.com/api/v2.1/calendar/fda"
querystring = {"token":benzinga_api_key}
headers = {"accept": "application/json"}
async def save_json(data):
with open(f"json/fda-calendar/data.json", 'w') as file:
ujson.dump(data, file)
async def get_quote_of_stocks(ticker):
async with aiohttp.ClientSession() as session:
url = f"https://financialmodelingprep.com/api/v3/quote/{ticker}?apikey={fmp_api_key}"
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
return {}
async def get_data():
try:
response = requests.request("GET", url, headers=headers, params=querystring)
data = ujson.loads(response.text)['fda']
# New list to store the extracted information
extracted_data = []
# Iterate over the original data to extract required fields
for entry in data:
try:
symbol = entry['companies'][0]['securities'][0]['symbol']
name = entry['companies'][0]['name']
drug_name = entry['drug']['name'].capitalize()
indication = entry['drug']['indication_symptom']
outcome = entry['outcome']
source_type = entry['source_type']
status = entry['status']
target_date = entry['target_date']
changes_percentage = round((await get_quote_of_stocks(symbol))[0]['changesPercentage'] ,2)
# Create a new dictionary with the extracted information
new_entry = {
'symbol': symbol,
'name': name,
'drugName': drug_name,
'indication': indication,
'outcome': outcome,
'sourceType': source_type,
'status': status,
'targetDate': target_date,
'changesPercentage': changes_percentage
}
# Append the new dictionary to the new list
extracted_data.append(new_entry)
except:
pass
# Output the new list
return extracted_data
except Exception as e:
print(f"Error fetching data: {e}")
return []
async def run():
data = await get_data()
await save_json(data)
if __name__ == "__main__":
try:
asyncio.run(run())
except Exception as e:
print(f"An error occurred: {e}")