|
import gradio as gr |
|
import logging |
|
import os |
|
from huggingface_hub import InferenceClient |
|
from datetime import datetime |
|
import uuid |
|
import json |
|
import time |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler("chatbot_logs.log"), |
|
logging.StreamHandler() |
|
] |
|
) |
|
logger = logging.getLogger("CompanyChatbot") |
|
|
|
|
|
HF_MODEL = os.environ.get("HF_MODEL", "HuggingFaceH4/zephyr-7b-beta") |
|
HF_API_TOKEN = os.environ.get("HF_API_TOKEN", None) |
|
COMPANY_NAME = os.environ.get("COMPANY_NAME", "Your Company") |
|
DEFAULT_SYSTEM_PROMPT = os.environ.get("DEFAULT_SYSTEM_PROMPT", |
|
f"You are {COMPANY_NAME}'s professional AI assistant. Be helpful, accurate, and concise.") |
|
|
|
|
|
try: |
|
client = InferenceClient(HF_MODEL, token=HF_API_TOKEN) |
|
logger.info(f"Successfully initialized InferenceClient with model: {HF_MODEL}") |
|
except Exception as e: |
|
logger.error(f"Failed to initialize InferenceClient: {str(e)}") |
|
raise RuntimeError(f"Failed to initialize the model. Please check your configuration: {str(e)}") |
|
|
|
|
|
def save_conversation(user_id, conversation): |
|
filename = f"conversations/{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
|
os.makedirs(os.path.dirname(filename), exist_ok=True) |
|
with open(filename, 'w') as f: |
|
json.dump(conversation, f) |
|
logger.info(f"Saved conversation for user {user_id}") |
|
|
|
|
|
def respond( |
|
message, |
|
history, |
|
system_message, |
|
max_tokens, |
|
temperature, |
|
top_p, |
|
user_id |
|
): |
|
if not message.strip(): |
|
return history + [[message, "I'm sorry, I didn't receive any input. How can I help you today?"]] |
|
|
|
|
|
logger.info(f"User {user_id} sent message - Length: {len(message)}") |
|
|
|
try: |
|
|
|
messages = [{"role": "system", "content": system_message}] |
|
|
|
|
|
for user_msg, assistant_msg in history: |
|
if user_msg: |
|
messages.append({"role": "user", "content": user_msg}) |
|
if assistant_msg: |
|
messages.append({"role": "assistant", "content": assistant_msg}) |
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
|
|
full_response = "" |
|
start_time = datetime.now() |
|
|
|
|
|
for message_chunk in client.chat_completion( |
|
messages, |
|
max_tokens=max_tokens, |
|
stream=True, |
|
temperature=temperature, |
|
top_p=top_p, |
|
): |
|
token = message_chunk.choices[0].delta.content |
|
if token: |
|
full_response += token |
|
yield history + [[message, full_response]] |
|
|
|
|
|
time_taken = (datetime.now() - start_time).total_seconds() |
|
logger.info(f"Response generated for user {user_id} in {time_taken:.2f}s - Length: {len(full_response)}") |
|
|
|
|
|
conversation_data = { |
|
"timestamp": datetime.now().isoformat(), |
|
"user_id": user_id, |
|
"messages": messages, |
|
"response": full_response, |
|
"parameters": { |
|
"max_tokens": max_tokens, |
|
"temperature": temperature, |
|
"top_p": top_p |
|
}, |
|
"time_taken": time_taken |
|
} |
|
save_conversation(user_id, conversation_data) |
|
|
|
|
|
|
|
if full_response: |
|
return history + [[message, full_response]] |
|
return history |
|
|
|
except Exception as e: |
|
error_msg = f"An error occurred: {str(e)}" |
|
logger.error(f"Error generating response for user {user_id}: {str(e)}") |
|
return history + [[message, error_msg]] |
|
|
|
|
|
def authenticate(username, password): |
|
|
|
valid_credentials = { |
|
"admin": {"password": "admin123", "role": "admin"}, |
|
"user": {"password": "user123", "role": "user"} |
|
} |
|
|
|
if username in valid_credentials and valid_credentials[username]["password"] == password: |
|
return True, str(uuid.uuid4()), valid_credentials[username]["role"] |
|
return False, None, None |
|
|
|
|
|
def login(username, password): |
|
if not username or not password: |
|
return ( |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
None, |
|
None, |
|
gr.update(visible=True, value="Please enter both username and password") |
|
) |
|
|
|
|
|
time.sleep(0.5) |
|
|
|
success, user_id, role = authenticate(username, password) |
|
if success: |
|
return ( |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
user_id, |
|
role, |
|
gr.update(visible=False) |
|
) |
|
else: |
|
return ( |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
None, |
|
None, |
|
gr.update(visible=True, value="Invalid username or password") |
|
) |
|
|
|
|
|
css = """ |
|
body { |
|
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; |
|
background-color: #f9f9f9; |
|
} |
|
.container { |
|
max-width: 1400px !important; |
|
margin: auto; |
|
} |
|
.setting-panel { |
|
background-color: #f0f4f8; |
|
border-radius: 10px; |
|
padding: 15px; |
|
box-shadow: 0 2px 6px rgba(0,0,0,0.1); |
|
} |
|
.chat-container { |
|
border-radius: 10px; |
|
box-shadow: 0 2px 6px rgba(0,0,0,0.1); |
|
background-color: white; |
|
} |
|
.company-header { |
|
background-color: #2c3e50; |
|
color: white; |
|
padding: 15px; |
|
border-radius: 10px 10px 0 0; |
|
margin-bottom: 15px; |
|
} |
|
.footer { |
|
text-align: center; |
|
margin-top: 20px; |
|
color: #666; |
|
font-size: 0.8em; |
|
} |
|
.message-user { |
|
background-color: #e6f7ff !important; |
|
border-radius: 15px 15px 0 15px !important; |
|
} |
|
.message-bot { |
|
background-color: #f0f0f0 !important; |
|
border-radius: 15px 15px 15px 0 !important; |
|
} |
|
.login-container { |
|
max-width: 500px; |
|
margin: 50px auto; |
|
padding: 30px; |
|
background-color: white; |
|
border-radius: 10px; |
|
box-shadow: 0 4px 10px rgba(0,0,0,0.1); |
|
} |
|
.login-header { |
|
text-align: center; |
|
margin-bottom: 30px; |
|
} |
|
.error-message { |
|
color: #e74c3c; |
|
background-color: #fdedeb; |
|
padding: 10px; |
|
border-radius: 5px; |
|
margin-bottom: 15px; |
|
font-size: 14px; |
|
} |
|
.role-badge { |
|
font-size: 12px; |
|
padding: 3px 8px; |
|
border-radius: 10px; |
|
margin-left: 10px; |
|
} |
|
.admin-badge { |
|
background-color: #e74c3c; |
|
color: white; |
|
} |
|
.user-badge { |
|
background-color: #3498db; |
|
color: white; |
|
} |
|
.setting-disabled { |
|
opacity: 0.5; |
|
pointer-events: none; |
|
} |
|
""" |
|
|
|
|
|
with gr.Blocks(css=css, title=f"{COMPANY_NAME} AI Assistant") as demo: |
|
user_id = gr.State(None) |
|
user_role = gr.State(None) |
|
|
|
with gr.Row(): |
|
gr.Markdown(f"<div class='company-header'><h1>{COMPANY_NAME} AI Assistant</h1></div>", elem_classes=["company-header"]) |
|
|
|
|
|
with gr.Group(visible=True) as login_group: |
|
with gr.Column(elem_classes=["login-container"]): |
|
gr.Markdown(f"<div class='login-header'><h2>Welcome to {COMPANY_NAME}</h2><p>Please log in to continue</p></div>") |
|
|
|
|
|
error_message = gr.Markdown(visible=False, value="", elem_classes=["error-message"]) |
|
|
|
username = gr.Textbox(label="Username", placeholder="Enter your username") |
|
password = gr.Textbox(label="Password", type="password", placeholder="Enter your password") |
|
|
|
with gr.Row(): |
|
login_button = gr.Button("Login", variant="primary", size="lg") |
|
|
|
|
|
with gr.Group(visible=False) as chat_group: |
|
with gr.Row(): |
|
|
|
with gr.Column(scale=1, elem_classes=["setting-panel"]): |
|
|
|
role_indicator = gr.Markdown("", elem_id="role-indicator") |
|
|
|
gr.Markdown("### Configuration") |
|
system_message = gr.Textbox( |
|
value=DEFAULT_SYSTEM_PROMPT, |
|
label="System Instructions", |
|
lines=4 |
|
) |
|
|
|
with gr.Group(elem_id="admin-settings") as admin_settings: |
|
max_tokens = gr.Slider( |
|
minimum=1, |
|
maximum=2048, |
|
value=512, |
|
step=1, |
|
label="Max Response Length" |
|
) |
|
temperature = gr.Slider( |
|
minimum=0.1, |
|
maximum=1.0, |
|
value=0.7, |
|
step=0.1, |
|
label="Temperature (Creativity)" |
|
) |
|
top_p = gr.Slider( |
|
minimum=0.1, |
|
maximum=1.0, |
|
value=0.95, |
|
step=0.05, |
|
label="Top-p (Variation)" |
|
) |
|
|
|
gr.Markdown("---") |
|
|
|
gr.Markdown("### Chat Actions") |
|
with gr.Row(): |
|
clear_btn = gr.Button("Clear Chat", variant="secondary") |
|
export_btn = gr.Button("Export Conversation", variant="secondary") |
|
|
|
|
|
logout_btn = gr.Button("Logout", variant="stop") |
|
|
|
gr.Markdown(f"<div class='footer'>© {datetime.now().year} {COMPANY_NAME}. All rights reserved.</div>") |
|
|
|
|
|
with gr.Column(scale=2, elem_classes=["chat-container"]): |
|
chatbot = gr.Chatbot(elem_classes=["chatbox"]) |
|
with gr.Row(): |
|
msg = gr.Textbox( |
|
show_label=False, |
|
placeholder="Type your message here...", |
|
container=False |
|
) |
|
submit_btn = gr.Button("Send", variant="primary") |
|
|
|
|
|
def update_role_display(role): |
|
if role == "admin": |
|
return f"<h3>Role: <span class='role-badge admin-badge'>Administrator</span></h3>" |
|
else: |
|
return f"<h3>Role: <span class='role-badge user-badge'>Standard User</span></h3>" |
|
|
|
def handle_role_permissions(role): |
|
if role == "admin": |
|
return gr.update(visible=True) |
|
else: |
|
return gr.update(visible=True, elem_classes=["setting-disabled"]) |
|
|
|
|
|
login_button.click( |
|
login, |
|
inputs=[username, password], |
|
outputs=[login_group, chat_group, user_id, user_role, error_message] |
|
).then( |
|
update_role_display, |
|
inputs=[user_role], |
|
outputs=[role_indicator] |
|
).then( |
|
handle_role_permissions, |
|
inputs=[user_role], |
|
outputs=[admin_settings] |
|
) |
|
|
|
|
|
def chat_with_saved_params(message, history, uid, role): |
|
|
|
system_msg = system_message.value |
|
tokens = max_tokens.value |
|
temp = temperature.value |
|
topp = top_p.value |
|
return respond(message, history, system_msg, tokens, temp, topp, uid) |
|
|
|
msg_and_submit = msg.submit( |
|
chat_with_saved_params, |
|
inputs=[msg, chatbot, user_id, user_role], |
|
outputs=[chatbot], |
|
show_progress=True |
|
) |
|
|
|
submit_click = submit_btn.click( |
|
chat_with_saved_params, |
|
inputs=[msg, chatbot, user_id, user_role], |
|
outputs=[chatbot], |
|
show_progress=True |
|
) |
|
|
|
|
|
msg_and_submit.then(lambda: gr.update(value=""), None, [msg]) |
|
submit_click.then(lambda: gr.update(value=""), None, [msg]) |
|
|
|
|
|
clear_btn.click(lambda: [], None, chatbot, queue=False) |
|
|
|
|
|
def export_conversation(chat_history, uid): |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filename = f"conversations/export_{uid}_{timestamp}.json" |
|
os.makedirs(os.path.dirname(filename), exist_ok=True) |
|
with open(filename, 'w') as f: |
|
json.dump(chat_history, f) |
|
logger.info(f"Exported conversation for user {uid}") |
|
return gr.update(value=f"Conversation exported to {filename}", visible=True) |
|
|
|
export_btn.click( |
|
export_conversation, |
|
inputs=[chatbot, user_id], |
|
outputs=[error_message] |
|
) |
|
|
|
|
|
def logout(): |
|
return gr.update(visible=True), gr.update(visible=False), None, None |
|
|
|
logout_btn.click( |
|
logout, |
|
outputs=[login_group, chat_group, user_id, user_role] |
|
) |
|
|
|
|
|
with open("styles.css", "w") as f: |
|
f.write(css) |
|
|
|
if __name__ == "__main__": |
|
|
|
if os.environ.get("PRODUCTION", "false").lower() == "true": |
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=int(os.environ.get("PORT", 7860)), |
|
share=False, |
|
show_error=False, |
|
auth=None, |
|
) |
|
else: |
|
|
|
demo.launch(share=True) |