File size: 18,562 Bytes
ad9da03
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
import hashlib
import re
from threading import Lock
from typing import Dict, List, Optional, Tuple, Union
import gradio as gr
from gemini_model_api import call_gemini_api

MODERATION_PROMPT = """
You are a content safety AI. Your only job is to analyze the user's message and determine if it violates content policies.
Check for hate speech, harassment, bullying, self-harm encouragement, and explicit content.
Your output MUST be a single word: either `[OK]` or `[VIOLATION]`.
"""
TRIAGE_PROMPT = """
You are a fast, logical decision-making AI. Your only job is to analyze a conversation history and decide if the AI participant named 'Gemini' should speak.
CRITERIA FOR RESPONDING (You should respond if ANY of these are true):
- **Direct Mention:** Gemini is addressed directly by name, even with typos (e.g., "Gemini", "Gmni").
- **Implicit Reference:** Gemini is clearly referred to implicitly as part of a group (e.g., "what about you guys?").
- **Question to Group:** A user asks a direct question to the group that is not directed at a specific person.
- **Reply to Your Question:** A user's message is a direct and logical answer to a question YOU (Gemini) asked in the previous turn.
- **Request for Help:** A user expresses a clear need for help or information.
CRITERIA FOR IGNORING:
- The conversation is a simple social exchange between other users.
- A question is clearly directed from one specific user to another.
Your output MUST be a single word: either `[RESPOND]` or `[IGNORE]`.
"""
SYSTEM_PROMPT_ACTOR = """
You are a helpful and friendly AI assistant named Gemini, participating in a group chat. You will act as a human-like participant.
**CONTEXTUAL AWARENESS (This is how you understand the conversation):**
- When you see the name "Gemini" in the text, it is referring to **YOU**.
- Your task is to formulate a response based on the last few messages, where you were mentioned.
**RESPONSE RULES (This is how you MUST formulate your answer):**
1.  **Grounding:** You are a language model. You do not have a physical body, personal experiences, or feelings. **Do not invent stories about yourself** (like falling down stairs or having stomach aches). If asked about a personal experience, politely clarify that as an AI, you don't have them, but you can help with information.
2.  **No Prefix:** **ABSOLUTELY DO NOT** start your response with your name (e.g., "Gemini:"). This is a strict rule.
3.  **No Meta-Commentary:** Do not make comments about your own thought process.
4.  **Language:** Respond in the same language as the conversation.
"""
SUMMARY_PROMPT = """
You are a factual reporting tool. Your only task is to read the following chat history and summarize **who said what**.
ABSOLUTE RULES:
1.  Your response **MUST** be in the primary language used in the conversation.
2.  **DO NOT** provide any opinion, analysis, or interpretation.
3.  Your output **MUST** be a list of key points, attributing each point to the user who made it.
Example output format:
- **Alice** asked for a way to cook eggs without the oil splashing.
- **Gemini** explained that this happens due to water in the pan and suggested drying it first.
- **Eliseu** understood the advice and said he would try it.
Now, generate a factual summary for the following conversation:
"""
OPINION_PROMPT = """
You are a social and emotional intelligence analyst. Your only task is to read the following chat history and provide your opinion on the **dynamics and mood** of the conversation.
ABSOLUTE RULES:
1.  Your response **MUST** be in the primary language used in the conversation.
2.  **DO NOT** summarize who said what. Focus only on the underlying feeling and interaction style.
3.  **DO NOT** be academic or technical. Speak like an insightful person.
4.  Your output **MUST** be a short, reflective paragraph.
Focus on answering questions like:
- What was the overall tone? (e.g., helpful, tense, humorous)
- How were the participants interacting? (e.g., collaboratively, arguing, supporting each other)
- What is your general emotional takeaway from the exchange?
Now, provide your opinion on the following conversation:
"""

# --- State and Helper functions ---
history_lock = Lock()
AVAILABLE_CHANNELS_LIST = ["general", "dev", "agents", "mcp"]
chat_histories = {
    channel: [{"role": "assistant", "content": f"Welcome to the #{channel} channel!"}]
    for channel in AVAILABLE_CHANNELS_LIST
}
active_users = {channel: set() for channel in AVAILABLE_CHANNELS_LIST}
USER_COLORS = [
    "#FF6347",
    "#4682B4",
    "#32CD32",
    "#FFD700",
    "#6A5ACD",
    "#FF69B4",
    "chocolate",
    "indigo",
]


