DeepResearch-Leaderboard / tabs /data_viewer_tab.py
Ayanami0730's picture
Add DeepResearch Bench application with LFS support
927e909
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Data-Viewer tab ---- 美化·修正版
"""
import gradio as gr
import pandas as pd
import json, random
from pathlib import Path
import re # 导入re模块
# ---------- 路径 ----------
BASE_DIR = Path(__file__).resolve().parent.parent
DATA_VIEWER_FILE = BASE_DIR / "data" / "data_viewer.jsonl"
# ---------- 工具 ----------
def load_data_viewer_data() -> pd.DataFrame:
records = []
if DATA_VIEWER_FILE.exists():
for line in DATA_VIEWER_FILE.read_text(encoding="utf-8").splitlines():
try:
records.append(json.loads(line))
except json.JSONDecodeError:
continue
df = pd.DataFrame(records)
req = ["model_name", "id", "prompt", "article", "overall_score",
"comprehensiveness_score", "insight_score",
"instruction_following_score", "readability_score"]
if df.empty or not all(c in df.columns for c in req):
# 如果缺少任何必要列,返回一个包含所有期望列的空DataFrame,以避免后续错误
return pd.DataFrame(columns=req)
df["id"] = df["id"].astype(str)
return df
def make_user_task_markdown(item_id, prompt):
return f"""### User Task 🎯
**Task ID:** {item_id}
**Description:** {prompt}"""
def make_article_markdown(article: str) -> str:
if article and isinstance(article, str):
# 首先,标准化已经存在的多个换行符
processed_article = re.sub(r'\n{2,}', '\n\n', article)
# 保护表格区域
table_pattern = r'(\|[^\n]*\n(?:[\|\s\-:]+\n)?(?:\|[^\n]*\n)*)'
tables = []
def replace_table(match):
tables.append(match.group(1))
return f'__TABLE_PLACEHOLDER_{len(tables)-1}__'
processed_article = re.sub(table_pattern, replace_table, processed_article)
# 处理列表格式:识别 * ** 模式并确保前面有换行
# 匹配模式:* **标题:** 内容
processed_article = re.sub(r'(?<!\n)\*\s*\*\*([^*]+?)\*\*:', r'\n\n* **\1**:', processed_article)
# 处理嵌套列表:识别 * ** 后跟 * ** 的模式
processed_article = re.sub(r'\*\s*\*\*([^*]+?)\*\*:\s*([^*]*?)\s*\*\s*\*\*', r'* **\1**: \2\n * **', processed_article)
# 在引用标记前确保有适当的换行
processed_article = re.sub(r'(?<!\n)\[\d+[^\]]*\]\*\s*\*\*', r'\n\n* **', processed_article)
# 处理其他孤立的换行符(避免破坏我们刚创建的格式)
# 但要小心不要影响列表结构
lines = processed_article.split('\n')
result_lines = []
for i, line in enumerate(lines):
result_lines.append(line)
# 如果当前行不为空,下一行也不为空,且都不是列表项,则添加空行
if (i < len(lines) - 1 and
line.strip() and
lines[i + 1].strip() and
not line.strip().startswith('*') and
not lines[i + 1].strip().startswith('*') and
not line.strip().startswith('#')):
# 检查是否已经是双换行
if i + 1 < len(lines) and lines[i + 1].strip():
result_lines.append('') # 添加空行
processed_article = '\n'.join(result_lines)
# 恢复表格
for i, table in enumerate(tables):
processed_article = processed_article.replace(f'__TABLE_PLACEHOLDER_{i}__', table)
else:
processed_article = article if article is not None else ""
return f"""### Generated Article 📖
{processed_article}"""
def make_scores_html(overall, comprehensiveness, insight, instruction, readability):
scores_data = [
("Overall Score", overall),
("Comprehensiveness Score", comprehensiveness),
("Insight Score", insight),
("Instruction-Following Score", instruction),
("Readability Score", readability)
]
html_items_str = ""
for title, score in scores_data:
score_value = score if score is not None else "N/A"
html_items_str += f"""
<div style="text-align: center; padding: 8px 5px; flex-grow: 1; flex-basis: 0;">
<h4 style="margin: 0 0 6px 0; font-size: 1.2em; color: #4a4a4a; font-weight: 600;">{title}</h4>
<p style="margin: 0; font-size: 1.2em; font-weight: bold; color: #333;">{score_value}</p>
</div>
"""
# Outer container styled to mimic the .card class from the main CSS block
return f"""
<div style="background:#fff; border:1px solid #e0e0e0; border-radius:8px; padding: 18px 15px; margin:18px 0; box-shadow:0 2px 4px rgba(0,0,0,.06);">
<div style="display: flex; justify-content: space-between; align-items: flex-start;">
{html_items_str}
</div>
</div>"""
# ---------- 生成 Tab ----------
def create_data_viewer_tab():
with gr.Tab("🔍Data Viewer"):
gr.HTML(
"""
<style>
.card{background:#fff;border:1px solid #e0e0e0;border-radius:8px;padding:22px 24px;margin:18px 0;box-shadow:0 2px 4px rgba(0,0,0,.06);}
.scrollable-sm{max-height:260px;overflow-y:auto;}
.scrollable-lg{max-height:700px;overflow-y:auto;} /* 调整高度为分数区域腾出空间 */
.card p{color:#424242 !important;line-height:1.75;margin:0 0 14px 0;text-align:justify;}
.card ul,.card ol{margin:12px 0 12px 24px;color:#424242 !important;}
.card li{margin:4px 0;color:#424242 !important;}
.card blockquote{border-left:4px solid #3498db;margin:18px 0;padding:14px 18px;background:#f8f9fa;font-style:italic;color:#555 !important;}
.card pre{background:#f8f8f8;color:#333 !important;padding:18px;border-radius:6px;overflow-x:auto;border:1px solid #e0e0e0;}
.card strong,.card b{font-weight:700 !important;}
.card::-webkit-scrollbar{width:10px}
.card::-webkit-scrollbar-track{background:#f5f5f5;border-radius:5px}
.card::-webkit-scrollbar-thumb{background:#c0c0c0;border-radius:5px}
.card::-webkit-scrollbar-thumb:hover{background:#a0a0a0}
</style>
"""
)
df = load_data_viewer_data()
if df.empty:
gr.Markdown("## ⚠️ 没有可用数据 \n请确认 `data/data_viewer.jsonl` 存在且字段齐全(包括所有分数)。")
return
models = sorted(df["model_name"].unique())
tasks_df = (
df[["id", "prompt"]].drop_duplicates()
.assign(id_num=lambda x: x["id"].astype(int))
.sort_values("id_num")
)
task_choices = []
for _, row in tasks_df.iterrows():
limit = 30 if int(row["id"]) <= 50 else 60
preview = row["prompt"][:limit] + ("…" if len(row["prompt"]) > limit else "")
task_choices.append(f"{row['id']}. {preview}")
init_model = random.choice(models) if models else None
init_task = random.choice(task_choices) if task_choices else None
with gr.Row():
model_dd = gr.Dropdown(label="Select Model", choices=models, value=init_model, interactive=True)
task_dd = gr.Dropdown(label="Select Task", choices=task_choices, value=init_task, interactive=True)
user_md = gr.Markdown(elem_classes=["card", "scrollable-sm"])
article_md = gr.Markdown(elem_classes=["card", "scrollable-lg"])
scores_html = gr.HTML() # 新增HTML组件用于显示分数
def fetch(model, task_disp):
if not model or not task_disp:
msg = "请选择模型和任务。"
return make_user_task_markdown("--", msg), make_article_markdown(msg), ""
item_id = task_disp.split(".", 1)[0].strip()
entry = df[(df["model_name"] == model) & (df["id"] == item_id)]
if entry.empty:
err = f"未找到模型 **{model}** 对应任务 **{item_id}** 的内容或分数。"
return make_user_task_markdown(item_id, err), make_article_markdown(err), ""
prompt = entry["prompt"].iloc[0]
article = entry["article"].iloc[0]
# 提取分数
overall = entry["overall_score"].iloc[0]
comprehensiveness = entry["comprehensiveness_score"].iloc[0]
insight = entry["insight_score"].iloc[0]
instruction = entry["instruction_following_score"].iloc[0]
readability = entry["readability_score"].iloc[0]
scores_content = make_scores_html(overall, comprehensiveness, insight, instruction, readability)
return make_user_task_markdown(item_id, prompt), make_article_markdown(article), scores_content
# 初始渲染
if init_model and init_task:
user_md.value, article_md.value, scores_html.value = fetch(init_model, init_task)
else:
user_md.value = make_user_task_markdown("--", "请选择模型和任务。")
article_md.value = make_article_markdown("请选择模型和任务。")
scores_html.value = ""
model_dd.change(fetch, inputs=[model_dd, task_dd], outputs=[user_md, article_md, scores_html])
task_dd.change(fetch, inputs=[model_dd, task_dd], outputs=[user_md, article_md, scores_html])