backend/app/cron_profile.py
2024-12-13 16:55:50 +01:00

239 lines
8.5 KiB
Python

from datetime import datetime, timedelta
import orjson
import time
import sqlite3
import pandas as pd
import numpy as np
from collections import defaultdict
import asyncio
import aiohttp
from tqdm import tqdm
from dotenv import load_dotenv
import os
import re
load_dotenv()
api_key = os.getenv('FMP_API_KEY')
query_template = """
SELECT
profile
FROM
stocks
WHERE
symbol = ?
"""
MONTH_MAP = {
'01': 'January', '02': 'February', '03': 'March', '04': 'April',
'05': 'May', '06': 'June', '07': 'July', '08': 'August',
'09': 'September', '10': 'October', '11': 'November', '12': 'December'
}
STATE_MAP = {
'AL': 'Alabama', 'AK': 'Alaska', 'AZ': 'Arizona', 'AR': 'Arkansas', 'CA': 'California',
'CO': 'Colorado', 'CT': 'Connecticut', 'DE': 'Delaware', 'FL': 'Florida', 'GA': 'Georgia',
'HI': 'Hawaii', 'ID': 'Idaho', 'IL': 'Illinois', 'IN': 'Indiana', 'IA': 'Iowa',
'KS': 'Kansas', 'KY': 'Kentucky', 'LA': 'Louisiana', 'ME': 'Maine', 'MD': 'Maryland',
'MA': 'Massachusetts', 'MI': 'Michigan', 'MN': 'Minnesota', 'MS': 'Mississippi', 'MO': 'Missouri',
'MT': 'Montana', 'NE': 'Nebraska', 'NV': 'Nevada', 'NH': 'New Hampshire', 'NJ': 'New Jersey',
'NM': 'New Mexico', 'NY': 'New York', 'NC': 'North Carolina', 'ND': 'North Dakota', 'OH': 'Ohio',
'OK': 'Oklahoma', 'OR': 'Oregon', 'PA': 'Pennsylvania', 'RI': 'Rhode Island', 'SC': 'South Carolina',
'SD': 'South Dakota', 'TN': 'Tennessee', 'TX': 'Texas', 'UT': 'Utah', 'VT': 'Vermont',
'VA': 'Virginia', 'WA': 'Washington', 'WV': 'West Virginia', 'WI': 'Wisconsin', 'WY': 'Wyoming'
}
async def save_json(symbol, data):
with open(f"json/profile/{symbol}.json", 'w') as file:
file.write(orjson.dumps(data).decode('utf-8'))
def custom_sort(entry):
title_lower = entry['position'].lower()
# Most priority: CEO or Chief Executive Officer
ceo_keywords = ['ceo', 'chief executive officer']
if any(keyword in title_lower for keyword in ceo_keywords):
return (0, 0, entry['name'])
# Second priority: Other Chief-level positions
chief_keywords = [
'chief financial officer',
'chief operating officer',
'chief technology officer',
'chief information officer',
'chief marketing officer',
'chief legal officer',
'chief people officer'
]
if any(keyword in title_lower for keyword in chief_keywords):
return (0, 1, entry['name'])
# Lowest priority: Other positions
return (1, 0, entry['name'])
def sort_executives(executives):
return sorted(executives, key=custom_sort)
async def fetch_sec_filings(session, symbol):
url = f"https://financialmodelingprep.com/api/v3/sec_filings/{symbol}?limit=10&page=0&apikey={api_key}"
async with session.get(url) as response:
data = await response.json()
def get_filing_title(filing_type):
if "/A" in filing_type:
prefix = "[Amend] "
filing_type = filing_type.replace("/A", "")
else:
prefix = ""
if filing_type == "8-K":
return f"{prefix}Current Report"
elif filing_type == "10-Q":
return f"{prefix}Quarterly Report"
elif filing_type == "10-K":
return f"{prefix}Annual Report"
elif filing_type == "13F-HR":
return f"{prefix}Quarterly report filed by institutional managers, holdings"
elif filing_type == "SC 13G":
return f"{prefix}Statement of acquisition of beneficial ownership by individuals"
elif filing_type == "S-3ASR":
return f"{prefix}Automatic shelf registration statement of securities of well-known seasoned issuers"
else:
return f"{prefix}Filing"
return [
{
'date': datetime.strptime(entry['fillingDate'], "%Y-%m-%d %H:%M:%S").strftime("%b %d, %Y"),
'type': entry['type'],
'title': get_filing_title(entry['type']),
'link': entry['finalLink']
}
for entry in data
]
async def fetch_executives(session, symbol):
url = f"https://financialmodelingprep.com/api/v3/key-executives/{symbol}?apikey={api_key}"
async with session.get(url) as response:
data = await response.json()
# Clean and process executives
processed_executives = []
for item in data:
try:
clean_name = item['name'].replace("Ms.","").replace("Mr.","").replace("Mrs.","").replace("Ms","").replace("Mr","").strip()
processed_executives.append({'name': clean_name,'position': item['title']})
except:
pass
# Sort executives to put CEO first
sorted_executives = sort_executives(processed_executives)
return sorted_executives
async def fetch_company_core_information(session, symbol):
url = f"https://financialmodelingprep.com/api/v4/company-core-information?symbol={symbol}&apikey={api_key}"
async with session.get(url) as response:
data = await response.json()
if not data:
return {}
company_info = data[0]
# Convert fiscalYearEnd to "Month1-Month2" format
fiscal_year_end = company_info.get('fiscalYearEnd')
if fiscal_year_end:
month_end = fiscal_year_end.split('-')[0]
month_name_end = MONTH_MAP.get(month_end, '')
# Find the start month by getting the next month after the end month
month_end_num = int(month_end)
month_start_num = (month_end_num % 12) + 1
month_name_start = MONTH_MAP.get(f"{month_start_num:02}", '')
company_info['fiscalYearRange'] = f"{month_name_start} - {month_name_end}"
keys_to_remove = ['businessAddress', 'mailingAddress','sicDescription', 'registrantName', 'stateOfIncorporation', 'fiscalYearEnd']
# Creating a new dictionary without the unwanted keys
company_info = {key: value for key, value in company_info.items() if key not in keys_to_remove}
return company_info
async def get_data(session, symbol, con):
try:
df = pd.read_sql_query(query_template, con, params=(symbol,))
if df.empty:
return
data= df.to_dict(orient='records')[0]
company_profile = orjson.loads(data['profile'])[0]
if company_profile['state'] in STATE_MAP:
company_profile['state'] = STATE_MAP[company_profile['state']]
company_profile['ceo'] = company_profile['ceo'].replace("Ms.","").replace("Mr.","").replace("Mrs.","").replace("Ms","").replace("Mr","").strip()
keys_to_keep = ['currency', 'country', 'description', 'isin', 'cusip', 'sector','industry', 'ceo','website','fullTimeEmployees','address','city','state','ipoDate']
# Creating a new dictionary without the unwanted keys
company_profile = {key: value for key, value in company_profile.items() if key in keys_to_keep}
# Fetch SEC filings
filings = await fetch_sec_filings(session, symbol)
# Fetch executives
executives = await fetch_executives(session, symbol)
# Fetch company core information
core_info = await fetch_company_core_information(session, symbol)
#print(filings)
#print(executives)
res = {**company_profile,**core_info, 'executives': executives, 'filings': filings}
if len(res) > 0:
await save_json(symbol, res)
except Exception as e:
print(f"Error processing {symbol}: {e}")
async def run():
con = sqlite3.connect('stocks.db')
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
symbols = [row[0] for row in cursor.fetchall()]
async with aiohttp.ClientSession() as session:
tasks = []
for i, symbol in enumerate(tqdm(symbols), 1):
tasks.append(get_data(session, symbol, con))
# Batch processing and rate limiting
if i % 100 == 0:
await asyncio.gather(*tasks)
tasks = []
print(f'Processed {i} symbols, sleeping...')
await asyncio.sleep(60)
# Process any remaining tasks
if tasks:
await asyncio.gather(*tasks)
con.close()
def main():
"""
Entry point for the script.
"""
asyncio.run(run())
if __name__ == "__main__":
main()