def get_user_color(username: str) -> str:
    base_username = re.sub(r"_\d+$", "", username)
    hash_object = hashlib.sha256(base_username.encode())
    hash_digest = hash_object.hexdigest()
    hash_int = int(hash_digest, 16)
    color_index = hash_int % len(USER_COLORS)
    return USER_COLORS[color_index]


def clean_html_for_llm(text: str) -> str:
    clean_text = re.sub("<[^<]+?>", "", text)
    clean_text = re.sub(r"^\s*\*\*[a-zA-Z0-9_]+:\*\*\s*", "", clean_text)
    clean_text = clean_text.replace("**", "")
    return clean_text.strip()


def consolidate_history_for_gemini(history: List[Dict]) -> List[Dict]:
    if not history:
        return []
    prepared_history = []
    for msg in history:
        if msg.get("role") not in ["user", "assistant"]:
            continue
        role = "model" if msg.get("role") == "assistant" else "user"
        content = (
            f"{msg.get('username', '')}: {msg.get('content', '')}"
            if msg.get("username")
            else msg.get("content", "")
        )
        prepared_history.append(
            {"role": role, "username": msg.get("username"), "content": clean_html_for_llm(content)}
        )
    if not prepared_history:
        return []
    consolidated = []
    current_block = prepared_history[0]
    for msg in prepared_history[1:]:
        if (
            msg["role"] == "user"
            and current_block["role"] == "user"
            and msg.get("username") == current_block.get("username")
        ):
            current_block["content"] += "\n" + msg["content"]
        else:
            consolidated.append(current_block)
            current_block = msg
    consolidated.append(current_block)
    for block in consolidated:
        block.pop("username", None)
    return consolidated


def moderate_with_llm(message_text: str) -> Optional[str]:
    moderation_payload = [
        {"role": "system", "content": MODERATION_PROMPT},
        {"role": "user", "content": message_text},
    ]
    decision = call_gemini_api(moderation_payload, stream=False, temperature=0.0)
    if decision and "[VIOLATION]" in decision:
        return "Message blocked by content safety policy."
    return None


def login_user(channel: str, username: str) -> Tuple[str, str, List[Dict]]:
    """Handles login logic. Returns final username, channel, and the unformatted history."""

    if not username:
        username = "User"
    final_channel = channel if channel else "general"
    with history_lock:
        if final_channel not in active_users:
            active_users[final_channel] = set()
        users_in_channel = active_users.get(final_channel)
        final_username = username
        i = 2
        while final_username in users_in_channel:
            final_username = f"{username}_{i}"
            i += 1
        users_in_channel.add(final_username)
        join_message = {
            "role": "system_join_leave",
            "content": f"<em>{final_username} has joined the chat.</em>",
        }
        chat_histories.setdefault(final_channel, []).append(join_message)
        updated_history = chat_histories.get(final_channel)
    return final_username, final_channel, updated_history


def exit_chat(channel: str, username: str) -> bool:
    """Handles logout logic. Returns True on completion."""
    with history_lock:
        if channel in active_users and username in active_users[channel]:
            active_users[channel].remove(username)
        exit_message = {
            "role": "system_join_leave",
            "content": f"<em>{username} has left the chat.</em>",
        }
        if channel in chat_histories:
            chat_histories[channel].append(exit_message)
    return True


def send_message(channel: str, username: str, message: str) -> List[Dict]:
    """Handles new messages. Returns the full, unformatted history."""

    if not message or not username:
        with history_lock:
            return chat_histories.get(channel, [])
    moderation_result = moderate_with_llm(message)
    if moderation_result:
        with history_lock:
            chat_histories[channel].append({"role": "system_error", "content": moderation_result})
        return chat_histories.get(channel, [])
    with history_lock:
        chat_histories[channel].append({"role": "user", "username": username, "content": message})
        history_for_llm = list(chat_histories[channel])
    history_for_triage = [
        {"role": "system", "content": TRIAGE_PROMPT}
    ] + consolidate_history_for_gemini(history_for_llm)
    decision = call_gemini_api(history_for_triage, stream=False, temperature=0.0)
    if decision and "[RESPOND]" in decision:
        history_for_actor = [
            {"role": "system", "content": SYSTEM_PROMPT_ACTOR}
        ] + consolidate_history_for_gemini(history_for_llm)
        bot_response_text = call_gemini_api(history_for_actor, stream=False, temperature=0.7)
        if (
            bot_response_text
            and "Error:" not in bot_response_text
            and "[BLOCKED" not in bot_response_text
        ):
            cleaned_response = re.sub(r"^\s*gemini:\s*", "", bot_response_text, flags=re.IGNORECASE)
            with history_lock:
                chat_histories[channel].append(
                    {"role": "assistant", "username": "Gemini", "content": cleaned_response}
                )
    with history_lock:
        return chat_histories.get(channel, [])


