import os import uuid import json import time import gradio as gr import logging # Load local .env only if it exists from dotenv import load_dotenv load_dotenv() import google.generativeai as genai from langgraph.graph import START, MessagesState, StateGraph from langgraph.checkpoint.memory import MemorySaver from langchain_core.messages import HumanMessage, AIMessage from langchain_core.prompts.chat import ( ChatPromptTemplate, SystemMessagePromptTemplate, MessagesPlaceholder, HumanMessagePromptTemplate, ) from langchain_google_genai import ChatGoogleGenerativeAI # === Logging === logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # === Load API Key === os.environ["GOOGLE_API_KEY"] = os.getenv["GEMINI_API_KEY"] # GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") # if not GEMINI_API_KEY: #raise ValueError("GEMINI_API_KEY is missing. Set it as an environment variable or Hugging Face Secret.") genai.configure(api_key=GEMINI_API_KEY) # === Chat Storage === HISTORY_FILE = "chat_history.json" def load_all_sessions(): if os.path.exists(HISTORY_FILE): with open(HISTORY_FILE, "r", encoding="utf-8") as f: return json.load(f) return {} def save_all_sessions(sessions): with open(HISTORY_FILE, "w", encoding="utf-8") as f: json.dump(sessions, f, indent=2) sessions = load_all_sessions() # === Gemini LLM Chatbot === class GeminiChatbot: def __init__(self): self.setup_model() def setup_model(self): system_template = """ You are a helpful, respectful and honest assistant. Always answer as helpfully as possible, while being safe. Your answers should be informative, engaging, and accurate. If a question doesn't make any sense, or isn't factually coherent, explain why. If you don't know the answer to a question, please don't share false information. """ self.prompt = ChatPromptTemplate.from_messages([ SystemMessagePromptTemplate.from_template(system_template), MessagesPlaceholder(variable_name="chat_history"), HumanMessagePromptTemplate.from_template("{input}") ]) self.model = ChatGoogleGenerativeAI( model="gemini-2.0-flash", temperature=0.7, top_p=0.95, google_api_key=GEMINI_API_KEY, convert_system_message_to_human=True ) def call_model(state: MessagesState): chat_history = state["messages"][:-1] user_input = state["messages"][-1].content formatted_messages = self.prompt.format_messages(chat_history=chat_history, input=user_input) response = self.model.invoke(formatted_messages) return {"messages": response} workflow = StateGraph(state_schema=MessagesState) workflow.add_node("model", call_model) workflow.add_edge(START, "model") self.memory = MemorySaver() self.app = workflow.compile(checkpointer=self.memory) def get_response(self, user_message, history, thread_id): from langchain_core.messages import HumanMessage, AIMessage try: # Format chat history for LangChain langchain_history = [] for user, bot in history: langchain_history.append(HumanMessage(content=user)) langchain_history.append(AIMessage(content=bot)) input_msg = HumanMessage(content=user_message) full_history = langchain_history + [input_msg] config = {"configurable": {"thread_id": thread_id}} # Get final response response = self.app.invoke({"messages": full_history}, config) full_text = response["messages"][-1].content full_response = "" for char in full_text: full_response += char yield full_response time.sleep(0.01) except Exception as e: logger.error(f"Response error: {e}") yield f"⚠ Error: {type(e).__name__} — {str(e)}" chatbot = GeminiChatbot() # === Gradio UI === def launch_interface(): with gr.Blocks( theme=gr.themes.Base(), css=""" body { background-color: black; } .gr-textbox textarea { background-color: #2f2f2f; color: white; } .gr-chatbot { background-color: #2f2f2f; color: white; } .gr-button, .gr-dropdown { margin: 5px auto; display: block; width: 50%; } .gr-markdown h2 { text-align: center; color: white; } """ ) as demo: demo.title = "LangChain Powered ChatBot" gr.Markdown("## LangChain Powered ChatBot") current_thread_id = gr.State() session_names = gr.State() history = gr.State([]) if not sessions: new_id = str(uuid.uuid4()) sessions[new_id] = [] save_all_sessions(sessions) current_thread_id.value = new_id session_names.value = [f"NEW: {new_id}"] else: current_thread_id.value = next(iter(sessions)) session_names.value = [f"PREVIOUS: {k}" for k in sessions if sessions[k]] def get_dropdown_choices(): return [f"PREVIOUS: {k}" for k in sessions if sessions[k]] + [f"NEW: {current_thread_id.value}"] # UI new_chat_btn = gr.Button("New Chat", variant="primary") session_selector = gr.Dropdown( label="Chats", choices=get_dropdown_choices(), value=f"NEW: {current_thread_id.value}", interactive=True ) chatbot_ui = gr.Chatbot(label="Conversation", height=350) with gr.Row(): msg = gr.Textbox(placeholder="Ask a question...", container=False, scale=9) send = gr.Button("Send", variant="primary", scale=1) clear = gr.Button("Clear Current Chat") # === Event Functions === def start_new_chat(): new_id = str(uuid.uuid4()) sessions[new_id] = [] save_all_sessions(sessions) display = f"NEW: {new_id}" updated = [f"PREVIOUS: {k}" for k in sessions if sessions[k]] + [display] return new_id, [], gr.update(choices=updated, value=display), display def switch_chat(display_id): true_id = display_id.split(": ", 1)[-1] return true_id, sessions.get(true_id, []), display_id def respond(message, history, thread_id): if not message.strip(): yield history return history.append((message, "")) yield history for chunk in chatbot.get_response(message, history[:-1], thread_id): history[-1] = (message, chunk) yield history sessions[thread_id] = history save_all_sessions(sessions) def clear_chat(thread_id): sessions[thread_id] = [] save_all_sessions(sessions) return [] # === Bind Events === new_chat_btn.click(start_new_chat, outputs=[current_thread_id, chatbot_ui, session_selector, session_selector]) session_selector.change(switch_chat, inputs=session_selector, outputs=[current_thread_id, chatbot_ui, session_selector]) send.click(respond, [msg, chatbot_ui, current_thread_id], [chatbot_ui]).then(lambda: "", None, msg) msg.submit(respond, [msg, chatbot_ui, current_thread_id], [chatbot_ui]).then(lambda: "", None, msg) clear.click(clear_chat, inputs=[current_thread_id], outputs=[chatbot_ui]) return demo # === Run App === if __name__ == "__main__": try: demo = launch_interface() demo.launch() except Exception as e: logger.critical(f"App failed: {e}")