Spaces:
Sleeping
Sleeping
File size: 5,120 Bytes
2efa720 fb9918b 2689723 2efa720 2689723 2efa720 2689723 2efa720 2689723 2efa720 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
## `genesis/pipeline.py`
from __future__ import annotations
import os, json
from typing import Any, Dict, List
from pydantic import BaseModel
from openai import AsyncOpenAI
from agents import Agent, Runner, RunConfig, WebSearchTool, HostedMCPTool
from .safety import SafetyGuard
from .tools import OntologyTool, PubMedTool, StructureTool, CrossrefTool, HFRerankTool
# Env & client
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
os.environ["OPENAI_AGENTS_DISABLE_TRACING"] = os.getenv("GENESIS_DISABLE_TRACING", "1")
client = AsyncOpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL, timeout=600.0)
DEEP_MODEL_PRIMARY = os.getenv("GENESIS_DEEP_MODEL", "o3-deep-research")
DEEP_MODEL_FAST = os.getenv("GENESIS_DEEP_FAST_MODEL", "o4-mini-deep-research")
INSTRUCTION_MODEL = os.getenv("GENESIS_INSTRUCTION_MODEL", "gpt-4o-mini")
TRIAGE_MODEL = os.getenv("GENESIS_TRIAGE_MODEL", "gpt-4o-mini")
CLARIFY_MODEL = os.getenv("GENESIS_CLARIFY_MODEL", "gpt-4o-mini")
MCP_URL = os.getenv("GENESIS_MCP_URL")
safety_guard = SafetyGuard()
class Clarifications(BaseModel):
questions: List[str]
CLARIFY_PROMPT = (
"Ask at most 3 essential questions to improve a high-level synthetic biology research brief. "
"Focus only on: organism/system, target (gene/protein/pathway), timeframe, preferred outputs. "
"Never request operational lab details. Friendly tone."
)
INSTRUCTION_PROMPT = (
"Rewrite the user query into detailed DEEP RESEARCH instructions in English. OUTPUT ONLY the instructions. "
"Include dimensions: organism/system, target, scope/timeframe, evaluation axes, required tables. "
"Format as a report with headers: Abstract, Background, Findings, Synthesis, Open Questions, Limitations, Risk & Ethics, References. "
"Prefer primary literature (PubMed/Crossref) and databases (UMLS/BioPortal/RCSB). Strictly avoid wet-lab protocols."
)
# Tools
base_tools = [WebSearchTool(), OntologyTool(), PubMedTool(), StructureTool(), CrossrefTool()]
if MCP_URL:
base_tools.append(HostedMCPTool(tool_config={
"type": "mcp", "server_label": "file_search", "server_url": MCP_URL, "require_approval": "never"
}))
# Agents
research_agent = Agent(
name="Synthetic Biology Research Agent",
model=DEEP_MODEL_PRIMARY,
instructions=(
"Perform high-level empirical research with citations. Use tools judiciously. "
"NEVER produce step-by-step lab instructions or protocols."
),
tools=base_tools,
)
instruction_agent = Agent(
name="Research Instruction Agent",
model=INSTRUCTION_MODEL,
instructions=INSTRUCTION_PROMPT,
handoffs=[research_agent],
)
clarifying_agent = Agent(
name="Clarifying Questions Agent",
model=CLARIFY_MODEL,
instructions=CLARIFY_PROMPT,
output_type=Clarifications,
handoffs=[instruction_agent],
)
triage_agent = Agent(
name="Triage Agent",
model=TRIAGE_MODEL,
instructions=(
"If the user query lacks essential context, handoff to Clarifying Questions Agent; "
"otherwise handoff to Research Instruction Agent. Return EXACTLY one function call."
),
handoffs=[clarifying_agent, instruction_agent],
)
async def _extract_citations(stream) -> List[Dict[str, str]]:
citations: List[Dict[str, str]] = []
try:
for item in reversed(stream.new_items):
if item.type == "message_output_item":
for content in getattr(item.raw_item, "content", []):
for ann in getattr(content, "annotations", []):
if getattr(ann, "type", None) == "url_citation":
citations.append({"title": getattr(ann, "title", ""), "url": getattr(ann, "url", "")})
break
except Exception:
pass
return citations
async def research_once(query: str, fast: bool = False, rerank_model: str | None = None) -> Dict[str, Any]:
# Safety gate input
dec = safety_guard.gate(query)
if not dec.allowed:
query = "SAFE-ONLY REVIEW: " + query + "
Only produce high-level literature synthesis with citations."
# Switch to fast model if requested
if fast and research_agent.model != DEEP_MODEL_FAST:
research_agent.model = DEEP_MODEL_FAST
# Run pipeline
stream = Runner.run_streamed(triage_agent, query, run_config=RunConfig(tracing_disabled=True))
async for _ in stream.stream_events():
pass
final_text = stream.final_output
citations = await _extract_citations(stream)
# Optional HF rerank to reorder citations by relevance to query
if rerank_model and citations:
from .tools import HFRerankTool
rerank = HFRerankTool(model_id=rerank_model)
docs = [c.get("title") or c.get("url", "") for c in citations]
try:
rr = await rerank.call(query, docs)
order = rr.get("order")
if order:
citations = [citations[i] for i in order]
except Exception:
pass
return {"final_output": final_text, "citations": citations} |