mgbam's picture
Create pipeline.py
2efa720 verified
raw
history blame
5.12 kB
## `genesis/pipeline.py`
from __future__ import annotations
import os, json
from typing import Any, Dict, List
from pydantic import BaseModel
from openai import AsyncOpenAI
from agents import Agent, Runner, RunConfig, WebSearchTool, HostedMCPTool
from .safety import SafetyGuard
from .tools import OntologyTool, PubMedTool, StructureTool, CrossrefTool, HFRerankTool
# Env & client
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
os.environ["OPENAI_AGENTS_DISABLE_TRACING"] = os.getenv("GENESIS_DISABLE_TRACING", "1")
client = AsyncOpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL, timeout=600.0)
DEEP_MODEL_PRIMARY = os.getenv("GENESIS_DEEP_MODEL", "o3-deep-research")
DEEP_MODEL_FAST = os.getenv("GENESIS_DEEP_FAST_MODEL", "o4-mini-deep-research")
INSTRUCTION_MODEL = os.getenv("GENESIS_INSTRUCTION_MODEL", "gpt-4o-mini")
TRIAGE_MODEL = os.getenv("GENESIS_TRIAGE_MODEL", "gpt-4o-mini")
CLARIFY_MODEL = os.getenv("GENESIS_CLARIFY_MODEL", "gpt-4o-mini")
MCP_URL = os.getenv("GENESIS_MCP_URL")
safety_guard = SafetyGuard()
class Clarifications(BaseModel):
questions: List[str]
CLARIFY_PROMPT = (
"Ask at most 3 essential questions to improve a high-level synthetic biology research brief. "
"Focus only on: organism/system, target (gene/protein/pathway), timeframe, preferred outputs. "
"Never request operational lab details. Friendly tone."
)
INSTRUCTION_PROMPT = (
"Rewrite the user query into detailed DEEP RESEARCH instructions in English. OUTPUT ONLY the instructions. "
"Include dimensions: organism/system, target, scope/timeframe, evaluation axes, required tables. "
"Format as a report with headers: Abstract, Background, Findings, Synthesis, Open Questions, Limitations, Risk & Ethics, References. "
"Prefer primary literature (PubMed/Crossref) and databases (UMLS/BioPortal/RCSB). Strictly avoid wet-lab protocols."
)
# Tools
base_tools = [WebSearchTool(), OntologyTool(), PubMedTool(), StructureTool(), CrossrefTool()]
if MCP_URL:
base_tools.append(HostedMCPTool(tool_config={
"type": "mcp", "server_label": "file_search", "server_url": MCP_URL, "require_approval": "never"
}))
# Agents
research_agent = Agent(
name="Synthetic Biology Research Agent",
model=DEEP_MODEL_PRIMARY,
instructions=(
"Perform high-level empirical research with citations. Use tools judiciously. "
"NEVER produce step-by-step lab instructions or protocols."
),
tools=base_tools,
)
instruction_agent = Agent(
name="Research Instruction Agent",
model=INSTRUCTION_MODEL,
instructions=INSTRUCTION_PROMPT,
handoffs=[research_agent],
)
clarifying_agent = Agent(
name="Clarifying Questions Agent",
model=CLARIFY_MODEL,
instructions=CLARIFY_PROMPT,
output_type=Clarifications,
handoffs=[instruction_agent],
)
triage_agent = Agent(
name="Triage Agent",
model=TRIAGE_MODEL,
instructions=(
"If the user query lacks essential context, handoff to Clarifying Questions Agent; "
"otherwise handoff to Research Instruction Agent. Return EXACTLY one function call."
),
handoffs=[clarifying_agent, instruction_agent],
)
async def _extract_citations(stream) -> List[Dict[str, str]]:
citations: List[Dict[str, str]] = []
try:
for item in reversed(stream.new_items):
if item.type == "message_output_item":
for content in getattr(item.raw_item, "content", []):
for ann in getattr(content, "annotations", []):
if getattr(ann, "type", None) == "url_citation":
citations.append({"title": getattr(ann, "title", ""), "url": getattr(ann, "url", "")})
break
except Exception:
pass
return citations
async def research_once(query: str, fast: bool = False, rerank_model: str | None = None) -> Dict[str, Any]:
# Safety gate input
dec = safety_guard.gate(query)
if not dec.allowed:
query = "SAFE-ONLY REVIEW: " + query + "
Only produce high-level literature synthesis with citations."
# Switch to fast model if requested
if fast and research_agent.model != DEEP_MODEL_FAST:
research_agent.model = DEEP_MODEL_FAST
# Run pipeline
stream = Runner.run_streamed(triage_agent, query, run_config=RunConfig(tracing_disabled=True))
async for _ in stream.stream_events():
pass
final_text = stream.final_output
citations = await _extract_citations(stream)
# Optional HF rerank to reorder citations by relevance to query
if rerank_model and citations:
from .tools import HFRerankTool
rerank = HFRerankTool(model_id=rerank_model)
docs = [c.get("title") or c.get("url", "") for c in citations]
try:
rr = await rerank.call(query, docs)
order = rr.get("order")
if order:
citations = [citations[i] for i in order]
except Exception:
pass
return {"final_output": final_text, "citations": citations}