File size: 6,841 Bytes
114747f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68bd1d5
 
114747f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import os
import logging
from typing import Any, Dict

from llama_index.core.agent.workflow import ReActAgent
from llama_index.core.tools import FunctionTool
from llama_index.core.workflow import Context
from llama_index.llms.google_genai import GoogleGenAI

# -----------------------------------------------------------------------------
# Context helper tools ---------------------------------------------------------
# -----------------------------------------------------------------------------

async def write_state(ctx: Context, key: str, value: Any) -> str:
    state_dict = await ctx.get("state")
    state_dict[key] = value
    await ctx.set("state", state_dict)
    return f"state['{key}'] written"

async def read_state(ctx: Context, key: str) -> Any:
    state_dict = await ctx.get("state")
    return state_dict.get(key, "")

write_state_tool = FunctionTool.from_defaults(
    fn=write_state,
    name="write_state",
    description="Store or overwrite a value in the shared workflow state.",
)
read_state_tool = FunctionTool.from_defaults(
    fn=read_state,
    name="read_state",
    description="Retrieve a value from the shared workflow state.",
)

# -----------------------------------------------------------------------------
# Fresh implementation of answer_question -------------------------------------
# -----------------------------------------------------------------------------

def answer_question(question: str) -> str:
    """Return chain‑of‑thought and FINAL ANSWER following strict template."""
    gemini_api_key = os.getenv("GEMINI_API_KEY")
    if not gemini_api_key:
        logging.warning("GEMINI_API_KEY not set – returning fallback answer.")
        return f"Chain of thought: (api key missing)\n\nFINAL ANSWER: {question}"

    meta_prompt = (
        "You are a professional assistant. Respond with two sections:"\
        "\n1. Chain of thought: concise reasoning (3–5 sentences)."\
        "\n2. FINAL ANSWER: the concise answer following these rules:"\
        "\n   • If numeric, no thousands separators or units unless requested."\
        "\n   • If text, as few words as possible, no unnecessary articles."\
        "\n   • If list, comma‑separate applying the above rules."\
        "\n   • Must start exactly with 'FINAL ANSWER:' (uppercase)."\
        f"\n\nQuestion: {question}\n\nAnswer:"
    )

    llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
    return llm.complete(meta_prompt).text.strip()

answer_question_tool = FunctionTool.from_defaults(
    fn=answer_question,
    name="answer_question",
    description="Generate reasoning and emit 'FINAL ANSWER: ...' following the strict format rules.",
)

# -----------------------------------------------------------------------------
# System prompt (unchanged) ----------------------------------------------------
# -----------------------------------------------------------------------------

SYNTHESIS_SYSTEM_PROMPT = r"""
You are SynthesisAgent, the final composer in a multi‑agent workflow.
Your goal is to merge validated outputs from specialised agents into a concise
user‑facing answer.

POTENTIAL STATE KEYS TO CONSULT
--------------------------------
objective                 – str  (restated user goal)
plan                      – dict (PlannerAgent JSON plan)
evidence                  – list[str]  (ResearchAgent facts)
calculations              – list[dict] (MathAgent results)
code_outputs              – list[dict] (CodeAgent execution)
image_analysis            – list[dict] (ImageAnalyzerAgent)
figure_interpretation     – list[dict] (FigureInterpretationAgent)
video_analysis            – list[dict] (VideoAnalyzerAgent)
text_analysis             – list[dict] (TextAnalyzerAgent)
role_draft                – str       (RoleAgent draft, optional)
reasoning                 – list[str]  (ReasoningAgent chain‑of‑thought)
validation                – list[dict] (AdvancedValidationAgent)

WORKFLOW
--------
1. Read every relevant key. Create a short internal outline.
2. If contradictions or missing evidence exist, hand off to
   advanced_validation_agent or research_agent.
3. Draft a clear, well‑structured answer (<= 200 words or 7 bullet points).
4. Call the tool `answer_question` with the **user question** to format the
   final output as required.

STYLE
-----
* Formal but approachable language; no internal state leakage.
* Cite numeric values plainly; no inline URLs.
* Prefer paragraph then bullets for details.

HANDOFF POLICY
--------------
Allowed targets when more work required:
  • advanced_validation_agent  – contradictions or doubt
  • research_agent             – missing data
  • reasoning_agent            – reconcile complex logic
  • long_context_management_agent – compress oversized context before answer
  
If your response exceeds the maximum token limit and cannot be completed in a single reply, please conclude your output with the marker [CONTINUE]. In subsequent interactions, I will prompt you with “continue” to receive the next portion of the response.
"""

# -----------------------------------------------------------------------------
# Factory ---------------------------------------------------------------------
# -----------------------------------------------------------------------------

def initialize_synthesis_agent() -> ReActAgent:
    logger = logging.getLogger(__name__)
    logger.info("Initialising SynthesisAgent …")

    gemini_api_key = os.getenv("GEMINI_API_KEY")
    if not gemini_api_key:
        raise ValueError("GEMINI_API_KEY required for SynthesisAgent")

    llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)

    agent = ReActAgent(
        name="synthesis_agent",
        description=(
            "Aggregates all validated information, resolves residual issues and "
            "produces the final user answer via answer_question, adhering to the "
            "required template."),
        tools=[write_state_tool, read_state_tool, answer_question_tool],
        llm=llm,
        system_prompt=SYNTHESIS_SYSTEM_PROMPT,
        can_handoff_to=[
            "advanced_validation_agent",
            "research_agent",
            "reasoning_agent",
            "long_context_management_agent",
        ],
    )
    return agent

# -----------------------------------------------------------------------------
# Stand‑alone test ------------------------------------------------------------
# -----------------------------------------------------------------------------

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
    ag = initialize_synthesis_agent()
    print("SynthesisAgent ready.")