pagezyhf's picture
pagezyhf HF Staff
data refresh
bfb9a2f
import gradio as gr
import pandas as pd
from datasets import load_dataset
from datetime import datetime, date
import numpy as np
from functools import lru_cache
# Load the dataset
@lru_cache(maxsize=1)
def load_trending_models_data():
"""Load the trending models dataset from Hugging Face"""
try:
print("Loading dataset from hf-azure-internal/trending-models-analysis...")
# First, check what splits are available
dataset_info = load_dataset("hf-azure-internal/trending-models-analysis")
print(f"Available splits: {list(dataset_info.keys())}")
# Try to load the correct split
if "models" in dataset_info:
print("Using 'models' split...")
dataset = dataset_info["models"]
elif "train" in dataset_info:
print("Using 'train' split...")
dataset = dataset_info["train"]
else:
# Fallback to first available split
split_name = list(dataset_info.keys())[0]
print(f"Using '{split_name}' split...")
dataset = dataset_info[split_name]
print(f"Dataset loaded. Type: {type(dataset)}")
df = dataset.to_pandas()
print(f"Converted to pandas. Shape: {df.shape}")
print(f"Columns: {list(df.columns)}")
# Convert collected_at to datetime if it's not already
if 'collected_at' in df.columns:
print(f"collected_at column found. Sample values:")
print(df['collected_at'].head(3).tolist())
df['collected_at'] = pd.to_datetime(df['collected_at'])
print(f"After conversion, dtype: {df['collected_at'].dtype}")
# Show unique dates
unique_dates = df['collected_at'].dt.date.unique()
print(f"Unique dates in dataset: {sorted(unique_dates)}")
else:
print("No 'collected_at' column found!")
return df
except Exception as e:
print(f"Error loading dataset: {e}")
# Return empty dataframe with expected columns for development
return pd.DataFrame(columns=[
'id', 'trending_rank', 'author', 'tags', 'license', 'library_name',
'gated', 'task', 'is_in_catalog', 'is_custom_code', 'is_excluded_org',
'is_supported_license', 'is_supported_library', 'is_safetensors',
'is_supported_task', 'is_securely_scanned', 'collected_at', 'model_status'
])
def clear_data_cache():
"""Clear the cached dataset to force a reload"""
load_trending_models_data.cache_clear()
print("Dataset cache cleared!")
def load_trending_models_data_fresh():
"""Load fresh data by clearing cache first"""
clear_data_cache()
return load_trending_models_data()
def get_status_emoji(value):
"""Convert boolean values to emoji indicators"""
if pd.isna(value):
return "❓"
return "🟒" if value else "πŸ”΄"
def get_negative_status_emoji(value):
"""Convert boolean values to emoji indicators where True is bad (red) and False is good (green)"""
if pd.isna(value):
return "❓"
return "πŸ”΄" if value else "🟒"
def get_status_with_text(value, text_value=None):
"""Convert boolean values to emoji indicators with optional text"""
if pd.isna(value):
emoji = "❓"
else:
emoji = "🟒" if value else "πŸ”΄"
# Handle arrays and None values properly
if text_value is not None:
# Convert arrays to scalar if needed
if hasattr(text_value, '__len__') and hasattr(text_value, 'size'):
# It's likely a numpy array or similar
if text_value.size == 0:
text_value = None
elif text_value.size == 1:
text_value = text_value.item() if hasattr(text_value, 'item') else text_value[0]
# Now check if we have a valid text value
if text_value is not None and not pd.isna(text_value) and str(text_value).strip():
return f"{emoji} {text_value}"
return emoji
def get_negative_status_with_text(value, text_value=None):
"""Convert boolean values to emoji indicators where True is bad (red) and False is good (green), with optional text"""
if pd.isna(value):
emoji = "❓"
else:
emoji = "πŸ”΄" if value else "🟒"
if text_value and not pd.isna(text_value):
return f"{emoji} {text_value}"
else:
return emoji
def create_clickable_model_id(model_id):
"""Convert model ID to clickable link"""
if pd.isna(model_id) or not model_id:
return ""
return f'<a href="https://hf.co/{model_id}" target="_blank" style="text-decoration: underline; color: #0066cc;">{model_id}</a>'
def get_status_with_color(status):
"""Add color coding to status values"""
if pd.isna(status) or not status:
return ""
status_lower = str(status).lower()
if status_lower == "to add":
return f'<span style="color: #0066ff; font-weight: bold; background-color: #e6f3ff; padding: 2px 6px; border-radius: 4px;">{status}</span>'
elif status_lower == "added":
return f'<span style="color: #00aa00; font-weight: bold; background-color: #e6ffe6; padding: 2px 6px; border-radius: 4px;">{status}</span>'
elif status_lower == "blocked":
return f'<span style="color: #cc0000; font-weight: bold; background-color: #ffe6e6; padding: 2px 6px; border-radius: 4px;">{status}</span>'
else:
return f'<span style="padding: 2px 6px; border-radius: 4px;">{status}</span>'
def create_display_dataframe(df, selected_date):
"""Create a DataFrame for display"""
if df.empty:
return pd.DataFrame()
# Filter by date if specified
filtered_df = df.copy()
if selected_date and 'collected_at' in df.columns:
# Convert selected_date to just the date part for comparison
if isinstance(selected_date, str):
target_date = pd.to_datetime(selected_date).date()
elif hasattr(selected_date, 'date'):
target_date = selected_date.date()
else:
target_date = selected_date
# Filter by comparing just the date parts (ignoring time)
filtered_df = filtered_df[filtered_df['collected_at'].dt.date == target_date]
if filtered_df.empty:
return pd.DataFrame()
# Create display dataframe with key columns
display_df = filtered_df[['trending_rank', 'id', 'is_custom_code', 'is_excluded_org',
'is_supported_license', 'is_supported_library', 'is_safetensors',
'is_supported_task', 'is_securely_scanned', 'model_status']].copy()
# Convert boolean columns to emojis for better display
display_df['Custom Code'] = filtered_df['is_custom_code'].apply(get_negative_status_emoji)
display_df['Excluded Org'] = filtered_df.apply(lambda row: get_negative_status_with_text(row['is_excluded_org'], row.get('author')), axis=1)
display_df['Supported License'] = filtered_df.apply(lambda row: get_status_with_text(row['is_supported_license'], row.get('license')), axis=1)
display_df['Supported Library'] = filtered_df.apply(lambda row: get_status_with_text(row['is_supported_library'], row.get('library_name')), axis=1)
display_df['Safetensors'] = filtered_df['is_safetensors'].apply(get_status_emoji)
display_df['Supported Task'] = filtered_df.apply(lambda row: get_status_with_text(row['is_supported_task'], row.get('task')), axis=1)
display_df['Security Check'] = filtered_df['is_securely_scanned'].apply(get_status_emoji)
# Create clickable model IDs and colored status
display_df['Model ID'] = filtered_df['id'].apply(create_clickable_model_id)
display_df['Status'] = filtered_df['model_status'].apply(get_status_with_color)
# Rename and reorder columns
display_df = display_df.rename(columns={
'trending_rank': 'Rank'
})
# Select final columns for display
final_columns = ['Rank', 'Model ID', 'Custom Code', 'Excluded Org', 'Supported License',
'Supported Library', 'Safetensors', 'Supported Task', 'Security Check', 'Status']
display_df = display_df[final_columns]
# Sort by rank and reset index to get clean row indices
display_df = display_df.sort_values('Rank').reset_index(drop=True)
return display_df
def update_dashboard(selected_date, use_fresh_data=False):
"""Update the dashboard based on user selections"""
if use_fresh_data:
df = load_trending_models_data_fresh()
else:
df = load_trending_models_data()
display_df = create_display_dataframe(df, selected_date)
return display_df
def get_available_dates():
"""Get list of available dates from the dataset"""
df = load_trending_models_data()
if df.empty or 'collected_at' not in df.columns:
return [], None, None
dates = df['collected_at'].dt.date.unique()
valid_dates = sorted([d for d in dates if pd.notna(d)], reverse=True)
if not valid_dates:
return [], None, None
return valid_dates, valid_dates[-1], valid_dates[0] # all_dates, min_date, max_date
def get_available_dates_fresh():
"""Get list of available dates from fresh dataset (clears cache first)"""
df = load_trending_models_data_fresh()
if df.empty or 'collected_at' not in df.columns:
return [], None, None
dates = df['collected_at'].dt.date.unique()
valid_dates = sorted([d for d in dates if pd.notna(d)], reverse=True)
if not valid_dates:
return [], None, None
return valid_dates, valid_dates[-1], valid_dates[0] # all_dates, min_date, max_date
# Create the Gradio interface
def create_interface():
# Custom CSS for enhanced styling
custom_css = """
.dataframe-container {
border-radius: 12px;
overflow: hidden;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
}
.info-text {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 12px 16px;
border-radius: 8px;
text-align: center;
font-weight: 500;
margin: 8px 0;
}
"""
with gr.Blocks(title="Trending Models Dashboard", theme=gr.themes.Soft(), css=custom_css) as demo:
gr.Markdown("""
# Trending Models Support Dashboard
**Data Source:** [hf-azure-internal/trending-models-analysis](https://huggingface.co/datasets/hf-azure-internal/trending-models-analysis)
""")
# Get date information
available_dates, min_date, max_date = get_available_dates()
# Controls row at the top
with gr.Row():
with gr.Column(scale=1):
date_picker = gr.Textbox(
value=str(max_date) if max_date else "",
label="πŸ“… Date Selection",
placeholder="2025-01-21",
info="Enter date in YYYY-MM-DD format"
)
with gr.Column(scale=1):
refresh_btn = gr.Button("πŸ”„ Refresh Data", variant="primary", size="lg")
# Main dataframe display
with gr.Row():
dataframe_display = gr.Dataframe(
label="πŸ“Š Trending Models Overview",
interactive=False,
wrap=True,
elem_classes=["dataframe-container"],
datatype=["number", "html", "str", "str", "str", "str", "str", "str", "str", "html"]
)
# Event handlers
def update_dashboard_wrapper(selected_date_text):
"""Wrapper to handle the dashboard update"""
selected_date = None
if selected_date_text:
try:
selected_date = pd.to_datetime(selected_date_text).date()
except Exception as e:
print(f"Date conversion error: {e}, value: {selected_date_text}")
selected_date = None
return update_dashboard(selected_date)
# Wire up events
date_picker.change(
fn=update_dashboard_wrapper,
inputs=[date_picker],
outputs=[dataframe_display]
)
def refresh_data(selected_date_text):
"""Refresh data and update dashboard"""
print("Refreshing data - clearing cache and reloading dataset...")
available_dates, _, max_date = get_available_dates_fresh()
selected_date = max_date
if selected_date_text:
try:
selected_date = pd.to_datetime(selected_date_text).date()
except Exception as e:
print(f"Date conversion error in refresh: {e}, value: {selected_date_text}")
selected_date = max_date
display_df = update_dashboard(selected_date, use_fresh_data=True)
return (
str(max_date) if max_date else "",
display_df
)
refresh_btn.click(
fn=refresh_data,
inputs=[date_picker],
outputs=[date_picker, dataframe_display]
)
# Load initial data
demo.load(
fn=update_dashboard_wrapper,
inputs=[date_picker],
outputs=[dataframe_display]
)
return demo
# Launch the app
if __name__ == "__main__":
demo = create_interface()
demo.launch()