|
import gradio as gr |
|
import pandas as pd |
|
from datasets import load_dataset |
|
from datetime import datetime, date |
|
import numpy as np |
|
from functools import lru_cache |
|
|
|
|
|
@lru_cache(maxsize=1) |
|
def load_trending_models_data(): |
|
"""Load the trending models dataset from Hugging Face""" |
|
try: |
|
print("Loading dataset from hf-azure-internal/trending-models-analysis...") |
|
|
|
|
|
dataset_info = load_dataset("hf-azure-internal/trending-models-analysis") |
|
print(f"Available splits: {list(dataset_info.keys())}") |
|
|
|
|
|
if "models" in dataset_info: |
|
print("Using 'models' split...") |
|
dataset = dataset_info["models"] |
|
elif "train" in dataset_info: |
|
print("Using 'train' split...") |
|
dataset = dataset_info["train"] |
|
else: |
|
|
|
split_name = list(dataset_info.keys())[0] |
|
print(f"Using '{split_name}' split...") |
|
dataset = dataset_info[split_name] |
|
|
|
print(f"Dataset loaded. Type: {type(dataset)}") |
|
|
|
df = dataset.to_pandas() |
|
print(f"Converted to pandas. Shape: {df.shape}") |
|
print(f"Columns: {list(df.columns)}") |
|
|
|
|
|
if 'collected_at' in df.columns: |
|
print(f"collected_at column found. Sample values:") |
|
print(df['collected_at'].head(3).tolist()) |
|
df['collected_at'] = pd.to_datetime(df['collected_at']) |
|
print(f"After conversion, dtype: {df['collected_at'].dtype}") |
|
|
|
|
|
unique_dates = df['collected_at'].dt.date.unique() |
|
print(f"Unique dates in dataset: {sorted(unique_dates)}") |
|
else: |
|
print("No 'collected_at' column found!") |
|
|
|
return df |
|
except Exception as e: |
|
print(f"Error loading dataset: {e}") |
|
|
|
return pd.DataFrame(columns=[ |
|
'id', 'trending_rank', 'author', 'tags', 'license', 'library_name', |
|
'gated', 'task', 'is_in_catalog', 'is_custom_code', 'is_excluded_org', |
|
'is_supported_license', 'is_supported_library', 'is_safetensors', |
|
'is_supported_task', 'is_securely_scanned', 'collected_at', 'model_status' |
|
]) |
|
|
|
def clear_data_cache(): |
|
"""Clear the cached dataset to force a reload""" |
|
load_trending_models_data.cache_clear() |
|
print("Dataset cache cleared!") |
|
|
|
def load_trending_models_data_fresh(): |
|
"""Load fresh data by clearing cache first""" |
|
clear_data_cache() |
|
return load_trending_models_data() |
|
|
|
def get_status_emoji(value): |
|
"""Convert boolean values to emoji indicators""" |
|
if pd.isna(value): |
|
return "β" |
|
return "π’" if value else "π΄" |
|
|
|
def get_negative_status_emoji(value): |
|
"""Convert boolean values to emoji indicators where True is bad (red) and False is good (green)""" |
|
if pd.isna(value): |
|
return "β" |
|
return "π΄" if value else "π’" |
|
|
|
def get_status_with_text(value, text_value=None): |
|
"""Convert boolean values to emoji indicators with optional text""" |
|
if pd.isna(value): |
|
emoji = "β" |
|
else: |
|
emoji = "π’" if value else "π΄" |
|
|
|
|
|
if text_value is not None: |
|
|
|
if hasattr(text_value, '__len__') and hasattr(text_value, 'size'): |
|
|
|
if text_value.size == 0: |
|
text_value = None |
|
elif text_value.size == 1: |
|
text_value = text_value.item() if hasattr(text_value, 'item') else text_value[0] |
|
|
|
|
|
if text_value is not None and not pd.isna(text_value) and str(text_value).strip(): |
|
return f"{emoji} {text_value}" |
|
|
|
return emoji |
|
|
|
def get_negative_status_with_text(value, text_value=None): |
|
"""Convert boolean values to emoji indicators where True is bad (red) and False is good (green), with optional text""" |
|
if pd.isna(value): |
|
emoji = "β" |
|
else: |
|
emoji = "π΄" if value else "π’" |
|
|
|
if text_value and not pd.isna(text_value): |
|
return f"{emoji} {text_value}" |
|
else: |
|
return emoji |
|
|
|
def create_clickable_model_id(model_id): |
|
"""Convert model ID to clickable link""" |
|
if pd.isna(model_id) or not model_id: |
|
return "" |
|
return f'<a href="https://hf.co/{model_id}" target="_blank" style="text-decoration: underline; color: #0066cc;">{model_id}</a>' |
|
|
|
def get_status_with_color(status): |
|
"""Add color coding to status values""" |
|
if pd.isna(status) or not status: |
|
return "" |
|
|
|
status_lower = str(status).lower() |
|
if status_lower == "to add": |
|
return f'<span style="color: #0066ff; font-weight: bold; background-color: #e6f3ff; padding: 2px 6px; border-radius: 4px;">{status}</span>' |
|
elif status_lower == "added": |
|
return f'<span style="color: #00aa00; font-weight: bold; background-color: #e6ffe6; padding: 2px 6px; border-radius: 4px;">{status}</span>' |
|
elif status_lower == "blocked": |
|
return f'<span style="color: #cc0000; font-weight: bold; background-color: #ffe6e6; padding: 2px 6px; border-radius: 4px;">{status}</span>' |
|
else: |
|
return f'<span style="padding: 2px 6px; border-radius: 4px;">{status}</span>' |
|
|
|
def create_display_dataframe(df, selected_date): |
|
"""Create a DataFrame for display""" |
|
if df.empty: |
|
return pd.DataFrame() |
|
|
|
|
|
filtered_df = df.copy() |
|
if selected_date and 'collected_at' in df.columns: |
|
|
|
if isinstance(selected_date, str): |
|
target_date = pd.to_datetime(selected_date).date() |
|
elif hasattr(selected_date, 'date'): |
|
target_date = selected_date.date() |
|
else: |
|
target_date = selected_date |
|
|
|
|
|
filtered_df = filtered_df[filtered_df['collected_at'].dt.date == target_date] |
|
|
|
if filtered_df.empty: |
|
return pd.DataFrame() |
|
|
|
|
|
display_df = filtered_df[['trending_rank', 'id', 'is_custom_code', 'is_excluded_org', |
|
'is_supported_license', 'is_supported_library', 'is_safetensors', |
|
'is_supported_task', 'is_securely_scanned', 'model_status']].copy() |
|
|
|
|
|
display_df['Custom Code'] = filtered_df['is_custom_code'].apply(get_negative_status_emoji) |
|
display_df['Excluded Org'] = filtered_df.apply(lambda row: get_negative_status_with_text(row['is_excluded_org'], row.get('author')), axis=1) |
|
display_df['Supported License'] = filtered_df.apply(lambda row: get_status_with_text(row['is_supported_license'], row.get('license')), axis=1) |
|
display_df['Supported Library'] = filtered_df.apply(lambda row: get_status_with_text(row['is_supported_library'], row.get('library_name')), axis=1) |
|
display_df['Safetensors'] = filtered_df['is_safetensors'].apply(get_status_emoji) |
|
display_df['Supported Task'] = filtered_df.apply(lambda row: get_status_with_text(row['is_supported_task'], row.get('task')), axis=1) |
|
display_df['Security Check'] = filtered_df['is_securely_scanned'].apply(get_status_emoji) |
|
|
|
|
|
display_df['Model ID'] = filtered_df['id'].apply(create_clickable_model_id) |
|
display_df['Status'] = filtered_df['model_status'].apply(get_status_with_color) |
|
|
|
|
|
display_df = display_df.rename(columns={ |
|
'trending_rank': 'Rank' |
|
}) |
|
|
|
|
|
final_columns = ['Rank', 'Model ID', 'Custom Code', 'Excluded Org', 'Supported License', |
|
'Supported Library', 'Safetensors', 'Supported Task', 'Security Check', 'Status'] |
|
display_df = display_df[final_columns] |
|
|
|
|
|
display_df = display_df.sort_values('Rank').reset_index(drop=True) |
|
|
|
return display_df |
|
|
|
def update_dashboard(selected_date, use_fresh_data=False): |
|
"""Update the dashboard based on user selections""" |
|
if use_fresh_data: |
|
df = load_trending_models_data_fresh() |
|
else: |
|
df = load_trending_models_data() |
|
display_df = create_display_dataframe(df, selected_date) |
|
return display_df |
|
|
|
def get_available_dates(): |
|
"""Get list of available dates from the dataset""" |
|
df = load_trending_models_data() |
|
if df.empty or 'collected_at' not in df.columns: |
|
return [], None, None |
|
|
|
dates = df['collected_at'].dt.date.unique() |
|
valid_dates = sorted([d for d in dates if pd.notna(d)], reverse=True) |
|
|
|
if not valid_dates: |
|
return [], None, None |
|
|
|
return valid_dates, valid_dates[-1], valid_dates[0] |
|
|
|
def get_available_dates_fresh(): |
|
"""Get list of available dates from fresh dataset (clears cache first)""" |
|
df = load_trending_models_data_fresh() |
|
if df.empty or 'collected_at' not in df.columns: |
|
return [], None, None |
|
|
|
dates = df['collected_at'].dt.date.unique() |
|
valid_dates = sorted([d for d in dates if pd.notna(d)], reverse=True) |
|
|
|
if not valid_dates: |
|
return [], None, None |
|
|
|
return valid_dates, valid_dates[-1], valid_dates[0] |
|
|
|
|
|
def create_interface(): |
|
|
|
custom_css = """ |
|
.dataframe-container { |
|
border-radius: 12px; |
|
overflow: hidden; |
|
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); |
|
} |
|
|
|
.info-text { |
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
color: white; |
|
padding: 12px 16px; |
|
border-radius: 8px; |
|
text-align: center; |
|
font-weight: 500; |
|
margin: 8px 0; |
|
} |
|
""" |
|
|
|
with gr.Blocks(title="Trending Models Dashboard", theme=gr.themes.Soft(), css=custom_css) as demo: |
|
gr.Markdown(""" |
|
# Trending Models Support Dashboard |
|
**Data Source:** [hf-azure-internal/trending-models-analysis](https://huggingface.co/datasets/hf-azure-internal/trending-models-analysis) |
|
""") |
|
|
|
|
|
available_dates, min_date, max_date = get_available_dates() |
|
|
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
date_picker = gr.Textbox( |
|
value=str(max_date) if max_date else "", |
|
label="π
Date Selection", |
|
placeholder="2025-01-21", |
|
info="Enter date in YYYY-MM-DD format" |
|
) |
|
|
|
with gr.Column(scale=1): |
|
refresh_btn = gr.Button("π Refresh Data", variant="primary", size="lg") |
|
|
|
|
|
with gr.Row(): |
|
dataframe_display = gr.Dataframe( |
|
label="π Trending Models Overview", |
|
interactive=False, |
|
wrap=True, |
|
elem_classes=["dataframe-container"], |
|
datatype=["number", "html", "str", "str", "str", "str", "str", "str", "str", "html"] |
|
) |
|
|
|
|
|
def update_dashboard_wrapper(selected_date_text): |
|
"""Wrapper to handle the dashboard update""" |
|
selected_date = None |
|
if selected_date_text: |
|
try: |
|
selected_date = pd.to_datetime(selected_date_text).date() |
|
except Exception as e: |
|
print(f"Date conversion error: {e}, value: {selected_date_text}") |
|
selected_date = None |
|
|
|
return update_dashboard(selected_date) |
|
|
|
|
|
date_picker.change( |
|
fn=update_dashboard_wrapper, |
|
inputs=[date_picker], |
|
outputs=[dataframe_display] |
|
) |
|
|
|
def refresh_data(selected_date_text): |
|
"""Refresh data and update dashboard""" |
|
print("Refreshing data - clearing cache and reloading dataset...") |
|
available_dates, _, max_date = get_available_dates_fresh() |
|
|
|
selected_date = max_date |
|
if selected_date_text: |
|
try: |
|
selected_date = pd.to_datetime(selected_date_text).date() |
|
except Exception as e: |
|
print(f"Date conversion error in refresh: {e}, value: {selected_date_text}") |
|
selected_date = max_date |
|
|
|
display_df = update_dashboard(selected_date, use_fresh_data=True) |
|
return ( |
|
str(max_date) if max_date else "", |
|
display_df |
|
) |
|
|
|
refresh_btn.click( |
|
fn=refresh_data, |
|
inputs=[date_picker], |
|
outputs=[date_picker, dataframe_display] |
|
) |
|
|
|
|
|
demo.load( |
|
fn=update_dashboard_wrapper, |
|
inputs=[date_picker], |
|
outputs=[dataframe_display] |
|
) |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
demo = create_interface() |
|
demo.launch() |