import gradio as gr import pandas as pd from datasets import load_dataset from datetime import datetime, date import numpy as np from functools import lru_cache # Load the dataset @lru_cache(maxsize=1) def load_trending_models_data(): """Load the trending models dataset from Hugging Face""" try: print("Loading dataset from hf-azure-internal/trending-models-analysis...") # First, check what splits are available dataset_info = load_dataset("hf-azure-internal/trending-models-analysis") print(f"Available splits: {list(dataset_info.keys())}") # Try to load the correct split if "models" in dataset_info: print("Using 'models' split...") dataset = dataset_info["models"] elif "train" in dataset_info: print("Using 'train' split...") dataset = dataset_info["train"] else: # Fallback to first available split split_name = list(dataset_info.keys())[0] print(f"Using '{split_name}' split...") dataset = dataset_info[split_name] print(f"Dataset loaded. Type: {type(dataset)}") df = dataset.to_pandas() print(f"Converted to pandas. Shape: {df.shape}") print(f"Columns: {list(df.columns)}") # Convert collected_at to datetime if it's not already if 'collected_at' in df.columns: print(f"collected_at column found. Sample values:") print(df['collected_at'].head(3).tolist()) df['collected_at'] = pd.to_datetime(df['collected_at']) print(f"After conversion, dtype: {df['collected_at'].dtype}") # Show unique dates unique_dates = df['collected_at'].dt.date.unique() print(f"Unique dates in dataset: {sorted(unique_dates)}") else: print("No 'collected_at' column found!") return df except Exception as e: print(f"Error loading dataset: {e}") # Return empty dataframe with expected columns for development return pd.DataFrame(columns=[ 'id', 'trending_rank', 'author', 'tags', 'license', 'library_name', 'gated', 'task', 'is_in_catalog', 'is_custom_code', 'is_excluded_org', 'is_supported_license', 'is_supported_library', 'is_safetensors', 'is_supported_task', 'is_securely_scanned', 'collected_at', 'model_status' ]) def clear_data_cache(): """Clear the cached dataset to force a reload""" load_trending_models_data.cache_clear() print("Dataset cache cleared!") def load_trending_models_data_fresh(): """Load fresh data by clearing cache first""" clear_data_cache() return load_trending_models_data() def get_status_emoji(value): """Convert boolean values to emoji indicators""" if pd.isna(value): return "❓" return "🟢" if value else "🔴" def get_negative_status_emoji(value): """Convert boolean values to emoji indicators where True is bad (red) and False is good (green)""" if pd.isna(value): return "❓" return "🔴" if value else "🟢" def get_status_with_text(value, text_value=None): """Convert boolean values to emoji indicators with optional text""" if pd.isna(value): emoji = "❓" else: emoji = "🟢" if value else "🔴" # Handle arrays and None values properly if text_value is not None: # Convert arrays to scalar if needed if hasattr(text_value, '__len__') and hasattr(text_value, 'size'): # It's likely a numpy array or similar if text_value.size == 0: text_value = None elif text_value.size == 1: text_value = text_value.item() if hasattr(text_value, 'item') else text_value[0] # Now check if we have a valid text value if text_value is not None and not pd.isna(text_value) and str(text_value).strip(): return f"{emoji} {text_value}" return emoji def get_negative_status_with_text(value, text_value=None): """Convert boolean values to emoji indicators where True is bad (red) and False is good (green), with optional text""" if pd.isna(value): emoji = "❓" else: emoji = "🔴" if value else "🟢" if text_value and not pd.isna(text_value): return f"{emoji} {text_value}" else: return emoji def create_clickable_model_id(model_id): """Convert model ID to clickable link""" if pd.isna(model_id) or not model_id: return "" return f'{model_id}' def get_status_with_color(status): """Add color coding to status values""" if pd.isna(status) or not status: return "" status_lower = str(status).lower() if status_lower == "to add": return f'{status}' elif status_lower == "added": return f'{status}' elif status_lower == "blocked": return f'{status}' else: return f'{status}' def create_display_dataframe(df, selected_date): """Create a DataFrame for display""" if df.empty: return pd.DataFrame() # Filter by date if specified filtered_df = df.copy() if selected_date and 'collected_at' in df.columns: # Convert selected_date to just the date part for comparison if isinstance(selected_date, str): target_date = pd.to_datetime(selected_date).date() elif hasattr(selected_date, 'date'): target_date = selected_date.date() else: target_date = selected_date # Filter by comparing just the date parts (ignoring time) filtered_df = filtered_df[filtered_df['collected_at'].dt.date == target_date] if filtered_df.empty: return pd.DataFrame() # Create display dataframe with key columns display_df = filtered_df[['trending_rank', 'id', 'is_custom_code', 'is_excluded_org', 'is_supported_license', 'is_supported_library', 'is_safetensors', 'is_supported_task', 'is_securely_scanned', 'model_status']].copy() # Convert boolean columns to emojis for better display display_df['Custom Code'] = filtered_df['is_custom_code'].apply(get_negative_status_emoji) display_df['Excluded Org'] = filtered_df.apply(lambda row: get_negative_status_with_text(row['is_excluded_org'], row.get('author')), axis=1) display_df['Supported License'] = filtered_df.apply(lambda row: get_status_with_text(row['is_supported_license'], row.get('license')), axis=1) display_df['Supported Library'] = filtered_df.apply(lambda row: get_status_with_text(row['is_supported_library'], row.get('library_name')), axis=1) display_df['Safetensors'] = filtered_df['is_safetensors'].apply(get_status_emoji) display_df['Supported Task'] = filtered_df.apply(lambda row: get_status_with_text(row['is_supported_task'], row.get('task')), axis=1) display_df['Security Check'] = filtered_df['is_securely_scanned'].apply(get_status_emoji) # Create clickable model IDs and colored status display_df['Model ID'] = filtered_df['id'].apply(create_clickable_model_id) display_df['Status'] = filtered_df['model_status'].apply(get_status_with_color) # Rename and reorder columns display_df = display_df.rename(columns={ 'trending_rank': 'Rank' }) # Select final columns for display final_columns = ['Rank', 'Model ID', 'Custom Code', 'Excluded Org', 'Supported License', 'Supported Library', 'Safetensors', 'Supported Task', 'Security Check', 'Status'] display_df = display_df[final_columns] # Sort by rank and reset index to get clean row indices display_df = display_df.sort_values('Rank').reset_index(drop=True) return display_df def update_dashboard(selected_date, use_fresh_data=False): """Update the dashboard based on user selections""" if use_fresh_data: df = load_trending_models_data_fresh() else: df = load_trending_models_data() display_df = create_display_dataframe(df, selected_date) return display_df def get_available_dates(): """Get list of available dates from the dataset""" df = load_trending_models_data() if df.empty or 'collected_at' not in df.columns: return [], None, None dates = df['collected_at'].dt.date.unique() valid_dates = sorted([d for d in dates if pd.notna(d)], reverse=True) if not valid_dates: return [], None, None return valid_dates, valid_dates[-1], valid_dates[0] # all_dates, min_date, max_date def get_available_dates_fresh(): """Get list of available dates from fresh dataset (clears cache first)""" df = load_trending_models_data_fresh() if df.empty or 'collected_at' not in df.columns: return [], None, None dates = df['collected_at'].dt.date.unique() valid_dates = sorted([d for d in dates if pd.notna(d)], reverse=True) if not valid_dates: return [], None, None return valid_dates, valid_dates[-1], valid_dates[0] # all_dates, min_date, max_date # Create the Gradio interface def create_interface(): # Custom CSS for enhanced styling custom_css = """ .dataframe-container { border-radius: 12px; overflow: hidden; box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); } .info-text { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 12px 16px; border-radius: 8px; text-align: center; font-weight: 500; margin: 8px 0; } """ with gr.Blocks(title="Trending Models Dashboard", theme=gr.themes.Soft(), css=custom_css) as demo: gr.Markdown(""" # Trending Models Support Dashboard **Data Source:** [hf-azure-internal/trending-models-analysis](https://huggingface.co/datasets/hf-azure-internal/trending-models-analysis) """) # Get date information available_dates, min_date, max_date = get_available_dates() # Controls row at the top with gr.Row(): with gr.Column(scale=1): date_picker = gr.Textbox( value=str(max_date) if max_date else "", label="📅 Date Selection", placeholder="2025-01-21", info="Enter date in YYYY-MM-DD format" ) with gr.Column(scale=1): refresh_btn = gr.Button("🔄 Refresh Data", variant="primary", size="lg") # Main dataframe display with gr.Row(): dataframe_display = gr.Dataframe( label="📊 Trending Models Overview", interactive=False, wrap=True, elem_classes=["dataframe-container"], datatype=["number", "html", "str", "str", "str", "str", "str", "str", "str", "html"] ) # Event handlers def update_dashboard_wrapper(selected_date_text): """Wrapper to handle the dashboard update""" selected_date = None if selected_date_text: try: selected_date = pd.to_datetime(selected_date_text).date() except Exception as e: print(f"Date conversion error: {e}, value: {selected_date_text}") selected_date = None return update_dashboard(selected_date) # Wire up events date_picker.change( fn=update_dashboard_wrapper, inputs=[date_picker], outputs=[dataframe_display] ) def refresh_data(selected_date_text): """Refresh data and update dashboard""" print("Refreshing data - clearing cache and reloading dataset...") available_dates, _, max_date = get_available_dates_fresh() selected_date = max_date if selected_date_text: try: selected_date = pd.to_datetime(selected_date_text).date() except Exception as e: print(f"Date conversion error in refresh: {e}, value: {selected_date_text}") selected_date = max_date display_df = update_dashboard(selected_date, use_fresh_data=True) return ( str(max_date) if max_date else "", display_df ) refresh_btn.click( fn=refresh_data, inputs=[date_picker], outputs=[date_picker, dataframe_display] ) # Load initial data demo.load( fn=update_dashboard_wrapper, inputs=[date_picker], outputs=[dataframe_display] ) return demo # Launch the app if __name__ == "__main__": demo = create_interface() demo.launch()