update insider cron job
This commit is contained in:
parent
b7909036d2
commit
4ab4a7e70c
@ -4,13 +4,18 @@ import aiohttp
|
|||||||
import sqlite3
|
import sqlite3
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from aiofiles import open as async_open
|
from aiofiles import open as async_open
|
||||||
|
from tqdm import tqdm
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
import os
|
import os
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
api_key = os.getenv('FMP_API_KEY')
|
api_key = os.getenv('FMP_API_KEY')
|
||||||
|
|
||||||
|
|
||||||
|
keys_to_remove_insider_history = {"symbol", "link", "filingDate", "reportingCik"}
|
||||||
|
keys_to_remove_insider_statistics = {"symbol", "cik", "purchases", "sales", "pPurchases", "sSales"}
|
||||||
|
|
||||||
|
|
||||||
# Function to check if the year is at least 2015
|
# Function to check if the year is at least 2015
|
||||||
def is_at_least_2015(date_string):
|
def is_at_least_2015(date_string):
|
||||||
year = datetime.strptime(date_string, "%Y-%m-%d").year
|
year = datetime.strptime(date_string, "%Y-%m-%d").year
|
||||||
@ -39,6 +44,8 @@ async def get_insider_trading_endpoints(session, symbol):
|
|||||||
filtered_data = [item for item in aggregated_data if is_at_least_2015(item["transactionDate"][:10])]
|
filtered_data = [item for item in aggregated_data if is_at_least_2015(item["transactionDate"][:10])]
|
||||||
|
|
||||||
if len(filtered_data) > 0:
|
if len(filtered_data) > 0:
|
||||||
|
filtered_data = [{k: v for k, v in item.items() if k not in keys_to_remove_insider_history} for item in filtered_data]
|
||||||
|
|
||||||
await save_insider_trading_as_json(symbol, filtered_data)
|
await save_insider_trading_as_json(symbol, filtered_data)
|
||||||
|
|
||||||
|
|
||||||
@ -51,28 +58,6 @@ async def save_insider_trading_as_json(symbol, data):
|
|||||||
async with async_open(f"json/insider-trading/history/{symbol}.json", 'w') as file:
|
async with async_open(f"json/insider-trading/history/{symbol}.json", 'w') as file:
|
||||||
await file.write(ujson.dumps(data))
|
await file.write(ujson.dumps(data))
|
||||||
|
|
||||||
async def aggregate_statistics(symbol, data):
|
|
||||||
aggregated_data = {}
|
|
||||||
for entry in data:
|
|
||||||
year = entry['year']
|
|
||||||
if year >= 2015:
|
|
||||||
if year not in aggregated_data:
|
|
||||||
aggregated_data[year] = {
|
|
||||||
'year': year,
|
|
||||||
'totalBought': 0,
|
|
||||||
'totalSold': 0,
|
|
||||||
'purchases': 0,
|
|
||||||
'sales': 0
|
|
||||||
}
|
|
||||||
aggregated_data[year]['totalBought'] += entry['totalBought']
|
|
||||||
aggregated_data[year]['totalSold'] += entry['totalSold']
|
|
||||||
aggregated_data[year]['purchases'] += entry['purchases']
|
|
||||||
aggregated_data[year]['sales'] += entry['sales']
|
|
||||||
|
|
||||||
res = list(aggregated_data.values())
|
|
||||||
|
|
||||||
await save_statistics_as_json(symbol, res)
|
|
||||||
|
|
||||||
|
|
||||||
async def process_symbols(session, symbols):
|
async def process_symbols(session, symbols):
|
||||||
#History
|
#History
|
||||||
@ -85,7 +70,8 @@ async def process_symbols(session, symbols):
|
|||||||
results = await asyncio.gather(*tasks)
|
results = await asyncio.gather(*tasks)
|
||||||
for symbol, data in results:
|
for symbol, data in results:
|
||||||
if data:
|
if data:
|
||||||
await aggregate_statistics(symbol, data)
|
filtered_data = [{k: v for k, v in item.items() if k not in keys_to_remove_insider_statistics} for item in data]
|
||||||
|
await save_statistics_as_json(symbol, filtered_data)
|
||||||
|
|
||||||
|
|
||||||
async def run():
|
async def run():
|
||||||
@ -100,12 +86,10 @@ async def run():
|
|||||||
chunks = [stock_symbols[i:i + chunk_size] for i in range(0, len(stock_symbols), chunk_size)]
|
chunks = [stock_symbols[i:i + chunk_size] for i in range(0, len(stock_symbols), chunk_size)]
|
||||||
|
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
for chunk in chunks:
|
for chunk in tqdm(chunks):
|
||||||
await process_symbols(session, chunk)
|
await process_symbols(session, chunk)
|
||||||
await asyncio.sleep(60)
|
await asyncio.sleep(60)
|
||||||
print('sleep')
|
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
asyncio.run(run())
|
asyncio.run(run())
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@ -2236,9 +2236,9 @@ async def get_insider_trading_statistics(data:TickerData, api_key: str = Securit
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
with open(f"json/insider-trading/statistics/{ticker}.json", 'rb') as file:
|
with open(f"json/insider-trading/statistics/{ticker}.json", 'rb') as file:
|
||||||
res = orjson.loads(file.read())
|
res = orjson.loads(file.read())[0]
|
||||||
except:
|
except:
|
||||||
res = []
|
res = {}
|
||||||
|
|
||||||
redis_client.set(cache_key, orjson.dumps(res))
|
redis_client.set(cache_key, orjson.dumps(res))
|
||||||
redis_client.expire(cache_key, 3600 * 24) # Set cache expiration time to 1 day
|
redis_client.expire(cache_key, 3600 * 24) # Set cache expiration time to 1 day
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user