|
import gradio as gr |
|
import logging |
|
import os |
|
from huggingface_hub import InferenceClient |
|
from datetime import datetime |
|
import uuid |
|
import json |
|
import time |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler("chatbot_logs.log"), |
|
logging.StreamHandler() |
|
] |
|
) |
|
logger = logging.getLogger("CompanyChatbot") |
|
|
|
|
|
HF_MODEL = os.environ.get("HF_MODEL", "mistralai/Mixtral-8x7B-Instruct-v0.1") |
|
HF_API_TOKEN = os.environ.get("HF_API_TOKEN") |
|
COMPANY_NAME = os.environ.get("COMPANY_NAME", "RS") |
|
DEFAULT_SYSTEM_PROMPT = os.environ.get("DEFAULT_SYSTEM_PROMPT", |
|
f"You are {COMPANY_NAME}'s professional AI assistant. Be helpful, accurate, and concise.") |
|
|
|
|
|
if not HF_API_TOKEN: |
|
raise RuntimeError("HF_API_TOKEN environment variable is not set.") |
|
|
|
|
|
try: |
|
client = InferenceClient(model=HF_MODEL, token=HF_API_TOKEN) |
|
logger.info(f"Successfully initialized InferenceClient with model: {HF_MODEL}") |
|
except Exception as e: |
|
logger.error(f"Failed to initialize InferenceClient: {str(e)}") |
|
raise RuntimeError(f"Failed to initialize the model: {str(e)}") |
|
|
|
|
|
class ConfigState: |
|
def __init__(self): |
|
self.system_message = DEFAULT_SYSTEM_PROMPT |
|
self.max_tokens = 512 |
|
self.temperature = 0.7 |
|
self.top_p = 0.95 |
|
|
|
config_state = ConfigState() |
|
|
|
|
|
def save_conversation(user_id, conversation): |
|
filename = f"conversations/{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
|
os.makedirs(os.path.dirname(filename), exist_ok=True) |
|
with open(filename, 'w') as f: |
|
json.dump(conversation, f) |
|
logger.info(f"Saved conversation for user {user_id}") |
|
|
|
|
|
def respond(message, chat_history, user_id): |
|
if not message.strip(): |
|
return chat_history or [] + [[message, "I'm sorry, I didn't receive any input. How can I help you today?"]] |
|
|
|
logger.info(f"User {user_id} sent message - Length: {len(message)}") |
|
|
|
try: |
|
chat_history = chat_history or [] |
|
messages = [{"role": "system", "content": config_state.system_message}] |
|
|
|
for user_msg, assistant_msg in chat_history: |
|
messages.append({"role": "user", "content": user_msg}) |
|
if assistant_msg: |
|
messages.append({"role": "assistant", "content": assistant_msg}) |
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
start_time = datetime.now() |
|
full_response = "" |
|
|
|
for message_chunk in client.chat_completion( |
|
messages, |
|
max_tokens=config_state.max_tokens, |
|
temperature=config_state.temperature, |
|
top_p=config_state.top_p, |
|
stream=True, |
|
): |
|
if message_chunk.choices[0].delta.content: |
|
full_response += message_chunk.choices[0].delta.content |
|
yield chat_history + [[message, full_response]] |
|
|
|
time_taken = (datetime.now() - start_time).total_seconds() |
|
logger.info(f"Response generated for user {user_id} in {time_taken:.2f}s - Length: {len(full_response)}") |
|
|
|
conversation_data = { |
|
"timestamp": datetime.now().isoformat(), |
|
"user_id": user_id, |
|
"messages": messages, |
|
"response": full_response, |
|
"parameters": { |
|
"max_tokens": config_state.max_tokens, |
|
"temperature": config_state.temperature, |
|
"top_p": config_state.top_p |
|
}, |
|
"time_taken": time_taken |
|
} |
|
save_conversation(user_id, conversation_data) |
|
|
|
if not full_response: |
|
return chat_history |
|
return chat_history + [[message, full_response]] |
|
|
|
except Exception as e: |
|
error_msg = f"An error occurred: {str(e)}" |
|
logger.error(f"Error generating response for user {user_id}: {str(e)}") |
|
yield chat_history + [[message, error_msg]] |
|
|
|
|
|
def update_config(system_msg, max_tok, temp, tp, role): |
|
if role == "admin": |
|
config_state.system_message = system_msg |
|
config_state.max_tokens = max_tok |
|
config_state.temperature = temp |
|
config_state.top_p = tp |
|
logger.info("Configuration updated by admin") |
|
return "Configuration updated successfully" |
|
return "Only administrators can update configuration" |
|
|
|
|
|
def authenticate(username, password): |
|
valid_credentials = { |
|
"admin": {"password": "admin123", "role": "admin"}, |
|
"user": {"password": "user123", "role": "user"} |
|
} |
|
|
|
if username in valid_credentials and valid_credentials[username]["password"] == password: |
|
return True, str(uuid.uuid4()), valid_credentials[username]["role"] |
|
return False, None, None |
|
|
|
|
|
def login(username, password): |
|
if not username or not password: |
|
return ( |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
None, |
|
None, |
|
gr.update(visible=True, value="Please enter both username and password") |
|
) |
|
|
|
time.sleep(0.5) |
|
|
|
success, user_id, role = authenticate(username, password) |
|
if success: |
|
return ( |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
user_id, |
|
role, |
|
gr.update(visible=False) |
|
) |
|
else: |
|
return ( |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
None, |
|
None, |
|
gr.update(visible=True, value="Invalid username or password") |
|
) |
|
|
|
|
|
css = """ |
|
|
|
body { |
|
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; |
|
background-color: #f9f9f9; |
|
} |
|
|
|
.container { |
|
max-width: 1400px !important; |
|
margin: auto; |
|
} |
|
|
|
.setting-panel { |
|
background-color: #f0f4f8; |
|
border-radius: 10px; |
|
padding: 15px; |
|
box-shadow: 0 2px 6px rgba(0,0,0,0.1); |
|
} |
|
|
|
.chat-container { |
|
border-radius: 10px; |
|
box-shadow: 0 2px 6px rgba(0,0,0,0.1); |
|
background-color: white; |
|
} |
|
|
|
.company-header { |
|
background-color: #2c3e50; |
|
color: white; |
|
padding: 15px; |
|
border-radius: 10px 10px 0 0; |
|
margin-bottom: 15px; |
|
} |
|
|
|
.footer { |
|
text-align: center; |
|
margin-top: 20px; |
|
color: #666; |
|
font-size: 0.8em; |
|
} |
|
|
|
.message-user { |
|
background-color: #e6f7ff !important; |
|
border-radius: 15px 15px 0 15px !important; |
|
} |
|
|
|
.message-bot { |
|
background-color: #f0f0f0 !important; |
|
border-radius: 15px 15px 15px 0 !important; |
|
} |
|
|
|
.login-container { |
|
max-width: 500px; |
|
margin: 50px auto; |
|
padding: 30px; |
|
background-color: white; |
|
border-radius: 10px; |
|
box-shadow: 0 4px 10px rgba(0,0,0,0.1); |
|
} |
|
|
|
.login-header { |
|
text-align: center; |
|
margin-bottom: 30px; |
|
} |
|
|
|
.error-message { |
|
color: #e74c3c; |
|
background-color: #fdedeb; |
|
padding: 10px; |
|
border-radius: 5px; |
|
margin-bottom: 15px; |
|
font-size: 14px; |
|
} |
|
|
|
.role-badge { |
|
font-size: 12px; |
|
padding: 3px 8px; |
|
border-radius: 10px; |
|
margin-left: 10px; |
|
} |
|
|
|
.admin-badge { |
|
background-color: #e74c3c; |
|
color: white; |
|
} |
|
|
|
.user-badge { |
|
background-color: #3498db; |
|
color: white; |
|
} |
|
|
|
.setting-disabled { |
|
opacity: 0.5; |
|
pointer-events: none; |
|
} |
|
|
|
""" |
|
|
|
|
|
with gr.Blocks(css=css, title=f"{COMPANY_NAME} AI Assistant") as demo: |
|
user_id = gr.State(None) |
|
user_role = gr.State(None) |
|
|
|
with gr.Row(): |
|
gr.Markdown(f"<div class='company-header'><h1>{COMPANY_NAME} AI Assistant</h1></div>") |
|
|
|
with gr.Group(visible=True) as login_group: |
|
with gr.Column(elem_classes=["login-container"]): |
|
gr.Markdown(f"<div class='login-header'><h2>Welcome to {COMPANY_NAME}</h2><p>Please log in to continue</p></div>") |
|
error_message = gr.Markdown(visible=False, value="", elem_classes=["error-message"]) |
|
username = gr.Textbox(label="Username", placeholder="Enter your username") |
|
password = gr.Textbox(label="Password", type="password", placeholder="Enter your password") |
|
login_button = gr.Button("Login", variant="primary", size="lg") |
|
|
|
with gr.Group(visible=False) as chat_group: |
|
with gr.Row(): |
|
with gr.Column(scale=1, elem_classes=["setting-panel"]): |
|
role_indicator = gr.Markdown("", elem_id="role-indicator") |
|
gr.Markdown("### Configuration") |
|
with gr.Group() as config_group: |
|
system_message = gr.Textbox(value=DEFAULT_SYSTEM_PROMPT, label="System Instructions", lines=4, interactive=False) |
|
max_tokens = gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max Response Length", interactive=False) |
|
temperature = gr.Slider(minimum=0.1, maximum=1.0, value=0.7, step=0.1, label="Temperature", interactive=False) |
|
top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p", interactive=False) |
|
update_config_btn = gr.Button("Update Configuration", visible=False) |
|
config_status = gr.Markdown("") |
|
gr.Markdown("### Chat Actions") |
|
with gr.Row(): |
|
clear_btn = gr.Button("Clear Chat", variant="secondary") |
|
export_btn = gr.Button("Export Conversation", variant="secondary") |
|
logout_btn = gr.Button("Logout", variant="stop") |
|
|
|
with gr.Column(scale=2, elem_classes=["chat-container"]): |
|
chatbot = gr.Chatbot(elem_classes=["chatbox"]) |
|
with gr.Row(): |
|
msg = gr.Textbox(show_label=False, placeholder="Type your message here...", container=False) |
|
submit_btn = gr.Button("Send", variant="primary") |
|
|
|
def update_role_display(role): |
|
badge_class = "admin-badge" if role == "admin" else "user-badge" |
|
role_display = "Administrator" if role == "admin" else "Standard User" |
|
return f"<h3>Role: <span class='role-badge {badge_class}'>{role_display}</span></h3>" |
|
|
|
def handle_role_permissions(role): |
|
is_admin = role == "admin" |
|
return [ |
|
gr.update(interactive=is_admin), |
|
gr.update(interactive=is_admin), |
|
gr.update(interactive=is_admin), |
|
gr.update(interactive=is_admin), |
|
gr.update(visible=is_admin), |
|
] |
|
|
|
login_button.click(login, inputs=[username, password], outputs=[login_group, chat_group, user_id, user_role, error_message]) \ |
|
.then(update_role_display, inputs=[user_role], outputs=[role_indicator]) \ |
|
.then(handle_role_permissions, inputs=[user_role], outputs=[system_message, max_tokens, temperature, top_p, update_config_btn]) |
|
|
|
update_config_btn.click(update_config, inputs=[system_message, max_tokens, temperature, top_p, user_role], outputs=[config_status]) |
|
|
|
msg.submit(respond, inputs=[msg, chatbot, user_id], outputs=[chatbot]).then(lambda: "", None, [msg]) |
|
|
|
submit_btn.click(respond, inputs=[msg, chatbot, user_id], outputs=[chatbot]).then(lambda: "", None, [msg]) |
|
|
|
clear_btn.click(lambda: [], None, chatbot, queue=False) |
|
|
|
def export_conversation(chat_history, uid): |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filename = f"conversations/export_{uid}_{timestamp}.json" |
|
os.makedirs(os.path.dirname(filename), exist_ok=True) |
|
with open(filename, 'w') as f: |
|
json.dump(chat_history, f) |
|
logger.info(f"Exported conversation for user {uid}") |
|
return gr.update(value=f"Conversation exported to {filename}", visible=True) |
|
|
|
export_btn.click(export_conversation, inputs=[chatbot, user_id], outputs=[error_message]) |
|
|
|
def logout(): |
|
return gr.update(visible=True), gr.update(visible=False), None, None |
|
|
|
logout_btn.click(logout, outputs=[login_group, chat_group, user_id, user_role]) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(server_name="0.0.0.0", server_port=7860, share=True, show_error=True) |