Spaces:
Sleeping
Sleeping
## `genesis/pipeline.py` | |
from __future__ import annotations | |
import os, json | |
from typing import Any, Dict, List | |
from pydantic import BaseModel | |
from openai import AsyncOpenAI | |
from agents import Agent, Runner, RunConfig, WebSearchTool, HostedMCPTool | |
from .safety import SafetyGuard | |
from .tools import OntologyTool, PubMedTool, StructureTool, CrossrefTool, HFRerankTool | |
# Env & client | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") | |
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") | |
os.environ["OPENAI_AGENTS_DISABLE_TRACING"] = os.getenv("GENESIS_DISABLE_TRACING", "1") | |
client = AsyncOpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL, timeout=600.0) | |
DEEP_MODEL_PRIMARY = os.getenv("GENESIS_DEEP_MODEL", "o3-deep-research") | |
DEEP_MODEL_FAST = os.getenv("GENESIS_DEEP_FAST_MODEL", "o4-mini-deep-research") | |
INSTRUCTION_MODEL = os.getenv("GENESIS_INSTRUCTION_MODEL", "gpt-4o-mini") | |
TRIAGE_MODEL = os.getenv("GENESIS_TRIAGE_MODEL", "gpt-4o-mini") | |
CLARIFY_MODEL = os.getenv("GENESIS_CLARIFY_MODEL", "gpt-4o-mini") | |
MCP_URL = os.getenv("GENESIS_MCP_URL") | |
safety_guard = SafetyGuard() | |
class Clarifications(BaseModel): | |
questions: List[str] | |
CLARIFY_PROMPT = ( | |
"Ask at most 3 essential questions to improve a high-level synthetic biology research brief. " | |
"Focus only on: organism/system, target (gene/protein/pathway), timeframe, preferred outputs. " | |
"Never request operational lab details. Friendly tone." | |
) | |
INSTRUCTION_PROMPT = ( | |
"Rewrite the user query into detailed DEEP RESEARCH instructions in English. OUTPUT ONLY the instructions. " | |
"Include dimensions: organism/system, target, scope/timeframe, evaluation axes, required tables. " | |
"Format as a report with headers: Abstract, Background, Findings, Synthesis, Open Questions, Limitations, Risk & Ethics, References. " | |
"Prefer primary literature (PubMed/Crossref) and databases (UMLS/BioPortal/RCSB). Strictly avoid wet-lab protocols." | |
) | |
# Tools | |
base_tools = [WebSearchTool(), OntologyTool(), PubMedTool(), StructureTool(), CrossrefTool()] | |
if MCP_URL: | |
base_tools.append(HostedMCPTool(tool_config={ | |
"type": "mcp", "server_label": "file_search", "server_url": MCP_URL, "require_approval": "never" | |
})) | |
# Agents | |
research_agent = Agent( | |
name="Synthetic Biology Research Agent", | |
model=DEEP_MODEL_PRIMARY, | |
instructions=( | |
"Perform high-level empirical research with citations. Use tools judiciously. " | |
"NEVER produce step-by-step lab instructions or protocols." | |
), | |
tools=base_tools, | |
) | |
instruction_agent = Agent( | |
name="Research Instruction Agent", | |
model=INSTRUCTION_MODEL, | |
instructions=INSTRUCTION_PROMPT, | |
handoffs=[research_agent], | |
) | |
clarifying_agent = Agent( | |
name="Clarifying Questions Agent", | |
model=CLARIFY_MODEL, | |
instructions=CLARIFY_PROMPT, | |
output_type=Clarifications, | |
handoffs=[instruction_agent], | |
) | |
triage_agent = Agent( | |
name="Triage Agent", | |
model=TRIAGE_MODEL, | |
instructions=( | |
"If the user query lacks essential context, handoff to Clarifying Questions Agent; " | |
"otherwise handoff to Research Instruction Agent. Return EXACTLY one function call." | |
), | |
handoffs=[clarifying_agent, instruction_agent], | |
) | |
async def _extract_citations(stream) -> List[Dict[str, str]]: | |
citations: List[Dict[str, str]] = [] | |
try: | |
for item in reversed(stream.new_items): | |
if item.type == "message_output_item": | |
for content in getattr(item.raw_item, "content", []): | |
for ann in getattr(content, "annotations", []): | |
if getattr(ann, "type", None) == "url_citation": | |
citations.append({"title": getattr(ann, "title", ""), "url": getattr(ann, "url", "")}) | |
break | |
except Exception: | |
pass | |
return citations | |
async def research_once(query: str, fast: bool = False, rerank_model: str | None = None) -> Dict[str, Any]: | |
# Safety gate input | |
dec = safety_guard.gate(query) | |
if not dec.allowed: | |
query = "SAFE-ONLY REVIEW: " + query + " | |
Only produce high-level literature synthesis with citations." | |
# Switch to fast model if requested | |
if fast and research_agent.model != DEEP_MODEL_FAST: | |
research_agent.model = DEEP_MODEL_FAST | |
# Run pipeline | |
stream = Runner.run_streamed(triage_agent, query, run_config=RunConfig(tracing_disabled=True)) | |
async for _ in stream.stream_events(): | |
pass | |
final_text = stream.final_output | |
citations = await _extract_citations(stream) | |
# Optional HF rerank to reorder citations by relevance to query | |
if rerank_model and citations: | |
from .tools import HFRerankTool | |
rerank = HFRerankTool(model_id=rerank_model) | |
docs = [c.get("title") or c.get("url", "") for c in citations] | |
try: | |
rr = await rerank.call(query, docs) | |
order = rr.get("order") | |
if order: | |
citations = [citations[i] for i in order] | |
except Exception: | |
pass | |
return {"final_output": final_text, "citations": citations} |