mgbam commited on
Commit
2efa720
·
verified ·
1 Parent(s): 8e7e06c

Create pipeline.py

Browse files
Files changed (1) hide show
  1. genesis/pipeline.py +127 -54
genesis/pipeline.py CHANGED
@@ -1,61 +1,134 @@
1
- ## `genesis/providers.py`
2
 
3
  from __future__ import annotations
4
  import os, json
5
- import httpx
6
- import google.generativeai as genai
7
  from typing import Any, Dict, List
 
8
 
9
- # Optional post-processors for polishing final summaries (NO lab steps)
10
-
11
- async def gemini_postprocess(text: str, citations: List[dict]) -> str:
12
- api_key = os.getenv("GEMINI_API_KEY")
13
- if not api_key:
14
- return text
15
- genai.configure(api_key=api_key)
16
- model = genai.GenerativeModel("gemini-1.5-flash")
17
- prompt = (
18
- "Polish the following high-level scientific synthesis for clarity and flow. "
19
- "Do NOT add wet-lab procedures or operational details. Maintain citations list context.
20
-
21
- " + text
22
- )
23
- resp = await model.asynchronous.generate_content_async(prompt)
24
- return resp.text or text
25
-
26
- async def deepseek_postprocess(text: str, citations: List[dict]) -> str:
27
- # Generic OpenAI-compatible chat completions call
28
- base = os.getenv("DEEPSEEK_BASE_URL")
29
- key = os.getenv("DEEPSEEK_API_KEY")
30
- if not base or not key:
31
- return text
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  try:
33
- async with httpx.AsyncClient(timeout=60.0) as http:
34
- r = await http.post(
35
- f"{base.rstrip('/')}/v1/chat/completions",
36
- headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"},
37
- json={
38
- "model": os.getenv("DEEPSEEK_MODEL", "deepseek-chat"),
39
- "messages": [
40
- {"role": "system", "content": "You are a scientific editor. Never add lab protocols."},
41
- {"role": "user", "content": (
42
- "Polish the following high-level synthesis without adding operational details.
43
-
44
- " + text
45
- )},
46
- ],
47
- "temperature": 0.3,
48
- },
49
- )
50
- data = r.json()
51
- return data.get("choices", [{}])[0].get("message", {}).get("content", text)
52
  except Exception:
53
- return text
54
-
55
- async def postprocess_summary(base_text: str, citations: List[dict], engine: str = "none") -> str:
56
- engine = (engine or "none").lower()
57
- if engine == "gemini":
58
- return await gemini_postprocess(base_text, citations)
59
- if engine == "deepseek":
60
- return await deepseek_postprocess(base_text, citations)
61
- return base_text
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ## `genesis/pipeline.py`
2
 
3
  from __future__ import annotations
4
  import os, json
 
 
5
  from typing import Any, Dict, List
6
+ from pydantic import BaseModel
7
 
