292 lines
11 KiB
Python
Executable File
292 lines
11 KiB
Python
Executable File
import aiohttp
|
|
import asyncio
|
|
import sqlite3
|
|
import certifi
|
|
import json
|
|
import pandas as pd
|
|
from tqdm import tqdm
|
|
import re
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
import subprocess
|
|
import time
|
|
import warnings
|
|
from dotenv import load_dotenv
|
|
import os
|
|
import re
|
|
|
|
# Filter out the specific RuntimeWarning
|
|
warnings.filterwarnings("ignore", category=RuntimeWarning, message="invalid value encountered in scalar divide")
|
|
|
|
def normalize_name(name):
|
|
return re.sub(r'\s+', ' ', re.sub(r'[^\w\s]', '', name.lower())).strip()
|
|
|
|
|
|
con = sqlite3.connect('stocks.db')
|
|
etf_con = sqlite3.connect('etf.db')
|
|
crypto_con = sqlite3.connect('crypto.db')
|
|
|
|
cursor = con.cursor()
|
|
cursor.execute("PRAGMA journal_mode = wal")
|
|
cursor.execute("SELECT DISTINCT symbol, name FROM stocks WHERE symbol NOT LIKE '%.%'")
|
|
stock_data = [{
|
|
'symbol': row[0],
|
|
'name': row[1],
|
|
} for row in cursor.fetchall()]
|
|
# Create a dictionary from stock_data for quick lookup
|
|
stock_dict = {normalize_name(stock['name']): stock['symbol'] for stock in stock_data}
|
|
stock_symbols = [item['symbol'] for item in stock_data]
|
|
|
|
etf_cursor = etf_con.cursor()
|
|
etf_cursor.execute("PRAGMA journal_mode = wal")
|
|
etf_cursor.execute("SELECT DISTINCT symbol, name FROM etfs")
|
|
etf_data = [{
|
|
'symbol': row[0],
|
|
'name': row[1],
|
|
} for row in etf_cursor.fetchall()]
|
|
# Create a dictionary from stock_data for quick lookup
|
|
etf_dict = {normalize_name(etf['name']): etf['symbol'] for etf in etf_data}
|
|
etf_symbols = [item['symbol'] for item in etf_data]
|
|
|
|
|
|
crypto_cursor = crypto_con.cursor()
|
|
crypto_cursor.execute("PRAGMA journal_mode = wal")
|
|
crypto_cursor.execute("SELECT DISTINCT symbol FROM cryptos")
|
|
crypto_symbols = [row[0] for row in crypto_cursor.fetchall()]
|
|
|
|
total_symbols = stock_symbols + etf_symbols + crypto_symbols
|
|
con.close()
|
|
etf_con.close()
|
|
crypto_con.close()
|
|
|
|
|
|
load_dotenv()
|
|
api_key = os.getenv('FMP_API_KEY')
|
|
quarter_date = '2024-3-31'
|
|
|
|
|
|
if os.path.exists("backup_db/institute.db"):
|
|
os.remove('backup_db/institute.db')
|
|
|
|
|
|
def get_jsonparsed_data(data):
|
|
try:
|
|
return json.loads(data)
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
|
|
|
|
class InstituteDatabase:
|
|
def __init__(self, db_path):
|
|
self.db_path = db_path
|
|
self.conn = sqlite3.connect(db_path)
|
|
self.cursor = self.conn.cursor()
|
|
self.cursor.execute("PRAGMA journal_mode = wal")
|
|
self.conn.commit()
|
|
self._create_table()
|
|
|
|
def close_connection(self):
|
|
self.cursor.close()
|
|
self.conn.close()
|
|
|
|
def _create_table(self):
|
|
self.cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS institutes (
|
|
cik TEXT PRIMARY KEY,
|
|
name TEXT
|
|
)
|
|
""")
|
|
|
|
|
|
def get_column_type(self, value):
|
|
column_type = ""
|
|
|
|
if isinstance(value, str):
|
|
column_type = "TEXT"
|
|
elif isinstance(value, int):
|
|
column_type = "INTEGER"
|
|
elif isinstance(value, float):
|
|
column_type = "REAL"
|
|
else:
|
|
# Handle other data types or customize based on your specific needs
|
|
column_type = "TEXT"
|
|
|
|
return column_type
|
|
|
|
def remove_null(self, value):
|
|
if isinstance(value, str) and value == None:
|
|
value = 'n/a'
|
|
elif isinstance(value, int) and value == None:
|
|
value = 0
|
|
elif isinstance(value, float) and value == None:
|
|
value = 0
|
|
else:
|
|
# Handle other data types or customize based on your specific needs
|
|
pass
|
|
|
|
return value
|
|
|
|
|
|
|
|
|
|
async def save_portfolio_data(self, session, cik):
|
|
try:
|
|
# Fetch summary data
|
|
summary_url = f"https://financialmodelingprep.com/api/v4/institutional-ownership/portfolio-holdings-summary?cik={cik}&page=0&apikey={api_key}"
|
|
async with session.get(summary_url) as response:
|
|
summary_data = await response.text()
|
|
summary_parsed_data = get_jsonparsed_data(summary_data)
|
|
|
|
portfolio_data = {}
|
|
holdings_data = []
|
|
|
|
# Fetch paginated holdings data
|
|
for page in range(0,100):
|
|
holdings_url = f"https://financialmodelingprep.com/api/v4/institutional-ownership/portfolio-holdings?cik={cik}&date={quarter_date}&page={page}&apikey={api_key}"
|
|
async with session.get(holdings_url) as response:
|
|
data = await response.text()
|
|
parsed_data = get_jsonparsed_data(data)
|
|
|
|
if not parsed_data: # Stop if no more data
|
|
break
|
|
|
|
if isinstance(parsed_data, list):
|
|
for item in parsed_data:
|
|
if item['symbol'] is None:
|
|
normalized_security_name = normalize_name(item['securityName'])
|
|
if normalized_security_name in stock_dict:
|
|
item['symbol'] = stock_dict[normalized_security_name]
|
|
elif normalized_security_name in etf_dict:
|
|
item['symbol'] = etf_dict[normalized_security_name]
|
|
|
|
parsed_data = [
|
|
{**item, 'type': ('stocks' if item['symbol'] in stock_symbols else
|
|
'crypto' if item['symbol'] in crypto_symbols else
|
|
'etf' if item['symbol'] in etf_symbols else None)}
|
|
for item in parsed_data
|
|
if 'symbol' in item and item['symbol'] is not None and item['symbol'] in total_symbols
|
|
]
|
|
|
|
holdings_data.extend(parsed_data)
|
|
portfolio_data['holdings'] = json.dumps(holdings_data)
|
|
|
|
if not holdings_data:
|
|
self.cursor.execute("DELETE FROM institutes WHERE cik = ?", (cik,))
|
|
self.conn.commit()
|
|
return
|
|
|
|
#Filter information out that is not needed (yet)!
|
|
holdings_data = [{"symbol": item["symbol"], "securityName": item["securityName"], 'weight': item['weight'], 'sharesNumber': item['sharesNumber'], 'changeInSharesNumberPercentage': item['changeInSharesNumberPercentage'], 'putCallShare': item['putCallShare'], "marketValue": item["marketValue"], 'avgPricePaid': item['avgPricePaid']} for item in holdings_data]
|
|
|
|
number_of_stocks = len(holdings_data)
|
|
performance_percentages = [item.get("performancePercentage", 0) for item in holdings_data]
|
|
positive_performance_count = sum(1 for percentage in performance_percentages if percentage > 0)
|
|
win_rate = round(positive_performance_count / len(performance_percentages) * 100, 2) if performance_percentages else 0
|
|
|
|
portfolio_data.update({
|
|
'winRate': win_rate,
|
|
'numberOfStocks': number_of_stocks
|
|
})
|
|
|
|
# Process and add summary data
|
|
if isinstance(summary_parsed_data, list) and summary_parsed_data:
|
|
portfolio_data['summary'] = json.dumps(summary_parsed_data)
|
|
data_dict = {
|
|
'marketValue': summary_parsed_data[0].get('marketValue', 0),
|
|
'averageHoldingPeriod': summary_parsed_data[0].get('averageHoldingPeriod', 0),
|
|
'turnover': summary_parsed_data[0].get('turnover', 0),
|
|
'performancePercentage': summary_parsed_data[0].get('performancePercentage', 0),
|
|
'performancePercentage3year': summary_parsed_data[0].get('performancePercentage3year', 0),
|
|
'performancePercentage5year': summary_parsed_data[0].get('performancePercentage5year', 0),
|
|
'performanceSinceInceptionPercentage': summary_parsed_data[0].get('performanceSinceInceptionPercentage', 0)
|
|
}
|
|
portfolio_data.update(data_dict)
|
|
|
|
self.cursor.execute("PRAGMA table_info(institutes)")
|
|
columns = {column[1]: column[2] for column in self.cursor.fetchall()}
|
|
|
|
column_definitions = {
|
|
key: (self.get_column_type(portfolio_data.get(key, None)), self.remove_null(portfolio_data.get(key, None)))
|
|
for key in portfolio_data
|
|
}
|
|
|
|
for column, (column_type, value) in column_definitions.items():
|
|
if column not in columns and column_type:
|
|
self.cursor.execute(f"ALTER TABLE institutes ADD COLUMN {column} {column_type}")
|
|
|
|
self.cursor.execute(f"UPDATE institutes SET {column} = ? WHERE cik = ?", (value, cik))
|
|
|
|
self.conn.commit()
|
|
|
|
except Exception as e:
|
|
print(f"Failed to fetch portfolio data for cik {cik}: {str(e)}")
|
|
|
|
|
|
async def save_insitute(self, institutes):
|
|
|
|
institute_data = []
|
|
|
|
for item in institutes:
|
|
cik = item.get('cik', '')
|
|
name = item.get('name', '')
|
|
|
|
|
|
institute_data.append((cik, name))
|
|
|
|
|
|
self.cursor.execute("BEGIN TRANSACTION") # Begin a transaction
|
|
|
|
for data in institute_data:
|
|
cik, name = data
|
|
self.cursor.execute("""
|
|
INSERT OR IGNORE INTO institutes (cik, name)
|
|
VALUES (?, ?)
|
|
""", (cik, name))
|
|
self.cursor.execute("""
|
|
UPDATE institutes SET name = ?
|
|
WHERE cik = ?
|
|
""", (name, cik))
|
|
|
|
self.cursor.execute("COMMIT") # Commit the transaction
|
|
self.conn.commit()
|
|
|
|
|
|
|
|
# Save OHLC data for each ticker using aiohttp
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
tasks = []
|
|
i = 0
|
|
for item in tqdm(institute_data):
|
|
cik, name = item
|
|
tasks.append(self.save_portfolio_data(session, cik))
|
|
|
|
i += 1
|
|
if i % 400 == 0:
|
|
await asyncio.gather(*tasks)
|
|
tasks = []
|
|
print('sleeping mode: ', i)
|
|
await asyncio.sleep(60) # Pause for 60 seconds
|
|
|
|
|
|
if tasks:
|
|
await asyncio.gather(*tasks)
|
|
|
|
|
|
url = f"https://financialmodelingprep.com/api/v4/institutional-ownership/list?apikey={api_key}"
|
|
|
|
|
|
async def fetch_tickers():
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
data = await response.text()
|
|
return get_jsonparsed_data(data)
|
|
|
|
|
|
db = InstituteDatabase('backup_db/institute.db')
|
|
loop = asyncio.get_event_loop()
|
|
all_tickers = loop.run_until_complete(fetch_tickers())
|
|
#all_tickers = [{'cik': '0001364742', 'name': "GARDA CAPITAL PARTNERS LP"}]
|
|
loop.run_until_complete(db.save_insitute(all_tickers))
|
|
db.close_connection()
|