add more ta rules

This commit is contained in:
MuslemRahimi 2024-09-09 23:53:19 +02:00
parent 8bfeda405c
commit 30a4f1c23d
2 changed files with 63 additions and 84 deletions

View File

@ -11,26 +11,31 @@ from aiohttp import TCPConnector
load_dotenv() load_dotenv()
api_key = os.getenv('FMP_API_KEY') api_key = os.getenv('FMP_API_KEY')
def date_range_days(steps=20): # Rate limiting
end_date = datetime.utcnow() MAX_REQUESTS_PER_MINUTE = 100
start_date = end_date - timedelta(days=180) # 6 months ago request_semaphore = asyncio.Semaphore(MAX_REQUESTS_PER_MINUTE)
while start_date < end_date: last_request_time = datetime.min
next_date = start_date + timedelta(days=steps)
yield start_date.strftime("%Y-%m-%d"), min(next_date, end_date).strftime("%Y-%m-%d") async def fetch_data(session, url):
start_date = next_date global last_request_time
async with request_semaphore:
# Ensure at least 60 seconds between batches of MAX_REQUESTS_PER_MINUTE
current_time = datetime.now()
if (current_time - last_request_time).total_seconds() < 60:
await asyncio.sleep(60 - (current_time - last_request_time).total_seconds())
last_request_time = datetime.now()
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
print(f"Error status {response.status} for URL: {url}")
return []
except Exception as e:
print(f"Error fetching data from {url}: {e}")
return []
async def get_data_batch(session, symbol, url_list):
tasks = []
for url in url_list:
tasks.append(fetch_data(session, url))
results = await asyncio.gather(*tasks)
data = []
for result in results:
if result:
data.extend(result)
return data
def get_existing_data(symbol, interval): def get_existing_data(symbol, interval):
file_path = f"json/export/price/{interval}/{symbol}.json" file_path = f"json/export/price/{interval}/{symbol}.json"
if os.path.exists(file_path): if os.path.exists(file_path):
@ -38,65 +43,37 @@ def get_existing_data(symbol, interval):
return ujson.load(file) return ujson.load(file)
return [] return []
def get_missing_date_ranges(existing_data, start_date, end_date):
existing_dates = {item['date'].split()[0] for item in existing_data} # Use a set for O(1) lookup time
start_date = datetime.strptime(start_date, "%Y-%m-%d")
end_date = datetime.strptime(end_date, "%Y-%m-%d")
missing_ranges = []
current_date = start_date
range_start = None
while current_date <= end_date:
date_str = current_date.strftime("%Y-%m-%d")
if date_str not in existing_dates:
if range_start is None:
range_start = current_date
else:
# If we found an existing date, and we have a start for a missing range, add it
if range_start:
missing_ranges.append((range_start.strftime("%Y-%m-%d"), date_str))
range_start = None
current_date += timedelta(days=1)
# If the loop ends and we still have an open range, add it
if range_start:
missing_ranges.append((range_start.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")))
return missing_ranges
async def fetch_data(session, url):
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
return []
except Exception as e:
print(f"Error fetching data from {url}: {e}")
return []
async def get_data(session, symbol, time_period): async def get_data(session, symbol, time_period):
steps = 20 if time_period == '30min' else 40
existing_data = get_existing_data(symbol, time_period) existing_data = get_existing_data(symbol, time_period)
res_list = existing_data if not existing_data:
urls_to_fetch = [] return await fetch_all_data(session, symbol, time_period)
last_date = datetime.strptime(existing_data[-1]['date'], "%Y-%m-%d %H:%M:%S")
current_date = datetime.utcnow()
if (current_date - last_date).days < 1:
return # Data is up to date, skip to next symbol
# Fetch only missing data
start_date = (last_date + timedelta(days=1)).strftime("%Y-%m-%d")
end_date = current_date.strftime("%Y-%m-%d")
url = f"https://financialmodelingprep.com/api/v3/historical-chart/{time_period}/{symbol}?serietype=bar&extend=false&from={start_date}&to={end_date}&apikey={api_key}"
new_data = await fetch_data(session, url)
if new_data:
existing_data.extend(new_data)
existing_data.sort(key=lambda x: x['date'])
await save_json(symbol, existing_data, time_period)
for start_date, end_date in date_range_days(steps=steps): async def fetch_all_data(session, symbol, time_period):
missing_ranges = get_missing_date_ranges(existing_data, start_date, end_date) end_date = datetime.utcnow()
for missing_start, missing_end in missing_ranges: start_date = end_date - timedelta(days=180)
url = f"https://financialmodelingprep.com/api/v3/historical-chart/{time_period}/{symbol}?serietype=bar&extend=false&from={missing_start}&to={missing_end}&apikey={api_key}" url = f"https://financialmodelingprep.com/api/v3/historical-chart/{time_period}/{symbol}?serietype=bar&extend=false&from={start_date.strftime('%Y-%m-%d')}&to={end_date.strftime('%Y-%m-%d')}&apikey={api_key}"
urls_to_fetch.append(url)
data = await fetch_data(session, url)
if urls_to_fetch: if data:
fetched_data = await get_data_batch(session, symbol, urls_to_fetch) data.sort(key=lambda x: x['date'])
res_list.extend(fetched_data) await save_json(symbol, data, time_period)
if res_list:
current_datetime = datetime.utcnow()
filtered_data = {item['date']: item for item in res_list if datetime.strptime(item['date'], "%Y-%m-%d %H:%M:%S") <= current_datetime}
sorted_data = sorted(filtered_data.values(), key=lambda x: x['date'], reverse=False)
await save_json(symbol, sorted_data, time_period)
async def save_json(symbol, data, interval): async def save_json(symbol, data, interval):
os.makedirs(f"json/export/price/{interval}", exist_ok=True) os.makedirs(f"json/export/price/{interval}", exist_ok=True)
@ -104,7 +81,6 @@ async def save_json(symbol, data, interval):
ujson.dump(data, file) ujson.dump(data, file)
async def process_symbol(session, symbol): async def process_symbol(session, symbol):
# Process both 30min and 60min intervals
await get_data(session, symbol, '30min') await get_data(session, symbol, '30min')
await get_data(session, symbol, '1hour') await get_data(session, symbol, '1hour')
@ -125,14 +101,13 @@ async def run():
total_symbols = stock_symbols + etf_symbols total_symbols = stock_symbols + etf_symbols
# Use aiohttp connector with a higher limit for performance connector = TCPConnector(limit=MAX_REQUESTS_PER_MINUTE)
connector = TCPConnector(limit=100)
async with aiohttp.ClientSession(connector=connector) as session: async with aiohttp.ClientSession(connector=connector) as session:
for i, symbol in enumerate(tqdm(total_symbols), 1): tasks = [process_symbol(session, symbol) for symbol in total_symbols]
await process_symbol(session, symbol) for i, _ in enumerate(tqdm(asyncio.as_completed(tasks), total=len(tasks)), 1):
if i % 100 == 0: if i % MAX_REQUESTS_PER_MINUTE == 0:
print(f'Sleeping after processing {i} symbols') print(f'Processed {i} symbols')
await asyncio.sleep(60) await asyncio.sleep(60) # Sleep for 60 seconds after every MAX_REQUESTS_PER_MINUTE symbols
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(run()) asyncio.run(run())

View File

@ -26,9 +26,13 @@ class TASignals:
def run(self): def run(self):
ta_df = pd.DataFrame() ta_df = pd.DataFrame()
ta_df['sma_20'] = sma_indicator(self.data["Close"], window=20)
ta_df['sma_50'] = sma_indicator(self.data["Close"], window=50) ta_df['sma_50'] = sma_indicator(self.data["Close"], window=50)
ta_df['sma_100'] = sma_indicator(self.data["Close"], window=100)
ta_df['sma_200'] = sma_indicator(self.data["Close"], window=200) ta_df['sma_200'] = sma_indicator(self.data["Close"], window=200)
ta_df['ema_20'] = ema_indicator(self.data['Close'], window=20)
ta_df['ema_50'] = ema_indicator(self.data['Close'], window=50) ta_df['ema_50'] = ema_indicator(self.data['Close'], window=50)
ta_df['sma_100'] = sma_indicator(self.data["Close"], window=100)
ta_df['ema_200'] = sma_indicator(self.data['Close'], window=200) ta_df['ema_200'] = sma_indicator(self.data['Close'], window=200)
ta_df['rsi'] = rsi(self.data['Close'], window=14) ta_df['rsi'] = rsi(self.data['Close'], window=14)
ta_df['stoch_rsi'] = stochrsi_k(self.data['Close'], window=14, smooth1 = 3, smooth2 =3)*100 ta_df['stoch_rsi'] = stochrsi_k(self.data['Close'], window=14, smooth1 = 3, smooth2 =3)*100