Spaces:
Running
Running
from huggingface_hub import HfFileSystem | |
import pandas as pd | |
from utils import logger | |
from datetime import datetime | |
import threading | |
import traceback | |
import json | |
# NOTE: if caching is an issue, try adding `use_listings_cache=False` | |
fs = HfFileSystem() | |
IMPORTANT_MODELS = [ | |
"auto", | |
"bert", # old but dominant (encoder only) | |
"gpt2", # old (decoder) | |
"t5", # old (encoder-decoder) | |
"modernbert", # (encoder only) | |
"vit", # old (vision) - fixed comma | |
"clip", # old but dominant (vision) | |
"detr", # objection detection, segmentation (vision) | |
"table-transformer", # objection detection (visioin) - maybe just detr? | |
"got_ocr2", # ocr (vision) | |
"whisper", # old but dominant (audio) | |
"wav2vec2", # old (audio) | |
"llama", # new and dominant (meta) | |
"gemma3", # new (google) | |
"qwen2", # new (Alibaba) | |
"mistral3", # new (Mistral) - added missing comma | |
"qwen2_5_vl", # new (vision) | |
"llava", # many models from it (vision) | |
"smolvlm", # new (video) | |
"internvl", # new (video) | |
"gemma3n", # new (omnimodal models) | |
"qwen2_5_omni", # new (omnimodal models) | |
] | |
KEYS_TO_KEEP = [ | |
"success_amd", | |
"success_nvidia", | |
"skipped_amd", | |
"skipped_nvidia", | |
"failed_multi_no_amd", | |
"failed_multi_no_nvidia", | |
"failed_single_no_amd", | |
"failed_single_no_nvidia", | |
"failures_amd", | |
"failures_nvidia", | |
"job_link_amd", | |
"job_link_nvidia", | |
] | |
def read_one_dataframe(json_path: str, device_label: str) -> pd.DataFrame: | |
logger.info(f"Reading df located at {json_path}") | |
df = pd.read_json(json_path, orient="index") | |
df.index.name = "model_name" | |
df[f"failed_multi_no_{device_label}"] = df["failures"].apply(lambda x: len(x["multi"]) if "multi" in x else 0) | |
df[f"failed_single_no_{device_label}"] = df["failures"].apply(lambda x: len(x["single"]) if "single" in x else 0) | |
return df | |
def get_distant_data() -> pd.DataFrame: | |
# Retrieve AMD dataframe | |
amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json" | |
files_amd = sorted(fs.glob(amd_src), reverse=True) | |
df_amd = read_one_dataframe(f"hf://{files_amd[0]}", "amd") | |
# Retrieve NVIDIA dataframe | |
nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/**/ci_results_run_models_gpu/model_results.json" | |
files_nvidia = sorted(fs.glob(nvidia_src), reverse=True) | |
# NOTE: should this be removeprefix instead of lstrip? | |
nvidia_path = files_nvidia[0].lstrip('datasets/hf-internal-testing/transformers_daily_ci/') | |
nvidia_path = "https://huggingface.co/datasets/hf-internal-testing/transformers_daily_ci/raw/main/" + nvidia_path | |
df_nvidia = read_one_dataframe(nvidia_path, "nvidia") | |
# Join both dataframes | |
joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") | |
joined = joined[KEYS_TO_KEEP] | |
joined.index = joined.index.str.replace("^models_", "", regex=True) | |
# Fitler out all but important models | |
important_models_lower = [model.lower() for model in IMPORTANT_MODELS] | |
filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] | |
# Warn for ach missing important models | |
for model in IMPORTANT_MODELS: | |
if model not in filtered_joined.index: | |
print(f"[WARNING] Model {model} was missing from index.") | |
return filtered_joined | |
def get_sample_data() -> pd.DataFrame: | |
# Retrieve sample dataframes | |
df_amd = read_one_dataframe("sample_amd.json", "amd") | |
df_nvidia = read_one_dataframe("sample_nvidia.json", "nvidia") | |
# Join both dataframes | |
joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") | |
joined = joined[KEYS_TO_KEEP] | |
joined.index = joined.index.str.replace("^models_", "", regex=True) | |
# Fitler out all but important models | |
important_models_lower = [model.lower() for model in IMPORTANT_MODELS] | |
filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] | |
# Prefix all model names with "sample_" | |
filtered_joined.index = "sample_" + filtered_joined.index | |
return filtered_joined | |
def safe_extract(row: pd.DataFrame, key: str) -> int: | |
return int(row.get(key, 0)) if pd.notna(row.get(key, 0)) else 0 | |
def extract_model_data(row: pd.Series) -> tuple[dict[str, int], dict[str, int], int, int, int, int]: | |
"""Extract and process model data from DataFrame row.""" | |
# Handle missing values and get counts directly from dataframe | |
success_nvidia = safe_extract(row, "success_nvidia") | |
success_amd = safe_extract(row, "success_amd") | |
skipped_nvidia = safe_extract(row, "skipped_nvidia") | |
skipped_amd = safe_extract(row, "skipped_amd") | |
failed_multi_amd = safe_extract(row, 'failed_multi_no_amd') | |
failed_multi_nvidia = safe_extract(row, 'failed_multi_no_nvidia') | |
failed_single_amd = safe_extract(row, 'failed_single_no_amd') | |
failed_single_nvidia = safe_extract(row, 'failed_single_no_nvidia') | |
# Calculate total failures | |
total_failed_amd = failed_multi_amd + failed_single_amd | |
total_failed_nvidia = failed_multi_nvidia + failed_single_nvidia | |
# Create stats dictionaries directly from dataframe values | |
amd_stats = { | |
'passed': success_amd, | |
'failed': total_failed_amd, | |
'skipped': skipped_amd, | |
'error': 0 # Not available in this dataset | |
} | |
nvidia_stats = { | |
'passed': success_nvidia, | |
'failed': total_failed_nvidia, | |
'skipped': skipped_nvidia, | |
'error': 0 # Not available in this dataset | |
} | |
return amd_stats, nvidia_stats, failed_multi_amd, failed_single_amd, failed_multi_nvidia, failed_single_nvidia | |
class CIResults: | |
def __init__(self): | |
self.df = pd.DataFrame() | |
self.available_models = [] | |
self.last_update_time = "" | |
def load_data(self) -> None: | |
"""Load data from the data source.""" | |
# Try loading the distant data, and fall back on sample data for local tinkering | |
try: | |
logger.info("Loading distant data...") | |
new_df = get_distant_data() | |
except Exception as e: | |
error_msg = [ | |
"Loading data failed:", | |
"-" * 120, | |
traceback.format_exc(), | |
"-" * 120, | |
"Falling back on sample data." | |
] | |
logger.error("\n".join(error_msg)) | |
new_df = get_sample_data() | |
# Update attributes | |
self.df = new_df | |
self.available_models = new_df.index.tolist() | |
self.last_update_time = datetime.now().strftime('%H:%M') | |
# Log and return distant load status | |
logger.info(f"Data loaded successfully: {len(self.available_models)} models") | |
logger.info(f"Models: {self.available_models[:5]}{'...' if len(self.available_models) > 5 else ''}") | |
logger.info(f"Last update: {self.last_update_time}") | |
# Log a preview of the df | |
msg = {} | |
for model in self.available_models[:3]: | |
msg[model] = {} | |
for col in self.df.columns: | |
value = self.df.loc[model, col] | |
if not isinstance(value, int): | |
value = str(value) | |
if len(value) > 10: | |
value = value[:10] + "..." | |
msg[model][col] = value | |
logger.info(json.dumps(msg, indent=4)) | |
def schedule_data_reload(self): | |
"""Schedule the next data reload.""" | |
def reload_data(): | |
self.load_data() | |
# Schedule the next reload in 15 minutes (900 seconds) | |
timer = threading.Timer(900.0, reload_data) | |
timer.daemon = True # Dies when main thread dies | |
timer.start() | |
logger.info("Next data reload scheduled in 15 minutes") | |
# Start the first reload timer | |
timer = threading.Timer(900.0, reload_data) | |
timer.daemon = True | |
timer.start() | |
logger.info("Data auto-reload scheduled every 15 minutes") | |