191 lines
6.7 KiB
Python
191 lines
6.7 KiB
Python
import ujson
|
|
import asyncio
|
|
import aiohttp
|
|
import sqlite3
|
|
from datetime import datetime
|
|
from itertools import groupby
|
|
from operator import itemgetter
|
|
from aiofiles import open as async_open
|
|
from tqdm import tqdm
|
|
from dotenv import load_dotenv
|
|
import os
|
|
import time
|
|
import pandas as pd
|
|
|
|
load_dotenv()
|
|
api_key = os.getenv('FMP_API_KEY')
|
|
|
|
|
|
async def save_json(data):
|
|
async with async_open(f"json/tracker/insider/data.json", 'w') as file:
|
|
await file.write(ujson.dumps(data))
|
|
|
|
def remove_outliers(group, tolerance=0.5):
|
|
# Calculate the median price
|
|
median_price = group['price'].median()
|
|
|
|
# Define the lower and upper bounds within 50% of the median
|
|
lower_bound = median_price * (1 - tolerance)
|
|
upper_bound = median_price * (1 + tolerance)
|
|
|
|
# Filter the group based on these bounds
|
|
return group[(group['price'] >= lower_bound) & (group['price'] <= upper_bound)]
|
|
|
|
|
|
def format_name(name):
|
|
"""
|
|
Formats a name from "LASTNAME MIDDLE FIRSTNAME" format to "Firstname Middle Lastname"
|
|
|
|
Args:
|
|
name (str): Name in uppercase format (e.g., "SINGLETON J MATTHEW")
|
|
|
|
Returns:
|
|
str: Formatted name (e.g., "Matthew J Singleton")
|
|
"""
|
|
# Split the name into parts
|
|
parts = name.strip().split()
|
|
|
|
# Handle empty string or single word
|
|
if not parts:
|
|
return ""
|
|
if len(parts) == 1:
|
|
return parts[0].capitalize()
|
|
|
|
# The first part is the last name
|
|
lastname = parts[0].capitalize()
|
|
|
|
# The remaining parts are in reverse order
|
|
other_parts = parts[1:]
|
|
other_parts.reverse()
|
|
|
|
# Capitalize each part
|
|
other_parts = [part.capitalize() for part in other_parts]
|
|
|
|
# Join all parts
|
|
return " ".join(other_parts + [lastname])
|
|
|
|
def aggregate_transactions(transactions, min_value=100_000):
|
|
|
|
# Sort transactions by the keys we want to group by
|
|
sorted_transactions = sorted(
|
|
transactions,
|
|
key=lambda x: (x['reportingName'], x['symbol'], x['transactionType'])
|
|
)
|
|
|
|
# Group by reportingName, symbol, and transactionType
|
|
result = []
|
|
for key, group in groupby(
|
|
sorted_transactions,
|
|
key=lambda x: (x['reportingName'], x['symbol'], x['transactionType'])
|
|
):
|
|
group_list = list(group)
|
|
|
|
# Calculate average value
|
|
avg_value = sum(t['value'] for t in group_list) / len(group_list)
|
|
|
|
# Calculate the total number of shares in the group
|
|
total_shares = sum(t['shares'] for t in group_list)
|
|
|
|
# Only include transactions with average value >= min_value
|
|
if avg_value >= min_value:
|
|
# Find latest filing date
|
|
latest_date = max(
|
|
datetime.strptime(t['filingDate'], '%Y-%m-%d %H:%M:%S')
|
|
for t in group_list
|
|
).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
# Create aggregated transaction with formatted name and total shares
|
|
result.append({
|
|
'reportingName': format_name(key[0]),
|
|
'symbol': key[1],
|
|
'transactionType': key[2],
|
|
'filingDate': latest_date,
|
|
'avgValue': avg_value,
|
|
'totalShares': total_shares # Added total shares here
|
|
})
|
|
|
|
# Sort the final result by filingDate
|
|
return sorted(result, key=lambda x: x['filingDate'], reverse=True)
|
|
|
|
|
|
|
|
async def get_data(session, symbols):
|
|
res_list = []
|
|
for page in range(0, 20): # Adjust the number of pages as needed
|
|
url = f"https://financialmodelingprep.com/api/v4/insider-trading?page={page}&apikey={api_key}"
|
|
async with session.get(url) as response:
|
|
try:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
|
|
# Filter and adjust transactionType based on acquistionOrDisposition
|
|
filtered_data = [
|
|
{
|
|
"reportingName": item.get("reportingName"),
|
|
"symbol": item.get("symbol"),
|
|
"filingDate": item.get("filingDate"),
|
|
"shares": item.get("securitiesTransacted"),
|
|
"value": round(item.get("securitiesTransacted",0) * item.get("price",0),2),
|
|
"price": item.get("price",0),
|
|
"transactionType": "Buy" if item.get("acquistionOrDisposition") == "A"
|
|
else "Sell" if item.get("acquistionOrDisposition") == "D"
|
|
else None, # None if neither "A" nor "D"
|
|
}
|
|
for item in data
|
|
if item.get("acquistionOrDisposition") in ["A", "D"] and item.get('price') > 0 and item.get("securitiesTransacted") > 0 # Filter out if not "A" or "D"
|
|
]
|
|
|
|
res_list += filtered_data
|
|
else:
|
|
print(f"Failed to fetch data. Status code: {response.status}")
|
|
except Exception as e:
|
|
print(f"Error while fetching data: {e}")
|
|
break
|
|
|
|
df = pd.DataFrame(res_list)
|
|
|
|
filtered_df = df.groupby('symbol', group_keys=False).apply(remove_outliers)
|
|
res_list = filtered_df.to_dict('records')
|
|
|
|
res_list = aggregate_transactions(res_list)
|
|
|
|
new_data = []
|
|
for item in res_list:
|
|
try:
|
|
symbol = item['symbol']
|
|
with open(f"json/quote/{symbol}.json") as file:
|
|
stock_data = ujson.load(file)
|
|
item['marketCap'] = stock_data['marketCap']
|
|
item['price'] = round(stock_data['price'],2)
|
|
item['changesPercentage'] = round(stock_data['changesPercentage'],2)
|
|
new_data.append({**item})
|
|
except:
|
|
pass
|
|
|
|
return new_data
|
|
|
|
|
|
async def run():
|
|
# Connect to SQLite
|
|
con = sqlite3.connect('stocks.db')
|
|
cursor = con.cursor()
|
|
cursor.execute("PRAGMA journal_mode = wal")
|
|
|
|
# Fetch stock symbols
|
|
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
|
|
stock_symbols = [row[0] for row in cursor.fetchall()]
|
|
con.close()
|
|
|
|
# Fetch data asynchronously using aiohttp
|
|
async with aiohttp.ClientSession() as session:
|
|
data = await get_data(session, stock_symbols)
|
|
if len(data) > 0:
|
|
print(f"Fetched {len(data)} records.")
|
|
await save_json(data)
|
|
|
|
|
|
try:
|
|
asyncio.run(run())
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|