backend/app/cron_insider_tracker.py
2024-11-06 00:09:17 +01:00

173 lines
6.0 KiB
Python

import ujson
import asyncio
import aiohttp
import sqlite3
from datetime import datetime
from itertools import groupby
from operator import itemgetter
from aiofiles import open as async_open
from tqdm import tqdm
from dotenv import load_dotenv
import os
import time
load_dotenv()
api_key = os.getenv('FMP_API_KEY')
async def save_json(data):
async with async_open(f"json/tracker/insider/data.json", 'w') as file:
await file.write(ujson.dumps(data))
def format_name(name):
"""
Formats a name from "LASTNAME MIDDLE FIRSTNAME" format to "Firstname Middle Lastname"
Args:
name (str): Name in uppercase format (e.g., "SINGLETON J MATTHEW")
Returns:
str: Formatted name (e.g., "Matthew J Singleton")
"""
# Split the name into parts
parts = name.strip().split()
# Handle empty string or single word
if not parts:
return ""
if len(parts) == 1:
return parts[0].capitalize()
# The first part is the last name
lastname = parts[0].capitalize()
# The remaining parts are in reverse order
other_parts = parts[1:]
other_parts.reverse()
# Capitalize each part
other_parts = [part.capitalize() for part in other_parts]
# Join all parts
return " ".join(other_parts + [lastname])
def aggregate_transactions(transactions, min_value=100_000):
# Sort transactions by the keys we want to group by
sorted_transactions = sorted(
transactions,
key=lambda x: (x['reportingName'], x['symbol'], x['transactionType'])
)
# Group by reportingName, symbol, and transactionType
result = []
for key, group in groupby(
sorted_transactions,
key=lambda x: (x['reportingName'], x['symbol'], x['transactionType'])
):
group_list = list(group)
# Calculate average value
avg_value = sum(t['value'] for t in group_list) / len(group_list)
# Calculate the total number of shares in the group
total_shares = sum(t['shares'] for t in group_list)
# Only include transactions with average value >= min_value
if avg_value >= min_value:
# Find latest filing date
latest_date = max(
datetime.strptime(t['filingDate'], '%Y-%m-%d %H:%M:%S')
for t in group_list
).strftime('%Y-%m-%d %H:%M:%S')
# Create aggregated transaction with formatted name and total shares
result.append({
'reportingName': format_name(key[0]),
'symbol': key[1],
'transactionType': key[2],
'filingDate': latest_date,
'avgValue': avg_value,
'totalShares': total_shares # Added total shares here
})
# Sort the final result by filingDate
return sorted(result, key=lambda x: x['filingDate'], reverse=True)
async def get_data(session, symbols):
res_list = []
for page in range(0, 20): # Adjust the number of pages as needed
url = f"https://financialmodelingprep.com/api/v4/insider-trading?page={page}&apikey={api_key}"
async with session.get(url) as response:
try:
if response.status == 200:
data = await response.json()
# Filter and adjust transactionType based on acquistionOrDisposition
filtered_data = [
{
"reportingName": item.get("reportingName"),
"symbol": item.get("symbol"),
"filingDate": item.get("filingDate"),
"shares": item.get("securitiesTransacted"),
"value": round(item.get("securitiesTransacted") * item.get("price"),2),
"transactionType": "Buy" if item.get("acquistionOrDisposition") == "A"
else "Sell" if item.get("acquistionOrDisposition") == "D"
else None, # None if neither "A" nor "D"
}
for item in data
if item.get("acquistionOrDisposition") in ["A", "D"] and item.get('price') > 0 and item.get("securitiesTransacted") > 0 # Filter out if not "A" or "D"
]
res_list += filtered_data
else:
print(f"Failed to fetch data. Status code: {response.status}")
except Exception as e:
print(f"Error while fetching data: {e}")
break
res_list = aggregate_transactions(res_list)
new_data = []
for item in res_list:
try:
symbol = item['symbol']
with open(f"json/quote/{symbol}.json") as file:
stock_data = ujson.load(file)
item['marketCap'] = stock_data['marketCap']
item['price'] = round(stock_data['price'],2)
item['changesPercentage'] = round(stock_data['changesPercentage'],2)
new_data.append({**item})
except:
pass
return new_data
async def run():
# Connect to SQLite
con = sqlite3.connect('stocks.db')
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
# Fetch stock symbols
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
stock_symbols = [row[0] for row in cursor.fetchall()]
con.close()
# Fetch data asynchronously using aiohttp
async with aiohttp.ClientSession() as session:
data = await get_data(session, stock_symbols)
if len(data) > 0:
print(f"Fetched {len(data)} records.")
await save_json(data)
try:
asyncio.run(run())
except Exception as e:
print(f"Error: {e}")