|
import os |
|
import uuid |
|
import json |
|
import time |
|
import gradio as gr |
|
import logging |
|
import google.generativeai as genai |
|
|
|
from langgraph.graph import START, MessagesState, StateGraph |
|
from langgraph.checkpoint.memory import MemorySaver |
|
from langchain_core.messages import HumanMessage, AIMessage |
|
from langchain_core.prompts.chat import ( |
|
ChatPromptTemplate, |
|
SystemMessagePromptTemplate, |
|
MessagesPlaceholder, |
|
HumanMessagePromptTemplate, |
|
) |
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
if not GEMINI_API_KEY: |
|
raise ValueError("GEMINI_API_KEY not found in environment variables. Please set it in Hugging Face secrets.") |
|
|
|
try: |
|
genai.configure(api_key=GEMINI_API_KEY) |
|
except Exception as e: |
|
logger.error(f"Failed to configure Gemini API: {e}") |
|
raise |
|
|
|
|
|
HISTORY_FILE = "chat_history.json" |
|
|
|
def load_all_sessions(): |
|
try: |
|
if os.path.exists(HISTORY_FILE): |
|
with open(HISTORY_FILE, "r", encoding="utf-8") as f: |
|
return json.load(f) |
|
except Exception as e: |
|
logger.error(f"Error loading sessions: {e}") |
|
return {} |
|
|
|
def save_all_sessions(sessions): |
|
try: |
|
with open(HISTORY_FILE, "w", encoding="utf-8") as f: |
|
json.dump(sessions, f, indent=2) |
|
except Exception as e: |
|
logger.error(f"Error saving sessions: {e}") |
|
|
|
|
|
sessions = load_all_sessions() |
|
|
|
|
|
class GeminiChatbot: |
|
def __init__(self): |
|
self.setup_model() |
|
|
|
def setup_model(self): |
|
system_template = """ |
|
You are a helpful, respectful and honest assistant. Always answer as helpfully as possible, while being safe. |
|
Your answers should be informative, engaging, and accurate. If a question doesn't make any sense, or isn't factually coherent, explain why. |
|
If you don't know the answer to a question, please don't share false information. |
|
""" |
|
|
|
self.prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessagePromptTemplate.from_template(system_template), |
|
MessagesPlaceholder(variable_name="chat_history"), |
|
HumanMessagePromptTemplate.from_template("{input}") |
|
]) |
|
|
|
try: |
|
self.model = ChatGoogleGenerativeAI( |
|
model="gemini-1.5-flash", |
|
temperature=0.7, |
|
top_p=0.95, |
|
google_api_key=GEMINI_API_KEY, |
|
convert_system_message_to_human=True |
|
) |
|
except Exception as e: |
|
logger.error(f"Failed to initialize Gemini model: {e}") |
|
raise |
|
|
|
def call_model(state: MessagesState): |
|
try: |
|
chat_history = state["messages"][:-1] |
|
user_input = state["messages"][-1].content |
|
formatted_messages = self.prompt.format_messages( |
|
chat_history=chat_history, |
|
input=user_input |
|
) |
|
response = self.model.invoke(formatted_messages) |
|
return {"messages": response} |
|
except Exception as e: |
|
logger.error(f"Model invocation error: {e}") |
|
raise |
|
|
|
workflow = StateGraph(state_schema=MessagesState) |
|
workflow.add_node("model", call_model) |
|
workflow.add_edge(START, "model") |
|
|
|
self.memory = MemorySaver() |
|
self.app = workflow.compile(checkpointer=self.memory) |
|
|
|
def get_response(self, user_message, history, thread_id): |
|
try: |
|
langchain_history = [] |
|
for user, bot in history: |
|
langchain_history.append(HumanMessage(content=user)) |
|
langchain_history.append(AIMessage(content=bot)) |
|
|
|
input_msg = HumanMessage(content=user_message) |
|
full_history = langchain_history + [input_msg] |
|
config = {"configurable": {"thread_id": thread_id}} |
|
|
|
response = self.app.invoke({"messages": full_history}, config) |
|
full_text = response["messages"][-1].content |
|
|
|
|
|
full_response = "" |
|
for char in full_text: |
|
full_response += char |
|
yield full_response |
|
time.sleep(0.01) |
|
|
|
except Exception as e: |
|
logger.error(f"Response error: {e}") |
|
yield f"⚠ Error: {type(e).__name__} — {str(e)}" |
|
|
|
|
|
try: |
|
chatbot = GeminiChatbot() |
|
except Exception as e: |
|
logger.critical(f"Failed to initialize chatbot: {e}") |
|
raise |
|
|
|
|
|
def launch_interface(): |
|
with gr.Blocks( |
|
theme=gr.themes.Base(), |
|
css=""" |
|
body { background-color: #f0f2f6; } |
|
.gr-block { background-color: white; } |
|
.gr-textbox textarea { background-color: white; } |
|
.gr-chatbot { background-color: white; border-radius: 10px; } |
|
.gr-button { |
|
margin: 5px; |
|
border-radius: 5px; |
|
} |
|
.gr-markdown h2 { text-align: center; } |
|
""" |
|
) as demo: |
|
demo.title = "LangChain Powered ChatBot" |
|
gr.Markdown("## LangChain Powered ChatBot") |
|
|
|
|
|
current_thread_id = gr.State() |
|
history = gr.State([]) |
|
|
|
|
|
if not sessions: |
|
new_id = str(uuid.uuid4()) |
|
sessions[new_id] = [] |
|
save_all_sessions(sessions) |
|
current_thread_id.value = new_id |
|
else: |
|
current_thread_id.value = next(iter(sessions)) |
|
|
|
def get_dropdown_choices(): |
|
choices = [] |
|
for session_id in sessions: |
|
if sessions[session_id]: |
|
first_msg = sessions[session_id][0][0][:20] |
|
choices.append((f"Chat: {first_msg}...", session_id)) |
|
choices.append(("+ New Chat", current_thread_id.value)) |
|
return choices |
|
|
|
|
|
with gr.Row(): |
|
new_chat_btn = gr.Button("+ New Chat", variant="primary") |
|
session_selector = gr.Dropdown( |
|
label="Your Chats", |
|
choices=get_dropdown_choices(), |
|
value=current_thread_id.value, |
|
interactive=True |
|
) |
|
|
|
chatbot_ui = gr.Chatbot(label="Conversation", height=400) |
|
|
|
with gr.Row(): |
|
msg = gr.Textbox(placeholder="Type your message...", container=False, scale=9) |
|
send_btn = gr.Button("Send", variant="primary", scale=1) |
|
|
|
clear_btn = gr.Button("Clear Current Chat") |
|
|
|
def start_new_chat(): |
|
new_id = str(uuid.uuid4()) |
|
sessions[new_id] = [] |
|
save_all_sessions(sessions) |
|
current_thread_id.value = new_id |
|
return [], gr.Dropdown.update(choices=get_dropdown_choices(), value=new_id) |
|
|
|
def switch_chat(session_id): |
|
if session_id == current_thread_id.value: |
|
return current_thread_id.value, sessions.get(session_id, []) |
|
current_thread_id.value = session_id |
|
return session_id, sessions.get(session_id, []) |
|
|
|
def respond(message, chat_history, thread_id): |
|
if not message.strip(): |
|
yield chat_history |
|
return |
|
|
|
chat_history.append((message, "")) |
|
yield chat_history |
|
|
|
full_response = "" |
|
for chunk in chatbot.get_response(message, chat_history[:-1], thread_id): |
|
full_response = chunk |
|
chat_history[-1] = (message, full_response) |
|
yield chat_history |
|
|
|
sessions[thread_id] = chat_history |
|
save_all_sessions(sessions) |
|
|
|
|
|
if len(chat_history) == 1: |
|
yield chat_history, gr.Dropdown.update(choices=get_dropdown_choices()) |
|
else: |
|
yield chat_history |
|
|
|
def clear_chat(thread_id): |
|
sessions[thread_id] = [] |
|
save_all_sessions(sessions) |
|
return [] |
|
|
|
|
|
new_chat_btn.click( |
|
start_new_chat, |
|
outputs=[chatbot_ui, session_selector] |
|
) |
|
|
|
session_selector.change( |
|
switch_chat, |
|
inputs=session_selector, |
|
outputs=[current_thread_id, chatbot_ui] |
|
) |
|
|
|
send_btn.click( |
|
respond, |
|
inputs=[msg, chatbot_ui, current_thread_id], |
|
outputs=[chatbot_ui, session_selector] |
|
).then(lambda: "", None, msg) |
|
|
|
msg.submit( |
|
respond, |
|
inputs=[msg, chatbot_ui, current_thread_id], |
|
outputs=[chatbot_ui, session_selector] |
|
).then(lambda: "", None, msg) |
|
|
|
clear_btn.click( |
|
clear_chat, |
|
inputs=[current_thread_id], |
|
outputs=[chatbot_ui] |
|
) |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
demo = launch_interface() |
|
demo.launch(server_name="0.0.0.0", server_port=7860) |
|
except Exception as e: |
|
logger.critical(f"Application failed to start: {e}") |