from huggingface_hub import HfFileSystem import pandas as pd from utils import logger import os from datetime import datetime import threading import traceback fs = HfFileSystem() IMPORTANT_MODELS = [ "auto", "bert", # old but dominant (encoder only) "gpt2", # old (decoder) "t5", # old (encoder-decoder) "modernbert", # (encoder only) "vit", # old (vision) - fixed comma "clip", # old but dominant (vision) "detr", # objection detection, segmentation (vision) "table-transformer", # objection detection (visioin) - maybe just detr? "got_ocr2", # ocr (vision) "whisper", # old but dominant (audio) "wav2vec2", # old (audio) "llama", # new and dominant (meta) "gemma3", # new (google) "qwen2", # new (Alibaba) "mistral3", # new (Mistral) - added missing comma "qwen2_5_vl", # new (vision) "llava", # many models from it (vision) "smolvlm", # new (video) "internvl", # new (video) "gemma3n", # new (omnimodal models) "qwen2_5_omni", # new (omnimodal models) ] def read_one_dataframe(json_path: str, device_label: str) -> pd.DataFrame: df = pd.read_json(json_path, orient="index") df.index.name = "model_name" df[f"failed_multi_no_{device_label}"] = df["failures"].apply(lambda x: len(x["multi"]) if "multi" in x else 0) df[f"failed_single_no_{device_label}"] = df["failures"].apply(lambda x: len(x["single"]) if "single" in x else 0) return df def get_distant_data() -> pd.DataFrame: # Retrieve AMD dataframe amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json" files_amd = sorted(fs.glob(amd_src), reverse=True) df_amd = read_one_dataframe(f"hf://{files_amd[0]}", "amd") # Retrieve NVIDIA dataframe nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/**/ci_results_run_models_gpu/model_results.json" files_nvidia = sorted(fs.glob(nvidia_src), reverse=True) # NOTE: should this be removeprefix instead of lstrip? nvidia_path = files_nvidia[0].lstrip('datasets/hf-internal-testing/transformers_daily_ci/') nvidia_path = "https://huggingface.co/datasets/hf-internal-testing/transformers_daily_ci/raw/main/" + nvidia_path df_nvidia = read_one_dataframe(nvidia_path, "nvidia") # Join both dataframes joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") joined = joined[ [ "success_amd", "success_nvidia", "failed_multi_no_amd", "failed_multi_no_nvidia", "failed_single_no_amd", "failed_single_no_nvidia", "failures_amd", "failures_nvidia", "job_link_amd", "job_link_nvidia", ] ] joined.index = joined.index.str.replace("^models_", "", regex=True) # Fitler out all but important models important_models_lower = [model.lower() for model in IMPORTANT_MODELS] filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] return filtered_joined def get_sample_data() -> pd.DataFrame: # Retrieve sample dataframes df_amd = read_one_dataframe("sample_amd.json", "amd") df_nvidia = read_one_dataframe("sample_nvidia.json", "nvidia") # Join both dataframes joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") joined = joined[ [ "success_amd", "success_nvidia", "failed_multi_no_amd", "failed_multi_no_nvidia", "failed_single_no_amd", "failed_single_no_nvidia", "failures_amd", "failures_nvidia", "job_link_amd", "job_link_nvidia", ] ] joined.index = joined.index.str.replace("^models_", "", regex=True) # Fitler out all but important models important_models_lower = [model.lower() for model in IMPORTANT_MODELS] filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] # Prefix all model names with "sample_" filtered_joined.index = "sample_" + filtered_joined.index return filtered_joined def extract_model_data(row: pd.Series) -> tuple[dict[str, int], dict[str, int], int, int, int, int]: """Extract and process model data from DataFrame row.""" # Handle missing values and get counts directly from dataframe success_amd = int(row.get('success_amd', 0)) if pd.notna(row.get('success_amd', 0)) else 0 success_nvidia = int(row.get('success_nvidia', 0)) if pd.notna(row.get('success_nvidia', 0)) else 0 failed_multi_amd = int(row.get('failed_multi_no_amd', 0)) if pd.notna(row.get('failed_multi_no_amd', 0)) else 0 failed_multi_nvidia = int(row.get('failed_multi_no_nvidia', 0)) if pd.notna(row.get('failed_multi_no_nvidia', 0)) else 0 failed_single_amd = int(row.get('failed_single_no_amd', 0)) if pd.notna(row.get('failed_single_no_amd', 0)) else 0 failed_single_nvidia = int(row.get('failed_single_no_nvidia', 0)) if pd.notna(row.get('failed_single_no_nvidia', 0)) else 0 # Calculate total failures total_failed_amd = failed_multi_amd + failed_single_amd total_failed_nvidia = failed_multi_nvidia + failed_single_nvidia # Create stats dictionaries directly from dataframe values amd_stats = { 'passed': success_amd, 'failed': total_failed_amd, 'skipped': 0, # Not available in this dataset 'error': 0 # Not available in this dataset } nvidia_stats = { 'passed': success_nvidia, 'failed': total_failed_nvidia, 'skipped': 0, # Not available in this dataset 'error': 0 # Not available in this dataset } return amd_stats, nvidia_stats, failed_multi_amd, failed_single_amd, failed_multi_nvidia, failed_single_nvidia class CIResults: def __init__(self): self.df = pd.DataFrame() self.available_models = [] self.last_update_time = "" def load_data(self) -> None: """Load data from the data source.""" # Try loading the distant data, and fall back on sample data for local tinkering try: logger.info("Loading distant data...") new_df = get_distant_data() except Exception as e: error_msg = [ "Loading data failed:", "-" * 120, traceback.format_exc(), "-" * 120, "Falling back on sample data." ] logger.error("\n".join(error_msg)) new_df = get_sample_data() # Update attributes self.df = new_df self.available_models = new_df.index.tolist() self.last_update_time = datetime.now().strftime('%H:%M') # Log and return distant load status logger.info(f"Data loaded successfully: {len(self.available_models)} models") logger.info(f"Models: {self.available_models[:5]}{'...' if len(self.available_models) > 5 else ''}") def schedule_data_reload(self): """Schedule the next data reload.""" def reload_data(): self.load_data() # Schedule the next reload in 15 minutes (900 seconds) timer = threading.Timer(900.0, reload_data) timer.daemon = True # Dies when main thread dies timer.start() logger.info("Next data reload scheduled in 15 minutes") # Start the first reload timer timer = threading.Timer(900.0, reload_data) timer.daemon = True timer.start() logger.info("Data auto-reload scheduled every 15 minutes")