Spaces:
Sleeping
Sleeping
""" | |
Data loading functionality for the LMM-Vibes Gradio app. | |
This module handles loading pipeline results and converting them to formats | |
suitable for the Gradio interface. | |
""" | |
import json | |
import pandas as pd | |
from pathlib import Path | |
from typing import Dict, List, Any, Tuple, Optional | |
import os | |
from .state import app_state | |
from lmmvibes.metrics.plotting import create_model_cluster_dataframe | |
class DataCache: | |
"""Simple cache for loaded data to avoid re-loading.""" | |
_cache = {} | |
def get(cls, key: str): | |
return cls._cache.get(key) | |
def set(cls, key: str, value: Any): | |
cls._cache[key] = value | |
def clear(cls): | |
cls._cache.clear() | |
def scan_for_result_subfolders(base_dir: str) -> List[str]: | |
"""Scan for subfolders that might contain pipeline results.""" | |
base_path = Path(base_dir) | |
if not base_path.exists(): | |
return [] | |
# Look for subfolders that contain the required files | |
subfolders = [] | |
for item in base_path.iterdir(): | |
if item.is_dir(): | |
# Check if this subfolder contains pipeline results | |
required_files = [ | |
"model_cluster_scores.json", | |
"cluster_scores.json", | |
"model_scores.json", | |
"clustered_results_lightweight.jsonl" | |
] | |
if all((item / f).exists() for f in required_files): | |
subfolders.append(item.name) | |
return subfolders | |
def validate_results_directory(results_dir: str) -> Tuple[bool, str]: | |
"""Validate that the results directory contains the expected files.""" | |
results_path = Path(results_dir) | |
if not results_path.exists(): | |
return False, f"Directory does not exist: {results_dir}" | |
if not results_path.is_dir(): | |
return False, f"Path is not a directory: {results_dir}" | |
# Check for FunctionalMetrics format files | |
required_files = [ | |
"model_cluster_scores.json", | |
"cluster_scores.json", | |
"model_scores.json", | |
] | |
missing_files = [] | |
for filename in required_files: | |
if not (results_path / filename).exists(): | |
missing_files.append(filename) | |
# Check for clustered results | |
if not (results_path / "clustered_results_lightweight.jsonl").exists(): | |
missing_files.append("clustered_results_lightweight.jsonl") | |
if missing_files: | |
return False, f"Missing required files: {', '.join(missing_files)}" | |
return True, "" | |
def get_available_models(metrics: Dict[str, Any]) -> List[str]: | |
"""Extract available models from metrics data.""" | |
model_cluster_scores = metrics.get("model_cluster_scores", {}) | |
return list(model_cluster_scores.keys()) | |
def get_all_models(metrics: Dict[str, Any]) -> List[str]: | |
"""Get all available models from metrics data.""" | |
return get_available_models(metrics) | |
def load_pipeline_results(results_dir: str) -> Tuple[pd.DataFrame, Dict[str, Any], pd.DataFrame, Path]: | |
"""Load pipeline outputs (FunctionalMetrics format only). | |
Returns: | |
clustered_df: DataFrame of per-conversation data loaded from clustered_results.jsonl | |
metrics: Dict containing the three FunctionalMetrics score dictionaries | |
model_cluster_df: DataFrame created from model_cluster_scores for plotting/analysis | |
results_path: Path to the results directory | |
""" | |
cache_key = f"pipeline_results_{results_dir}" | |
cached = DataCache.get(cache_key) | |
if cached: | |
return cached | |
results_path = Path(results_dir) | |
if not results_path.exists(): | |
raise FileNotFoundError(f"Results directory does not exist: {results_dir}") | |
# ------------------------------------------------------------------ | |
# 1. Load FunctionalMetrics score files (must ALL be present) | |
# ------------------------------------------------------------------ | |
required_files = [ | |
"model_cluster_scores.json", | |
"cluster_scores.json", | |
"model_scores.json", | |
] | |
missing = [f for f in required_files if not (results_path / f).exists()] | |
if missing: | |
raise FileNotFoundError( | |
f"Missing required metrics files in {results_dir}: {', '.join(missing)}" | |
) | |
with open(results_path / "model_cluster_scores.json") as f: | |
model_cluster_scores = json.load(f) | |
with open(results_path / "cluster_scores.json") as f: | |
cluster_scores = json.load(f) | |
with open(results_path / "model_scores.json") as f: | |
model_scores = json.load(f) | |
metrics = { | |
"model_cluster_scores": model_cluster_scores, | |
"cluster_scores": cluster_scores, | |
"model_scores": model_scores, | |
} | |
# ------------------------------------------------------------------ | |
# 2. Load clustered conversation data (JSON-Lines) | |
# ------------------------------------------------------------------ | |
clustered_path = results_path / "clustered_results_lightweight.jsonl" | |
if not clustered_path.exists(): | |
raise FileNotFoundError(f"clustered_results_lightweight.jsonl not found in {results_dir}") | |
try: | |
clustered_df = pd.read_json(clustered_path, lines=True) | |
except Exception as e: | |
raise ValueError(f"Could not load clustered results: {e}") | |
# ------------------------------------------------------------------ | |
# 3. Create model_cluster_df from metrics for plotting/analysis | |
# ------------------------------------------------------------------ | |
model_cluster_df = create_model_cluster_dataframe(model_cluster_scores) | |
result = (clustered_df, metrics, model_cluster_df, results_path) | |
DataCache.set(cache_key, result) | |
return result | |
def load_property_examples(results_path: Path, property_ids: List[str]) -> pd.DataFrame: | |
"""Load specific property examples on-demand""" | |
if not property_ids: | |
return pd.DataFrame() | |
cache_key = f"examples_{results_path}_{hash(tuple(sorted(property_ids)))}" | |
cached = DataCache.get(cache_key) | |
if cached is not None: | |
return cached | |
# Load full dataset to get prompt/response details | |
clustered_path = results_path / "clustered_results_lightweight.jsonl" | |
if not clustered_path.exists(): | |
raise FileNotFoundError("Could not load example data - clustered_results_lightweight.jsonl not found") | |
try: | |
full_df = pd.read_json(clustered_path, lines=True) | |
result = full_df[full_df['id'].isin(property_ids)] | |
DataCache.set(cache_key, result) | |
return result | |
except Exception as e: | |
raise ValueError(f"Failed to load examples: {e}") |