267 lines
10 KiB
Python
267 lines
10 KiB
Python
from datetime import datetime, timedelta
|
|
import orjson
|
|
import time
|
|
import sqlite3
|
|
import asyncio
|
|
import aiohttp
|
|
import random
|
|
from tqdm import tqdm
|
|
from dotenv import load_dotenv
|
|
import os
|
|
|
|
load_dotenv()
|
|
api_key = os.getenv('FMP_API_KEY')
|
|
|
|
def standardize_strings(string_list):
|
|
return [string.title() for string in string_list]
|
|
|
|
def convert_to_dict(data):
|
|
result = {}
|
|
|
|
for entry in data:
|
|
for date, categories in entry.items():
|
|
if date not in result:
|
|
result[date] = {}
|
|
for category, amount in categories.items():
|
|
result[date][category] = amount
|
|
|
|
return result
|
|
|
|
async def save_json(data, symbol):
|
|
with open(f"json/business-metrics/{symbol}.json", 'wb') as file:
|
|
file.write(orjson.dumps(data))
|
|
|
|
import orjson
|
|
from datetime import datetime
|
|
|
|
def convert_to_dict(data):
|
|
result = {}
|
|
|
|
for entry in data:
|
|
for date, categories in entry.items():
|
|
if date not in result:
|
|
result[date] = {}
|
|
for category, amount in categories.items():
|
|
result[date][category] = amount
|
|
|
|
return result
|
|
|
|
def prepare_expense_dataset(data):
|
|
data = convert_to_dict(data)
|
|
res_list = {}
|
|
operating_name_list = []
|
|
operating_history_list = []
|
|
index = 0
|
|
for date, info in data.items():
|
|
value_list = []
|
|
for name, val in info.items():
|
|
if index == 0:
|
|
operating_name_list.append(name)
|
|
if name in operating_name_list:
|
|
value_list.append(val)
|
|
if len(value_list) > 0:
|
|
operating_history_list.append({'date': date, 'value': value_list})
|
|
index +=1
|
|
|
|
operating_history_list = sorted(operating_history_list, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d'))
|
|
|
|
# Initialize 'valueGrowth' as None for all entries
|
|
for item in operating_history_list:
|
|
item['valueGrowth'] = [None] * len(item['value'])
|
|
|
|
# Calculate valueGrowth for each item based on the previous date value
|
|
for i in range(1, len(operating_history_list)): # Start from the second item
|
|
current_item = operating_history_list[i]
|
|
prev_item = operating_history_list[i - 1]
|
|
|
|
value_growth = []
|
|
for cur_value, prev_value in zip(current_item['value'], prev_item['value']):
|
|
try:
|
|
growth = round(((cur_value - prev_value) / prev_value) * 100, 2)
|
|
except:
|
|
growth = None
|
|
value_growth.append(growth)
|
|
|
|
current_item['valueGrowth'] = value_growth
|
|
|
|
operating_history_list = sorted(operating_history_list, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d'), reverse=True)
|
|
|
|
res_list = {'operatingExpenses': {'names': operating_name_list, 'history': operating_history_list}}
|
|
return res_list
|
|
|
|
def prepare_geo_dataset(data):
|
|
data = convert_to_dict(data)
|
|
res_list = {}
|
|
geo_name_list = []
|
|
geo_history_list = []
|
|
index = 0
|
|
for date, info in data.items():
|
|
value_list = []
|
|
for name, val in info.items():
|
|
if index == 0:
|
|
geo_name_list.append(name)
|
|
if name in geo_name_list:
|
|
value_list.append(val)
|
|
if len(value_list) > 0:
|
|
geo_history_list.append({'date': date, 'value': value_list})
|
|
index +=1
|
|
|
|
geo_history_list = sorted(geo_history_list, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d'))
|
|
|
|
# Initialize 'valueGrowth' as None for all entries
|
|
for item in geo_history_list:
|
|
item['valueGrowth'] = [None] * len(item['value'])
|
|
|
|
# Calculate valueGrowth for each item based on the previous date value
|
|
for i in range(1, len(geo_history_list)): # Start from the second item
|
|
current_item = geo_history_list[i]
|
|
prev_item = geo_history_list[i - 1]
|
|
|
|
value_growth = []
|
|
for cur_value, prev_value in zip(current_item['value'], prev_item['value']):
|
|
try:
|
|
growth = round(((cur_value - prev_value) / prev_value) * 100, 2)
|
|
except:
|
|
growth = None
|
|
value_growth.append(growth)
|
|
|
|
current_item['valueGrowth'] = value_growth
|
|
|
|
geo_history_list = sorted(geo_history_list, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d'), reverse=True)
|
|
|
|
res_list = {'geographic': {'names': standardize_strings(geo_name_list), 'history': geo_history_list}}
|
|
|
|
return res_list
|
|
|
|
def prepare_dataset(data, geo_data, income_data, symbol):
|
|
data = convert_to_dict(data)
|
|
res_list = {}
|
|
revenue_name_list = []
|
|
revenue_history_list = []
|
|
index = 0
|
|
for date, info in data.items():
|
|
value_list = []
|
|
for name, val in info.items():
|
|
if index == 0:
|
|
revenue_name_list.append(name)
|
|
if name in revenue_name_list:
|
|
value_list.append(val)
|
|
if len(value_list) > 0:
|
|
revenue_history_list.append({'date': date, 'value': value_list})
|
|
index +=1
|
|
|
|
revenue_history_list = sorted(revenue_history_list, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d'))
|
|
|
|
# Initialize 'valueGrowth' as None for all entries
|
|
for item in revenue_history_list:
|
|
item['valueGrowth'] = [None] * len(item['value'])
|
|
|
|
# Calculate valueGrowth for each item based on the previous date value
|
|
for i in range(1, len(revenue_history_list)): # Start from the second item
|
|
current_item = revenue_history_list[i]
|
|
prev_item = revenue_history_list[i - 1]
|
|
|
|
value_growth = []
|
|
for cur_value, prev_value in zip(current_item['value'], prev_item['value']):
|
|
try:
|
|
growth = round(((cur_value - prev_value) / prev_value) * 100, 2)
|
|
except:
|
|
growth = None
|
|
value_growth.append(growth)
|
|
|
|
current_item['valueGrowth'] = value_growth
|
|
|
|
revenue_history_list = sorted(revenue_history_list, key=lambda x: datetime.strptime(x['date'], '%Y-%m-%d'), reverse=True)
|
|
|
|
res_list = {'revenue': {'names': revenue_name_list, 'history': revenue_history_list}}
|
|
|
|
geo_data = prepare_geo_dataset(geo_data)
|
|
operating_expense_data = prepare_expense_dataset(income_data)
|
|
|
|
|
|
#res_list = {**res_list, **geo_data, 'expense': operating_expense_data}
|
|
res_list = {**res_list, **geo_data, **operating_expense_data}
|
|
return res_list
|
|
|
|
async def get_data(session, total_symbols):
|
|
batch_size = 300 # Process 300 symbols at a time
|
|
for i in tqdm(range(0, len(total_symbols), batch_size)):
|
|
batch = total_symbols[i:i+batch_size]
|
|
for symbol in batch:
|
|
try:
|
|
with open(f"json/financial-statements/income-statement/quarter/{symbol}.json",'r') as file:
|
|
income_data = orjson.loads(file.read())
|
|
|
|
include_selling_and_marketing = income_data[0].get('sellingAndMarketingExpenses', 0) > 0 if income_data else False
|
|
# Process the income_data
|
|
income_data = [
|
|
{
|
|
'date': entry['date'],
|
|
'Selling, General, and Administrative': entry.get('sellingGeneralAndAdministrativeExpenses', 0),
|
|
'Research and Development': entry.get('researchAndDevelopmentExpenses', 0),
|
|
**({'Sales and Marketing': entry.get('sellingAndMarketingExpenses', 0)} if include_selling_and_marketing else {})
|
|
}
|
|
for entry in income_data
|
|
if datetime.strptime(entry['date'], '%Y-%m-%d') > datetime(2015, 1, 1)
|
|
]
|
|
|
|
income_data = [
|
|
{
|
|
entry['date']: {
|
|
key: value
|
|
for key, value in entry.items()
|
|
if key != 'date'
|
|
}
|
|
}
|
|
for entry in income_data
|
|
]
|
|
except:
|
|
income_data = []
|
|
|
|
|
|
product_data = []
|
|
geo_data = []
|
|
|
|
urls = [f"https://financialmodelingprep.com/api/v4/revenue-product-segmentation?symbol={symbol}&structure=flat&period=quarter&apikey={api_key}",
|
|
f"https://financialmodelingprep.com/api/v4/revenue-geographic-segmentation?symbol={symbol}&structure=flat&apikey={api_key}"
|
|
]
|
|
|
|
for url in urls:
|
|
try:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
if "product" in url:
|
|
product_data = data
|
|
else:
|
|
geo_data = data
|
|
except Exception as e:
|
|
print(f"Error fetching data for {symbol}: {e}")
|
|
pass
|
|
|
|
if len(product_data) > 0 and len(geo_data) > 0:
|
|
data = prepare_dataset(product_data, geo_data, income_data, symbol)
|
|
await save_json(data, symbol)
|
|
|
|
# Wait 60 seconds after processing each batch of 300 symbols
|
|
if i + batch_size < len(total_symbols):
|
|
print(f"Processed {i + batch_size} symbols, waiting 60 seconds...")
|
|
await asyncio.sleep(60)
|
|
|
|
async def run():
|
|
con = sqlite3.connect('stocks.db')
|
|
cursor = con.cursor()
|
|
cursor.execute("PRAGMA journal_mode = wal")
|
|
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
|
|
total_symbols = [row[0] for row in cursor.fetchall()]
|
|
#total_symbols = ['TSLA'] # For testing purposes
|
|
con.close()
|
|
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
await get_data(session, total_symbols)
|
|
|
|
if __name__ == "__main__":
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_until_complete(run())
|