From 30a4f1c23db67a38ae11c82739f3a251e70742ce Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Mon, 9 Sep 2024 23:53:19 +0200 Subject: [PATCH] add more ta rules --- app/cron_export_price.py | 143 ++++++++++++++++----------------------- app/ta_signal.py | 4 ++ 2 files changed, 63 insertions(+), 84 deletions(-) diff --git a/app/cron_export_price.py b/app/cron_export_price.py index 61cbae1..4496f58 100644 --- a/app/cron_export_price.py +++ b/app/cron_export_price.py @@ -11,26 +11,31 @@ from aiohttp import TCPConnector load_dotenv() api_key = os.getenv('FMP_API_KEY') -def date_range_days(steps=20): - end_date = datetime.utcnow() - start_date = end_date - timedelta(days=180) # 6 months ago - while start_date < end_date: - next_date = start_date + timedelta(days=steps) - yield start_date.strftime("%Y-%m-%d"), min(next_date, end_date).strftime("%Y-%m-%d") - start_date = next_date +# Rate limiting +MAX_REQUESTS_PER_MINUTE = 100 +request_semaphore = asyncio.Semaphore(MAX_REQUESTS_PER_MINUTE) +last_request_time = datetime.min + +async def fetch_data(session, url): + global last_request_time + async with request_semaphore: + # Ensure at least 60 seconds between batches of MAX_REQUESTS_PER_MINUTE + current_time = datetime.now() + if (current_time - last_request_time).total_seconds() < 60: + await asyncio.sleep(60 - (current_time - last_request_time).total_seconds()) + last_request_time = datetime.now() + + try: + async with session.get(url) as response: + if response.status == 200: + return await response.json() + else: + print(f"Error status {response.status} for URL: {url}") + return [] + except Exception as e: + print(f"Error fetching data from {url}: {e}") + return [] -async def get_data_batch(session, symbol, url_list): - tasks = [] - for url in url_list: - tasks.append(fetch_data(session, url)) - - results = await asyncio.gather(*tasks) - data = [] - for result in results: - if result: - data.extend(result) - return data - def get_existing_data(symbol, interval): file_path = f"json/export/price/{interval}/{symbol}.json" if os.path.exists(file_path): @@ -38,65 +43,37 @@ def get_existing_data(symbol, interval): return ujson.load(file) return [] -def get_missing_date_ranges(existing_data, start_date, end_date): - existing_dates = {item['date'].split()[0] for item in existing_data} # Use a set for O(1) lookup time - start_date = datetime.strptime(start_date, "%Y-%m-%d") - end_date = datetime.strptime(end_date, "%Y-%m-%d") - - missing_ranges = [] - current_date = start_date - range_start = None - - while current_date <= end_date: - date_str = current_date.strftime("%Y-%m-%d") - if date_str not in existing_dates: - if range_start is None: - range_start = current_date - else: - # If we found an existing date, and we have a start for a missing range, add it - if range_start: - missing_ranges.append((range_start.strftime("%Y-%m-%d"), date_str)) - range_start = None - current_date += timedelta(days=1) - - # If the loop ends and we still have an open range, add it - if range_start: - missing_ranges.append((range_start.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))) - - return missing_ranges - -async def fetch_data(session, url): - try: - async with session.get(url) as response: - if response.status == 200: - return await response.json() - else: - return [] - except Exception as e: - print(f"Error fetching data from {url}: {e}") - return [] - async def get_data(session, symbol, time_period): - steps = 20 if time_period == '30min' else 40 existing_data = get_existing_data(symbol, time_period) - res_list = existing_data - urls_to_fetch = [] + if not existing_data: + return await fetch_all_data(session, symbol, time_period) + + last_date = datetime.strptime(existing_data[-1]['date'], "%Y-%m-%d %H:%M:%S") + current_date = datetime.utcnow() + + if (current_date - last_date).days < 1: + return # Data is up to date, skip to next symbol + + # Fetch only missing data + start_date = (last_date + timedelta(days=1)).strftime("%Y-%m-%d") + end_date = current_date.strftime("%Y-%m-%d") + url = f"https://financialmodelingprep.com/api/v3/historical-chart/{time_period}/{symbol}?serietype=bar&extend=false&from={start_date}&to={end_date}&apikey={api_key}" + + new_data = await fetch_data(session, url) + if new_data: + existing_data.extend(new_data) + existing_data.sort(key=lambda x: x['date']) + await save_json(symbol, existing_data, time_period) - for start_date, end_date in date_range_days(steps=steps): - missing_ranges = get_missing_date_ranges(existing_data, start_date, end_date) - for missing_start, missing_end in missing_ranges: - url = f"https://financialmodelingprep.com/api/v3/historical-chart/{time_period}/{symbol}?serietype=bar&extend=false&from={missing_start}&to={missing_end}&apikey={api_key}" - urls_to_fetch.append(url) - - if urls_to_fetch: - fetched_data = await get_data_batch(session, symbol, urls_to_fetch) - res_list.extend(fetched_data) - - if res_list: - current_datetime = datetime.utcnow() - filtered_data = {item['date']: item for item in res_list if datetime.strptime(item['date'], "%Y-%m-%d %H:%M:%S") <= current_datetime} - sorted_data = sorted(filtered_data.values(), key=lambda x: x['date'], reverse=False) - await save_json(symbol, sorted_data, time_period) +async def fetch_all_data(session, symbol, time_period): + end_date = datetime.utcnow() + start_date = end_date - timedelta(days=180) + url = f"https://financialmodelingprep.com/api/v3/historical-chart/{time_period}/{symbol}?serietype=bar&extend=false&from={start_date.strftime('%Y-%m-%d')}&to={end_date.strftime('%Y-%m-%d')}&apikey={api_key}" + + data = await fetch_data(session, url) + if data: + data.sort(key=lambda x: x['date']) + await save_json(symbol, data, time_period) async def save_json(symbol, data, interval): os.makedirs(f"json/export/price/{interval}", exist_ok=True) @@ -104,7 +81,6 @@ async def save_json(symbol, data, interval): ujson.dump(data, file) async def process_symbol(session, symbol): - # Process both 30min and 60min intervals await get_data(session, symbol, '30min') await get_data(session, symbol, '1hour') @@ -125,14 +101,13 @@ async def run(): total_symbols = stock_symbols + etf_symbols - # Use aiohttp connector with a higher limit for performance - connector = TCPConnector(limit=100) + connector = TCPConnector(limit=MAX_REQUESTS_PER_MINUTE) async with aiohttp.ClientSession(connector=connector) as session: - for i, symbol in enumerate(tqdm(total_symbols), 1): - await process_symbol(session, symbol) - if i % 100 == 0: - print(f'Sleeping after processing {i} symbols') - await asyncio.sleep(60) + tasks = [process_symbol(session, symbol) for symbol in total_symbols] + for i, _ in enumerate(tqdm(asyncio.as_completed(tasks), total=len(tasks)), 1): + if i % MAX_REQUESTS_PER_MINUTE == 0: + print(f'Processed {i} symbols') + await asyncio.sleep(60) # Sleep for 60 seconds after every MAX_REQUESTS_PER_MINUTE symbols if __name__ == "__main__": - asyncio.run(run()) + asyncio.run(run()) \ No newline at end of file diff --git a/app/ta_signal.py b/app/ta_signal.py index 874f22d..ae80e42 100755 --- a/app/ta_signal.py +++ b/app/ta_signal.py @@ -26,9 +26,13 @@ class TASignals: def run(self): ta_df = pd.DataFrame() + ta_df['sma_20'] = sma_indicator(self.data["Close"], window=20) ta_df['sma_50'] = sma_indicator(self.data["Close"], window=50) + ta_df['sma_100'] = sma_indicator(self.data["Close"], window=100) ta_df['sma_200'] = sma_indicator(self.data["Close"], window=200) + ta_df['ema_20'] = ema_indicator(self.data['Close'], window=20) ta_df['ema_50'] = ema_indicator(self.data['Close'], window=50) + ta_df['sma_100'] = sma_indicator(self.data["Close"], window=100) ta_df['ema_200'] = sma_indicator(self.data['Close'], window=200) ta_df['rsi'] = rsi(self.data['Close'], window=14) ta_df['stoch_rsi'] = stochrsi_k(self.data['Close'], window=14, smooth1 = 3, smooth2 =3)*100