File size: 18,562 Bytes
ad9da03 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 |
import hashlib
import re
from threading import Lock
from typing import Dict, List, Optional, Tuple, Union
import gradio as gr
from gemini_model_api import call_gemini_api
MODERATION_PROMPT = """
You are a content safety AI. Your only job is to analyze the user's message and determine if it violates content policies.
Check for hate speech, harassment, bullying, self-harm encouragement, and explicit content.
Your output MUST be a single word: either `[OK]` or `[VIOLATION]`.
"""
TRIAGE_PROMPT = """
You are a fast, logical decision-making AI. Your only job is to analyze a conversation history and decide if the AI participant named 'Gemini' should speak.
CRITERIA FOR RESPONDING (You should respond if ANY of these are true):
- **Direct Mention:** Gemini is addressed directly by name, even with typos (e.g., "Gemini", "Gmni").
- **Implicit Reference:** Gemini is clearly referred to implicitly as part of a group (e.g., "what about you guys?").
- **Question to Group:** A user asks a direct question to the group that is not directed at a specific person.
- **Reply to Your Question:** A user's message is a direct and logical answer to a question YOU (Gemini) asked in the previous turn.
- **Request for Help:** A user expresses a clear need for help or information.
CRITERIA FOR IGNORING:
- The conversation is a simple social exchange between other users.
- A question is clearly directed from one specific user to another.
Your output MUST be a single word: either `[RESPOND]` or `[IGNORE]`.
"""
SYSTEM_PROMPT_ACTOR = """
You are a helpful and friendly AI assistant named Gemini, participating in a group chat. You will act as a human-like participant.
**CONTEXTUAL AWARENESS (This is how you understand the conversation):**
- When you see the name "Gemini" in the text, it is referring to **YOU**.
- Your task is to formulate a response based on the last few messages, where you were mentioned.
**RESPONSE RULES (This is how you MUST formulate your answer):**
1. **Grounding:** You are a language model. You do not have a physical body, personal experiences, or feelings. **Do not invent stories about yourself** (like falling down stairs or having stomach aches). If asked about a personal experience, politely clarify that as an AI, you don't have them, but you can help with information.
2. **No Prefix:** **ABSOLUTELY DO NOT** start your response with your name (e.g., "Gemini:"). This is a strict rule.
3. **No Meta-Commentary:** Do not make comments about your own thought process.
4. **Language:** Respond in the same language as the conversation.
"""
SUMMARY_PROMPT = """
You are a factual reporting tool. Your only task is to read the following chat history and summarize **who said what**.
ABSOLUTE RULES:
1. Your response **MUST** be in the primary language used in the conversation.
2. **DO NOT** provide any opinion, analysis, or interpretation.
3. Your output **MUST** be a list of key points, attributing each point to the user who made it.
Example output format:
- **Alice** asked for a way to cook eggs without the oil splashing.
- **Gemini** explained that this happens due to water in the pan and suggested drying it first.
- **Eliseu** understood the advice and said he would try it.
Now, generate a factual summary for the following conversation:
"""
OPINION_PROMPT = """
You are a social and emotional intelligence analyst. Your only task is to read the following chat history and provide your opinion on the **dynamics and mood** of the conversation.
ABSOLUTE RULES:
1. Your response **MUST** be in the primary language used in the conversation.
2. **DO NOT** summarize who said what. Focus only on the underlying feeling and interaction style.
3. **DO NOT** be academic or technical. Speak like an insightful person.
4. Your output **MUST** be a short, reflective paragraph.
Focus on answering questions like:
- What was the overall tone? (e.g., helpful, tense, humorous)
- How were the participants interacting? (e.g., collaboratively, arguing, supporting each other)
- What is your general emotional takeaway from the exchange?
Now, provide your opinion on the following conversation:
"""
# --- State and Helper functions ---
history_lock = Lock()
AVAILABLE_CHANNELS_LIST = ["general", "dev", "agents", "mcp"]
chat_histories = {
channel: [{"role": "assistant", "content": f"Welcome to the #{channel} channel!"}]
for channel in AVAILABLE_CHANNELS_LIST
}
active_users = {channel: set() for channel in AVAILABLE_CHANNELS_LIST}
USER_COLORS = [
"#FF6347",
"#4682B4",
"#32CD32",
"#FFD700",
"#6A5ACD",
"#FF69B4",
"chocolate",
"indigo",
]
def get_user_color(username: str) -> str:
base_username = re.sub(r"_\d+$", "", username)
hash_object = hashlib.sha256(base_username.encode())
hash_digest = hash_object.hexdigest()
hash_int = int(hash_digest, 16)
color_index = hash_int % len(USER_COLORS)
return USER_COLORS[color_index]
def clean_html_for_llm(text: str) -> str:
clean_text = re.sub("<[^<]+?>", "", text)
clean_text = re.sub(r"^\s*\*\*[a-zA-Z0-9_]+:\*\*\s*", "", clean_text)
clean_text = clean_text.replace("**", "")
return clean_text.strip()
def consolidate_history_for_gemini(history: List[Dict]) -> List[Dict]:
if not history:
return []
prepared_history = []
for msg in history:
if msg.get("role") not in ["user", "assistant"]:
continue
role = "model" if msg.get("role") == "assistant" else "user"
content = (
f"{msg.get('username', '')}: {msg.get('content', '')}"
if msg.get("username")
else msg.get("content", "")
)
prepared_history.append(
{"role": role, "username": msg.get("username"), "content": clean_html_for_llm(content)}
)
if not prepared_history:
return []
consolidated = []
current_block = prepared_history[0]
for msg in prepared_history[1:]:
if (
msg["role"] == "user"
and current_block["role"] == "user"
and msg.get("username") == current_block.get("username")
):
current_block["content"] += "\n" + msg["content"]
else:
consolidated.append(current_block)
current_block = msg
consolidated.append(current_block)
for block in consolidated:
block.pop("username", None)
return consolidated
def moderate_with_llm(message_text: str) -> Optional[str]:
moderation_payload = [
{"role": "system", "content": MODERATION_PROMPT},
{"role": "user", "content": message_text},
]
decision = call_gemini_api(moderation_payload, stream=False, temperature=0.0)
if decision and "[VIOLATION]" in decision:
return "Message blocked by content safety policy."
return None
def login_user(channel: str, username: str) -> Tuple[str, str, List[Dict]]:
"""Handles login logic. Returns final username, channel, and the unformatted history."""
if not username:
username = "User"
final_channel = channel if channel else "general"
with history_lock:
if final_channel not in active_users:
active_users[final_channel] = set()
users_in_channel = active_users.get(final_channel)
final_username = username
i = 2
while final_username in users_in_channel:
final_username = f"{username}_{i}"
i += 1
users_in_channel.add(final_username)
join_message = {
"role": "system_join_leave",
"content": f"<em>{final_username} has joined the chat.</em>",
}
chat_histories.setdefault(final_channel, []).append(join_message)
updated_history = chat_histories.get(final_channel)
return final_username, final_channel, updated_history
def exit_chat(channel: str, username: str) -> bool:
"""Handles logout logic. Returns True on completion."""
with history_lock:
if channel in active_users and username in active_users[channel]:
active_users[channel].remove(username)
exit_message = {
"role": "system_join_leave",
"content": f"<em>{username} has left the chat.</em>",
}
if channel in chat_histories:
chat_histories[channel].append(exit_message)
return True
def send_message(channel: str, username: str, message: str) -> List[Dict]:
"""Handles new messages. Returns the full, unformatted history."""
if not message or not username:
with history_lock:
return chat_histories.get(channel, [])
moderation_result = moderate_with_llm(message)
if moderation_result:
with history_lock:
chat_histories[channel].append({"role": "system_error", "content": moderation_result})
return chat_histories.get(channel, [])
with history_lock:
chat_histories[channel].append({"role": "user", "username": username, "content": message})
history_for_llm = list(chat_histories[channel])
history_for_triage = [
{"role": "system", "content": TRIAGE_PROMPT}
] + consolidate_history_for_gemini(history_for_llm)
decision = call_gemini_api(history_for_triage, stream=False, temperature=0.0)
if decision and "[RESPOND]" in decision:
history_for_actor = [
{"role": "system", "content": SYSTEM_PROMPT_ACTOR}
] + consolidate_history_for_gemini(history_for_llm)
bot_response_text = call_gemini_api(history_for_actor, stream=False, temperature=0.7)
if (
bot_response_text
and "Error:" not in bot_response_text
and "[BLOCKED" not in bot_response_text
):
cleaned_response = re.sub(r"^\s*gemini:\s*", "", bot_response_text, flags=re.IGNORECASE)
with history_lock:
chat_histories[channel].append(
{"role": "assistant", "username": "Gemini", "content": cleaned_response}
)
with history_lock:
return chat_histories.get(channel, [])
def get_summary_or_opinion(channel: str, prompt_template: str) -> List[Dict]:
"""Handles summary and opnion chat tool. Returns the full, unformatted history."""
with history_lock:
history_copy = chat_histories.get(channel, []).copy()
history_for_llm = [
{"role": "system", "content": prompt_template}
] + consolidate_history_for_gemini(history_copy)
response_text = call_gemini_api(history_for_llm, stream=False)
is_summary = "summary" in prompt_template.lower()
role = "system_summary" if is_summary else "system_opinion"
content = (
response_text
if response_text and "Error:" not in response_text
else "Could not generate the response."
)
with history_lock:
chat_histories[channel].append({"role": role, "content": content})
return chat_histories.get(channel, [])
def format_history_for_display(history: List[Dict]) -> List[Dict]:
"""Applies HTML formatting to a clean history list for display."""
formatted_history = []
for msg in history:
new_msg = msg.copy()
role, content, username = (
new_msg.get("role"),
new_msg.get("content", ""),
new_msg.get("username"),
)
if role == "user" and username:
color = get_user_color(username)
new_msg["content"] = (
f"<span style='color:{color}; font-weight: bold;'>{username}:</span> {content}"
)
elif role == "assistant" and username:
new_msg["content"] = f"**{username}:** {content}"
elif role == "system_join_leave":
new_msg["content"] = f"<div style='text-align: center; color: grey;'>{content}</div>"
new_msg["role"] = "user"
elif role == "system_error":
new_msg["content"] = f"<span style='color:red;'>**System:** {content}</span>"
new_msg["role"] = "user"
elif role == "system_summary" or role == "system_opinion":
is_summary = role == "system_summary"
title = "Conversation Summary" if is_summary else "Gemini's Opinion"
color = "#6c757d" if is_summary else "#007bff"
response_content = content.replace("**", "")
if is_summary:
formatted_list = re.sub(r"-\s*", "<br>- ", response_content).strip()
if formatted_list.startswith("<br>- "):
formatted_list = formatted_list[5:]
response_content = "- " + formatted_list
new_msg["content"] = (
f"<div style='background-color:#f8f9fa;...'><b>{title}:</b><br>{response_content}</div>"
)
new_msg["role"] = "user"
formatted_history.append(new_msg)
return formatted_history
def get_and_format_history(
channel: str, current_ui_history: List[Dict]
) -> Union[List[Dict], gr.skip]:
"""UI helper: Intelligently gets and formats history."""
with history_lock:
backend_history = chat_histories.get(channel, [])
if len(backend_history) == len(current_ui_history):
return gr.skip()
else:
return format_history_for_display(backend_history)
def update_ui_after_login(
final_username: str, final_channel: str, unformatted_history: List[Dict]
) -> Tuple:
"""UI-only function to switch views and update components after login."""
return (
gr.update(visible=False),
gr.update(visible=True),
final_username,
final_channel,
format_history_for_display(unformatted_history),
)
def update_ui_after_logout() -> Tuple:
"""UI-only function to switch views after logout."""
return gr.update(visible=True), gr.update(visible=False)
def get_summary(channel):
return get_summary_or_opinion(channel, SUMMARY_PROMPT)
def get_opinion(channel):
return get_summary_or_opinion(channel, OPINION_PROMPT)
def clear_textbox():
return ""
with gr.Blocks(theme=gr.themes.Ocean(), title="Multi-Agent Chat") as demo:
with gr.Column(visible=True) as login_view:
gr.Markdown("# π Welcome to Multi-Agent Chat")
username_input_login = gr.Textbox(label="Your Name", placeholder="e.g., Lucy")
channel_choice_dropdown = gr.Dropdown(
choices=AVAILABLE_CHANNELS_LIST, label="Choose a Channel", value="general"
)
login_button = gr.Button("Enter Chat", variant="primary")
with gr.Column(visible=False) as chat_view:
gr.Markdown("# π Welcome to Multi-Agent Chat")
gr.Markdown("""### π¬ Interacting with the Gemini Agent
The AI agent, Gemini, is always listening to the conversation but is designed to be reserved. To get its attention, you need to address it directly.
- **To ask a question or get a response:** Simply mention **"Gemini"** in your message. The agent is smart enough to understand context and even some typos!
> **Example:** "That's a great point, Lucy. What do you think, **Gemini**?" π€
- **For general chat:** Just talk normally with other users. Gemini will remain silent unless it feels its participation is highly valuable.
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## βοΈ Session Data")
username_display = gr.Textbox(label="Logged in as", interactive=False)
channel_display = gr.Textbox(label="Current Channel", interactive=False)
gr.Markdown("## π€ MCP Tools")
summary_button = gr.Button("π Generate Chat Summary")
opinion_button = gr.Button("π€ Ask for LLM's Opinion")
exit_button = gr.Button("πͺ Exit Chat")
with gr.Column(scale=3):
chatbot = gr.Chatbot(
label="Conversation",
height=600,
type="messages",
group_consecutive_messages=False,
)
with gr.Row():
msg_input = gr.Textbox(
show_label=False, placeholder="Type your message...", scale=5
)
send_button = gr.Button("Send", variant="primary", scale=1)
chat_timer = gr.Timer(5)
chat_timer.tick(fn=get_and_format_history, inputs=[channel_display, chatbot], outputs=chatbot)
unformatted_history_state = gr.State()
dumb_state = gr.State(value=None)
login_event = login_button.click(
fn=login_user,
inputs=[channel_choice_dropdown, username_input_login],
outputs=[
username_display,
channel_display,
unformatted_history_state,
],
api_name="login_user",
)
login_event.then(
fn=update_ui_after_login,
inputs=[username_display, channel_display, unformatted_history_state],
outputs=[login_view, chat_view, username_display, channel_display, chatbot],
)
exit_event = exit_button.click(
fn=exit_chat,
inputs=[channel_display, username_display],
outputs=dumb_state,
api_name="exit_chat",
)
exit_event.then(fn=update_ui_after_logout, inputs=None, outputs=[login_view, chat_view])
summary_event = summary_button.click(
fn=get_summary, inputs=[channel_display], outputs=dumb_state, api_name="get_summary"
).then(fn=format_history_for_display, inputs=chatbot, outputs=chatbot)
opinion_event = opinion_button.click(
fn=get_opinion, inputs=[channel_display], outputs=dumb_state, api_name="get_opinion"
).then(fn=format_history_for_display, inputs=chatbot, outputs=chatbot)
send_event = (
send_button.click(
fn=send_message,
inputs=[channel_display, username_display, msg_input],
outputs=dumb_state,
api_name="send_message",
)
.then(fn=format_history_for_display, inputs=chatbot, outputs=chatbot)
.then(fn=clear_textbox, inputs=None, outputs=msg_input)
)
submit_event = (
msg_input.submit(
fn=send_message,
inputs=[channel_display, username_display, msg_input],
outputs=dumb_state,
api_name="send_message",
)
.then(fn=format_history_for_display, inputs=chatbot, outputs=chatbot)
.then(fn=clear_textbox, inputs=None, outputs=msg_input)
)
if __name__ == "__main__":
demo.launch(mcp_server=True)
|