add etf sector weighting
This commit is contained in:
parent
8f2cbae8ee
commit
2de3252a06
72
app/cron_etf_sector.py
Normal file
72
app/cron_etf_sector.py
Normal file
@ -0,0 +1,72 @@
|
||||
from datetime import datetime, timedelta
|
||||
import ujson
|
||||
import time
|
||||
import sqlite3
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import random
|
||||
from tqdm import tqdm
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
|
||||
load_dotenv()
|
||||
api_key = os.getenv('FMP_API_KEY')
|
||||
directory_path = "json/etf-sector"
|
||||
|
||||
|
||||
|
||||
async def get_data(session, symbol):
|
||||
url = f"https://financialmodelingprep.com/api/v3/etf-sector-weightings/{symbol}?apikey={api_key}"
|
||||
res_list = []
|
||||
try:
|
||||
async with session.get(url) as response:
|
||||
data = await response.json()
|
||||
if len(data) > 0:
|
||||
for item in data:
|
||||
try:
|
||||
if 'sector' in item and 'weightPercentage' in item:
|
||||
res_list.append({'sector': item['sector'], 'weightPercentage': round(float(item['weightPercentage'].replace("%","")),2)})
|
||||
except:
|
||||
pass
|
||||
res_list = sorted(res_list, key=lambda x: x['weightPercentage'], reverse=True)[0:5]
|
||||
if res_list:
|
||||
save_json(res_list, symbol) # Removed await since it's not async
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing {symbol}: {str(e)}")
|
||||
|
||||
def save_json(data, symbol):
|
||||
os.makedirs(directory_path, exist_ok=True)
|
||||
file_path = f"{directory_path}/{symbol}.json"
|
||||
try:
|
||||
with open(file_path, 'w') as file: # Changed to text mode since we're using ujson
|
||||
ujson.dump(data, file) # Added the file argument
|
||||
except Exception as e:
|
||||
print(f"Error saving JSON for {symbol}: {str(e)}")
|
||||
|
||||
async def run():
|
||||
con = sqlite3.connect('etf.db')
|
||||
cursor = con.cursor()
|
||||
cursor.execute("PRAGMA journal_mode = wal")
|
||||
cursor.execute("SELECT DISTINCT symbol FROM etfs")
|
||||
symbols = [row[0] for row in cursor.fetchall()]
|
||||
con.close()
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
tasks = []
|
||||
i = 0
|
||||
for symbol in tqdm(symbols):
|
||||
tasks.append(get_data(session, symbol))
|
||||
i += 1
|
||||
if i % 400 == 0:
|
||||
await asyncio.gather(*tasks)
|
||||
tasks = []
|
||||
print('sleeping mode: ', i)
|
||||
await asyncio.sleep(60) # Pause for 60 seconds
|
||||
|
||||
if tasks:
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
if __name__ == "__main__":
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(run())
|
||||
39
app/main.py
39
app/main.py
@ -2017,33 +2017,36 @@ async def etf_holdings(data: TickerData, api_key: str = Security(get_api_key)):
|
||||
)
|
||||
|
||||
|
||||
@app.post("/etf-country-weighting")
|
||||
|
||||
@app.post("/etf-sector-weighting")
|
||||
async def etf_holdings(data: TickerData, api_key: str = Security(get_api_key)):
|
||||
ticker = data.ticker.upper()
|
||||
cache_key = f"etf-country-weighting-{ticker}"
|
||||
|
||||
cache_key = f"etf-sector-weighting-{ticker}"
|
||||
cached_result = redis_client.get(cache_key)
|
||||
if cached_result:
|
||||
return orjson.loads(cached_result)
|
||||
return StreamingResponse(
|
||||
io.BytesIO(cached_result),
|
||||
media_type="application/json",
|
||||
headers={"Content-Encoding": "gzip"}
|
||||
)
|
||||
|
||||
|
||||
query_template = f"SELECT country_weightings from etfs WHERE symbol = ?"
|
||||
df = pd.read_sql_query(query_template, etf_con, params=(ticker,))
|
||||
try:
|
||||
res = orjson.loads(df['country_weightings'].iloc[0])
|
||||
for item in res:
|
||||
if item["weightPercentage"] != 'NaN%':
|
||||
item["weightPercentage"] = float(item["weightPercentage"].rstrip('%'))
|
||||
else:
|
||||
item["weightPercentage"] = 0
|
||||
|
||||
# Sort the list by weightPercentage in descending order
|
||||
res = sorted(res, key=lambda x: x["weightPercentage"], reverse=True)
|
||||
with open(f"json/etf-sector/{ticker}.json", 'rb') as file:
|
||||
res = orjson.loads(file.read())
|
||||
except:
|
||||
res = []
|
||||
|
||||
redis_client.set(cache_key, orjson.dumps(res), 3600*3600) # Set cache expiration time to 1 hour
|
||||
return res
|
||||
data = orjson.dumps(res)
|
||||
compressed_data = gzip.compress(data)
|
||||
|
||||
redis_client.set(cache_key, compressed_data)
|
||||
redis_client.expire(cache_key,3600*3600)
|
||||
|
||||
return StreamingResponse(
|
||||
io.BytesIO(compressed_data),
|
||||
media_type="application/json",
|
||||
headers={"Content-Encoding": "gzip"}
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user