Spaces:
Build error
Build error
import glob | |
import json | |
import logging | |
import os | |
import subprocess | |
import sys | |
import tempfile | |
from collections import deque | |
from pathlib import Path | |
from typing import Annotated, Any, ClassVar, Iterator, Literal, Optional | |
import pytest | |
from agent_protocol_client import AgentApi, ApiClient | |
from agent_protocol_client import Configuration as ClientConfig | |
from agent_protocol_client import Step | |
from colorama import Fore, Style | |
from openai import _load_client as get_openai_client | |
from pydantic import ( | |
BaseModel, | |
Field, | |
StringConstraints, | |
ValidationInfo, | |
field_validator, | |
) | |
from agbenchmark.agent_api_interface import download_agent_artifacts_into_folder | |
from agbenchmark.agent_interface import copy_challenge_artifacts_into_workspace | |
from agbenchmark.config import AgentBenchmarkConfig | |
from agbenchmark.utils.data_types import Category, DifficultyLevel, EvalResult | |
from agbenchmark.utils.prompts import ( | |
END_PROMPT, | |
FEW_SHOT_EXAMPLES, | |
PROMPT_MAP, | |
SCORING_MAP, | |
) | |
from .base import BaseChallenge, ChallengeInfo | |
logger = logging.getLogger(__name__) | |
with open(Path(__file__).parent / "optional_categories.json") as f: | |
OPTIONAL_CATEGORIES: list[str] = json.load(f)["optional_categories"] | |
class BuiltinChallengeSpec(BaseModel): | |
eval_id: str = "" | |
name: str | |
task: str | |
category: list[Category] | |
dependencies: list[str] | |
cutoff: int | |
class Info(BaseModel): | |
difficulty: DifficultyLevel | |
description: Annotated[ | |
str, StringConstraints(pattern=r"^Tests if the agent can.*") | |
] | |
side_effects: list[str] = Field(default_factory=list) | |
info: Info | |
class Ground(BaseModel): | |
answer: str | |
should_contain: Optional[list[str]] = None | |
should_not_contain: Optional[list[str]] = None | |
files: list[str] | |
case_sensitive: Optional[bool] = True | |
class Eval(BaseModel): | |
type: str | |
scoring: Optional[Literal["percentage", "scale", "binary"]] = None | |
template: Optional[ | |
Literal["rubric", "reference", "question", "custom"] | |
] = None | |
examples: Optional[str] = None | |
def validate_eval_fields(cls, value, info: ValidationInfo): | |
field_name = info.field_name | |
if "type" in info.data and info.data["type"] == "llm": | |
if value is None: | |
raise ValueError( | |
f"{field_name} must be provided when eval type is 'llm'" | |
) | |
else: | |
if value is not None: | |
raise ValueError( | |
f"{field_name} should only exist when eval type is 'llm'" | |
) | |
return value | |
eval: Eval | |
ground: Ground | |
metadata: Optional[dict[str, Any]] = None | |
spec_file: Path | None = Field(None, exclude=True) | |
class BuiltinChallenge(BaseChallenge): | |
""" | |
Base class for AGBenchmark's built-in challenges (challenges/**/*.json). | |
All of the logic is present in this class. Individual challenges are created as | |
subclasses of `BuiltinChallenge` with challenge-specific values assigned to the | |
ClassVars `_spec` etc. | |
Dynamically constructing subclasses rather than class instances for the individual | |
challenges makes them suitable for collection by Pytest, which will run their | |
`test_method` like any regular test item. | |
""" | |
_spec: ClassVar[BuiltinChallengeSpec] | |
CHALLENGE_LOCATION: ClassVar[str] | |
ARTIFACTS_LOCATION: ClassVar[str] | |
SOURCE_URI_PREFIX = "__BUILTIN__" | |
def from_challenge_spec( | |
cls, spec: BuiltinChallengeSpec | |
) -> type["BuiltinChallenge"]: | |
if not spec.spec_file: | |
raise ValueError("spec.spec_file not defined") | |
challenge_info = ChallengeInfo( | |
eval_id=spec.eval_id, | |
name=spec.name, | |
task=spec.task, | |
task_artifacts_dir=spec.spec_file.parent, | |
category=spec.category, | |
difficulty=spec.info.difficulty, | |
description=spec.info.description, | |
dependencies=spec.dependencies, | |
reference_answer=spec.ground.answer, | |
source_uri=( | |
f"__BUILTIN__/{spec.spec_file.relative_to(Path(__file__).parent)}" | |
), | |
) | |
challenge_class_name = f"Test{challenge_info.name}" | |
logger.debug(f"Creating {challenge_class_name} from spec: {spec.spec_file}") | |
return type( | |
challenge_class_name, | |
(BuiltinChallenge,), | |
{ | |
"info": challenge_info, | |
"_spec": spec, | |
"CHALLENGE_LOCATION": str(spec.spec_file), | |
"ARTIFACTS_LOCATION": str(spec.spec_file.resolve().parent), | |
}, | |
) | |
def from_challenge_spec_file(cls, spec_file: Path) -> type["BuiltinChallenge"]: | |
challenge_spec = BuiltinChallengeSpec.model_validate_json(spec_file.read_text()) | |
challenge_spec.spec_file = spec_file | |
return cls.from_challenge_spec(challenge_spec) | |
def from_source_uri(cls, source_uri: str) -> type["BuiltinChallenge"]: | |
if not source_uri.startswith(cls.SOURCE_URI_PREFIX): | |
raise ValueError(f"Invalid source_uri for BuiltinChallenge: {source_uri}") | |
path = source_uri.split("/", 1)[1] | |
spec_file = Path(__file__).parent / path | |
return cls.from_challenge_spec_file(spec_file) | |
async def test_method( | |
self, | |
config: AgentBenchmarkConfig, | |
request: pytest.FixtureRequest, | |
i_attempt: int, | |
) -> None: | |
# if os.environ.get("HELICONE_API_KEY"): | |
# from helicone.lock import HeliconeLockManager | |
# HeliconeLockManager.write_custom_property("challenge", self.info.name) | |
timeout = self._spec.cutoff or 60 | |
if request.config.getoption("--nc"): | |
timeout = 100000 | |
elif cutoff := request.config.getoption("--cutoff"): | |
timeout = int(cutoff) # type: ignore | |
task_id = "" | |
n_steps = 0 | |
timed_out = None | |
agent_task_cost = None | |
steps: list[Step] = [] | |
try: | |
async for step in self.run_challenge( | |
config, timeout, mock=bool(request.config.getoption("--mock")) | |
): | |
if not task_id: | |
task_id = step.task_id | |
n_steps += 1 | |
steps.append(step.model_copy()) | |
if step.additional_output: | |
agent_task_cost = step.additional_output.get( | |
"task_total_cost", | |
step.additional_output.get("task_cumulative_cost"), | |
) | |
timed_out = False | |
except TimeoutError: | |
timed_out = True | |
assert isinstance(request.node, pytest.Item) | |
request.node.user_properties.append(("steps", steps)) | |
request.node.user_properties.append(("n_steps", n_steps)) | |
request.node.user_properties.append(("timed_out", timed_out)) | |
request.node.user_properties.append(("agent_task_cost", agent_task_cost)) | |
agent_client_config = ClientConfig(host=config.host) | |
async with ApiClient(agent_client_config) as api_client: | |
api_instance = AgentApi(api_client) | |
eval_results = await self.evaluate_task_state(api_instance, task_id) | |
if not eval_results: | |
if timed_out: | |
raise TimeoutError("Timed out, no results to evaluate") | |
else: | |
raise ValueError("No results to evaluate") | |
request.node.user_properties.append( | |
( | |
"answers", | |
[r.result for r in eval_results] | |
if request.config.getoption("--keep-answers") | |
else None, | |
) | |
) | |
request.node.user_properties.append(("scores", [r.score for r in eval_results])) | |
# FIXME: this allows partial failure | |
assert any(r.passed for r in eval_results), ( | |
f"No passed evals: {eval_results}" | |
if not timed_out | |
else f"Timed out; no passed evals: {eval_results}" | |
) | |
async def evaluate_task_state( | |
cls, agent: AgentApi, task_id: str | |
) -> list[EvalResult]: | |
with tempfile.TemporaryDirectory() as workspace: | |
workspace = Path(workspace) | |
await download_agent_artifacts_into_folder(agent, task_id, workspace) | |
if cls.info.task_artifacts_dir: | |
copy_challenge_artifacts_into_workspace( | |
cls.info.task_artifacts_dir, "custom_python", workspace | |
) | |
return list(cls.evaluate_workspace_content(workspace)) | |
def evaluate_workspace_content(cls, workspace: Path) -> Iterator[EvalResult]: | |
result_ground = cls._spec.ground | |
outputs_for_eval = cls.get_outputs_for_eval(workspace, result_ground) | |
if result_ground.should_contain or result_ground.should_not_contain: | |
for source, content in outputs_for_eval: | |
score = cls.score_result(content, result_ground) | |
if score is not None: | |
print(f"{Fore.GREEN}Your score is:{Style.RESET_ALL}", score) | |
yield EvalResult( | |
result=content, | |
result_source=str(source), | |
score=score, | |
passed=score > 0.9, # FIXME: arbitrary threshold | |
) | |
if result_ground.eval.type in ("python", "pytest"): | |
for py_file, output in outputs_for_eval: | |
yield EvalResult( | |
result=output, | |
result_source=str(py_file), | |
score=float(not output.startswith("Error:")), | |
passed=not output.startswith("Error:"), | |
) | |
if result_ground.eval.type == "llm": | |
combined_results = "\n".join(output[1] for output in outputs_for_eval) | |
llm_eval = cls.score_result_with_llm(combined_results, result_ground) | |
print(f"{Fore.GREEN}Your score is:{Style.RESET_ALL}", llm_eval) | |
if result_ground.eval.scoring == "percentage": | |
score = llm_eval / 100 | |
elif result_ground.eval.scoring == "scale": | |
score = llm_eval / 10 | |
else: | |
score = llm_eval | |
yield EvalResult( | |
result=combined_results, | |
result_source=", ".join(str(res[0]) for res in outputs_for_eval), | |
score=score, | |
passed=score > 0.9, # FIXME: arbitrary threshold | |
) | |
def get_outputs_for_eval( | |
workspace: str | Path | dict[str, str], ground: BuiltinChallengeSpec.Ground | |
) -> Iterator[tuple[str | Path, str]]: | |
if isinstance(workspace, dict): | |
workspace = workspace["output"] | |
script_dir = workspace | |
for file_pattern in ground.files: | |
# Check if it is a file extension | |
if file_pattern.startswith("."): | |
# Find all files with the given extension in the workspace | |
matching_files = glob.glob(os.path.join(script_dir, "*" + file_pattern)) | |
else: | |
# Otherwise, it is a specific file | |
matching_files = [os.path.join(script_dir, file_pattern)] | |
logger.debug( | |
f"Files to evaluate for pattern `{file_pattern}`: {matching_files}" | |
) | |
for file_path in matching_files: | |
relative_file_path = Path(file_path).relative_to(workspace) | |
logger.debug( | |
f"Evaluating {relative_file_path} " | |
f"(eval type: {ground.eval.type})..." | |
) | |
if ground.eval.type == "python": | |
result = subprocess.run( | |
[sys.executable, file_path], | |
cwd=os.path.abspath(workspace), | |
capture_output=True, | |
text=True, | |
) | |
if "error" in result.stderr or result.returncode != 0: | |
yield relative_file_path, f"Error: {result.stderr}\n" | |
else: | |
yield relative_file_path, f"Output: {result.stdout}\n" | |
else: | |
with open(file_path, "r") as f: | |
yield relative_file_path, f.read() | |
else: | |
if ground.eval.type == "pytest": | |
result = subprocess.run( | |
[sys.executable, "-m", "pytest"], | |
cwd=os.path.abspath(workspace), | |
capture_output=True, | |
text=True, | |
) | |
logger.debug(f"EXIT CODE: {result.returncode}") | |
logger.debug(f"STDOUT: {result.stdout}") | |
logger.debug(f"STDERR: {result.stderr}") | |
if "error" in result.stderr or result.returncode != 0: | |
yield "pytest", f"Error: {result.stderr.strip() or result.stdout}\n" | |
else: | |
yield "pytest", f"Output: {result.stdout}\n" | |
def score_result(content: str, ground: BuiltinChallengeSpec.Ground) -> float | None: | |
print(f"{Fore.BLUE}Scoring content:{Style.RESET_ALL}", content) | |
if ground.should_contain: | |
for should_contain_word in ground.should_contain: | |
if not ground.case_sensitive: | |
should_contain_word = should_contain_word.lower() | |
content = content.lower() | |
print_content = ( | |
f"{Fore.BLUE}Word that should exist{Style.RESET_ALL}" | |
f" - {should_contain_word}:" | |
) | |
if should_contain_word not in content: | |
print(print_content, "False") | |
return 0.0 | |
else: | |
print(print_content, "True") | |
return 1.0 | |
if ground.should_not_contain: | |
for should_not_contain_word in ground.should_not_contain: | |
if not ground.case_sensitive: | |
should_not_contain_word = should_not_contain_word.lower() | |
content = content.lower() | |
print_content = ( | |
f"{Fore.BLUE}Word that should not exist{Style.RESET_ALL}" | |
f" - {should_not_contain_word}:" | |
) | |
if should_not_contain_word in content: | |
print(print_content, "False") | |
return 0.0 | |
else: | |
print(print_content, "True") | |
return 1.0 | |
def score_result_with_llm( | |
cls, content: str, ground: BuiltinChallengeSpec.Ground, *, mock: bool = False | |
) -> float: | |
if mock: | |
return 1.0 | |
# the validation for this is done in the Eval BaseModel | |
scoring = SCORING_MAP[ground.eval.scoring] # type: ignore | |
prompt = PROMPT_MAP[ground.eval.template].format( # type: ignore | |
task=cls._spec.task, scoring=scoring, answer=ground.answer, response=content | |
) | |
if ground.eval.examples: | |
prompt += FEW_SHOT_EXAMPLES.format(examples=ground.eval.examples) | |
prompt += END_PROMPT | |
answer = get_openai_client().chat.completions.create( | |
model="gpt-4", | |
messages=[ | |
{"role": "system", "content": prompt}, | |
], | |
) | |
return float(answer.choices[0].message.content) # type: ignore | |
def load_builtin_challenges() -> Iterator[type[BuiltinChallenge]]: | |
logger.info("Loading built-in challenges...") | |
challenges_path = Path(__file__).parent | |
logger.debug(f"Looking for challenge spec files in {challenges_path}...") | |
json_files = deque(challenges_path.rglob("data.json")) | |
logger.debug(f"Found {len(json_files)} built-in challenges.") | |
loaded, ignored = 0, 0 | |
while json_files: | |
# Take and remove the first element from json_files | |
json_file = json_files.popleft() | |
if _challenge_should_be_ignored(json_file): | |
ignored += 1 | |
continue | |
challenge = BuiltinChallenge.from_challenge_spec_file(json_file) | |
logger.debug(f"Generated test for {challenge.info.name}") | |
yield challenge | |
loaded += 1 | |
logger.info( | |
f"Loading built-in challenges complete: loaded {loaded}, ignored {ignored}." | |
) | |
def _challenge_should_be_ignored(json_file_path: Path): | |
return ( | |
"challenges/deprecated" in json_file_path.as_posix() | |
or "challenges/library" in json_file_path.as_posix() | |
) | |