|
import os |
|
import uuid |
|
|
|
import google.generativeai as genai |
|
|
|
from langgraph.graph import START, MessagesState, StateGraph |
|
from langgraph.checkpoint.memory import MemorySaver |
|
|
|
from langchain_core.messages import HumanMessage, AIMessage |
|
from langchain_core.prompts.chat import (ChatPromptTemplate, SystemMessagePromptTemplate, MessagesPlaceholder, HumanMessagePromptTemplate,) |
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
|
|
import gradio as gr |
|
|
|
import time |
|
import json |
|
|
|
import logging |
|
from dotenv import load_dotenv |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
load_dotenv() |
|
|
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
if not GEMINI_API_KEY: |
|
raise ValueError("Missing GEMINI_API_KEY") |
|
genai.configure(api_key=GEMINI_API_KEY) |
|
|
|
HISTORY_FILE = "chat_history.json" |
|
|
|
|
|
def load_all_sessions(): |
|
if os.path.exists(HISTORY_FILE): |
|
with open(HISTORY_FILE, "r", encoding="utf-8") as f: |
|
return json.load(f) |
|
return {} |
|
|
|
def save_all_sessions(sessions): |
|
with open(HISTORY_FILE, "w", encoding="utf-8") as f: |
|
json.dump(sessions, f, indent=2) |
|
|
|
class GeminiChatbot: |
|
def __init__(self): |
|
self.setup_model() |
|
|
|
def setup_model(self): |
|
system_template = """ |
|
You are a helpful, respectful and honest assistant. Always answer as helpfully as possible, while being safe. |
|
Your answers should be informative, engaging, and accurate. If a question doesn't make any sense, or isn't factually coherent, explain why instead of answering something not correct. |
|
If you don't know the answer to a question, please don't share false information. |
|
""" |
|
|
|
self.prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessagePromptTemplate.from_template(system_template), |
|
MessagesPlaceholder(variable_name="chat_history"), |
|
HumanMessagePromptTemplate.from_template("{input}") |
|
]) |
|
|
|
self.model = ChatGoogleGenerativeAI( |
|
model="gemini-2.0-flash", |
|
temperature=0.7, |
|
top_p=0.95, |
|
google_api_key=GEMINI_API_KEY, |
|
convert_system_message_to_human=True |
|
) |
|
|
|
def call_model(state: MessagesState): |
|
chat_history = state["messages"][:-1] |
|
user_input = state["messages"][-1].content |
|
|
|
formatted_messages = self.prompt.format_messages( |
|
chat_history=chat_history, |
|
input=user_input |
|
) |
|
|
|
response = self.model.invoke(formatted_messages) |
|
return {"messages": response} |
|
|
|
workflow = StateGraph(state_schema=MessagesState) |
|
workflow.add_node("model", call_model) |
|
workflow.add_edge(START, "model") |
|
|
|
self.memory = MemorySaver() |
|
self.app = workflow.compile(checkpointer=self.memory) |
|
|
|
def get_response(self, user_message, history, thread_id): |
|
try: |
|
langchain_history = [] |
|
for user, bot in history: |
|
langchain_history.append(HumanMessage(content=user)) |
|
langchain_history.append(AIMessage(content=bot)) |
|
|
|
input_message = HumanMessage(content=user_message) |
|
full_history = langchain_history + [input_message] |
|
|
|
full_response = "" |
|
config = {"configurable": {"thread_id": thread_id}} |
|
|
|
response = self.app.invoke({"messages": full_history}, config) |
|
complete_response = response["messages"][-1].content |
|
|
|
for char in complete_response: |
|
full_response += char |
|
yield full_response |
|
time.sleep(0.01) |
|
|
|
except Exception as e: |
|
logger.error(f"LangGraph Error: {e}") |
|
yield f"Error: {type(e).__name__} — {str(e)}" |
|
|
|
|
|
chatbot = GeminiChatbot() |
|
sessions = load_all_sessions() |
|
|
|
|
|
def launch_interface(): |
|
with gr.Blocks( |
|
theme=gr.themes.Base(), |
|
css=""" |
|
body { |
|
background-color: black; |
|
} |
|
.gr-block.gr-textbox textarea { |
|
background-color: #2f2f2f; |
|
color: white; |
|
} |
|
.gr-chatbot { |
|
background-color: #2f2f2f; |
|
color: white; |
|
} |
|
.gr-button, .gr-dropdown { |
|
margin: 5px auto; |
|
display: block; |
|
width: 50%; |
|
} |
|
.gr-markdown h2 { |
|
text-align: center; |
|
color: white; |
|
} |
|
""" |
|
) as demo: |
|
demo.title = "LangChain Powered ChatBot" |
|
gr.Markdown("## LangChain Powered ChatBot") |
|
|
|
current_thread_id = gr.State() |
|
session_names = gr.State() |
|
history = gr.State([]) |
|
|
|
if not sessions: |
|
new_id = str(uuid.uuid4()) |
|
sessions[new_id] = [] |
|
save_all_sessions(sessions) |
|
current_thread_id.value = new_id |
|
session_names.value = [f"NEW: {new_id}"] |
|
else: |
|
current_thread_id.value = next(iter(sessions.keys())) |
|
session_names.value = [f"PREVIOUS: {k}" for k in sessions.keys()] |
|
|
|
def get_dropdown_choices(): |
|
choices = [] |
|
for session_id in sessions: |
|
if sessions[session_id]: |
|
choices.append(f"PREVIOUS: {session_id}") |
|
choices.append(f"NEW: {current_thread_id.value}") |
|
return choices |
|
|
|
with gr.Column(): |
|
new_chat_btn = gr.Button("New Chat", variant="primary") |
|
session_selector = gr.Dropdown( |
|
label="Chats", |
|
choices=get_dropdown_choices(), |
|
value=f"NEW: {current_thread_id.value}", |
|
interactive=True |
|
) |
|
|
|
chatbot_ui = gr.Chatbot(label="Conversation", height=320) |
|
|
|
with gr.Row(): |
|
msg = gr.Textbox(placeholder="Ask a question...", container=False, scale=9) |
|
send = gr.Button("Send", variant="primary", scale=1) |
|
|
|
clear = gr.Button("Clear Current Chat") |
|
|
|
def start_new_chat(): |
|
new_id = str(uuid.uuid4()) |
|
sessions[new_id] = [] |
|
save_all_sessions(sessions) |
|
|
|
display_name = f"NEW: {new_id}" |
|
updated_choices = [f"PREVIOUS: {k}" for k in sessions if sessions[k]] + [display_name] |
|
|
|
return ( |
|
new_id, |
|
[], |
|
gr.update(choices=updated_choices, value=display_name), |
|
display_name |
|
) |
|
|
|
|
|
|
|
def switch_chat(selected_display_id): |
|
if not selected_display_id: |
|
return current_thread_id.value, [], "" |
|
|
|
true_id = selected_display_id.split(": ", 1)[-1] |
|
chat_history = sessions.get(true_id, []) |
|
return true_id, chat_history, selected_display_id |
|
|
|
def respond(message, history, thread_id): |
|
if not message.strip(): |
|
yield history |
|
return |
|
|
|
history.append((message, "")) |
|
yield history |
|
|
|
full_response = "" |
|
for chunk in chatbot.get_response(message, history[:-1], thread_id): |
|
full_response = chunk |
|
history[-1] = (message, full_response) |
|
yield history |
|
|
|
sessions[thread_id] = history |
|
save_all_sessions(sessions) |
|
|
|
def clear_current(thread_id): |
|
sessions[thread_id] = [] |
|
save_all_sessions(sessions) |
|
return [] |
|
|
|
new_chat_btn.click( |
|
start_new_chat, |
|
outputs=[current_thread_id, chatbot_ui, session_selector, session_selector] |
|
) |
|
|
|
session_selector.change( |
|
switch_chat, |
|
inputs=session_selector, |
|
outputs=[current_thread_id, chatbot_ui, session_selector] |
|
) |
|
|
|
send.click( |
|
respond, |
|
inputs=[msg, chatbot_ui, current_thread_id], |
|
outputs=[chatbot_ui] |
|
).then( |
|
lambda: "", None, msg |
|
) |
|
|
|
msg.submit( |
|
respond, |
|
inputs=[msg, chatbot_ui, current_thread_id], |
|
outputs=[chatbot_ui] |
|
).then( |
|
lambda: "", None, msg |
|
) |
|
|
|
clear.click( |
|
clear_current, |
|
inputs=[current_thread_id], |
|
outputs=[chatbot_ui] |
|
) |
|
|
|
return demo |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
demo = launch_interface() |
|
demo.launch(share=True) |
|
except Exception as e: |
|
logger.critical(f"App failed: {e}") |
|
|