|
import hashlib |
|
import re |
|
from threading import Lock |
|
from typing import Dict, List, Optional, Tuple, Union |
|
import gradio as gr |
|
from gemini_model_api import call_gemini_api |
|
|
|
MODERATION_PROMPT = """ |
|
You are a content safety AI. Your only job is to analyze the user's message and determine if it violates content policies. |
|
Check for hate speech, harassment, bullying, self-harm encouragement, and explicit content. |
|
Your output MUST be a single word: either `[OK]` or `[VIOLATION]`. |
|
""" |
|
TRIAGE_PROMPT = """ |
|
You are a fast, logical decision-making AI. Your only job is to analyze a conversation history and decide if the AI participant named 'Gemini' should speak. |
|
CRITERIA FOR RESPONDING (You should respond if ANY of these are true): |
|
- **Direct Mention:** Gemini is addressed directly by name, even with typos (e.g., "Gemini", "Gmni"). |
|
- **Implicit Reference:** Gemini is clearly referred to implicitly as part of a group (e.g., "what about you guys?"). |
|
- **Question to Group:** A user asks a direct question to the group that is not directed at a specific person. |
|
- **Reply to Your Question:** A user's message is a direct and logical answer to a question YOU (Gemini) asked in the previous turn. |
|
- **Request for Help:** A user expresses a clear need for help or information. |
|
CRITERIA FOR IGNORING: |
|
- The conversation is a simple social exchange between other users. |
|
- A question is clearly directed from one specific user to another. |
|
Your output MUST be a single word: either `[RESPOND]` or `[IGNORE]`. |
|
""" |
|
SYSTEM_PROMPT_ACTOR = """ |
|
You are a helpful and friendly AI assistant named Gemini, participating in a group chat. You will act as a human-like participant. |
|
**CONTEXTUAL AWARENESS (This is how you understand the conversation):** |
|
- When you see the name "Gemini" in the text, it is referring to **YOU**. |
|
- Your task is to formulate a response based on the last few messages, where you were mentioned. |
|
**RESPONSE RULES (This is how you MUST formulate your answer):** |
|
1. **Grounding:** You are a language model. You do not have a physical body, personal experiences, or feelings. **Do not invent stories about yourself** (like falling down stairs or having stomach aches). If asked about a personal experience, politely clarify that as an AI, you don't have them, but you can help with information. |
|
2. **No Prefix:** **ABSOLUTELY DO NOT** start your response with your name (e.g., "Gemini:"). This is a strict rule. |
|
3. **No Meta-Commentary:** Do not make comments about your own thought process. |
|
4. **Language:** Respond in the same language as the conversation. |
|
""" |
|
SUMMARY_PROMPT = """ |
|
You are a factual reporting tool. Your only task is to read the following chat history and summarize **who said what**. |
|
ABSOLUTE RULES: |
|
1. Your response **MUST** be in the primary language used in the conversation. |
|
2. **DO NOT** provide any opinion, analysis, or interpretation. |
|
3. Your output **MUST** be a list of key points, attributing each point to the user who made it. |
|
Example output format: |
|
- **Alice** asked for a way to cook eggs without the oil splashing. |
|
- **Gemini** explained that this happens due to water in the pan and suggested drying it first. |
|
- **Eliseu** understood the advice and said he would try it. |
|
Now, generate a factual summary for the following conversation: |
|
""" |
|
OPINION_PROMPT = """ |
|
You are a social and emotional intelligence analyst. Your only task is to read the following chat history and provide your opinion on the **dynamics and mood** of the conversation. |
|
ABSOLUTE RULES: |
|
1. Your response **MUST** be in the primary language used in the conversation. |
|
2. **DO NOT** summarize who said what. Focus only on the underlying feeling and interaction style. |
|
3. **DO NOT** be academic or technical. Speak like an insightful person. |
|
4. Your output **MUST** be a short, reflective paragraph. |
|
Focus on answering questions like: |
|
- What was the overall tone? (e.g., helpful, tense, humorous) |
|
- How were the participants interacting? (e.g., collaboratively, arguing, supporting each other) |
|
- What is your general emotional takeaway from the exchange? |
|
Now, provide your opinion on the following conversation: |
|
""" |
|
|
|
|
|
history_lock = Lock() |
|
AVAILABLE_CHANNELS_LIST = ["general", "dev", "agents", "mcp"] |
|
chat_histories = { |
|
channel: [{"role": "assistant", "content": f"Welcome to the #{channel} channel!"}] |
|
for channel in AVAILABLE_CHANNELS_LIST |
|
} |
|
active_users = {channel: set() for channel in AVAILABLE_CHANNELS_LIST} |
|
USER_COLORS = [ |
|
"#FF6347", |
|
"#4682B4", |
|
"#32CD32", |
|
"#FFD700", |
|
"#6A5ACD", |
|
"#FF69B4", |
|
"chocolate", |
|
"indigo", |
|
] |
|
|
|
|
|
def get_user_color(username: str) -> str: |
|
base_username = re.sub(r"_\d+$", "", username) |
|
hash_object = hashlib.sha256(base_username.encode()) |
|
hash_digest = hash_object.hexdigest() |
|
hash_int = int(hash_digest, 16) |
|
color_index = hash_int % len(USER_COLORS) |
|
return USER_COLORS[color_index] |
|
|
|
|
|
def clean_html_for_llm(text: str) -> str: |
|
clean_text = re.sub("<[^<]+?>", "", text) |
|
clean_text = re.sub(r"^\s*\*\*[a-zA-Z0-9_]+:\*\*\s*", "", clean_text) |
|
clean_text = clean_text.replace("**", "") |
|
return clean_text.strip() |
|
|
|
|
|
def consolidate_history_for_gemini(history: List[Dict]) -> List[Dict]: |
|
if not history: |
|
return [] |
|
prepared_history = [] |
|
for msg in history: |
|
if msg.get("role") not in ["user", "assistant"]: |
|
continue |
|
role = "model" if msg.get("role") == "assistant" else "user" |
|
content = ( |
|
f"{msg.get('username', '')}: {msg.get('content', '')}" |
|
if msg.get("username") |
|
else msg.get("content", "") |
|
) |
|
prepared_history.append( |
|
{"role": role, "username": msg.get("username"), "content": clean_html_for_llm(content)} |
|
) |
|
if not prepared_history: |
|
return [] |
|
consolidated = [] |
|
current_block = prepared_history[0] |
|
for msg in prepared_history[1:]: |
|
if ( |
|
msg["role"] == "user" |
|
and current_block["role"] == "user" |
|
and msg.get("username") == current_block.get("username") |
|
): |
|
current_block["content"] += "\n" + msg["content"] |
|
else: |
|
consolidated.append(current_block) |
|
current_block = msg |
|
consolidated.append(current_block) |
|
for block in consolidated: |
|
block.pop("username", None) |
|
return consolidated |
|
|
|
|
|
def moderate_with_llm(message_text: str) -> Optional[str]: |
|
moderation_payload = [ |
|
{"role": "system", "content": MODERATION_PROMPT}, |
|
{"role": "user", "content": message_text}, |
|
] |
|
decision = call_gemini_api(moderation_payload, stream=False, temperature=0.0) |
|
if decision and "[VIOLATION]" in decision: |
|
return "Message blocked by content safety policy." |
|
return None |
|
|
|
|
|
def login_user(channel: str, username: str) -> Tuple[str, str, List[Dict]]: |
|
"""Handles login logic. Returns final username, channel, and the unformatted history.""" |
|
|
|
if not username: |
|
username = "User" |
|
final_channel = channel if channel else "general" |
|
with history_lock: |
|
if final_channel not in active_users: |
|
active_users[final_channel] = set() |
|
users_in_channel = active_users.get(final_channel) |
|
final_username = username |
|
i = 2 |
|
while final_username in users_in_channel: |
|
final_username = f"{username}_{i}" |
|
i += 1 |
|
users_in_channel.add(final_username) |
|
join_message = { |
|
"role": "system_join_leave", |
|
"content": f"<em>{final_username} has joined the chat.</em>", |
|
} |
|
chat_histories.setdefault(final_channel, []).append(join_message) |
|
updated_history = chat_histories.get(final_channel) |
|
return final_username, final_channel, updated_history |
|
|
|
|
|
def exit_chat(channel: str, username: str) -> bool: |
|
"""Handles logout logic. Returns True on completion.""" |
|
with history_lock: |
|
if channel in active_users and username in active_users[channel]: |
|
active_users[channel].remove(username) |
|
exit_message = { |
|
"role": "system_join_leave", |
|
"content": f"<em>{username} has left the chat.</em>", |
|
} |
|
if channel in chat_histories: |
|
chat_histories[channel].append(exit_message) |
|
return True |
|
|
|
|
|
def send_message(channel: str, username: str, message: str) -> List[Dict]: |
|
"""Handles new messages. Returns the full, unformatted history.""" |
|
|
|
if not message or not username: |
|
with history_lock: |
|
return chat_histories.get(channel, []) |
|
moderation_result = moderate_with_llm(message) |
|
if moderation_result: |
|
with history_lock: |
|
chat_histories[channel].append({"role": "system_error", "content": moderation_result}) |
|
return chat_histories.get(channel, []) |
|
with history_lock: |
|
chat_histories[channel].append({"role": "user", "username": username, "content": message}) |
|
history_for_llm = list(chat_histories[channel]) |
|
history_for_triage = [ |
|
{"role": "system", "content": TRIAGE_PROMPT} |
|
] + consolidate_history_for_gemini(history_for_llm) |
|
decision = call_gemini_api(history_for_triage, stream=False, temperature=0.0) |
|
if decision and "[RESPOND]" in decision: |
|
history_for_actor = [ |
|
{"role": "system", "content": SYSTEM_PROMPT_ACTOR} |
|
] + consolidate_history_for_gemini(history_for_llm) |
|
bot_response_text = call_gemini_api(history_for_actor, stream=False, temperature=0.7) |
|
if ( |
|
bot_response_text |
|
and "Error:" not in bot_response_text |
|
and "[BLOCKED" not in bot_response_text |
|
): |
|
cleaned_response = re.sub(r"^\s*gemini:\s*", "", bot_response_text, flags=re.IGNORECASE) |
|
with history_lock: |
|
chat_histories[channel].append( |
|
{"role": "assistant", "username": "Gemini", "content": cleaned_response} |
|
) |
|
with history_lock: |
|
return chat_histories.get(channel, []) |
|
|
|
|
|
def get_summary_or_opinion(channel: str, prompt_template: str) -> List[Dict]: |
|
"""Handles summary and opnion chat tool. Returns the full, unformatted history.""" |
|
with history_lock: |
|
history_copy = chat_histories.get(channel, []).copy() |
|
history_for_llm = [ |
|
{"role": "system", "content": prompt_template} |
|
] + consolidate_history_for_gemini(history_copy) |
|
response_text = call_gemini_api(history_for_llm, stream=False) |
|
is_summary = "summary" in prompt_template.lower() |
|
role = "system_summary" if is_summary else "system_opinion" |
|
content = ( |
|
response_text |
|
if response_text and "Error:" not in response_text |
|
else "Could not generate the response." |
|
) |
|
with history_lock: |
|
chat_histories[channel].append({"role": role, "content": content}) |
|
return chat_histories.get(channel, []) |
|
|
|
|
|
def format_history_for_display(history: List[Dict]) -> List[Dict]: |
|
"""Applies HTML formatting to a clean history list for display.""" |
|
formatted_history = [] |
|
for msg in history: |
|
new_msg = msg.copy() |
|
role, content, username = ( |
|
new_msg.get("role"), |
|
new_msg.get("content", ""), |
|
new_msg.get("username"), |
|
) |
|
if role == "user" and username: |
|
color = get_user_color(username) |
|
new_msg["content"] = ( |
|
f"<span style='color:{color}; font-weight: bold;'>{username}:</span> {content}" |
|
) |
|
elif role == "assistant" and username: |
|
new_msg["content"] = f"**{username}:** {content}" |
|
elif role == "system_join_leave": |
|
new_msg["content"] = f"<div style='text-align: center; color: grey;'>{content}</div>" |
|
new_msg["role"] = "user" |
|
elif role == "system_error": |
|
new_msg["content"] = f"<span style='color:red;'>**System:** {content}</span>" |
|
new_msg["role"] = "user" |
|
elif role == "system_summary" or role == "system_opinion": |
|
is_summary = role == "system_summary" |
|
title = "Conversation Summary" if is_summary else "Gemini's Opinion" |
|
color = "#6c757d" if is_summary else "#007bff" |
|
response_content = content.replace("**", "") |
|
if is_summary: |
|
formatted_list = re.sub(r"-\s*", "<br>- ", response_content).strip() |
|
if formatted_list.startswith("<br>- "): |
|
formatted_list = formatted_list[5:] |
|
response_content = "- " + formatted_list |
|
new_msg["content"] = ( |
|
f"<div style='background-color:#f8f9fa;...'><b>{title}:</b><br>{response_content}</div>" |
|
) |
|
new_msg["role"] = "user" |
|
formatted_history.append(new_msg) |
|
return formatted_history |
|
|
|
|
|
def get_and_format_history( |
|
channel: str, current_ui_history: List[Dict] |
|
) -> Union[List[Dict], gr.skip]: |
|
"""UI helper: Intelligently gets and formats history.""" |
|
with history_lock: |
|
backend_history = chat_histories.get(channel, []) |
|
if len(backend_history) == len(current_ui_history): |
|
return gr.skip() |
|
else: |
|
return format_history_for_display(backend_history) |
|
|
|
|
|
def update_ui_after_login( |
|
final_username: str, final_channel: str, unformatted_history: List[Dict] |
|
) -> Tuple: |
|
"""UI-only function to switch views and update components after login.""" |
|
return ( |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
final_username, |
|
final_channel, |
|
format_history_for_display(unformatted_history), |
|
) |
|
|
|
|
|
def update_ui_after_logout() -> Tuple: |
|
"""UI-only function to switch views after logout.""" |
|
return gr.update(visible=True), gr.update(visible=False) |
|
|
|
|
|
def get_summary(channel): |
|
return get_summary_or_opinion(channel, SUMMARY_PROMPT) |
|
|
|
|
|
def get_opinion(channel): |
|
return get_summary_or_opinion(channel, OPINION_PROMPT) |
|
|
|
|
|
def clear_textbox(): |
|
return "" |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Ocean(), title="Multi-Agent Chat") as demo: |
|
|
|
with gr.Column(visible=True) as login_view: |
|
gr.Markdown("# π Welcome to Multi-Agent Chat") |
|
username_input_login = gr.Textbox(label="Your Name", placeholder="e.g., Lucy") |
|
channel_choice_dropdown = gr.Dropdown( |
|
choices=AVAILABLE_CHANNELS_LIST, label="Choose a Channel", value="general" |
|
) |
|
login_button = gr.Button("Enter Chat", variant="primary") |
|
|
|
with gr.Column(visible=False) as chat_view: |
|
gr.Markdown("# π Welcome to Multi-Agent Chat") |
|
gr.Markdown("""### π¬ Interacting with the Gemini Agent |
|
The AI agent, Gemini, is always listening to the conversation but is designed to be reserved. To get its attention, you need to address it directly. |
|
- **To ask a question or get a response:** Simply mention **"Gemini"** in your message. The agent is smart enough to understand context and even some typos! |
|
> **Example:** "That's a great point, Lucy. What do you think, **Gemini**?" π€ |
|
- **For general chat:** Just talk normally with other users. Gemini will remain silent unless it feels its participation is highly valuable. |
|
""") |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
gr.Markdown("## βοΈ Session Data") |
|
username_display = gr.Textbox(label="Logged in as", interactive=False) |
|
channel_display = gr.Textbox(label="Current Channel", interactive=False) |
|
gr.Markdown("## π€ MCP Tools") |
|
summary_button = gr.Button("π Generate Chat Summary") |
|
opinion_button = gr.Button("π€ Ask for LLM's Opinion") |
|
exit_button = gr.Button("πͺ Exit Chat") |
|
with gr.Column(scale=3): |
|
chatbot = gr.Chatbot( |
|
label="Conversation", |
|
height=600, |
|
type="messages", |
|
group_consecutive_messages=False, |
|
) |
|
with gr.Row(): |
|
msg_input = gr.Textbox( |
|
show_label=False, placeholder="Type your message...", scale=5 |
|
) |
|
send_button = gr.Button("Send", variant="primary", scale=1) |
|
|
|
chat_timer = gr.Timer(5) |
|
chat_timer.tick(fn=get_and_format_history, inputs=[channel_display, chatbot], outputs=chatbot) |
|
unformatted_history_state = gr.State() |
|
dumb_state = gr.State(value=None) |
|
|
|
login_event = login_button.click( |
|
fn=login_user, |
|
inputs=[channel_choice_dropdown, username_input_login], |
|
outputs=[ |
|
username_display, |
|
channel_display, |
|
unformatted_history_state, |
|
], |
|
api_name="login_user", |
|
) |
|
login_event.then( |
|
fn=update_ui_after_login, |
|
inputs=[username_display, channel_display, unformatted_history_state], |
|
outputs=[login_view, chat_view, username_display, channel_display, chatbot], |
|
) |
|
|
|
exit_event = exit_button.click( |
|
fn=exit_chat, |
|
inputs=[channel_display, username_display], |
|
outputs=dumb_state, |
|
api_name="exit_chat", |
|
) |
|
exit_event.then(fn=update_ui_after_logout, inputs=None, outputs=[login_view, chat_view]) |
|
|
|
summary_event = summary_button.click( |
|
fn=get_summary, inputs=[channel_display], outputs=dumb_state, api_name="get_summary" |
|
).then(fn=format_history_for_display, inputs=chatbot, outputs=chatbot) |
|
|
|
opinion_event = opinion_button.click( |
|
fn=get_opinion, inputs=[channel_display], outputs=dumb_state, api_name="get_opinion" |
|
).then(fn=format_history_for_display, inputs=chatbot, outputs=chatbot) |
|
|
|
send_event = ( |
|
send_button.click( |
|
fn=send_message, |
|
inputs=[channel_display, username_display, msg_input], |
|
outputs=dumb_state, |
|
api_name="send_message", |
|
) |
|
.then(fn=format_history_for_display, inputs=chatbot, outputs=chatbot) |
|
.then(fn=clear_textbox, inputs=None, outputs=msg_input) |
|
) |
|
|
|
submit_event = ( |
|
msg_input.submit( |
|
fn=send_message, |
|
inputs=[channel_display, username_display, msg_input], |
|
outputs=dumb_state, |
|
api_name="send_message", |
|
) |
|
.then(fn=format_history_for_display, inputs=chatbot, outputs=chatbot) |
|
.then(fn=clear_textbox, inputs=None, outputs=msg_input) |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(mcp_server=True) |
|
|