def get_summary_or_opinion(channel: str, prompt_template: str) -> List[Dict]:
    """Handles summary and opnion chat tool. Returns the full, unformatted history."""
    with history_lock:
        history_copy = chat_histories.get(channel, []).copy()
    history_for_llm = [
        {"role": "system", "content": prompt_template}
    ] + consolidate_history_for_gemini(history_copy)
    response_text = call_gemini_api(history_for_llm, stream=False)
    is_summary = "summary" in prompt_template.lower()
    role = "system_summary" if is_summary else "system_opinion"
    content = (
        response_text
        if response_text and "Error:" not in response_text
        else "Could not generate the response."
    )
    with history_lock:
        chat_histories[channel].append({"role": role, "content": content})
        return chat_histories.get(channel, [])


def format_history_for_display(history: List[Dict]) -> List[Dict]:
    """Applies HTML formatting to a clean history list for display."""
    formatted_history = []
    for msg in history:
        new_msg = msg.copy()
        role, content, username = (
            new_msg.get("role"),
            new_msg.get("content", ""),
            new_msg.get("username"),
        )
        if role == "user" and username:
            color = get_user_color(username)
            new_msg["content"] = (
                f"<span style='color:{color}; font-weight: bold;'>{username}:</span> {content}"
            )
        elif role == "assistant" and username:
            new_msg["content"] = f"**{username}:** {content}"
        elif role == "system_join_leave":
            new_msg["content"] = f"<div style='text-align: center; color: grey;'>{content}</div>"
            new_msg["role"] = "user"
        elif role == "system_error":
            new_msg["content"] = f"<span style='color:red;'>**System:** {content}</span>"
            new_msg["role"] = "user"
        elif role == "system_summary" or role == "system_opinion":
            is_summary = role == "system_summary"
            title = "Conversation Summary" if is_summary else "Gemini's Opinion"
            color = "#6c757d" if is_summary else "#007bff"
            response_content = content.replace("**", "")
            if is_summary:
                formatted_list = re.sub(r"-\s*", "<br>- ", response_content).strip()
                if formatted_list.startswith("<br>- "):
                    formatted_list = formatted_list[5:]
                response_content = "- " + formatted_list
            new_msg["content"] = (
                f"<div style='background-color:#f8f9fa;...'><b>{title}:</b><br>{response_content}</div>"
            )
            new_msg["role"] = "user"
        formatted_history.append(new_msg)
    return formatted_history


def get_and_format_history(
    channel: str, current_ui_history: List[Dict]
) -> Union[List[Dict], gr.skip]:
    """UI helper: Intelligently gets and formats history."""
    with history_lock:
        backend_history = chat_histories.get(channel, [])
    if len(backend_history) == len(current_ui_history):
        return gr.skip()
    else:
        return format_history_for_display(backend_history)


def update_ui_after_login(
    final_username: str, final_channel: str, unformatted_history: List[Dict]
) -> Tuple:
    """UI-only function to switch views and update components after login."""
    return (
        gr.update(visible=False),
        gr.update(visible=True),
        final_username,
        final_channel,
        format_history_for_display(unformatted_history),
    )


def update_ui_after_logout() -> Tuple:
    """UI-only function to switch views after logout."""
    return gr.update(visible=True), gr.update(visible=False)


def get_summary(channel):
    return get_summary_or_opinion(channel, SUMMARY_PROMPT)


def get_opinion(channel):
    return get_summary_or_opinion(channel, OPINION_PROMPT)


def clear_textbox():
    return ""


