|
import os |
|
import uuid |
|
import json |
|
import time |
|
import gradio as gr |
|
import logging |
|
|
|
|
|
from dotenv import load_dotenv |
|
load_dotenv() |
|
|
|
import google.generativeai as genai |
|
|
|
from langgraph.graph import START, MessagesState, StateGraph |
|
from langgraph.checkpoint.memory import MemorySaver |
|
from langchain_core.messages import HumanMessage, AIMessage |
|
from langchain_core.prompts.chat import ( |
|
ChatPromptTemplate, |
|
SystemMessagePromptTemplate, |
|
MessagesPlaceholder, |
|
HumanMessagePromptTemplate, |
|
) |
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
os.environ["GOOGLE_API_KEY"] = os.getenv["GEMINI_API_KEY"] |
|
|
|
|
|
|
|
|
|
|
|
genai.configure(api_key=GEMINI_API_KEY) |
|
|
|
|
|
HISTORY_FILE = "chat_history.json" |
|
|
|
def load_all_sessions(): |
|
if os.path.exists(HISTORY_FILE): |
|
with open(HISTORY_FILE, "r", encoding="utf-8") as f: |
|
return json.load(f) |
|
return {} |
|
|
|
def save_all_sessions(sessions): |
|
with open(HISTORY_FILE, "w", encoding="utf-8") as f: |
|
json.dump(sessions, f, indent=2) |
|
|
|
sessions = load_all_sessions() |
|
|
|
|
|
class GeminiChatbot: |
|
def __init__(self): |
|
self.setup_model() |
|
|
|
def setup_model(self): |
|
system_template = """ |
|
You are a helpful, respectful and honest assistant. Always answer as helpfully as possible, while being safe. |
|
Your answers should be informative, engaging, and accurate. If a question doesn't make any sense, or isn't factually coherent, explain why. |
|
If you don't know the answer to a question, please don't share false information. |
|
""" |
|
|
|
self.prompt = ChatPromptTemplate.from_messages([ |
|
SystemMessagePromptTemplate.from_template(system_template), |
|
MessagesPlaceholder(variable_name="chat_history"), |
|
HumanMessagePromptTemplate.from_template("{input}") |
|
]) |
|
|
|
self.model = ChatGoogleGenerativeAI( |
|
model="gemini-2.0-flash", |
|
temperature=0.7, |
|
top_p=0.95, |
|
google_api_key=GEMINI_API_KEY, |
|
convert_system_message_to_human=True |
|
) |
|
|
|
def call_model(state: MessagesState): |
|
chat_history = state["messages"][:-1] |
|
user_input = state["messages"][-1].content |
|
formatted_messages = self.prompt.format_messages(chat_history=chat_history, input=user_input) |
|
response = self.model.invoke(formatted_messages) |
|
return {"messages": response} |
|
|
|
workflow = StateGraph(state_schema=MessagesState) |
|
workflow.add_node("model", call_model) |
|
workflow.add_edge(START, "model") |
|
|
|
self.memory = MemorySaver() |
|
self.app = workflow.compile(checkpointer=self.memory) |
|
|
|
def get_response(self, user_message, history, thread_id): |
|
from langchain_core.messages import HumanMessage, AIMessage |
|
|
|
try: |
|
|
|
langchain_history = [] |
|
for user, bot in history: |
|
langchain_history.append(HumanMessage(content=user)) |
|
langchain_history.append(AIMessage(content=bot)) |
|
|
|
input_msg = HumanMessage(content=user_message) |
|
full_history = langchain_history + [input_msg] |
|
config = {"configurable": {"thread_id": thread_id}} |
|
|
|
|
|
response = self.app.invoke({"messages": full_history}, config) |
|
full_text = response["messages"][-1].content |
|
|
|
full_response = "" |
|
for char in full_text: |
|
full_response += char |
|
yield full_response |
|
time.sleep(0.01) |
|
|
|
except Exception as e: |
|
logger.error(f"Response error: {e}") |
|
yield f"⚠ Error: {type(e).__name__} — {str(e)}" |
|
|
|
|
|
chatbot = GeminiChatbot() |
|
|
|
|
|
def launch_interface(): |
|
with gr.Blocks( |
|
theme=gr.themes.Base(), |
|
css=""" |
|
body { background-color: black; } |
|
.gr-textbox textarea { background-color: #2f2f2f; color: white; } |
|
.gr-chatbot { background-color: #2f2f2f; color: white; } |
|
.gr-button, .gr-dropdown { |
|
margin: 5px auto; |
|
display: block; |
|
width: 50%; |
|
} |
|
.gr-markdown h2 { text-align: center; color: white; } |
|
""" |
|
) as demo: |
|
demo.title = "LangChain Powered ChatBot" |
|
gr.Markdown("## LangChain Powered ChatBot") |
|
|
|
current_thread_id = gr.State() |
|
session_names = gr.State() |
|
history = gr.State([]) |
|
|
|
if not sessions: |
|
new_id = str(uuid.uuid4()) |
|
sessions[new_id] = [] |
|
save_all_sessions(sessions) |
|
current_thread_id.value = new_id |
|
session_names.value = [f"NEW: {new_id}"] |
|
else: |
|
current_thread_id.value = next(iter(sessions)) |
|
session_names.value = [f"PREVIOUS: {k}" for k in sessions if sessions[k]] |
|
|
|
def get_dropdown_choices(): |
|
return [f"PREVIOUS: {k}" for k in sessions if sessions[k]] + [f"NEW: {current_thread_id.value}"] |
|
|
|
|
|
new_chat_btn = gr.Button("New Chat", variant="primary") |
|
session_selector = gr.Dropdown( |
|
label="Chats", |
|
choices=get_dropdown_choices(), |
|
value=f"NEW: {current_thread_id.value}", |
|
interactive=True |
|
) |
|
|
|
chatbot_ui = gr.Chatbot(label="Conversation", height=350) |
|
|
|
with gr.Row(): |
|
msg = gr.Textbox(placeholder="Ask a question...", container=False, scale=9) |
|
send = gr.Button("Send", variant="primary", scale=1) |
|
|
|
clear = gr.Button("Clear Current Chat") |
|
|
|
|
|
def start_new_chat(): |
|
new_id = str(uuid.uuid4()) |
|
sessions[new_id] = [] |
|
save_all_sessions(sessions) |
|
display = f"NEW: {new_id}" |
|
updated = [f"PREVIOUS: {k}" for k in sessions if sessions[k]] + [display] |
|
return new_id, [], gr.update(choices=updated, value=display), display |
|
|
|
def switch_chat(display_id): |
|
true_id = display_id.split(": ", 1)[-1] |
|
return true_id, sessions.get(true_id, []), display_id |
|
|
|
def respond(message, history, thread_id): |
|
if not message.strip(): |
|
yield history |
|
return |
|
history.append((message, "")) |
|
yield history |
|
|
|
for chunk in chatbot.get_response(message, history[:-1], thread_id): |
|
history[-1] = (message, chunk) |
|
yield history |
|
|
|
sessions[thread_id] = history |
|
save_all_sessions(sessions) |
|
|
|
def clear_chat(thread_id): |
|
sessions[thread_id] = [] |
|
save_all_sessions(sessions) |
|
return [] |
|
|
|
|
|
new_chat_btn.click(start_new_chat, outputs=[current_thread_id, chatbot_ui, session_selector, session_selector]) |
|
session_selector.change(switch_chat, inputs=session_selector, outputs=[current_thread_id, chatbot_ui, session_selector]) |
|
send.click(respond, [msg, chatbot_ui, current_thread_id], [chatbot_ui]).then(lambda: "", None, msg) |
|
msg.submit(respond, [msg, chatbot_ui, current_thread_id], [chatbot_ui]).then(lambda: "", None, msg) |
|
clear.click(clear_chat, inputs=[current_thread_id], outputs=[chatbot_ui]) |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
demo = launch_interface() |
|
demo.launch() |
|
except Exception as e: |
|
logger.critical(f"App failed: {e}") |
|
|