File size: 5,120 Bytes
2efa720
fb9918b
2689723
 
 
2efa720
2689723
2efa720
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2689723
2efa720
 
 
 
 
 
 
2689723
2efa720
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
## `genesis/pipeline.py`

from __future__ import annotations
import os, json
from typing import Any, Dict, List
from pydantic import BaseModel

from openai import AsyncOpenAI
from agents import Agent, Runner, RunConfig, WebSearchTool, HostedMCPTool

from .safety import SafetyGuard
from .tools import OntologyTool, PubMedTool, StructureTool, CrossrefTool, HFRerankTool

# Env & client
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
os.environ["OPENAI_AGENTS_DISABLE_TRACING"] = os.getenv("GENESIS_DISABLE_TRACING", "1")
client = AsyncOpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL, timeout=600.0)

DEEP_MODEL_PRIMARY = os.getenv("GENESIS_DEEP_MODEL", "o3-deep-research")
DEEP_MODEL_FAST = os.getenv("GENESIS_DEEP_FAST_MODEL", "o4-mini-deep-research")
INSTRUCTION_MODEL = os.getenv("GENESIS_INSTRUCTION_MODEL", "gpt-4o-mini")
TRIAGE_MODEL = os.getenv("GENESIS_TRIAGE_MODEL", "gpt-4o-mini")
CLARIFY_MODEL = os.getenv("GENESIS_CLARIFY_MODEL", "gpt-4o-mini")
MCP_URL = os.getenv("GENESIS_MCP_URL")

safety_guard = SafetyGuard()

class Clarifications(BaseModel):
    questions: List[str]

CLARIFY_PROMPT = (
    "Ask at most 3 essential questions to improve a high-level synthetic biology research brief. "
    "Focus only on: organism/system, target (gene/protein/pathway), timeframe, preferred outputs. "
    "Never request operational lab details. Friendly tone."
)

INSTRUCTION_PROMPT = (
    "Rewrite the user query into detailed DEEP RESEARCH instructions in English. OUTPUT ONLY the instructions. "
    "Include dimensions: organism/system, target, scope/timeframe, evaluation axes, required tables. "
    "Format as a report with headers: Abstract, Background, Findings, Synthesis, Open Questions, Limitations, Risk & Ethics, References. "
    "Prefer primary literature (PubMed/Crossref) and databases (UMLS/BioPortal/RCSB). Strictly avoid wet-lab protocols."
)

# Tools
base_tools = [WebSearchTool(), OntologyTool(), PubMedTool(), StructureTool(), CrossrefTool()]
if MCP_URL:
    base_tools.append(HostedMCPTool(tool_config={
        "type": "mcp", "server_label": "file_search", "server_url": MCP_URL, "require_approval": "never"
    }))

# Agents
research_agent = Agent(
    name="Synthetic Biology Research Agent",
    model=DEEP_MODEL_PRIMARY,
    instructions=(
        "Perform high-level empirical research with citations. Use tools judiciously. "
        "NEVER produce step-by-step lab instructions or protocols."
    ),
    tools=base_tools,
)

instruction_agent = Agent(
    name="Research Instruction Agent",
    model=INSTRUCTION_MODEL,
    instructions=INSTRUCTION_PROMPT,
    handoffs=[research_agent],
)

clarifying_agent = Agent(
    name="Clarifying Questions Agent",
    model=CLARIFY_MODEL,
    instructions=CLARIFY_PROMPT,
    output_type=Clarifications,
    handoffs=[instruction_agent],
)

triage_agent = Agent(
    name="Triage Agent",
    model=TRIAGE_MODEL,
    instructions=(
        "If the user query lacks essential context, handoff to Clarifying Questions Agent; "
        "otherwise handoff to Research Instruction Agent. Return EXACTLY one function call."
    ),
    handoffs=[clarifying_agent, instruction_agent],
)

async def _extract_citations(stream) -> List[Dict[str, str]]:
    citations: List[Dict[str, str]] = []
    try:
        for item in reversed(stream.new_items):
            if item.type == "message_output_item":
                for content in getattr(item.raw_item, "content", []):
                    for ann in getattr(content, "annotations", []):
                        if getattr(ann, "type", None) == "url_citation":
                            citations.append({"title": getattr(ann, "title", ""), "url": getattr(ann, "url", "")})
                break
    except Exception:
        pass
    return citations

async def research_once(query: str, fast: bool = False, rerank_model: str | None = None) -> Dict[str, Any]:
    # Safety gate input
    dec = safety_guard.gate(query)
    if not dec.allowed:
        query = "SAFE-ONLY REVIEW: " + query + "
Only produce high-level literature synthesis with citations."

    # Switch to fast model if requested
    if fast and research_agent.model != DEEP_MODEL_FAST:
        research_agent.model = DEEP_MODEL_FAST

    # Run pipeline
    stream = Runner.run_streamed(triage_agent, query, run_config=RunConfig(tracing_disabled=True))
    async for _ in stream.stream_events():
        pass

    final_text = stream.final_output
    citations = await _extract_citations(stream)

    # Optional HF rerank to reorder citations by relevance to query
    if rerank_model and citations:
        from .tools import HFRerankTool
        rerank = HFRerankTool(model_id=rerank_model)
        docs = [c.get("title") or c.get("url", "") for c in citations]
        try:
            rr = await rerank.call(query, docs)
            order = rr.get("order")
            if order:
                citations = [citations[i] for i in order]
        except Exception:
            pass

    return {"final_output": final_text, "citations": citations}