## `genesis/pipeline.py` from __future__ import annotations import os, json from typing import Any, Dict, List from pydantic import BaseModel from openai import AsyncOpenAI from agents import Agent, Runner, RunConfig, WebSearchTool, HostedMCPTool from .safety import SafetyGuard from .tools import OntologyTool, PubMedTool, StructureTool, CrossrefTool, HFRerankTool # Env & client OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") os.environ["OPENAI_AGENTS_DISABLE_TRACING"] = os.getenv("GENESIS_DISABLE_TRACING", "1") client = AsyncOpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL, timeout=600.0) DEEP_MODEL_PRIMARY = os.getenv("GENESIS_DEEP_MODEL", "o3-deep-research") DEEP_MODEL_FAST = os.getenv("GENESIS_DEEP_FAST_MODEL", "o4-mini-deep-research") INSTRUCTION_MODEL = os.getenv("GENESIS_INSTRUCTION_MODEL", "gpt-4o-mini") TRIAGE_MODEL = os.getenv("GENESIS_TRIAGE_MODEL", "gpt-4o-mini") CLARIFY_MODEL = os.getenv("GENESIS_CLARIFY_MODEL", "gpt-4o-mini") MCP_URL = os.getenv("GENESIS_MCP_URL") safety_guard = SafetyGuard() class Clarifications(BaseModel): questions: List[str] CLARIFY_PROMPT = ( "Ask at most 3 essential questions to improve a high-level synthetic biology research brief. " "Focus only on: organism/system, target (gene/protein/pathway), timeframe, preferred outputs. " "Never request operational lab details. Friendly tone." ) INSTRUCTION_PROMPT = ( "Rewrite the user query into detailed DEEP RESEARCH instructions in English. OUTPUT ONLY the instructions. " "Include dimensions: organism/system, target, scope/timeframe, evaluation axes, required tables. " "Format as a report with headers: Abstract, Background, Findings, Synthesis, Open Questions, Limitations, Risk & Ethics, References. " "Prefer primary literature (PubMed/Crossref) and databases (UMLS/BioPortal/RCSB). Strictly avoid wet-lab protocols." ) # Tools base_tools = [WebSearchTool(), OntologyTool(), PubMedTool(), StructureTool(), CrossrefTool()] if MCP_URL: base_tools.append(HostedMCPTool(tool_config={ "type": "mcp", "server_label": "file_search", "server_url": MCP_URL, "require_approval": "never" })) # Agents research_agent = Agent( name="Synthetic Biology Research Agent", model=DEEP_MODEL_PRIMARY, instructions=( "Perform high-level empirical research with citations. Use tools judiciously. " "NEVER produce step-by-step lab instructions or protocols." ), tools=base_tools, ) instruction_agent = Agent( name="Research Instruction Agent", model=INSTRUCTION_MODEL, instructions=INSTRUCTION_PROMPT, handoffs=[research_agent], ) clarifying_agent = Agent( name="Clarifying Questions Agent", model=CLARIFY_MODEL, instructions=CLARIFY_PROMPT, output_type=Clarifications, handoffs=[instruction_agent], ) triage_agent = Agent( name="Triage Agent", model=TRIAGE_MODEL, instructions=( "If the user query lacks essential context, handoff to Clarifying Questions Agent; " "otherwise handoff to Research Instruction Agent. Return EXACTLY one function call." ), handoffs=[clarifying_agent, instruction_agent], ) async def _extract_citations(stream) -> List[Dict[str, str]]: citations: List[Dict[str, str]] = [] try: for item in reversed(stream.new_items): if item.type == "message_output_item": for content in getattr(item.raw_item, "content", []): for ann in getattr(content, "annotations", []): if getattr(ann, "type", None) == "url_citation": citations.append({"title": getattr(ann, "title", ""), "url": getattr(ann, "url", "")}) break except Exception: pass return citations async def research_once(query: str, fast: bool = False, rerank_model: str | None = None) -> Dict[str, Any]: # Safety gate input dec = safety_guard.gate(query) if not dec.allowed: query = "SAFE-ONLY REVIEW: " + query + " Only produce high-level literature synthesis with citations." # Switch to fast model if requested if fast and research_agent.model != DEEP_MODEL_FAST: research_agent.model = DEEP_MODEL_FAST # Run pipeline stream = Runner.run_streamed(triage_agent, query, run_config=RunConfig(tracing_disabled=True)) async for _ in stream.stream_events(): pass final_text = stream.final_output citations = await _extract_citations(stream) # Optional HF rerank to reorder citations by relevance to query if rerank_model and citations: from .tools import HFRerankTool rerank = HFRerankTool(model_id=rerank_model) docs = [c.get("title") or c.get("url", "") for c in citations] try: rr = await rerank.call(query, docs) order = rr.get("order") if order: citations = [citations[i] for i in order] except Exception: pass return {"final_output": final_text, "citations": citations}