with gr.Blocks(theme=gr.themes.Ocean(), title="Multi-Agent Chat") as demo:

    with gr.Column(visible=True) as login_view:
        gr.Markdown("# πŸš€ Welcome to Multi-Agent Chat")       
        username_input_login = gr.Textbox(label="Your Name", placeholder="e.g., Lucy")
        channel_choice_dropdown = gr.Dropdown(
            choices=AVAILABLE_CHANNELS_LIST, label="Choose a Channel", value="general"
        )
        login_button = gr.Button("Enter Chat", variant="primary")

    with gr.Column(visible=False) as chat_view:
        gr.Markdown("# πŸš€ Welcome to Multi-Agent Chat")
        gr.Markdown("""### πŸ’¬ Interacting with the Gemini Agent
                    The AI agent, Gemini, is always listening to the conversation but is designed to be reserved. To get its attention, you need to address it directly.
                    -   **To ask a question or get a response:** Simply mention **"Gemini"** in your message. The agent is smart enough to understand context and even some typos!
                        > **Example:** "That's a great point, Lucy. What do you think, **Gemini**?" πŸ€”
                    -   **For general chat:** Just talk normally with other users. Gemini will remain silent unless it feels its participation is highly valuable.
                    """)
        with gr.Row():
            with gr.Column(scale=1):
                gr.Markdown("## βš™οΈ Session Data")
                username_display = gr.Textbox(label="Logged in as", interactive=False)
                channel_display = gr.Textbox(label="Current Channel", interactive=False)
                gr.Markdown("## πŸ€– MCP Tools")
                summary_button = gr.Button("πŸ“„ Generate Chat Summary")
                opinion_button = gr.Button("πŸ€” Ask for LLM's Opinion")
                exit_button = gr.Button("πŸšͺ Exit Chat")
            with gr.Column(scale=3):
                chatbot = gr.Chatbot(
                    label="Conversation",
                    height=600,
                    type="messages",
                    group_consecutive_messages=False,
                )
                with gr.Row():
                    msg_input = gr.Textbox(
                        show_label=False, placeholder="Type your message...", scale=5
                    )
                    send_button = gr.Button("Send", variant="primary", scale=1)

    chat_timer = gr.Timer(5)
    chat_timer.tick(fn=get_and_format_history, inputs=[channel_display, chatbot], outputs=chatbot)
    unformatted_history_state = gr.State()
    dumb_state = gr.State(value=None)

    login_event = login_button.click(
        fn=login_user,
        inputs=[channel_choice_dropdown, username_input_login],
        outputs=[
            username_display,
            channel_display,
            unformatted_history_state,
        ],
        api_name="login_user",
    )
    login_event.then(
        fn=update_ui_after_login,
        inputs=[username_display, channel_display, unformatted_history_state],
        outputs=[login_view, chat_view, username_display, channel_display, chatbot],
    )

    exit_event = exit_button.click(
        fn=exit_chat,
        inputs=[channel_display, username_display],
        outputs=dumb_state,
        api_name="exit_chat",
    )
    exit_event.then(fn=update_ui_after_logout, inputs=None, outputs=[login_view, chat_view])

    summary_event = summary_button.click(
        fn=get_summary, inputs=[channel_display], outputs=dumb_state, api_name="get_summary"
    ).then(fn=format_history_for_display, inputs=chatbot, outputs=chatbot)

    opinion_event = opinion_button.click(
        fn=get_opinion, inputs=[channel_display], outputs=dumb_state, api_name="get_opinion"
    ).then(fn=format_history_for_display, inputs=chatbot, outputs=chatbot)

    send_event = (
        send_button.click(
            fn=send_message,
            inputs=[channel_display, username_display, msg_input],
            outputs=dumb_state,
            api_name="send_message",
        )
        .then(fn=format_history_for_display, inputs=chatbot, outputs=chatbot)
        .then(fn=clear_textbox, inputs=None, outputs=msg_input)
    )

    submit_event = (
        msg_input.submit(
            fn=send_message,
            inputs=[channel_display, username_display, msg_input],
            outputs=dumb_state,
            api_name="send_message",
        )
        .then(fn=format_history_for_display, inputs=chatbot, outputs=chatbot)
        .then(fn=clear_textbox, inputs=None, outputs=msg_input)
    )

if __name__ == "__main__":
    demo.launch(mcp_server=True)