Spaces:
Running
Running
File size: 6,708 Bytes
4862c84 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
"""
Data loading functionality for the LMM-Vibes Gradio app.
This module handles loading pipeline results and converting them to formats
suitable for the Gradio interface.
"""
import json
import pandas as pd
from pathlib import Path
from typing import Dict, List, Any, Tuple, Optional
import os
from .state import app_state
from lmmvibes.metrics.plotting import create_model_cluster_dataframe
class DataCache:
"""Simple cache for loaded data to avoid re-loading."""
_cache = {}
@classmethod
def get(cls, key: str):
return cls._cache.get(key)
@classmethod
def set(cls, key: str, value: Any):
cls._cache[key] = value
@classmethod
def clear(cls):
cls._cache.clear()
def scan_for_result_subfolders(base_dir: str) -> List[str]:
"""Scan for subfolders that might contain pipeline results."""
base_path = Path(base_dir)
if not base_path.exists():
return []
# Look for subfolders that contain the required files
subfolders = []
for item in base_path.iterdir():
if item.is_dir():
# Check if this subfolder contains pipeline results
required_files = [
"model_cluster_scores.json",
"cluster_scores.json",
"model_scores.json",
"clustered_results_lightweight.jsonl"
]
if all((item / f).exists() for f in required_files):
subfolders.append(item.name)
return subfolders
def validate_results_directory(results_dir: str) -> Tuple[bool, str]:
"""Validate that the results directory contains the expected files."""
results_path = Path(results_dir)
if not results_path.exists():
return False, f"Directory does not exist: {results_dir}"
if not results_path.is_dir():
return False, f"Path is not a directory: {results_dir}"
# Check for FunctionalMetrics format files
required_files = [
"model_cluster_scores.json",
"cluster_scores.json",
"model_scores.json",
]
missing_files = []
for filename in required_files:
if not (results_path / filename).exists():
missing_files.append(filename)
# Check for clustered results
if not (results_path / "clustered_results_lightweight.jsonl").exists():
missing_files.append("clustered_results_lightweight.jsonl")
if missing_files:
return False, f"Missing required files: {', '.join(missing_files)}"
return True, ""
def get_available_models(metrics: Dict[str, Any]) -> List[str]:
"""Extract available models from metrics data."""
model_cluster_scores = metrics.get("model_cluster_scores", {})
return list(model_cluster_scores.keys())
def get_all_models(metrics: Dict[str, Any]) -> List[str]:
"""Get all available models from metrics data."""
return get_available_models(metrics)
def load_pipeline_results(results_dir: str) -> Tuple[pd.DataFrame, Dict[str, Any], pd.DataFrame, Path]:
"""Load pipeline outputs (FunctionalMetrics format only).
Returns:
clustered_df: DataFrame of per-conversation data loaded from clustered_results.jsonl
metrics: Dict containing the three FunctionalMetrics score dictionaries
model_cluster_df: DataFrame created from model_cluster_scores for plotting/analysis
results_path: Path to the results directory
"""
cache_key = f"pipeline_results_{results_dir}"
cached = DataCache.get(cache_key)
if cached:
return cached
results_path = Path(results_dir)
if not results_path.exists():
raise FileNotFoundError(f"Results directory does not exist: {results_dir}")
# ------------------------------------------------------------------
# 1. Load FunctionalMetrics score files (must ALL be present)
# ------------------------------------------------------------------
required_files = [
"model_cluster_scores.json",
"cluster_scores.json",
"model_scores.json",
]
missing = [f for f in required_files if not (results_path / f).exists()]
if missing:
raise FileNotFoundError(
f"Missing required metrics files in {results_dir}: {', '.join(missing)}"
)
with open(results_path / "model_cluster_scores.json") as f:
model_cluster_scores = json.load(f)
with open(results_path / "cluster_scores.json") as f:
cluster_scores = json.load(f)
with open(results_path / "model_scores.json") as f:
model_scores = json.load(f)
metrics = {
"model_cluster_scores": model_cluster_scores,
"cluster_scores": cluster_scores,
"model_scores": model_scores,
}
# ------------------------------------------------------------------
# 2. Load clustered conversation data (JSON-Lines)
# ------------------------------------------------------------------
clustered_path = results_path / "clustered_results_lightweight.jsonl"
if not clustered_path.exists():
raise FileNotFoundError(f"clustered_results_lightweight.jsonl not found in {results_dir}")
try:
clustered_df = pd.read_json(clustered_path, lines=True)
except Exception as e:
raise ValueError(f"Could not load clustered results: {e}")
# ------------------------------------------------------------------
# 3. Create model_cluster_df from metrics for plotting/analysis
# ------------------------------------------------------------------
model_cluster_df = create_model_cluster_dataframe(model_cluster_scores)
result = (clustered_df, metrics, model_cluster_df, results_path)
DataCache.set(cache_key, result)
return result
def load_property_examples(results_path: Path, property_ids: List[str]) -> pd.DataFrame:
"""Load specific property examples on-demand"""
if not property_ids:
return pd.DataFrame()
cache_key = f"examples_{results_path}_{hash(tuple(sorted(property_ids)))}"
cached = DataCache.get(cache_key)
if cached is not None:
return cached
# Load full dataset to get prompt/response details
clustered_path = results_path / "clustered_results_lightweight.jsonl"
if not clustered_path.exists():
raise FileNotFoundError("Could not load example data - clustered_results_lightweight.jsonl not found")
try:
full_df = pd.read_json(clustered_path, lines=True)
result = full_df[full_df['id'].isin(property_ids)]
DataCache.set(cache_key, result)
return result
except Exception as e:
raise ValueError(f"Failed to load examples: {e}") |