Spaces:
Sleeping
Sleeping
""" | |
Utility functions for Gradio pipeline results app. | |
This module contains common utility functions used across different components. | |
""" | |
import numpy as np | |
import pandas as pd | |
import json | |
import markdown | |
import plotly.graph_objects as go | |
import plotly.express as px | |
from typing import Dict, List, Any, Optional, Tuple | |
import html | |
import ast | |
# Conversation rendering helpers are now in a dedicated module for clarity | |
from . import conversation_display as _convdisp | |
from .conversation_display import ( | |
convert_to_openai_format, | |
display_openai_conversation_html, | |
pretty_print_embedded_dicts, | |
) | |
# NEW IMPLEMENTATION --------------------------------------------------- | |
from .metrics_adapter import get_model_clusters, get_all_models | |
# --------------------------------------------------------------------------- | |
# NEW helper utilities for FunctionalMetrics format | |
# --------------------------------------------------------------------------- | |
def format_confidence_interval(ci: dict | None, decimals: int = 3) -> str: | |
"""Return a pretty string for a CI dict of the form {"lower": x, "upper": y}.""" | |
if not ci or not isinstance(ci, dict): | |
return "N/A" | |
lower, upper = ci.get("lower"), ci.get("upper") | |
if lower is None or upper is None: | |
return "N/A" | |
return f"[{lower:.{decimals}f}, {upper:.{decimals}f}]" | |
def get_confidence_interval_width(ci: dict | None) -> float | None: | |
"""Return CI width (upper-lower) if possible.""" | |
if not ci or not isinstance(ci, dict): | |
return None | |
lower, upper = ci.get("lower"), ci.get("upper") | |
if lower is None or upper is None: | |
return None | |
return upper - lower | |
def has_confidence_intervals(record: dict | None) -> bool: | |
"""Simple check whether any *_ci key with lower/upper exists in a metrics record.""" | |
if not record or not isinstance(record, dict): | |
return False | |
for k, v in record.items(): | |
if k.endswith("_ci") and isinstance(v, dict) and {"lower", "upper"}.issubset(v.keys()): | |
return True | |
return False | |
def extract_quality_score(quality_field: Any) -> float | None: | |
"""Given a quality field that may be a dict of metric values or a scalar, return its mean.""" | |
if quality_field is None: | |
return None | |
if isinstance(quality_field, (int, float)): | |
return float(quality_field) | |
if isinstance(quality_field, dict) and quality_field: | |
return float(np.mean(list(quality_field.values()))) | |
return None | |
# --------------------------------------------------------------------------- | |
# UPDATED: get_top_clusters_for_model for FunctionalMetrics format | |
# --------------------------------------------------------------------------- | |
def get_top_clusters_for_model(metrics: Dict[str, Any], model_name: str, top_n: int = 10) -> List[Tuple[str, Dict[str, Any]]]: | |
"""Return the top N clusters (by salience) for a given model. | |
Args: | |
metrics: The FunctionalMetrics dictionary (3-file format) loaded via data_loader. | |
model_name: Name of the model to inspect. | |
top_n: Number of clusters to return. | |
Returns: | |
List of (cluster_name, cluster_dict) tuples sorted by descending proportion_delta. | |
""" | |
clusters_dict = get_model_clusters(metrics, model_name) | |
if not clusters_dict: | |
return [] | |
# Filter out "No properties" clusters | |
clusters_dict = {k: v for k, v in clusters_dict.items() if k != "No properties"} | |
sorted_items = sorted( | |
clusters_dict.items(), key=lambda kv: kv[1].get("proportion_delta", 0), reverse=True | |
) | |
return sorted_items[:top_n] | |
def compute_model_rankings_new(metrics: Dict[str, Any]) -> List[tuple]: | |
"""Compute rankings of models based on mean salience (proportion_delta). | |
Args: | |
metrics: The FunctionalMetrics dict loaded by data_loader. | |
Returns: | |
List[Tuple[str, Dict[str, float]]]: sorted list of (model_name, summary_dict) | |
""" | |
model_scores: Dict[str, Dict[str, float]] = {} | |
for model in get_all_models(metrics): | |
clusters = get_model_clusters(metrics, model) | |
# Filter out "No properties" clusters | |
clusters = {k: v for k, v in clusters.items() if k != "No properties"} | |
if not clusters: | |
continue | |
saliences = [c.get("proportion_delta", 0.0) for c in clusters.values()] | |
model_scores[model] = { | |
"avg_salience": float(np.mean(saliences)), | |
"median_salience": float(np.median(saliences)), | |
"num_clusters": len(saliences), | |
"top_salience": float(max(saliences)), | |
"std_salience": float(np.std(saliences)), | |
} | |
return sorted(model_scores.items(), key=lambda x: x[1]["avg_salience"], reverse=True) | |
def create_model_summary_card_new( | |
model_name: str, | |
metrics: Dict[str, Any], | |
top_n: int = 3, | |
score_significant_only: bool = False, | |
quality_significant_only: bool = False, | |
sort_by: str = "quality_asc", | |
min_cluster_size: int = 1, | |
) -> str: | |
"""Generate a **styled** HTML summary card for a single model. | |
The new implementation recreates the legacy card design the user prefers: | |
• Card header with battle count | |
• Each cluster displayed as a vertically-spaced block (NOT a table) | |
• Frequency, distinctiveness factor and CI inline; quality score right-aligned | |
""" | |
clusters_dict = get_model_clusters(metrics, model_name) | |
if not clusters_dict: | |
return f"<div style='padding:20px'>No cluster data for {model_name}</div>" | |
# Filter out "No properties" clusters | |
clusters_dict = {k: v for k, v in clusters_dict.items() if k != "No properties"} | |
# Filter clusters ---------------------------------------------------- | |
all_clusters = [c for c in clusters_dict.values() if c.get("size", 0) >= min_cluster_size] | |
if score_significant_only: | |
if model_name == "all": | |
# For "all" model, we don't have proportion_delta_significant, so skip this filter | |
pass | |
else: | |
all_clusters = [c for c in all_clusters if c.get("proportion_delta_significant", False)] | |
if quality_significant_only: | |
all_clusters = [c for c in all_clusters if any(c.get("quality_delta_significant", {}).values())] | |
if not all_clusters: | |
return f"<div style='padding:20px'>No clusters pass filters for {model_name}</div>" | |
# Count significant properties --------------------------------------- | |
significant_frequency_count = 0 | |
significant_quality_count = 0 | |
for cluster in clusters_dict.values(): | |
if cluster.get("size", 0) >= min_cluster_size: | |
# Count frequency significance | |
if model_name != "all" and cluster.get("proportion_delta_significant", False): | |
significant_frequency_count += 1 | |
# Count quality significance (sum across all metrics) | |
quality_delta_significant = cluster.get("quality_delta_significant", {}) | |
significant_quality_count += sum(quality_delta_significant.values()) | |
# Sort --------------------------------------------------------------- | |
def _mean_quality(c: dict[str, Any]) -> float: | |
vals = list(c.get("quality", {}).values()) | |
return float(np.mean(vals)) if vals else 0.0 | |
sort_key_map = { | |
"quality_asc": (_mean_quality, False), | |
"quality_desc": (_mean_quality, True), | |
"frequency_desc": (lambda c: c.get("proportion", 0), True), | |
"frequency_asc": (lambda c: c.get("proportion", 0), False), | |
"salience_desc": (lambda c: c.get("proportion_delta", 0) if model_name != "all" else c.get("proportion", 0), True), | |
"salience_asc": (lambda c: c.get("proportion_delta", 0) if model_name != "all" else c.get("proportion", 0), False), | |
} | |
key_fn, reverse = sort_key_map.get(sort_by, (lambda c: c.get("proportion_delta", 0) if model_name != "all" else c.get("proportion", 0), True)) | |
sorted_clusters = sorted(all_clusters, key=key_fn, reverse=reverse)[:top_n] | |
# Determine total conversations for this model ---------------- | |
if model_name == "all": | |
# For "all" model, sum the individual model totals to avoid double-counting | |
model_scores = metrics.get("model_scores", {}) | |
total_battles = sum(model_data.get("size", 0) for model_data in model_scores.values()) | |
else: | |
model_scores_entry = metrics.get("model_scores", {}).get(model_name, {}) | |
total_battles = model_scores_entry.get("size") | |
if total_battles is None: | |
# Fallback: deduplicate example IDs across clusters | |
total_battles = sum(c.get("size", 0) for c in clusters_dict.values()) | |
# Card header -------------------------------------------------------- | |
html_parts: list[str] = [f""" | |
<div style="padding: 20px; border:1px solid #e0e0e0; border-radius:8px; margin-bottom:25px;"> | |
<h3 style="margin-top:0; font-size: 20px;">{html.escape(model_name)}</h3> | |
<p style="margin: 4px 0 8px 0; color:#555; font-size:13px;"> | |
{total_battles} battles · Top clusters by frequency | |
</p> | |
<p style="margin: 0 0 18px 0; color:#666; font-size:12px;"> | |
📊 {significant_frequency_count} significant frequency properties · {significant_quality_count} significant quality properties | |
</p> | |
"""] | |
# Cluster blocks ----------------------------------------------------- | |
for i, cluster in enumerate(sorted_clusters): | |
name = html.escape(next(k for k, v in clusters_dict.items() if v is cluster)) | |
prop = cluster.get("proportion", 0) | |
freq_pct = prop * 100 | |
size = cluster.get("size", 0) | |
# Check significance flags | |
is_proportion_significant = False | |
if model_name != "all": | |
is_proportion_significant = cluster.get("proportion_delta_significant", False) | |
quality_delta_significant = cluster.get("quality_delta_significant", {}) | |
is_quality_significant = any(quality_delta_significant.values()) | |
# Create significance indicators | |
significance_indicators = [] | |
if is_proportion_significant: | |
significance_indicators.append('<span style="background: #28a745; color: white; padding: 2px 6px; border-radius: 4px; font-size: 10px; font-weight: bold;">FREQ</span>') | |
if is_quality_significant: | |
significance_indicators.append('<span style="background: #007bff; color: white; padding: 2px 6px; border-radius: 4px; font-size: 10px; font-weight: bold;">QUAL</span>') | |
significance_html = " ".join(significance_indicators) if significance_indicators else "" | |
# Distinctiveness factor heuristic | |
if model_name == "all": | |
# For "all" model, proportion_delta doesn't make sense, so show proportion instead | |
distinct_factor = prop | |
distinct_text = f"{freq_pct:.1f}% of all conversations" | |
else: | |
sal = cluster.get("proportion_delta", 0) | |
distinct_factor = 1 + (sal / prop) if prop else 1 | |
distinct_text = f"proportion delta: {sal:+.3f}" | |
# Confidence interval (frequency based) | |
ci = cluster.get("proportion_ci") | |
ci_str = format_confidence_interval(ci) if ci else "N/A" | |
# Quality delta – show each metric separately | |
quality_delta = cluster.get("quality_delta", {}) | |
quality_delta_html = "" | |
if quality_delta: | |
quality_delta_parts = [] | |
for metric_name, delta_value in quality_delta.items(): | |
color = "#28a745" if delta_value >= 0 else "#dc3545" | |
quality_delta_parts.append(f'<div style="color:{color}; font-weight:500;">{metric_name}: {delta_value:+.3f}</div>') | |
quality_delta_html = "".join(quality_delta_parts) | |
else: | |
quality_delta_html = '<span style="color:#666;">No quality data</span>' | |
# Get light color for this cluster | |
cluster_color = get_light_color_for_cluster(name, i) | |
html_parts.append(f""" | |
<div style="border-left: 4px solid #4c6ef5; padding: 12px 16px; margin-bottom: 10px; background:{cluster_color}; border-radius: 4px;"> | |
<div style="display:flex; justify-content:space-between; align-items:flex-start;"> | |
<div style="max-width:80%;"> | |
<div style="margin-bottom:4px;"> | |
<strong style="font-size:14px;">{name}</strong> | |
</div> | |
<span style="font-size:12px; color:#555;">{freq_pct:.1f}% frequency ({size} out of {total_battles} total) · {distinct_text}</span> | |
</div> | |
<div style="font-size:12px; font-weight:normal; white-space:nowrap; text-align:right;"> | |
{quality_delta_html} | |
{significance_html} | |
</div> | |
</div> | |
</div> | |
""") | |
# Close card div ----------------------------------------------------- | |
html_parts.append("</div>") | |
return "\n".join(html_parts) | |
def format_cluster_dataframe(clustered_df: pd.DataFrame, | |
selected_models: Optional[List[str]] = None, | |
cluster_level: str = 'fine') -> pd.DataFrame: | |
"""Format cluster DataFrame for display in Gradio.""" | |
df = clustered_df.copy() | |
# Debug information | |
print(f"DEBUG: format_cluster_dataframe called") | |
print(f" - Input DataFrame shape: {df.shape}") | |
print(f" - Selected models: {selected_models}") | |
print(f" - Available models in data: {df['model'].unique().tolist() if 'model' in df.columns else 'No model column'}") | |
# Filter by models if specified | |
if selected_models: | |
print(f" - Filtering by {len(selected_models)} selected models") | |
df = df[df['model'].isin(selected_models)] | |
print(f" - After filtering shape: {df.shape}") | |
print(f" - Models after filtering: {df['model'].unique().tolist()}") | |
else: | |
print(f" - No model filtering applied") | |
# Select relevant columns based on cluster level using correct column names from pipeline | |
if cluster_level == 'fine': | |
id_col = 'property_description_fine_cluster_id' | |
label_col = 'property_description_fine_cluster_label' | |
# Also check for alternative naming without prefix | |
alt_id_col = 'fine_cluster_id' | |
alt_label_col = 'fine_cluster_label' | |
else: | |
id_col = 'property_description_coarse_cluster_id' | |
label_col = 'property_description_coarse_cluster_label' | |
# Also check for alternative naming without prefix | |
alt_id_col = 'coarse_cluster_id' | |
alt_label_col = 'coarse_cluster_label' | |
# Try both naming patterns | |
if id_col in df.columns and label_col in df.columns: | |
# Use the expected naming pattern | |
cols = ['question_id', 'model', 'property_description', id_col, label_col, 'score'] | |
elif alt_id_col in df.columns and alt_label_col in df.columns: | |
# Use the alternative naming pattern | |
cols = ['question_id', 'model', 'property_description', alt_id_col, alt_label_col, 'score'] | |
else: | |
# Fall back to basic columns if cluster columns are missing | |
cols = ['question_id', 'model', 'property_description', 'score'] | |
# Keep only existing columns | |
available_cols = [col for col in cols if col in df.columns] | |
df = df[available_cols] | |
print(f" - Final DataFrame shape: {df.shape}") | |
print(f" - Final columns: {df.columns.tolist()}") | |
return df | |
def truncate_cluster_name(cluster_desc: str, max_length: int = 50) -> str: | |
"""Truncate cluster description to fit in table column.""" | |
if len(cluster_desc) <= max_length: | |
return cluster_desc | |
return cluster_desc[:max_length-3] + "..." | |
def create_frequency_comparison_table(model_stats: Dict[str, Any], | |
selected_models: List[str], | |
cluster_level: str = "fine", # Ignored – kept for backward-compat | |
top_n: int = 50, | |
selected_model: str | None = None, | |
selected_quality_metric: str | None = None) -> pd.DataFrame: | |
"""Create a comparison table for the new FunctionalMetrics format. | |
The old signature is kept (cluster_level arg is ignored) so that callers | |
can be updated incrementally. | |
""" | |
if not selected_models: | |
return pd.DataFrame() | |
# ------------------------------------------------------------------ | |
# 1. Collect per-model, per-cluster rows | |
# ------------------------------------------------------------------ | |
all_rows: List[dict] = [] | |
for model in selected_models: | |
model_clusters = get_model_clusters(model_stats, model) # type: ignore[arg-type] | |
if not model_clusters: | |
continue | |
# Optional filter by a single model after the fact | |
if selected_model and model != selected_model: | |
continue | |
for cluster_name, cdata in model_clusters.items(): | |
# Filter out "No properties" clusters | |
if cluster_name == "No properties": | |
continue | |
# Basic numbers | |
freq_pct = cdata.get("proportion", 0.0) * 100.0 | |
prop_ci = cdata.get("proportion_ci") | |
# Quality per metric dicts ------------------------------------------------ | |
quality_dict = cdata.get("quality", {}) or {} | |
quality_ci_dict = cdata.get("quality_ci", {}) or {} | |
# Significance flags | |
sal_sig = bool(cdata.get("proportion_delta_significant", False)) | |
quality_sig_flags = cdata.get("quality_delta_significant", {}) or {} | |
all_rows.append({ | |
"cluster": cluster_name, | |
"model": model, | |
"frequency": freq_pct, | |
"proportion_ci": prop_ci, | |
"quality": quality_dict, | |
"quality_ci": quality_ci_dict, | |
"score_significant": sal_sig, | |
"quality_significant_any": any(quality_sig_flags.values()), | |
"quality_significant_metric": quality_sig_flags.get(selected_quality_metric) if selected_quality_metric else None, | |
}) | |
if not all_rows: | |
return pd.DataFrame() | |
df_all = pd.DataFrame(all_rows) | |
# Aggregate frequency across models ---------------------------------- | |
freq_sum = df_all.groupby("cluster")["frequency"].sum().sort_values(ascending=False) | |
top_clusters = freq_sum.head(top_n).index.tolist() | |
df_top = df_all[df_all["cluster"].isin(top_clusters)].copy() | |
table_rows: List[dict] = [] | |
for clu in top_clusters: | |
subset = df_top[df_top["cluster"] == clu] | |
avg_freq = subset["frequency"].mean() | |
# Aggregate CI (mean of bounds) | |
ci_lowers = [ci.get("lower") for ci in subset["proportion_ci"] if isinstance(ci, dict)] | |
ci_uppers = [ci.get("upper") for ci in subset["proportion_ci"] if isinstance(ci, dict)] | |
freq_ci = { | |
"lower": float(np.mean(ci_lowers)) if ci_lowers else None, | |
"upper": float(np.mean(ci_uppers)) if ci_uppers else None, | |
} if ci_lowers and ci_uppers else None | |
# Quality aggregation ----------------------------------------------------- | |
q_vals: List[float] = [] | |
q_ci_l: List[float] = [] | |
q_ci_u: List[float] = [] | |
quality_sig_any = False | |
for _, row in subset.iterrows(): | |
q_dict = row["quality"] | |
if selected_quality_metric: | |
if selected_quality_metric in q_dict: | |
q_vals.append(q_dict[selected_quality_metric]) | |
ci_metric = row["quality_ci"].get(selected_quality_metric) if isinstance(row["quality_ci"], dict) else None | |
if ci_metric: | |
q_ci_l.append(ci_metric.get("lower")) | |
q_ci_u.append(ci_metric.get("upper")) | |
quality_sig_any = quality_sig_any or bool(row["quality_significant_metric"]) | |
else: | |
q_vals.extend(q_dict.values()) | |
for ci in row["quality_ci"].values(): | |
if isinstance(ci, dict): | |
q_ci_l.append(ci.get("lower")) | |
q_ci_u.append(ci.get("upper")) | |
quality_sig_any = quality_sig_any or row["quality_significant_any"] | |
quality_val = float(np.mean(q_vals)) if q_vals else None | |
quality_ci = { | |
"lower": float(np.mean(q_ci_l)), | |
"upper": float(np.mean(q_ci_u)), | |
} if q_ci_l and q_ci_u else None | |
score_sig = subset["score_significant"].any() | |
table_rows.append({ | |
"Cluster": clu, | |
"Frequency (%)": f"{avg_freq:.1f}", | |
"Freq CI": format_confidence_interval(freq_ci), | |
"Quality": f"{quality_val:.3f}" if quality_val is not None else "N/A", | |
"Quality CI": format_confidence_interval(quality_ci) if quality_ci else "N/A", | |
"Score Significance": "Yes" if score_sig else "No", | |
"Quality Significance": "Yes" if quality_sig_any else "No", | |
}) | |
return pd.DataFrame(table_rows) | |
def create_frequency_comparison_plots(model_stats: Dict[str, Any], | |
selected_models: List[str], | |
cluster_level: str = 'fine', | |
top_n: int = 50, | |
show_confidence_intervals: bool = False) -> Tuple[go.Figure, go.Figure]: | |
"""Create frequency comparison plots (matching frequencies_tab.py exactly).""" | |
print(f"\nDEBUG: Plotting function called with:") | |
print(f" - Selected models: {selected_models}") | |
print(f" - Cluster level: {cluster_level}") | |
print(f" - Top N: {top_n}") | |
print(f" - Available models in stats: {list(model_stats.keys())}") | |
# Use the same data preparation logic as the table function | |
# Collect all clusters across all models for the chart (exact copy from frequencies_tab.py) | |
all_clusters_data = [] | |
for model_name, model_data in model_stats.items(): | |
if model_name not in selected_models: | |
continue | |
clusters = model_data.get(cluster_level, []) | |
for cluster in clusters: | |
# Filter out "No properties" clusters | |
if cluster.get('property_description') == "No properties": | |
continue | |
# Get confidence intervals for quality scores if available | |
quality_score_ci = cluster.get('quality_score_ci', {}) | |
has_quality_ci = bool(quality_score_ci) | |
# Get distinctiveness score confidence intervals (correct structure) | |
score_ci = cluster.get('score_ci', {}) | |
ci_lower = score_ci.get('lower') if score_ci else None | |
ci_upper = score_ci.get('upper') if score_ci else None | |
all_clusters_data.append({ | |
'property_description': cluster['property_description'], | |
'model': model_name, | |
'frequency': cluster.get('proportion', 0) * 100, # Convert to percentage | |
'size': cluster.get('size', 0), | |
'cluster_size_global': cluster.get('cluster_size_global', 0), | |
'has_ci': has_confidence_intervals(cluster), | |
'ci_lower': ci_lower, | |
'ci_upper': ci_upper, | |
'has_quality_ci': has_quality_ci | |
}) | |
if not all_clusters_data: | |
# Return empty figures | |
empty_fig = go.Figure() | |
empty_fig.add_annotation(text="No data available", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
return empty_fig, empty_fig | |
clusters_df = pd.DataFrame(all_clusters_data) | |
# Get all unique clusters for the chart | |
all_unique_clusters = clusters_df['property_description'].unique() | |
total_clusters = len(all_unique_clusters) | |
# Show all clusters by default | |
top_n_for_chart = min(top_n, total_clusters) | |
# Calculate total frequency per cluster and get top clusters | |
cluster_totals = clusters_df.groupby('property_description')['frequency'].sum().sort_values(ascending=False) | |
top_clusters = cluster_totals.head(top_n_for_chart).index.tolist() | |
# Get quality scores for the same clusters to sort by quality | |
quality_data_for_sorting = [] | |
for model_name, model_data in model_stats.items(): | |
if model_name not in selected_models: | |
continue | |
clusters = model_data.get(cluster_level, []) | |
for cluster in clusters: | |
# Filter out "No properties" clusters | |
if cluster.get('property_description') == "No properties": | |
continue | |
if cluster['property_description'] in top_clusters: | |
quality_data_for_sorting.append({ | |
'property_description': cluster['property_description'], | |
'quality_score': extract_quality_score(cluster.get('quality_score', 0)) | |
}) | |
# Calculate average quality score per cluster and sort | |
if quality_data_for_sorting: | |
quality_df_for_sorting = pd.DataFrame(quality_data_for_sorting) | |
avg_quality_per_cluster = quality_df_for_sorting.groupby('property_description')['quality_score'].mean().sort_values(ascending=True) # Low to high | |
top_clusters = avg_quality_per_cluster.index.tolist() | |
# Reverse the order so low quality appears at top of chart | |
top_clusters = top_clusters[::-1] | |
# Filter data to only include top clusters | |
chart_data = clusters_df[clusters_df['property_description'].isin(top_clusters)] | |
if chart_data.empty: | |
# Return empty figures | |
empty_fig = go.Figure() | |
empty_fig.add_annotation(text="No data available", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
return empty_fig, empty_fig | |
# Get unique models for colors | |
models = chart_data['model'].unique() | |
# Use a color palette that avoids yellow - using Set1 which has better contrast | |
colors = px.colors.qualitative.Set1[:len(models)] | |
# Create horizontal bar chart for frequencies | |
fig = go.Figure() | |
# Add a bar for each model | |
for i, model in enumerate(models): | |
model_data = chart_data[chart_data['model'] == model] | |
# Sort by cluster order (same as top_clusters) | |
model_data = model_data.set_index('property_description').reindex(top_clusters).reset_index() | |
# Fill NaN values with 0 for missing clusters | |
model_data['frequency'] = model_data['frequency'].fillna(0) | |
model_data['has_ci'] = model_data['has_ci'].fillna(False) | |
# For CI columns, replace NaN with None using where() instead of fillna(None) | |
model_data['ci_lower'] = model_data['ci_lower'].where(pd.notna(model_data['ci_lower']), None) | |
model_data['ci_upper'] = model_data['ci_upper'].where(pd.notna(model_data['ci_upper']), None) | |
# Ensure frequency is numeric and non-negative | |
model_data['frequency'] = pd.to_numeric(model_data['frequency'], errors='coerce').fillna(0) | |
model_data['frequency'] = model_data['frequency'].clip(lower=0) | |
# Debug: print model data for first model | |
if i == 0: # Only print for first model to avoid spam | |
print(f"DEBUG: Model {model} data sample:") | |
print(f" - Clusters: {len(model_data)}") | |
print(f" - Frequency range: {model_data['frequency'].min():.2f} - {model_data['frequency'].max():.2f}") | |
print(f" - Non-zero frequencies: {(model_data['frequency'] > 0).sum()}") | |
if len(model_data) > 0: | |
print(f" - Sample row: {model_data.iloc[0][['property_description', 'frequency']].to_dict()}") | |
# Remove any rows where property_description is NaN (these are clusters this model doesn't appear in) | |
model_data = model_data.dropna(subset=['property_description']) | |
# Get confidence intervals for error bars | |
ci_lower = [] | |
ci_upper = [] | |
for _, row in model_data.iterrows(): | |
freq_value = row.get('frequency', 0) | |
if (row.get('has_ci', False) and | |
pd.notna(row.get('ci_lower')) and | |
pd.notna(row.get('ci_upper')) and | |
freq_value > 0): # Only calculate CIs for non-zero frequencies | |
# IMPORTANT: These are distinctiveness score CIs, not frequency CIs | |
# The distinctiveness score measures how much more/less frequently | |
# a model exhibits this behavior compared to the median model | |
# We can use this to estimate uncertainty in the frequency measurement | |
distinctiveness_ci_width = row['ci_upper'] - row['ci_lower'] | |
# Convert to frequency uncertainty (approximate) | |
# A wider distinctiveness CI suggests more uncertainty in the frequency | |
freq_uncertainty = distinctiveness_ci_width * freq_value * 0.1 | |
ci_lower.append(max(0, freq_value - freq_uncertainty)) | |
ci_upper.append(freq_value + freq_uncertainty) | |
else: | |
ci_lower.append(None) | |
ci_upper.append(None) | |
# Debug: Check the data going into the plot | |
print(f"DEBUG: Adding trace for model {model}:") | |
print(f" - Y values (clusters): {model_data['property_description'].tolist()[:3]}...") # First 3 clusters | |
print(f" - X values (frequencies): {model_data['frequency'].tolist()[:3]}...") # First 3 frequencies | |
print(f" - Total data points: {len(model_data)}") | |
fig.add_trace(go.Bar( | |
y=model_data['property_description'], | |
x=model_data['frequency'], | |
name=model, | |
orientation='h', | |
marker_color=colors[i], | |
error_x=dict( | |
type='data', | |
array=[u - l if u is not None and l is not None else None for l, u in zip(ci_lower, ci_upper)], | |
arrayminus=[f - l if f is not None and l is not None else None for f, l in zip(model_data['frequency'], ci_lower)], | |
visible=show_confidence_intervals, | |
thickness=1, | |
width=3, | |
color='rgba(0,0,0,0.3)' | |
), | |
hovertemplate='<b>%{y}</b><br>' + | |
f'Model: {model}<br>' + | |
'Frequency: %{x:.1f}%<br>' + | |
'CI: %{customdata[0]}<extra></extra>', | |
customdata=[[ | |
format_confidence_interval({ | |
'lower': l, | |
'upper': u | |
}) if l is not None and u is not None else "N/A" | |
for l, u in zip(ci_lower, ci_upper) | |
]] | |
)) | |
# Update layout | |
fig.update_layout( | |
title=f"Model Frequencies in Top {len(top_clusters)} Clusters", | |
xaxis_title="Frequency (%)", | |
yaxis_title="Cluster Description", | |
barmode='group', # Group bars side by side | |
height=max(600, len(top_clusters) * 25), # Adjust height based on number of clusters | |
showlegend=True, | |
legend=dict( | |
orientation="h", | |
yanchor="bottom", | |
y=1.02, | |
xanchor="right", | |
x=1 | |
) | |
) | |
# Update y-axis to show truncated cluster names | |
fig.update_yaxes( | |
tickmode='array', | |
ticktext=[truncate_cluster_name(desc, 60) for desc in top_clusters], | |
tickvals=top_clusters | |
) | |
# Create quality score chart | |
# Get quality scores for the same clusters (single score per cluster) | |
quality_data = [] | |
quality_cis = [] # Add confidence intervals for quality scores | |
for cluster_desc in top_clusters: | |
# Get the first available quality score for this cluster | |
for model_name, model_data in model_stats.items(): | |
clusters = model_data.get(cluster_level, []) | |
for cluster in clusters: | |
if cluster['property_description'] == cluster_desc: | |
quality_score = extract_quality_score(cluster.get('quality_score', 0)) | |
quality_data.append({ | |
'property_description': cluster_desc, | |
'quality_score': quality_score | |
}) | |
# Get quality score confidence intervals | |
quality_ci = cluster.get('quality_score_ci', {}) | |
if isinstance(quality_ci, dict) and quality_ci: | |
# Get the first available quality CI | |
for score_key, ci_data in quality_ci.items(): | |
if isinstance(ci_data, dict): | |
ci_lower = ci_data.get('lower') | |
ci_upper = ci_data.get('upper') | |
if ci_lower is not None and ci_upper is not None: | |
quality_cis.append({ | |
'property_description': cluster_desc, | |
'ci_lower': ci_lower, | |
'ci_upper': ci_upper | |
}) | |
break | |
else: | |
quality_cis.append({ | |
'property_description': cluster_desc, | |
'ci_lower': None, | |
'ci_upper': None | |
}) | |
else: | |
quality_cis.append({ | |
'property_description': cluster_desc, | |
'ci_lower': None, | |
'ci_upper': None | |
}) | |
break | |
if any(q['property_description'] == cluster_desc for q in quality_data): | |
break | |
if quality_data: | |
quality_df = pd.DataFrame(quality_data) | |
quality_cis_df = pd.DataFrame(quality_cis) if quality_cis else None | |
# Create quality score chart with single bars | |
fig_quality = go.Figure() | |
# Prepare confidence intervals for error bars | |
ci_lower = [] | |
ci_upper = [] | |
for _, row in quality_df.iterrows(): | |
cluster_desc = row['property_description'] | |
if quality_cis_df is not None: | |
ci_row = quality_cis_df[quality_cis_df['property_description'] == cluster_desc] | |
if not ci_row.empty: | |
ci_lower.append(ci_row.iloc[0]['ci_lower']) | |
ci_upper.append(ci_row.iloc[0]['ci_upper']) | |
else: | |
ci_lower.append(None) | |
ci_upper.append(None) | |
else: | |
ci_lower.append(None) | |
ci_upper.append(None) | |
# Add a single bar for each cluster | |
fig_quality.add_trace(go.Bar( | |
y=[truncate_cluster_name(desc, 60) for desc in quality_df['property_description']], | |
x=quality_df['quality_score'], | |
orientation='h', | |
marker_color='lightblue', # Single color for all bars | |
name='Quality Score', | |
showlegend=False, | |
error_x=dict( | |
type='data', | |
array=[u - l if u is not None and l is not None else None for l, u in zip(ci_lower, ci_upper)], | |
arrayminus=[q - l if q is not None and l is not None else None for q, l in zip(quality_df['quality_score'], ci_lower)], | |
visible=show_confidence_intervals, | |
thickness=1, | |
width=3, | |
color='rgba(0,0,0,0.3)' | |
), | |
hovertemplate='<b>%{y}</b><br>' + | |
'Quality Score: %{x:.3f}<br>' + | |
'CI: %{customdata[0]}<extra></extra>', | |
customdata=[[ | |
format_confidence_interval({ | |
'lower': l, | |
'upper': u | |
}) if l is not None and u is not None else "N/A" | |
for l, u in zip(ci_lower, ci_upper) | |
]] | |
)) | |
# Update layout | |
fig_quality.update_layout( | |
title=f"Quality Scores", | |
xaxis_title="Quality Score", | |
yaxis_title="", # No y-axis title to save space | |
height=max(600, len(top_clusters) * 25), # Same height as main chart | |
showlegend=False, | |
yaxis=dict(showticklabels=False) # Hide y-axis labels to save space | |
) | |
else: | |
# Create empty quality figure | |
fig_quality = go.Figure() | |
fig_quality.add_annotation(text="No quality score data available", | |
xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
return fig, fig_quality | |
def search_clusters_by_text(clustered_df: pd.DataFrame, | |
search_term: str, | |
search_in: str = 'description') -> pd.DataFrame: | |
"""Search clusters by text in descriptions or other fields.""" | |
if not search_term: | |
return clustered_df.head(100) # Return first 100 if no search | |
search_term = search_term.lower() | |
if search_in == 'description': | |
mask = clustered_df['property_description'].str.lower().str.contains(search_term, na=False) | |
elif search_in == 'model': | |
mask = clustered_df['model'].str.lower().str.contains(search_term, na=False) | |
elif search_in == 'cluster_label': | |
# Use correct column names from pipeline | |
fine_label_col = 'property_description_fine_cluster_label' | |
coarse_label_col = 'property_description_coarse_cluster_label' | |
mask = pd.Series([False] * len(clustered_df)) | |
if fine_label_col in clustered_df.columns: | |
mask |= clustered_df[fine_label_col].str.lower().str.contains(search_term, na=False) | |
if coarse_label_col in clustered_df.columns: | |
mask |= clustered_df[coarse_label_col].str.lower().str.contains(search_term, na=False) | |
else: | |
# Search in all text columns using correct column names | |
text_cols = ['property_description', 'model', | |
'property_description_fine_cluster_label', | |
'property_description_coarse_cluster_label'] | |
mask = pd.Series([False] * len(clustered_df)) | |
for col in text_cols: | |
if col in clustered_df.columns: | |
mask |= clustered_df[col].str.lower().str.contains(search_term, na=False) | |
return clustered_df[mask].head(100) | |
def search_clusters_only(clustered_df: pd.DataFrame, | |
search_term: str, | |
cluster_level: str = 'fine') -> pd.DataFrame: | |
"""Search only over cluster labels, not individual property descriptions.""" | |
if not search_term: | |
return clustered_df | |
search_term = search_term.lower() | |
# Use the correct column names based on cluster level | |
if cluster_level == 'fine': | |
label_col = 'property_description_fine_cluster_label' | |
alt_label_col = 'fine_cluster_label' | |
else: | |
label_col = 'property_description_coarse_cluster_label' | |
alt_label_col = 'coarse_cluster_label' | |
# Try both naming patterns | |
if label_col in clustered_df.columns: | |
mask = clustered_df[label_col].str.lower().str.contains(search_term, na=False) | |
elif alt_label_col in clustered_df.columns: | |
mask = clustered_df[alt_label_col].str.lower().str.contains(search_term, na=False) | |
else: | |
# If neither column exists, return empty DataFrame | |
return pd.DataFrame() | |
return clustered_df[mask] | |
def create_interactive_cluster_viewer(clustered_df: pd.DataFrame, | |
selected_models: Optional[List[str]] = None, | |
cluster_level: str = 'fine') -> str: | |
"""Create interactive cluster viewer HTML similar to Streamlit version.""" | |
if clustered_df.empty: | |
return "<p>No cluster data available</p>" | |
df = clustered_df.copy() | |
# Debug information | |
print(f"DEBUG: create_interactive_cluster_viewer called") | |
print(f" - Input DataFrame shape: {df.shape}") | |
print(f" - Selected models: {selected_models}") | |
print(f" - Available models in data: {df['model'].unique().tolist() if 'model' in df.columns else 'No model column'}") | |
# Filter by models if specified | |
if selected_models: | |
print(f" - Filtering by {len(selected_models)} selected models") | |
df = df[df['model'].isin(selected_models)] | |
print(f" - After filtering shape: {df.shape}") | |
print(f" - Models after filtering: {df['model'].unique().tolist()}") | |
else: | |
print(f" - No model filtering applied") | |
if df.empty: | |
return f"<p>No data found for selected models: {', '.join(selected_models or [])}</p>" | |
# Get cluster scores data for quality and frequency information | |
from .state import app_state | |
cluster_scores = app_state.get("metrics", {}).get("cluster_scores", {}) | |
# Use the actual column names from the pipeline output (matching Streamlit version) | |
if cluster_level == 'fine': | |
id_col = 'property_description_fine_cluster_id' | |
label_col = 'property_description_fine_cluster_label' | |
# Also check for alternative naming without prefix | |
alt_id_col = 'fine_cluster_id' | |
alt_label_col = 'fine_cluster_label' | |
else: | |
id_col = 'property_description_coarse_cluster_id' | |
label_col = 'property_description_coarse_cluster_label' | |
# Also check for alternative naming without prefix | |
alt_id_col = 'coarse_cluster_id' | |
alt_label_col = 'coarse_cluster_label' | |
# Track if we fall back from coarse to fine | |
fell_back_to_fine = False | |
# Check if required columns exist and provide helpful debug info | |
# Try both naming patterns | |
if id_col in df.columns and label_col in df.columns: | |
# Use the expected naming pattern | |
pass | |
elif alt_id_col in df.columns and alt_label_col in df.columns: | |
# Use the alternative naming pattern | |
id_col = alt_id_col | |
label_col = alt_label_col | |
else: | |
# If coarse clusters are not available, try to fall back to fine clusters | |
if cluster_level == 'coarse': | |
# Check if fine clusters are available | |
fine_id_col = 'property_description_fine_cluster_id' | |
fine_label_col = 'property_description_fine_cluster_label' | |
fine_alt_id_col = 'fine_cluster_id' | |
fine_alt_label_col = 'fine_cluster_label' | |
if (fine_id_col in df.columns and fine_label_col in df.columns) or (fine_alt_id_col in df.columns and fine_alt_label_col in df.columns): | |
# Fall back to fine clusters | |
if fine_id_col in df.columns and fine_label_col in df.columns: | |
id_col = fine_id_col | |
label_col = fine_label_col | |
else: | |
id_col = fine_alt_id_col | |
label_col = fine_alt_label_col | |
cluster_level = 'fine' # Update the cluster level for display | |
fell_back_to_fine = True | |
else: | |
# No cluster columns available at all | |
available_cols = list(df.columns) | |
return f""" | |
<div style="padding: 20px; background: #fff3cd; border: 1px solid #ffeaa7; border-radius: 8px;"> | |
<h4>❌ Missing cluster columns in data</h4> | |
<p><strong>Expected:</strong> {id_col}, {label_col} OR {alt_id_col}, {alt_label_col}</p> | |
<p><strong>Available columns:</strong> {', '.join(available_cols)}</p> | |
<p>Please ensure your data contains clustering results from the LMM-Vibes pipeline.</p> | |
</div> | |
""" | |
else: | |
# For fine clusters, show the original error | |
available_cols = list(df.columns) | |
return f""" | |
<div style="padding: 20px; background: #fff3cd; border: 1px solid #ffeaa7; border-radius: 8px;"> | |
<h4>❌ Missing {cluster_level} cluster columns in data</h4> | |
<p><strong>Expected:</strong> {id_col}, {label_col} OR {alt_id_col}, {alt_label_col}</p> | |
<p><strong>Available columns:</strong> {', '.join(available_cols)}</p> | |
<p>Please ensure your data contains clustering results from the LMM-Vibes pipeline.</p> | |
</div> | |
""" | |
# Group by cluster to get cluster information | |
try: | |
print(f" - Grouping by cluster columns: {id_col}, {label_col}") | |
cluster_groups = df.groupby([id_col, label_col]).agg({ | |
'property_description': ['count', lambda x: x.unique().tolist()], | |
'model': lambda x: x.unique().tolist() | |
}).reset_index() | |
# Flatten column names | |
cluster_groups.columns = [ | |
id_col, label_col, 'size', 'property_descriptions', 'models' | |
] | |
# Sort by size (largest first) | |
cluster_groups = cluster_groups.sort_values('size', ascending=False) | |
# Filter out "No properties" clusters | |
cluster_groups = cluster_groups[cluster_groups[label_col] != "No properties"] | |
print(f" - Found {len(cluster_groups)} clusters") | |
print(f" - Cluster sizes: {cluster_groups['size'].tolist()}") | |
print(f" - Models per cluster: {[len(models) for models in cluster_groups['models']]}") | |
except Exception as e: | |
return f""" | |
<div style="padding: 20px; background: #f8d7da; border: 1px solid #f5c6cb; border-radius: 8px;"> | |
<h4>❌ Error processing cluster data</h4> | |
<p><strong>Error:</strong> {str(e)}</p> | |
<p>Please check your data format and try again.</p> | |
</div> | |
""" | |
if len(cluster_groups) == 0: | |
return """ | |
<div style="padding: 20px; background: #d1ecf1; border: 1px solid #bee5eb; border-radius: 8px;"> | |
<h4>ℹ️ No clusters found</h4> | |
<p>No clusters match your current filters. Try selecting different models or adjusting your search.</p> | |
</div> | |
""" | |
# Create HTML | |
html = f""" | |
<div style="max-width: 1600px; margin: 0 auto;"> | |
<h3>🔍 Interactive Cluster Viewer ({cluster_level.title()} Level)</h3> | |
<p style="color: #666; margin-bottom: 20px;"> | |
Click on clusters below to explore their property descriptions. | |
Showing {len(cluster_groups)} clusters sorted by size. | |
</p> | |
""" | |
# Add a note if we fell back from coarse to fine clusters | |
if cluster_level == 'fine' and fell_back_to_fine: | |
html += """ | |
<div style="padding: 15px; background: #fff3cd; border: 1px solid #ffeaa7; border-radius: 8px; margin-bottom: 20px;"> | |
<strong>Note:</strong> Coarse clusters not available in this dataset. Showing fine clusters instead. | |
</div> | |
""" | |
for i, row in cluster_groups.iterrows(): | |
cluster_id = row[id_col] | |
cluster_label = row[label_col] | |
cluster_size = row['size'] | |
property_descriptions = row['property_descriptions'] | |
models_in_cluster = row['models'] | |
# Get quality and frequency information from cluster_scores | |
cluster_metrics = cluster_scores.get(cluster_label, {}) | |
frequency_pct = cluster_metrics.get("proportion", 0) * 100 if cluster_metrics else 0 | |
quality_scores = cluster_metrics.get("quality", {}) | |
quality_delta = cluster_metrics.get("quality_delta", {}) | |
# Build per-metric header display: "metric: score (delta)" | |
header_quality_display = "N/A" | |
if quality_scores or quality_delta: | |
metric_names = sorted(set(quality_scores.keys()) | set(quality_delta.keys())) | |
parts: list[str] = [] | |
for metric_name in metric_names: | |
score_val = quality_scores.get(metric_name) | |
delta_val = quality_delta.get(metric_name) | |
score_str = f"{score_val:.3f}" if isinstance(score_val, (int, float)) else "N/A" | |
if isinstance(delta_val, (int, float)): | |
color = "#28a745" if delta_val >= 0 else "#dc3545" | |
parts.append(f"{metric_name}: {score_str} <span style=\"color: {color};\">({delta_val:+.3f})</span>") | |
else: | |
parts.append(f"{metric_name}: {score_str}") | |
header_quality_display = "\n".join(parts) | |
# Format quality scores for detailed view | |
quality_html = "" | |
if quality_scores: | |
quality_parts = [] | |
for metric_name, score in quality_scores.items(): | |
color = "#28a745" if score >= 0 else "#dc3545" | |
quality_parts.append(f'<span style="color:{color}; font-weight:500;">{metric_name}: {score:.3f}</span>') | |
quality_html = " | ".join(quality_parts) | |
else: | |
quality_html = '<span style="color:#666;">No quality data</span>' | |
# Format quality delta (relative to average) | |
quality_delta_html = "" | |
if quality_delta: | |
delta_parts = [] | |
for metric_name, delta in quality_delta.items(): | |
color = "#28a745" if delta >= 0 else "#dc3545" | |
sign = "+" if delta >= 0 else "" | |
delta_parts.append(f'<span style="color:{color}; font-weight:500;">{metric_name}: {sign}{delta:.3f}</span>') | |
quality_delta_html = " | ".join(delta_parts) | |
else: | |
quality_delta_html = '<span style="color:#666;">No delta data</span>' | |
# Format header quality score with visual indicators | |
header_quality_text = header_quality_display | |
# Get light color for this cluster (matching overview style) | |
cluster_color = get_light_color_for_cluster(cluster_label, i) | |
# Create expandable cluster card with overview-style design | |
html += f""" | |
<details style="margin: 15px 0; border: 1px solid #e0e0e0; border-radius: 8px; overflow: hidden; box-shadow: 0 2px 4px rgba(0,0,0,0.1);"> | |
<summary style=" | |
padding: 15px; | |
background: {cluster_color}; | |
color: #333; | |
cursor: pointer; | |
font-weight: 600; | |
font-size: 16px; | |
user-select: none; | |
list-style: none; | |
display: flex; | |
justify-content: space-between; | |
align-items: center; | |
border-bottom: 1px solid #dee2e6; | |
"> | |
<div style="max-width: 80%;"> | |
<div style="margin-bottom: 4px;"> | |
<strong style="font-size: 14px;">{cluster_label}</strong> | |
</div> | |
<span style="font-size: 12px; color: #555;"> | |
{frequency_pct:.1f}% frequency ({cluster_size} properties) · {len(models_in_cluster)} models | |
</span> | |
</div> | |
<div style="font-size: 12px; font-weight: normal; white-space: nowrap; text-align: right;"> | |
<div style="margin-bottom: 4px;"> | |
<span style="font-weight: 500;">{header_quality_text}</span> | |
</div> | |
<div style="color: #6c757d;"> | |
{frequency_pct:.1f}% frequency | |
</div> | |
</div> | |
</summary> | |
<div style="padding: 20px; background: #f8f9fa;"> | |
<div style="margin-bottom: 15px;"> | |
<strong>Cluster ID:</strong> {cluster_id}<br> | |
<strong>Size:</strong> {cluster_size} properties<br> | |
<strong>Models:</strong> {', '.join(models_in_cluster)}<br> | |
<strong>Frequency:</strong> {frequency_pct:.1f}% of all conversations<br> | |
<strong>Quality Scores:</strong> {quality_html}<br> | |
<strong>Quality vs Average:</strong> {quality_delta_html} | |
</div> | |
<h4 style="color: #333; margin: 15px 0 10px 0;"> | |
Property Descriptions ({len(property_descriptions)}) | |
</h4> | |
<div style="max-height: 300px; overflow-y: auto; background: white; border: 1px solid #ddd; border-radius: 4px; padding: 10px;"> | |
""" | |
# Display property descriptions | |
for i, desc in enumerate(property_descriptions, 1): | |
html += f""" | |
<div style=" | |
padding: 8px; | |
margin: 4px 0; | |
background: #f8f9fa; | |
border-left: 3px solid #667eea; | |
border-radius: 2px; | |
"> | |
<strong>{i}.</strong> {desc} | |
</div> | |
""" | |
html += """ | |
</div> | |
</div> | |
</details> | |
""" | |
html += "</div>" | |
return html | |
def get_cluster_statistics(clustered_df: pd.DataFrame, | |
selected_models: Optional[List[str]] = None) -> Dict[str, Any]: | |
"""Get cluster statistics for display.""" | |
if clustered_df.empty: | |
return {} | |
df = clustered_df.copy() | |
# Filter by models if specified | |
if selected_models: | |
df = df[df['model'].isin(selected_models)] | |
stats = { | |
'total_properties': len(df), | |
'total_models': df['model'].nunique() if 'model' in df.columns else 0, | |
} | |
# Fine cluster statistics - try both naming patterns | |
fine_id_col = 'property_description_fine_cluster_id' | |
alt_fine_id_col = 'fine_cluster_id' | |
if fine_id_col in df.columns: | |
stats['fine_clusters'] = df[fine_id_col].nunique() | |
cluster_sizes = df.groupby(fine_id_col).size() | |
stats['min_properties_per_fine_cluster'] = cluster_sizes.min() if not cluster_sizes.empty else 0 | |
stats['max_properties_per_fine_cluster'] = cluster_sizes.max() if not cluster_sizes.empty else 0 | |
stats['avg_properties_per_fine_cluster'] = cluster_sizes.mean() if not cluster_sizes.empty else 0 | |
elif alt_fine_id_col in df.columns: | |
stats['fine_clusters'] = df[alt_fine_id_col].nunique() | |
cluster_sizes = df.groupby(alt_fine_id_col).size() | |
stats['min_properties_per_fine_cluster'] = cluster_sizes.min() if not cluster_sizes.empty else 0 | |
stats['max_properties_per_fine_cluster'] = cluster_sizes.max() if not cluster_sizes.empty else 0 | |
stats['avg_properties_per_fine_cluster'] = cluster_sizes.mean() if not cluster_sizes.empty else 0 | |
# Coarse cluster statistics - try both naming patterns | |
coarse_id_col = 'property_description_coarse_cluster_id' | |
alt_coarse_id_col = 'coarse_cluster_id' | |
if coarse_id_col in df.columns: | |
stats['coarse_clusters'] = df[coarse_id_col].nunique() | |
cluster_sizes = df.groupby(coarse_id_col).size() | |
stats['min_properties_per_coarse_cluster'] = cluster_sizes.min() if not cluster_sizes.empty else 0 | |
stats['max_properties_per_coarse_cluster'] = cluster_sizes.max() if not cluster_sizes.empty else 0 | |
stats['avg_properties_per_coarse_cluster'] = cluster_sizes.mean() if not cluster_sizes.empty else 0 | |
elif alt_coarse_id_col in df.columns: | |
stats['coarse_clusters'] = df[alt_coarse_id_col].nunique() | |
cluster_sizes = df.groupby(alt_coarse_id_col).size() | |
stats['min_properties_per_coarse_cluster'] = cluster_sizes.min() if not cluster_sizes.empty else 0 | |
stats['max_properties_per_coarse_cluster'] = cluster_sizes.max() if not cluster_sizes.empty else 0 | |
stats['avg_properties_per_coarse_cluster'] = cluster_sizes.mean() if not cluster_sizes.empty else 0 | |
return stats | |
def get_unique_values_for_dropdowns(clustered_df: pd.DataFrame) -> Dict[str, List[str]]: | |
"""Get unique values for dropdown menus.""" | |
if clustered_df.empty: | |
return {'prompts': [], 'models': [], 'properties': []} | |
# Get unique values, handling missing columns gracefully | |
prompts = [] | |
if 'prompt' in clustered_df.columns: | |
unique_prompts = clustered_df['prompt'].dropna().unique().tolist() | |
prompts = [prompt[:100] + "..." if len(prompt) > 100 else prompt for prompt in sorted(unique_prompts)] | |
elif 'question' in clustered_df.columns: | |
unique_prompts = clustered_df['question'].dropna().unique().tolist() | |
prompts = [prompt[:100] + "..." if len(prompt) > 100 else prompt for prompt in sorted(unique_prompts)] | |
elif 'input' in clustered_df.columns: | |
unique_prompts = clustered_df['input'].dropna().unique().tolist() | |
prompts = [prompt[:100] + "..." if len(prompt) > 100 else prompt for prompt in sorted(unique_prompts)] | |
elif 'user_prompt' in clustered_df.columns: | |
unique_prompts = clustered_df['user_prompt'].dropna().unique().tolist() | |
prompts = [prompt[:100] + "..." if len(prompt) > 100 else prompt for prompt in sorted(unique_prompts)] | |
# Handle both single model and side-by-side datasets | |
models = [] | |
if 'model' in clustered_df.columns: | |
# Single model datasets | |
models = sorted(clustered_df['model'].dropna().unique().tolist()) | |
elif 'model_a' in clustered_df.columns and 'model_b' in clustered_df.columns: | |
# Side-by-side datasets - combine models from both columns | |
models_a = clustered_df['model_a'].dropna().unique().tolist() | |
models_b = clustered_df['model_b'].dropna().unique().tolist() | |
all_models = set(models_a + models_b) | |
models = sorted(list(all_models)) | |
# Use fine cluster labels instead of property descriptions - try both naming patterns | |
properties = [] | |
fine_label_col = 'property_description_fine_cluster_label' | |
alt_fine_label_col = 'fine_cluster_label' | |
if fine_label_col in clustered_df.columns: | |
unique_properties = clustered_df[fine_label_col].dropna().unique().tolist() | |
# Filter out "No properties" clusters | |
unique_properties = [prop for prop in unique_properties if prop != "No properties"] | |
properties = [prop[:100] + "..." if len(prop) > 100 else prop for prop in sorted(unique_properties)] | |
elif alt_fine_label_col in clustered_df.columns: | |
unique_properties = clustered_df[alt_fine_label_col].dropna().unique().tolist() | |
# Filter out "No properties" clusters | |
unique_properties = [prop for prop in unique_properties if prop != "No properties"] | |
properties = [prop[:100] + "..." if len(prop) > 100 else prop for prop in sorted(unique_properties)] | |
elif 'property_description' in clustered_df.columns: | |
# Fallback to property descriptions if cluster labels not available | |
unique_properties = clustered_df['property_description'].dropna().unique().tolist() | |
# Filter out "No properties" clusters | |
unique_properties = [prop for prop in unique_properties if prop != "No properties"] | |
properties = [prop[:100] + "..." if len(prop) > 100 else prop for prop in sorted(unique_properties)] | |
return { | |
'prompts': prompts, | |
'models': models, | |
'properties': properties | |
} | |
# --------------------------------------------------------------------------- | |
# Example data extraction (restored) | |
# --------------------------------------------------------------------------- | |
def get_example_data( | |
clustered_df: pd.DataFrame, | |
selected_prompt: str | None = None, | |
selected_model: str | None = None, | |
selected_property: str | None = None, | |
max_examples: int = 5, | |
show_unexpected_behavior: bool = False, | |
randomize: bool = False, | |
) -> List[Dict[str, Any]]: | |
"""Return a list of example rows filtered by prompt / model / property. | |
This function was accidentally removed during a refactor; it is required by | |
*examples_tab.py* and other parts of the UI. | |
Args: | |
clustered_df: DataFrame containing the clustered results data | |
selected_prompt: Prompt to filter by (None for all) | |
selected_model: Model to filter by (None for all) | |
selected_property: Property description to filter by (None for all) | |
max_examples: Maximum number of examples to return | |
show_unexpected_behavior: If True, filter to only show unexpected behavior | |
randomize: If True, sample randomly from the filtered set instead of taking the first rows | |
Returns: | |
List of example dictionaries with extracted data | |
""" | |
if clustered_df.empty: | |
return [] | |
df = clustered_df.copy() | |
# Filter by unexpected behavior if requested | |
if show_unexpected_behavior: | |
if "unexpected_behavior" in df.columns: | |
# Assuming True/1 means unexpected behavior | |
df = df[df["unexpected_behavior"].isin([True, 1, "True", "true"])] | |
else: | |
# If no unexpected_behavior column, return empty (or could return all) | |
return [] | |
# Filter by prompt | |
if selected_prompt: | |
prompt_cols = ["prompt", "question", "input", "user_prompt"] | |
for col in prompt_cols: | |
if col in df.columns: | |
df = df[df[col].str.contains(selected_prompt, case=False, na=False)] | |
break | |
# Filter by model - handle both single model and side-by-side datasets | |
if selected_model: | |
if "model" in df.columns: | |
# Single model datasets | |
df = df[df["model"] == selected_model] | |
elif "model_a" in df.columns and "model_b" in df.columns: | |
# Side-by-side datasets - filter where either model_a or model_b matches | |
df = df[(df["model_a"] == selected_model) | (df["model_b"] == selected_model)] | |
# Filter by property | |
if selected_property: | |
property_cols = ["property_description", "cluster", "fine_cluster_label", "property_description_fine_cluster_label"] | |
for col in property_cols: | |
if col in df.columns: | |
df = df[df[col].str.contains(selected_property, case=False, na=False)] | |
break | |
# Limit to max_examples (randomized if requested) | |
if randomize: | |
if len(df) > max_examples: | |
df = df.sample(n=max_examples) | |
else: | |
df = df.sample(frac=1) | |
else: | |
df = df.head(max_examples) | |
examples: List[Dict[str, Any]] = [] | |
for _, row in df.iterrows(): | |
prompt_val = next( | |
(row.get(col) for col in ["prompt", "question", "input", "user_prompt"] if row.get(col) is not None), | |
"N/A", | |
) | |
# Check if this is a side-by-side dataset | |
is_side_by_side = ('model_a_response' in row and 'model_b_response' in row and | |
row.get('model_a_response') is not None and row.get('model_b_response') is not None) | |
if is_side_by_side: | |
# For side-by-side datasets, store both responses separately | |
response_val = "SIDE_BY_SIDE" # Special marker | |
model_val = f"{row.get('model_a', 'Model A')} vs {row.get('model_b', 'Model B')}" | |
else: | |
# For single response datasets, use the existing logic | |
response_val = next( | |
( | |
row.get(col) | |
for col in [ | |
"model_response", | |
"model_a_response", | |
"model_b_response", | |
"responses", | |
"response", | |
"output", | |
] | |
if row.get(col) is not None | |
), | |
"N/A", | |
) | |
model_val = row.get("model", "N/A") | |
# Try both naming patterns for cluster data | |
fine_cluster_id = row.get("property_description_fine_cluster_id", row.get("fine_cluster_id", "N/A")) | |
fine_cluster_label = row.get("property_description_fine_cluster_label", row.get("fine_cluster_label", "N/A")) | |
coarse_cluster_id = row.get("property_description_coarse_cluster_id", row.get("coarse_cluster_id", "N/A")) | |
coarse_cluster_label = row.get("property_description_coarse_cluster_label", row.get("coarse_cluster_label", "N/A")) | |
example_dict = { | |
"id": row.get("id", "N/A"), | |
"model": model_val, | |
"prompt": prompt_val, | |
"response": response_val, | |
"property_description": row.get("property_description", "N/A"), | |
"score": row.get("score", "N/A"), | |
"fine_cluster_id": fine_cluster_id, | |
"fine_cluster_label": fine_cluster_label, | |
"coarse_cluster_id": coarse_cluster_id, | |
"coarse_cluster_label": coarse_cluster_label, | |
"category": row.get("category", "N/A"), | |
"type": row.get("type", "N/A"), | |
"impact": row.get("impact", "N/A"), | |
"reason": row.get("reason", "N/A"), | |
"evidence": row.get("evidence", "N/A"), | |
"user_preference_direction": row.get("user_preference_direction", "N/A"), | |
"raw_response": row.get("raw_response", "N/A"), | |
"contains_errors": row.get("contains_errors", "N/A"), | |
"unexpected_behavior": row.get("unexpected_behavior", "N/A"), | |
} | |
# Add side-by-side specific fields if applicable | |
if is_side_by_side: | |
example_dict.update({ | |
"is_side_by_side": True, | |
"model_a": row.get("model_a", "Model A"), | |
"model_b": row.get("model_b", "Model B"), | |
"model_a_response": row.get("model_a_response", "N/A"), | |
"model_b_response": row.get("model_b_response", "N/A"), | |
"winner": row.get("winner", None), | |
}) | |
else: | |
example_dict["is_side_by_side"] = False | |
examples.append(example_dict) | |
return examples | |
def format_examples_display(examples: List[Dict[str, Any]], | |
selected_prompt: str = None, | |
selected_model: str = None, | |
selected_property: str = None, | |
use_accordion: bool = True, | |
pretty_print_dicts: bool = True) -> str: | |
"""Format examples for HTML display with proper conversation rendering. | |
Args: | |
examples: List of example dictionaries | |
selected_prompt: Currently selected prompt filter | |
selected_model: Currently selected model filter | |
selected_property: Currently selected property filter | |
use_accordion: If True, group system and info messages in collapsible accordions | |
pretty_print_dicts: If True, pretty-print embedded dictionaries | |
Returns: | |
HTML string for display | |
""" | |
from .conversation_display import convert_to_openai_format, display_openai_conversation_html | |
from .side_by_side_display import display_side_by_side_responses | |
if not examples: | |
return "<p style='color: #e74c3c; padding: 20px;'>No examples found matching the current filters.</p>" | |
# Create filter summary | |
filter_parts = [] | |
if selected_prompt and selected_prompt != "All Prompts": | |
filter_parts.append(f"Prompt: {selected_prompt}") | |
if selected_model and selected_model != "All Models": | |
filter_parts.append(f"Model: {selected_model}") | |
if selected_property and selected_property != "All Clusters": | |
filter_parts.append(f"Cluster: {selected_property}") | |
filter_summary = "" | |
if filter_parts: | |
filter_summary = f""" | |
<div style="background: #e3f2fd; padding: 15px; border-radius: 8px; margin-bottom: 20px; border-left: 4px solid #2196f3;"> | |
<strong>🔍 Active Filters:</strong> {" • ".join(filter_parts)} | |
</div> | |
""" | |
html = f""" | |
<div style="font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;"> | |
<h3 style="color: #333; margin-bottom: 15px;">📋 Examples ({len(examples)} found)</h3> | |
{filter_summary} | |
""" | |
for i, example in enumerate(examples, 1): | |
# Check if this is a side-by-side example | |
if example.get('is_side_by_side', False): | |
# Use side-by-side display for comparison datasets | |
conversation_html = display_side_by_side_responses( | |
model_a=example['model_a'], | |
model_b=example['model_b'], | |
model_a_response=example['model_a_response'], | |
model_b_response=example['model_b_response'], | |
use_accordion=use_accordion, | |
pretty_print_dicts=pretty_print_dicts, | |
score=example['score'], | |
winner=example.get('winner') | |
) | |
else: | |
# Convert response to OpenAI format for proper display (single model) | |
response_data = example['response'] | |
if response_data != 'N/A': | |
openai_conversation = convert_to_openai_format(response_data) | |
conversation_html = display_openai_conversation_html( | |
openai_conversation, | |
use_accordion=use_accordion, | |
pretty_print_dicts=pretty_print_dicts, | |
evidence=example.get('evidence') | |
) | |
else: | |
conversation_html = "<p style='color: #dc3545; font-style: italic;'>No response data available</p>" | |
# Determine cluster info | |
cluster_info = "" | |
if example['fine_cluster_label'] != 'N/A': | |
cluster_info = f""" | |
<div style="margin-top: 10px; font-size: 13px; color: #666;"> | |
<strong>🏷️ Cluster:</strong> {example['fine_cluster_label']} (ID: {example['fine_cluster_id']}) | |
</div> | |
""" | |
# Score display for summary (only for non-side-by-side or when not shown in side-by-side) | |
score_badge = "" | |
if not example.get('is_side_by_side', False) and example['score'] != 'N/A': | |
try: | |
score_val = float(example['score']) | |
score_color = '#28a745' if score_val >= 0 else '#dc3545' | |
score_badge = f""" | |
<span style=" | |
background: {score_color}; | |
color: white; | |
padding: 4px 8px; | |
border-radius: 12px; | |
font-size: 12px; | |
font-weight: bold; | |
margin-left: 10px; | |
"> | |
Score: {score_val:.3f} | |
</span> | |
""" | |
except: | |
pass | |
# Create short preview of prompt for summary | |
prompt_preview = example['prompt'][:80] + "..." if len(example['prompt']) > 80 else example['prompt'] | |
# Create expandable example card | |
# First example is expanded by default | |
open_attr = "open" if i == 1 else "" | |
html += f""" | |
<details {open_attr} style="border: 1px solid #dee2e6; border-radius: 8px; margin-bottom: 15px; background: white; box-shadow: 0 2px 4px rgba(0,0,0,0.1);"> | |
<summary style=" | |
padding: 15px; | |
cursor: pointer; | |
font-weight: 600; | |
color: #495057; | |
background: linear-gradient(90deg, #f8f9fa 0%, #e9ecef 100%); | |
border-radius: 8px 8px 0 0; | |
border-bottom: 1px solid #dee2e6; | |
display: flex; | |
align-items: center; | |
justify-content: space-between; | |
"> | |
<span> | |
<span style="background: #6c757d; color: white; padding: 4px 8px; border-radius: 4px; font-size: 12px; margin-right: 10px;">#{i}</span> | |
{prompt_preview} | |
</span> | |
<span style="font-size: 12px; color: #6c757d;"> | |
{example['model']}{score_badge} | |
</span> | |
</summary> | |
<div style="padding: 20px;"> | |
<div style="margin-bottom: 15px; padding: 15px; background: #f8f9fa; border-radius: 6px; border-left: 4px solid #17a2b8;"> | |
<div style="display: flex; flex-wrap: wrap; gap: 15px; margin-top: 15px; font-size: 13px; color: #666;"> | |
<div><strong>Model:</strong> {example['model']}</div> | |
<div><strong>ID:</strong> {example['id']}</div> | |
{f'<div><strong>Category:</strong> {example["category"]}</div>' if example["category"] not in ["N/A", "None"] else ""} | |
{f'<div><strong>Type:</strong> {example["type"]}</div>' if example["type"] not in ["N/A", "None"] else ""} | |
{f'<div><strong>Impact:</strong> {example["impact"]}</div>' if example["impact"] not in ["N/A", "None"] else ""} | |
</div> | |
<div style="margin-top: 10px;"> | |
{f'<div style="margin-top: 10px;"><strong>Property:</strong> {example["property_description"]}</div>' if example["property_description"] not in ["N/A", "None"] else ""} | |
{f'<div style="margin-top: 10px;"><strong>Reason:</strong> {example["reason"]}</div>' if example["reason"] not in ["N/A", "None"] else ""} | |
{f'<div style="margin-top: 10px;"><strong>Evidence:</strong> {example["evidence"]}</div>' if example["evidence"] not in ["N/A", "None"] else ""} | |
</div> | |
</div> | |
<div style="margin-bottom: 15px;"> | |
<h5 style="margin: 0 0 8px 0; color: #333; font-size: 14px;">💬 {"Response Comparison" if example.get('is_side_by_side', False) else "Conversation"}</h5> | |
<div style="border-radius: 6px; font-size: 13px; line-height: 1.5;"> | |
{conversation_html} | |
</div> | |
</div> | |
</div> | |
</details> | |
""" | |
html += "</div>" | |
return html | |
# --------------------------------------------------------------------------- | |
# Legacy function aliases (backward compatibility) | |
# --------------------------------------------------------------------------- | |
def compute_model_rankings(*args, **kwargs): | |
"""Legacy alias → forwards to compute_model_rankings_new.""" | |
return compute_model_rankings_new(*args, **kwargs) | |
def create_model_summary_card(*args, **kwargs): | |
"""Legacy alias → forwards to create_model_summary_card_new.""" | |
return create_model_summary_card_new(*args, **kwargs) | |
def get_total_clusters_count(metrics: Dict[str, Any]) -> int: | |
"""Get the total number of clusters from the metrics data.""" | |
cluster_scores = metrics.get("cluster_scores", {}) | |
# Filter out "No properties" clusters | |
cluster_scores = {k: v for k, v in cluster_scores.items() if k != "No properties"} | |
return len(cluster_scores) | |
def get_light_color_for_cluster(cluster_name: str, index: int) -> str: | |
"""Generate a light dusty blue background for cluster boxes. | |
Returns a consistent light dusty blue color for all clusters. | |
""" | |
return "#f0f4f8" # Very light dusty blue | |
__all__ = [ | |
"get_model_clusters", | |
"get_all_models", | |
"get_all_clusters", | |
"format_confidence_interval", | |
"get_confidence_interval_width", | |
"has_confidence_intervals", | |
"extract_quality_score", | |
"get_top_clusters_for_model", | |
"compute_model_rankings_new", | |
"create_model_summary_card_new", | |
"format_cluster_dataframe", | |
"truncate_cluster_name", | |
"create_frequency_comparison_table", | |
"create_frequency_comparison_plots", | |
"search_clusters_by_text", | |
"search_clusters_only", | |
"create_interactive_cluster_viewer", | |
"get_cluster_statistics", | |
"get_unique_values_for_dropdowns", | |
"get_example_data", | |
"format_examples_display", | |
"compute_model_rankings", | |
"create_model_summary_card", | |
"get_total_clusters_count", | |
] |