8
+ from openai import AsyncOpenAI
9
+ from agents import Agent, Runner, RunConfig, WebSearchTool, HostedMCPTool
10
+
11
+ from .safety import SafetyGuard
12
+ from .tools import OntologyTool, PubMedTool, StructureTool, CrossrefTool, HFRerankTool
13
+
14
+ # Env & client
15
+ OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
16
+ OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
17
+ os.environ["OPENAI_AGENTS_DISABLE_TRACING"] = os.getenv("GENESIS_DISABLE_TRACING", "1")
18
+ client = AsyncOpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL, timeout=600.0)
19
+
20
+ DEEP_MODEL_PRIMARY = os.getenv("GENESIS_DEEP_MODEL", "o3-deep-research")
21
+ DEEP_MODEL_FAST = os.getenv("GENESIS_DEEP_FAST_MODEL", "o4-mini-deep-research")
22
+ INSTRUCTION_MODEL = os.getenv("GENESIS_INSTRUCTION_MODEL", "gpt-4o-mini")
23
+ TRIAGE_MODEL = os.getenv("GENESIS_TRIAGE_MODEL", "gpt-4o-mini")
24
+ CLARIFY_MODEL = os.getenv("GENESIS_CLARIFY_MODEL", "gpt-4o-mini")
25
+ MCP_URL = os.getenv("GENESIS_MCP_URL")
26
+
27
+ safety_guard = SafetyGuard()
28
+
29
+ class Clarifications(BaseModel):
30
+ questions: List[str]
31
+
32
+ CLARIFY_PROMPT = (
33
+ "Ask at most 3 essential questions to improve a high-level synthetic biology research brief. "
34
+ "Focus only on: organism/system, target (gene/protein/pathway), timeframe, preferred outputs. "
35
+ "Never request operational lab details. Friendly tone."
36
+ )
37
+
38
+ INSTRUCTION_PROMPT = (
39
+ "Rewrite the user query into detailed DEEP RESEARCH instructions in English. OUTPUT ONLY the instructions. "
40
+ "Include dimensions: organism/system, target, scope/timeframe, evaluation axes, required tables. "
41
+ "Format as a report with headers: Abstract, Background, Findings, Synthesis, Open Questions, Limitations, Risk & Ethics, References. "
42
+ "Prefer primary literature (PubMed/Crossref) and databases (UMLS/BioPortal/RCSB). Strictly avoid wet-lab protocols."
43
+ )
44
+
45
+ # Tools
46
+ base_tools = [WebSearchTool(), OntologyTool(), PubMedTool(), StructureTool(), CrossrefTool()]
47
+ if MCP_URL:
48
+ base_tools.append(HostedMCPTool(tool_config={
49
+ "type": "mcp", "server_label": "file_search", "server_url": MCP_URL, "require_approval": "never"
50
+ }))
51
+
52
+ # Agents
53
+ research_agent = Agent(
54
+ name="Synthetic Biology Research Agent",
55
+ model=DEEP_MODEL_PRIMARY,
56
+ instructions=(
57
+ "Perform high-level empirical research with citations. Use tools judiciously. "
58
+ "NEVER produce step-by-step lab instructions or protocols."
59
+ ),
60
+ tools=base_tools,
61
+ )
62
+
63
+ instruction_agent = Agent(
64
+ name="Research Instruction Agent",
65
+ model=INSTRUCTION_MODEL,
66
+ instructions=INSTRUCTION_PROMPT,
67
+ handoffs=[research_agent],
68
+ )
69
+
70
+ clarifying_agent = Agent(
71
+ name="Clarifying Questions Agent",
72
+ model=CLARIFY_MODEL,
73
+ instructions=CLARIFY_PROMPT,
74
+ output_type=Clarifications,
75
+ handoffs=[instruction_agent],
76
+ )
77
+
78
+ triage_agent = Agent(
79
+ name="Triage Agent",
80
+ model=TRIAGE_MODEL,
81
+ instructions=(
82
+ "If the user query lacks essential context, handoff to Clarifying Questions Agent; "
83
+ "otherwise handoff to Research Instruction Agent. Return EXACTLY one function call."
84
+ ),
85
+ handoffs=[clarifying_agent, instruction_agent],
86
+ )
87
+
88
+ async def _extract_citations(stream) -> List[Dict[str, str]]:
89
+ citations: List[Dict[str, str]] = []
90
  try:
91
+ for item in reversed(stream.new_items):
92
+ if item.type == "message_output_item":
93
+ for content in getattr(item.raw_item, "content", []):
94
+ for ann in getattr(content, "annotations", []):
95
+ if getattr(ann, "type", None) == "url_citation":
96
+ citations.append({"title": getattr(ann, "title", ""), "url": getattr(ann, "url", "")})
97
+ break
 
 
 
 
 
 
 
 
 
 
 
 
98
  except Exception:
99
+ pass
100
+ return citations
101
+
102
+ async def research_once(query: str, fast: bool = False, rerank_model: str | None = None) -> Dict[str, Any]:
103
+ # Safety gate input
104
+ dec = safety_guard.gate(query)
105
+ if not dec.allowed:
106
+ query = "SAFE-ONLY REVIEW: " + query + "
107
+ Only produce high-level literature synthesis with citations."
108
+
109
+ # Switch to fast model if requested
110
+ if fast and research_agent.model != DEEP_MODEL_FAST:
111
+ research_agent.model = DEEP_MODEL_FAST
112
+
113
+ # Run pipeline
114
+ stream = Runner.run_streamed(triage_agent, query, run_config=RunConfig(tracing_disabled=True))
115
+ async for _ in stream.stream_events():
116
+ pass
117
+
118
+ final_text = stream.final_output
119
+ citations = await _extract_citations(stream)
120
+
121
+ # Optional HF rerank to reorder citations by relevance to query
122
+ if rerank_model and citations:
123
+ from .tools import HFRerankTool
124
+ rerank = HFRerankTool(model_id=rerank_model)
125
+ docs = [c.get("title") or c.get("url", "") for c in citations]
126
+ try:
127
+ rr = await rerank.call(query, docs)
128
+ order = rr.get("order")
129
+ if order:
130
+ citations = [citations[i] for i in order]
131
+ except Exception:
132
+ pass
133
+
134
+ return {"final_output": final_text, "citations": citations}