refactor price alert to fastapi

This commit is contained in:
MuslemRahimi 2024-11-25 19:36:22 +01:00
parent 7d4f1c5af1
commit 42f9db45d3
3 changed files with 45 additions and 53 deletions

View File

@ -12,6 +12,7 @@ import numpy as np
import pandas as pd import pandas as pd
import orjson import orjson
import aiohttp import aiohttp
import aiofiles
import redis import redis
from dotenv import load_dotenv from dotenv import load_dotenv
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
@ -1209,6 +1210,8 @@ async def get_indicator(data: IndicatorListData, api_key: str = Security(get_api
headers={"Content-Encoding": "gzip"} headers={"Content-Encoding": "gzip"}
) )
@app.post("/get-watchlist") @app.post("/get-watchlist")
async def get_watchlist(data: GetWatchList, api_key: str = Security(get_api_key)): async def get_watchlist(data: GetWatchList, api_key: str = Security(get_api_key)):
data = data.dict() data = data.dict()
@ -1281,6 +1284,48 @@ async def get_watchlist(data: GetWatchList, api_key: str = Security(get_api_key)
headers={"Content-Encoding": "gzip"} headers={"Content-Encoding": "gzip"}
) )
@app.post("/get-price-alert")
async def get_price_alert(data: UserId, api_key: str = Security(get_api_key)):
user_id = data.dict()['userId']
# Fetch all alerts for the user in a single database call
result = pb.collection("priceAlert").get_full_list(query_params={"filter": f"user='{user_id}' && triggered=false"})
# Function to read JSON file asynchronously
async def fetch_quote_data(item):
try:
async with aiofiles.open(f"json/quote/{item.symbol}.json", mode='r') as file:
quote_data = orjson.loads(await file.read())
return {
'symbol': item.symbol,
'name': item.name,
'assetType': item.asset_type,
'targetPrice': item.target_price,
'priceWhenCreated': item.price_when_created,
'price': quote_data.get("price"),
'changesPercentage': quote_data.get("changesPercentage"),
'volume': quote_data.get("volume"),
}
except Exception as e:
print(f"Error processing {item.symbol}: {e}")
return None
# Run all fetch_quote_data tasks concurrently
tasks = [fetch_quote_data(item) for item in result]
res_list = [res for res in await asyncio.gather(*tasks) if res]
# Serialize and compress the response data
res = orjson.dumps(res_list)
compressed_data = gzip.compress(res)
return StreamingResponse(
io.BytesIO(compressed_data),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)
def process_option_activity(item): def process_option_activity(item):
item['put_call'] = 'Calls' if item['put_call'] == 'CALL' else 'Puts' item['put_call'] = 'Calls' if item['put_call'] == 'CALL' else 'Puts'

View File

@ -55,11 +55,6 @@ const corsMiddleware = (request, reply, done) => {
fastify.addHook("onRequest", corsMiddleware); fastify.addHook("onRequest", corsMiddleware);
fastify.register(require("./get-price-alert/server"), { pb, fs, path });
//fastify.register(require('./create-comment/server'), { pb });
function wait(ms) { function wait(ms) {
var start = new Date().getTime(); var start = new Date().getTime();
var end = start; var end = start;

View File

@ -1,48 +0,0 @@
module.exports = function (fastify, opts, done) {
const pb = opts.pb;
const fs = opts.fs
const path = opts.path
fastify.post('/get-price-alert', async (request, reply) => {
const data = request.body;
const userId = data?.userId;
let output;
try {
output = await pb.collection("priceAlert").getFullList({
filter: `user="${userId}" && triggered=false`
});
// Read the JSON file for each symbol in the output list
const itemsWithQuotes = await Promise.all(output.map(async (item) => {
const symbol = item.symbol;
try {
const filePath = path.join(__dirname, `../../app/json/quote/${symbol}.json`);
const fileData = fs.readFileSync(filePath, 'utf8');
const jsonData = JSON.parse(fileData);
// Extract only the desired fields from the JSON data
const { changesPercentage, price, volume } = jsonData;
return { ...item, changesPercentage, price, volume};
} catch (error) {
// Handle errors if file reading or parsing fails
console.error(`Error reading or parsing JSON for symbol ${symbol}: ${error}`);
return item;
}
}));
reply.send({ items: itemsWithQuotes });
} catch (e) {
console.error(e);
reply.send({ items: [] });
//reply.status(500).send({ error: "Internal Server Error" });
}
});
done();
};