From 7df28cd8f9880db2c13c113b71e310b1d020cf0c Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Sun, 20 Oct 2024 18:42:16 +0200 Subject: [PATCH] update metrics cron job --- app/cron_business_metrics.py | 358 ++++++++++++++++++++++++++++++----- app/main.py | 29 +++ 2 files changed, 340 insertions(+), 47 deletions(-) diff --git a/app/cron_business_metrics.py b/app/cron_business_metrics.py index d8cc995..57229a1 100644 --- a/app/cron_business_metrics.py +++ b/app/cron_business_metrics.py @@ -1,76 +1,340 @@ from edgar import * import ast +import ujson from tqdm import tqdm from datetime import datetime +from collections import defaultdict + + +#Tell the SEC who you are +set_identity("Max Mustermann max.mustermann@indigo.com") + # Define quarter-end dates for a given year #The last quarter Q4 result is not shown in any sec files #But using the https://www.sec.gov/Archives/edgar/data/1045810/000104581024000029/nvda-20240128.htm 10-K you see the annual end result which can be subtracted with all Quarter results to obtain Q4 (dumb af but works so don't judge me people) +def add_value_growth(data): + """ + Adds a new key 'valueGrowth' to each entry in the data list. + + Parameters: + - data (list): A list of dictionaries containing date and value lists. + + Returns: + - list: A new list with the 'valueGrowth' key added to each dictionary. + """ + # Initialize a new list for the output data + updated_data = [] + + # Loop through the data from the latest to the oldest + for i in range(len(data)): + try: + current_entry = data[i].copy() # Create a copy of the current entry + current_values = current_entry['value'] + + # Initialize the growth percentages list + if i < len(data) - 1: # Only compute growth if there is a next entry + next_values = data[i + 1]['value'] + growth_percentages = [] + + for j in range(len(current_values)): + # Convert values to integers if they are strings + next_value = int(next_values[j]) if isinstance(next_values[j], (int, str)) else 0 + current_value = int(current_values[j]) if isinstance(current_values[j], (int, str)) else 0 + + # Calculate growth percentage if next_value is not zero + if next_value != 0: + growth = round(((current_value - next_value) / next_value) * 100,2) + else: + growth = None # Cannot calculate growth if next value is zero + + growth_percentages.append(growth) + + current_entry['valueGrowth'] = growth_percentages # Add the growth percentages + else: + current_entry['valueGrowth'] = [None] * len(current_values) # No growth for the last entry + + updated_data.append(current_entry) # Append the updated entry to the output list + except: + pass + + return updated_data + +def sort_by_latest_date_and_highest_value(data): + # Define a key function to convert the date string to a datetime object + # and use the negative of the integer value for descending order + def sort_key(item): + date = datetime.strptime(item['date'], '%Y-%m-%d') + value = -int(item['value']) # Negative for descending order + return (date, value) + + # Sort the list + sorted_data = sorted(data, key=sort_key, reverse=True) + + return sorted_data + +def aggregate_other_values(data): + aggregated = defaultdict(int) + result = [] + + # First pass: aggregate 'Other' values and keep non-'Other' items + for item in data: + date = item['date'] + value = int(item['value']) + if item['name'] == 'Other': + aggregated[date] += value + else: + result.append(item) + + # Second pass: add aggregated 'Other' values + for date, value in aggregated.items(): + result.append({'name': 'Other', 'value': int(value), 'date': date}) + + return sorted(result, key=lambda x: (x['date'], x['name'])) + +# Define quarter-end dates for a given year def closest_quarter_end(date_str): date = datetime.strptime(date_str, "%Y-%m-%d") year = date.year - # Define quarter end dates for the year + # Define quarter end dates for the current year q1 = datetime(year, 3, 31) q2 = datetime(year, 6, 30) q3 = datetime(year, 9, 30) q4 = datetime(year, 12, 31) - - # Find the closest quarter date - closest = min([q1, q2, q3, q4], key=lambda d: abs(d - date)) - + + # If the date is in January, return the last day of Q4 of the previous year + if date.month == 1: + closest = datetime(year - 1, 12, 31) # Last quarter of the previous year + else: + # Adjust to next year's Q4 if the date is in the last quarter of the current year + if date >= q4: + closest = q4.replace(year=year + 1) # Next year's last quarter + else: + # Find the closest quarter date + closest = min([q1, q2, q3, q4], key=lambda d: abs(d - date)) + # Return the closest quarter date in 'YYYY-MM-DD' format return closest.strftime("%Y-%m-%d") - -# Tell the SEC who you are -set_identity("Michael Mccallum mike.mccalum@indigo.com") - -symbol = 'NVDA' -revenue_sources = [] -geography_sources = [] -filings = Company(symbol).get_filings(form=["10-K","10-Q"]).latest(50) -#print(filings[0].xbrl()) - -for i in range(0,17): - try: - filing_xbrl = filings[i].xbrl() - facts = filing_xbrl.facts.data - latest_rows = facts.groupby('dimensions').head(1) - for index, row in latest_rows.iterrows(): - dimensions_str = row.get("dimensions", "{}") - try: - dimensions_dict = ast.literal_eval(dimensions_str) if isinstance(dimensions_str, str) else dimensions_str - except (ValueError, SyntaxError): - dimensions_dict = {} +def compute_q4_results(dataset): + # Group data by year and name + yearly_data = defaultdict(lambda: defaultdict(dict)) + for item in dataset: + date = datetime.strptime(item['date'], '%Y-%m-%d') + year = date.year + quarter = (date.month - 1) // 3 + 1 + yearly_data[year][item['name']][quarter] = item['value'] - for column_name in ["srt:StatementGeographicalAxis","srt:ProductOrServiceAxis"]: - - product_dimension = dimensions_dict.get(column_name) if isinstance(dimensions_dict, dict) else None - #print(product_dimension) - #print(row["namespace"], row["fact"], product_dimension, row["value"]) + # Calculate Q4 results and update dataset + for year in sorted(yearly_data.keys(), reverse=True): + for name, quarters in yearly_data[year].items(): + if 4 in quarters: # This is the year-end total + total = quarters[4] + q1 = quarters.get(1, 0) + q2 = quarters.get(2, 0) + q3 = quarters.get(3, 0) + q4_value = total - (q1 + q2 + q3) - if column_name == "srt:ProductOrServiceAxis": - if row["namespace"] == "us-gaap" and product_dimension is not None and (product_dimension.startswith(symbol.lower() + ":") or product_dimension.startswith('country' + ":")): - revenue_sources.append({ - "name": product_dimension.replace("Member", "").replace(f"{symbol.lower()}:", ""), - "value": row["value"], "date": row["end_date"] - }) + # Update the original dataset + for item in dataset: + if item['name'] == name and item['date'] == f'{year}-12-31': + item['value'] = q4_value + break - else: - if row["namespace"] == "us-gaap" and product_dimension is not None and (product_dimension.startswith(symbol.lower() + ":") or product_dimension.startswith('country' + ":")): - geography_sources.append({ - "name": product_dimension.replace("Member", "").replace(f"{symbol.lower()}:", ""), - "value": row["value"], "date": row["end_date"] - }) + return dataset - except Exception as e: - print(e) + +def generate_revenue_dataset(dataset): + name_replacements = { + "datacenter": "Data Center", + "professionalvisualization": "Visualization", + "oemandother": "OEM & Other", + "automotive": "Automotive", + "oemip": "OEM & Other", + "gaming": "Gaming" + } + dataset = [revenue for revenue in dataset if revenue['name'] not in ['Compute', 'Networking']] -#print(revenue_sources) -print(geography_sources) + for item in dataset: + item['date'] = closest_quarter_end(item['date']) + + name = item.get('name').lower() + value = int(item.get('value')) + if name in name_replacements: + item['name'] = name_replacements[name] + item['value'] = int(value) + + # Custom order for specific countries + custom_order = { + 'Data Center': 4, + 'Gaming': 3, + 'Visualization': 2, + 'Automotive': 1, + 'OEM & Other': 0 + } + + dataset = sorted( + dataset, + key=lambda item: (datetime.strptime(item['date'], '%Y-%m-%d'), custom_order.get(item['name'], 4)), + reverse = True + ) + + dataset = compute_q4_results(dataset) + unique_names = sorted( + list(set(item['name'] for item in dataset if item['name'] not in {'CloudServiceAgreements'})), + key=lambda item: custom_order.get(item, 4), # Use 4 as default for items not in custom_order + reverse=True) + + result = {} + + # Iterate through the original data + for item in dataset: + # Get the date and value + date = item['date'] + value = item['value'] + + # Initialize the dictionary for the date if not already done + if date not in result: + result[date] = {'date': date, 'value': []} + + # Append the value to the list + result[date]['value'].append(value) + + # Convert the result dictionary to a list + res_list = list(result.values()) + + # Print the final result + res_list = add_value_growth(res_list) + + final_result = {'names': unique_names, 'history': res_list} + return final_result + +def generate_geography_dataset(dataset): + + country_replacements = { + "country:us": "United States", + "country:cn": "China", + "chinaincludinghongkong": "China" + } + + # Custom order for specific countries + custom_order = { + 'United States': 2, + 'China': 1, + 'Other': 0 + } + + for item in dataset: + item['date'] = closest_quarter_end(item['date']) + name = item.get('name').lower() + value = int(float(item.get('value'))) + if name in country_replacements: + item['name'] = country_replacements[name] + item['value'] = value + else: + item['name'] = 'Other' + item['value'] = value + + dataset = aggregate_other_values(dataset) + dataset = sorted( + dataset, + key=lambda item: (datetime.strptime(item['date'], '%Y-%m-%d'), custom_order.get(item['name'], 3)), + reverse = True + ) + + dataset = compute_q4_results(dataset) + result = {} + + unique_names = sorted( + list(set(item['name'] for item in dataset if item['name'] not in {'CloudServiceAgreements'})), + key=lambda item: custom_order.get(item, 4), # Use 4 as default for items not in custom_order + reverse=True) + + result = {} + + # Iterate through the original data + for item in dataset: + # Get the date and value + date = item['date'] + value = item['value'] + + # Initialize the dictionary for the date if not already done + if date not in result: + result[date] = {'date': date, 'value': []} + + # Append the value to the list + result[date]['value'].append(value) + + # Convert the result dictionary to a list + res_list = list(result.values()) + + # Print the final result + res_list = add_value_growth(res_list) + + final_result = {'names': unique_names, 'history': res_list} + return final_result + + +def run(symbol): + + revenue_sources = [] + geography_sources = [] + filings = Company(symbol).get_filings(form=["10-K","10-Q"]).latest(20) + #print(filings[0].xbrl()) + + for i in range(0,17): + try: + filing_xbrl = filings[i].xbrl() + facts = filing_xbrl.facts.data + latest_rows = facts.groupby('dimensions').head(1) + + + for index, row in latest_rows.iterrows(): + dimensions_str = row.get("dimensions", "{}") + try: + dimensions_dict = ast.literal_eval(dimensions_str) if isinstance(dimensions_str, str) else dimensions_str + except (ValueError, SyntaxError): + dimensions_dict = {} + + for column_name in ["srt:StatementGeographicalAxis","srt:ProductOrServiceAxis"]: + + product_dimension = dimensions_dict.get(column_name) if isinstance(dimensions_dict, dict) else None + #print(product_dimension) + #print(row["namespace"], row["fact"], product_dimension, row["value"]) + + if column_name == "srt:ProductOrServiceAxis": + if row["namespace"] == "us-gaap" and product_dimension is not None and (product_dimension.startswith(symbol.lower() + ":") or product_dimension.startswith('country' + ":")): + revenue_sources.append({ + "name": product_dimension.replace("Member", "").replace(f"{symbol.lower()}:", ""), + "value": row["value"], "date": row["end_date"] + }) + + else: + if row["namespace"] == "us-gaap" and product_dimension is not None and (product_dimension.startswith(symbol.lower() + ":") or product_dimension.startswith('country' + ":")): + geography_sources.append({ + "name": product_dimension.replace("Member", "").replace(f"{symbol.lower()}:", ""), + "value": row["value"], "date": row["end_date"] + }) + + + except Exception as e: + print(e) + + revenue_dataset = generate_revenue_dataset(revenue_sources) + geographic_dataset = generate_geography_dataset(geography_sources) + + final_dataset = {'revenue': revenue_dataset, 'geographic': geographic_dataset} + print(final_dataset) + with open(f"json/business-metrics/{symbol}.json", "w") as file: + ujson.dump(final_dataset, file) + +if __name__ == "__main__": + symbol = 'NVDA' + run(symbol) + diff --git a/app/main.py b/app/main.py index 61ab226..ee20b66 100755 --- a/app/main.py +++ b/app/main.py @@ -4054,6 +4054,35 @@ async def get_fomc_impact(api_key: str = Security(get_api_key)): headers={"Content-Encoding": "gzip"} ) +@app.post("/business-metrics") +async def get_fomc_impact(data: TickerData, api_key: str = Security(get_api_key)): + ticker = data.ticker + cache_key = f"business-metrics-{ticker}" + cached_result = redis_client.get(cache_key) + if cached_result: + return StreamingResponse( + io.BytesIO(cached_result), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + try: + with open(f"json/business-metrics/{ticker}.json", 'rb') as file: + res = orjson.loads(file.read()) + except: + res = {} + + data = orjson.dumps(res) + compressed_data = gzip.compress(data) + + redis_client.set(cache_key, compressed_data) + redis_client.expire(cache_key,3600*3600) + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + @app.get("/newsletter") async def get_newsletter(): try: