|
import gradio as gr |
|
import logging |
|
import os |
|
from huggingface_hub import InferenceClient |
|
from datetime import datetime |
|
import uuid |
|
import json |
|
import time |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler("chatbot_logs.log"), |
|
logging.StreamHandler() |
|
] |
|
) |
|
logger = logging.getLogger("CompanyChatbot") |
|
|
|
|
|
HF_MODEL = os.environ.get("HF_MODEL", "HuggingFaceH4/zephyr-7b-beta") |
|
HF_API_TOKEN = os.environ.get("HF_API_TOKEN", None) |
|
COMPANY_NAME = os.environ.get("COMPANY_NAME", "Your Company") |
|
DEFAULT_SYSTEM_PROMPT = os.environ.get("DEFAULT_SYSTEM_PROMPT", |
|
f"You are {COMPANY_NAME}'s professional AI assistant. Be helpful, accurate, and concise.") |
|
|
|
|
|
try: |
|
client = InferenceClient(HF_MODEL, token=HF_API_TOKEN) |
|
logger.info(f"Successfully initialized InferenceClient with model: {HF_MODEL}") |
|
except Exception as e: |
|
logger.error(f"Failed to initialize InferenceClient: {str(e)}") |
|
raise RuntimeError(f"Failed to initialize the model. Please check your configuration: {str(e)}") |
|
|
|
|
|
def save_conversation(user_id, conversation): |
|
filename = f"conversations/{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
|
os.makedirs(os.path.dirname(filename), exist_ok=True) |
|
with open(filename, 'w') as f: |
|
json.dump(conversation, f) |
|
logger.info(f"Saved conversation for user {user_id}") |
|
|
|
def generate_response(message, history, system_message, max_tokens, temperature, top_p, user_id): |
|
messages = [{"role": "system", "content": system_message}] |
|
|
|
|
|
for h in history: |
|
messages.append({"role": "user", "content": h[0]}) |
|
messages.append({"role": "assistant", "content": h[1]}) |
|
|
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
|
|
try: |
|
response = client.text_generation( |
|
prompt=message, |
|
max_new_tokens=max_tokens, |
|
temperature=temperature, |
|
top_p=top_p, |
|
) |
|
return response |
|
except Exception as e: |
|
logger.error(f"Error generating response: {str(e)}") |
|
return f"An error occurred: {str(e)}" |
|
|
|
def chat(message, history, system_message, max_tokens, temperature, top_p, user_id): |
|
if not message.strip(): |
|
return history |
|
|
|
logger.info(f"User {user_id} sent message - Length: {len(message)}") |
|
|
|
try: |
|
response = generate_response( |
|
message, |
|
history, |
|
system_message, |
|
max_tokens, |
|
temperature, |
|
top_p, |
|
user_id |
|
) |
|
|
|
|
|
new_history = list(history or []) |
|
new_history.append([message, str(response)]) |
|
|
|
|
|
conversation_data = { |
|
"timestamp": datetime.now().isoformat(), |
|
"user_id": user_id, |
|
"message": message, |
|
"response": str(response), |
|
"history": new_history |
|
} |
|
save_conversation(user_id, conversation_data) |
|
|
|
return new_history |
|
|
|
except Exception as e: |
|
logger.error(f"Error in chat function: {str(e)}") |
|
new_history = list(history or []) |
|
new_history.append([message, f"An error occurred: {str(e)}"]) |
|
return new_history |
|
|
|
|
|
def authenticate(username, password): |
|
valid_credentials = { |
|
"admin": {"password": "admin123", "role": "admin"}, |
|
"user": {"password": "user123", "role": "user"} |
|
} |
|
|
|
if username in valid_credentials and valid_credentials[username]["password"] == password: |
|
return True, str(uuid.uuid4()), valid_credentials[username]["role"] |
|
return False, None, None |
|
|
|
|
|
def login(username, password): |
|
if not username or not password: |
|
return ( |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
None, |
|
None, |
|
gr.update(visible=True, value="Please enter both username and password") |
|
) |
|
|
|
time.sleep(0.5) |
|
success, user_id, role = authenticate(username, password) |
|
|
|
if success: |
|
return ( |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
user_id, |
|
role, |
|
gr.update(visible=False) |
|
) |
|
else: |
|
return ( |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
None, |
|
None, |
|
gr.update(visible=True, value="Invalid username or password") |
|
) |
|
|
|
|
|
css = """ |
|
.container { max-width: 1400px !important; margin: auto; } |
|
.setting-panel { |
|
background-color: #f0f4f8; |
|
border-radius: 10px; |
|
padding: 15px; |
|
box-shadow: 0 2px 6px rgba(0,0,0,0.1); |
|
} |
|
.chat-container { |
|
border-radius: 10px; |
|
box-shadow: 0 2px 6px rgba(0,0,0,0.1); |
|
background-color: white; |
|
} |
|
.setting-disabled input, |
|
.setting-disabled select, |
|
.setting-disabled textarea, |
|
.setting-disabled button { |
|
pointer-events: none; |
|
opacity: 0.6; |
|
} |
|
""" |
|
|
|
|
|
with gr.Blocks(css=css, title=f"{COMPANY_NAME} AI Assistant") as demo: |
|
user_id = gr.State(None) |
|
user_role = gr.State(None) |
|
|
|
|
|
with gr.Group(visible=True) as login_group: |
|
username = gr.Textbox(label="Username") |
|
password = gr.Textbox(label="Password", type="password") |
|
login_button = gr.Button("Login") |
|
error_message = gr.Markdown(visible=False) |
|
|
|
|
|
with gr.Group(visible=False) as chat_group: |
|
with gr.Row(): |
|
|
|
with gr.Column(scale=1): |
|
role_display = gr.Markdown("") |
|
system_message = gr.Textbox( |
|
label="System Message", |
|
value=DEFAULT_SYSTEM_PROMPT, |
|
lines=3 |
|
) |
|
|
|
with gr.Group() as settings_group: |
|
max_tokens = gr.Slider( |
|
minimum=1, |
|
maximum=2048, |
|
value=512, |
|
step=1, |
|
label="Max Tokens" |
|
) |
|
temperature = gr.Slider( |
|
minimum=0.1, |
|
maximum=1.0, |
|
value=0.7, |
|
step=0.1, |
|
label="Temperature" |
|
) |
|
top_p = gr.Slider( |
|
minimum=0.1, |
|
maximum=1.0, |
|
value=0.95, |
|
step=0.05, |
|
label="Top P" |
|
) |
|
|
|
clear_button = gr.Button("Clear Chat") |
|
logout_button = gr.Button("Logout") |
|
|
|
|
|
with gr.Column(scale=2): |
|
chatbot = gr.Chatbot() |
|
msg = gr.Textbox( |
|
show_label=False, |
|
placeholder="Type your message here..." |
|
) |
|
submit_button = gr.Button("Send") |
|
|
|
|
|
def update_interface(role): |
|
settings_class = [] if role == "admin" else ["setting-disabled"] |
|
return ( |
|
f"Current Role: {role.title()}", |
|
gr.update(elem_classes=settings_class) |
|
) |
|
|
|
def chat_wrapper(message, history, uid, role): |
|
if not message.strip(): |
|
return history |
|
|
|
system_msg = system_message.value |
|
max_tok = max_tokens.value if role == "admin" else 512 |
|
temp = temperature.value if role == "admin" else 0.7 |
|
tp = top_p.value if role == "admin" else 0.95 |
|
|
|
return chat(message, history, system_msg, max_tok, temp, tp, uid) |
|
|
|
|
|
login_button.click( |
|
login, |
|
inputs=[username, password], |
|
outputs=[login_group, chat_group, user_id, user_role, error_message] |
|
).then( |
|
update_interface, |
|
inputs=[user_role], |
|
outputs=[role_display, settings_group] |
|
) |
|
|
|
|
|
submit_button.click( |
|
chat_wrapper, |
|
inputs=[msg, chatbot, user_id, user_role], |
|
outputs=[chatbot] |
|
).then(lambda: "", None, msg) |
|
|
|
msg.submit( |
|
chat_wrapper, |
|
inputs=[msg, chatbot, user_id, user_role], |
|
outputs=[chatbot] |
|
).then(lambda: "", None, msg) |
|
|
|
|
|
clear_button.click(lambda: None, None, chatbot) |
|
|
|
|
|
logout_button.click( |
|
lambda: ( |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
None, |
|
None |
|
), |
|
outputs=[login_group, chat_group, user_id, user_role] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(share=True) |