diff --git "a/src/ui/gradio_app.py" "b/src/ui/gradio_app.py"
new file mode 100644--- /dev/null
+++ "b/src/ui/gradio_app.py"
@@ -0,0 +1,3742 @@
+"""
+Gradio UI Module
+
+This module provides an intuitive interface for document upload,
+URL input, and querying using Gradio.
+
+Technology: Gradio
+"""
+
+import logging
+import os
+import sys
+import tempfile
+import json
+import time
+from typing import Dict, List, Any, Optional, Tuple
+from datetime import datetime
+from pathlib import Path
+
+try:
+ import gradio as gr
+except ImportError:
+ logging.warning("Gradio not available.")
+
+
+class GradioApp:
+ """
+ Provides a comprehensive Gradio-based user interface for the RAG system.
+
+ Features:
+ - Document upload with progress tracking
+ - URL processing with status updates
+ - Interactive Q&A interface with source display
+ - Knowledge base management
+ - System status and health monitoring
+ - Analytics dashboard
+ """
+
+ def __init__(self, rag_system, config: Optional[Dict[str, Any]] = None):
+ """
+ Initialize the GradioApp with the RAG system.
+
+ Args:
+ rag_system: Instance of the complete RAG system
+ config: Configuration dictionary with UI parameters
+ """
+ self.rag_system = rag_system
+ self.config = config or {}
+ self.logger = self._setup_unicode_logger()
+
+ # ๐ง Initialize settings manager
+ from utils.settings_manager import SettingsManager
+
+ config_manager = getattr(rag_system, "config_manager", None)
+ self.settings_manager = SettingsManager(config_manager)
+
+ # UI Configuration
+ self.title = self.config.get("title", "AI Embedded Knowledge Agent")
+ self.description = self.config.get(
+ "description",
+ "Upload documents or provide URLs to build your knowledge base, then ask questions!",
+ )
+ self.theme = self.config.get("theme", "default")
+ self.share = self.config.get("share", False)
+
+ # Features configuration
+ self.features = self.config.get("features", {})
+ self.enable_file_upload = self.features.get("file_upload", True)
+ self.enable_url_input = self.features.get("url_input", True)
+ self.enable_query_interface = self.features.get("query_interface", True)
+ self.enable_source_display = self.features.get("source_display", True)
+ self.enable_confidence_display = self.features.get("confidence_display", True)
+
+ # State management
+ self.processing_status = "Ready"
+ self.total_documents = 0
+ self.total_chunks = 0
+ self.query_count = 0
+
+ # Initialize interface
+ self.interface = None
+ self._create_interface()
+
+ self._log_safe("GradioApp initialized with advanced features")
+
+ def _setup_unicode_logger(self):
+ """๐ง Setup Unicode-safe logger for cross-platform compatibility."""
+ logger = logging.getLogger(__name__)
+
+ # โ
Configure handler with UTF-8 encoding for Windows compatibility
+ if not logger.handlers:
+ handler = logging.StreamHandler(sys.stdout)
+
+ # ๐ Force UTF-8 encoding on Windows to handle emojis
+ if sys.platform.startswith("win"):
+ try:
+ # โก Try to reconfigure stdout with UTF-8 encoding
+ handler.stream = open(
+ sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1
+ )
+ except Exception:
+ # ๐ Fallback to default if reconfiguration fails
+ pass
+
+ formatter = logging.Formatter(
+ "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+ )
+ handler.setFormatter(formatter)
+ logger.addHandler(handler)
+ logger.setLevel(logging.INFO)
+
+ return logger
+
+ def _log_safe(self, message: str, level: str = "info"):
+ """๐ก๏ธ Unicode-safe logging that handles emojis on Windows."""
+ try:
+ # โ
Pre-process message to be safe for Windows cp1252 encoding
+ safe_message = self._make_message_safe(message)
+ getattr(self.logger, level)(safe_message)
+ except UnicodeEncodeError:
+ # ๐ Additional fallback: Remove all non-ASCII characters
+ ascii_message = message.encode("ascii", "ignore").decode("ascii")
+ getattr(self.logger, level)(f"[ENCODING_SAFE] {ascii_message}")
+ except Exception as e:
+ # ๐จ Last resort: Basic logging without special characters
+ basic_message = (
+ str(message).replace("๐", "[LIVE]").replace("๐", "[LOCAL]")
+ )
+ try:
+ getattr(self.logger, level)(f"[SAFE] {basic_message}")
+ except:
+ print(f"[FALLBACK] {basic_message}") # Direct print as last resort
+
+ def _make_message_safe(self, message: str) -> str:
+ """๐ Convert emoji characters to safe text equivalents."""
+ emoji_map = {
+ "๐": "[SEARCH]",
+ "โ
": "[SUCCESS]",
+ "โ": "[ERROR]",
+ "๐": "[ROCKET]",
+ "๐": "[DOC]",
+ "๐": "[LINK]",
+ "โก": "[FAST]",
+ "๐ฏ": "[TARGET]",
+ "๐ข": "[GREEN]",
+ "๐ก": "[YELLOW]",
+ "๐ด": "[RED]",
+ "๐": "[CHART]",
+ "๐ท๏ธ": "[SPIDER]",
+ "๐ก": "[IDEA]",
+ "๐": "[REFRESH]",
+ "๐": "[BOOKS]",
+ "๐ฉบ": "[HEALTH]",
+ "๐": "[ANALYTICS]",
+ "๐": "[LIVE]",
+ "๐": "[WORLD]",
+ "๐ง": "[TOOL]",
+ "๐ก๏ธ": "[SHIELD]",
+ "๐จ": "[DESIGN]",
+ "๐": "[NOTE]",
+ "๐๏ธ": "[DELETE]",
+ "๐พ": "[SAVE]",
+ "๐": "[FOLDER]",
+ "๐": "[BELL]",
+ "โ๏ธ": "[SETTINGS]",
+ "๐งช": "[TEST]",
+ "๐ค": "[EXPORT]",
+ "๐": "[PORT]",
+ "๐ฒ": "[TREE]",
+ "๐ฅ": "[FIRE]",
+ "๐": "[KEY]",
+ "๐ ๏ธ": "[WRENCH]",
+ "๐ป": "[COMPUTER]",
+ "๐๏ธ": "[BUILDING]",
+ "โ": "[QUESTION]",
+ "๐ชฒ": "[BUG]",
+ "๐ช": "[BOOMERANG]",
+ "๐": "[BOOK]",
+ "๐งน": "[BROOM]",
+ "๐ฌ": "[MICROSCOPE]",
+ "๐ค": "[ROBOT]", # Added for Auto mode
+ "๐": "[HYBRID]", # Added for Hybrid mode
+ }
+
+ safe_message = message
+ for emoji, replacement in emoji_map.items():
+ safe_message = safe_message.replace(emoji, replacement)
+
+ return safe_message
+
+ def _create_interface(self):
+ """๐จ Create the modern full-width Gradio interface."""
+ # ๐ Use modern theme with custom CSS
+ theme = gr.themes.Soft(
+ primary_hue="blue",
+ secondary_hue="purple",
+ neutral_hue="slate",
+ font=gr.themes.GoogleFont("Inter"),
+ font_mono=gr.themes.GoogleFont("JetBrains Mono"),
+ ).set(
+ body_background_fill="*neutral_50",
+ body_text_color="*neutral_800",
+ button_primary_background_fill="linear-gradient(135deg, #667eea 0%, #764ba2 100%)",
+ button_primary_background_fill_hover="linear-gradient(135deg, #5a67d8 0%, #6b46c1 100%)",
+ button_primary_text_color="white",
+ input_background_fill="*neutral_50",
+ block_background_fill="white",
+ block_border_width="1px",
+ block_border_color="*neutral_200",
+ block_radius="12px",
+ container_radius="20px",
+ )
+
+ with gr.Blocks(
+ title=self.title,
+ theme=theme,
+ css=self._get_custom_css(),
+ head="""
+
+
+
+
+ """,
+ ) as interface:
+
+ # ๐ฏ Modern Header with Gradient Background
+ with gr.Row(elem_classes="app-header"):
+ with gr.Column():
+ gr.HTML(
+ f"""
+
๐ {self.title}
+ {self.description}
+ """
+ )
+
+ # ๐ Enhanced Status Bar with Modern Design
+ with gr.Row(elem_classes="status-bar"):
+ with gr.Column():
+ status_display = gr.HTML(
+ value="""
+
+ ๐ข
+ System Status: Ready
+
+ """,
+ elem_classes="status-display",
+ )
+ with gr.Column():
+ stats_display = gr.HTML(
+ value="""
+
+ ๐
+ Stats: Documents: 0 | Chunks: 0 | Queries: 0
+
+ """,
+ elem_classes="stats-display",
+ )
+
+ # Store interface components for updates early
+ self.status_display = status_display
+ self.stats_display = stats_display
+
+ # ๐จ Modern Interface Tabs with Enhanced Styling
+ with gr.Tabs(elem_classes="tab-nav") as tabs:
+ # ๐ Document Upload Tab
+ if self.enable_file_upload:
+ with gr.TabItem(
+ "๐ Upload Documents", id="upload_tab", elem_classes="tab-item"
+ ):
+ with gr.Column(elem_classes="feature-card fade-in"):
+ upload_components = self._create_upload_tab()
+
+ # ๐ URL Processing Tab
+ if self.enable_url_input:
+ with gr.TabItem(
+ "๐ Add URLs", id="url_tab", elem_classes="tab-item"
+ ):
+ with gr.Column(elem_classes="feature-card fade-in"):
+ url_components = self._create_url_tab()
+
+ # โ Query Interface Tab (Primary Tab)
+ if self.enable_query_interface:
+ with gr.TabItem(
+ "โ Ask Questions", id="query_tab", elem_classes="tab-item"
+ ):
+ with gr.Column(elem_classes="feature-card fade-in"):
+ query_components = self._create_query_tab()
+
+ # ๐ Knowledge Base Management Tab
+ with gr.TabItem(
+ "๐ Knowledge Base", id="kb_tab", elem_classes="tab-item"
+ ):
+ with gr.Column(elem_classes="feature-card fade-in"):
+ kb_components = self._create_knowledge_base_tab()
+
+ # ๐ Analytics Dashboard Tab
+ with gr.TabItem(
+ "๐ Analytics", id="analytics_tab", elem_classes="tab-item"
+ ):
+ with gr.Column(elem_classes="feature-card fade-in"):
+ analytics_components = self._create_analytics_tab()
+
+ # ๐ฉบ System Health Tab
+ with gr.TabItem(
+ "๐ฉบ System Health", id="health_tab", elem_classes="tab-item"
+ ):
+ with gr.Column(elem_classes="feature-card fade-in"):
+ health_components = self._create_health_tab()
+
+ # โ๏ธ Settings Tab
+ with gr.TabItem(
+ "โ๏ธ Settings", id="settings_tab", elem_classes="tab-item"
+ ):
+ with gr.Column(elem_classes="feature-card fade-in"):
+ settings_components = self._create_settings_tab()
+
+ self.interface = interface
+
+ def _create_upload_tab(self):
+ """๐จ Create the modern document upload tab with full-width design."""
+ # ๐ Upload Statistics Cards
+ with gr.Row(elem_classes="analytics-grid"):
+ with gr.Column(elem_classes="stat-card accent-blue"):
+ gr.HTML(
+ """
+ 7+
+ Supported Formats
+ """
+ )
+ with gr.Column(elem_classes="stat-card accent-green"):
+ gr.HTML(
+ """
+ โ
+ File Size Limit
+ """
+ )
+ with gr.Column(elem_classes="stat-card accent-purple"):
+ gr.HTML(
+ """
+ โก
+ Fast Processing
+ """
+ )
+
+ # ๐ฏ Main Upload Interface
+ with gr.Row(elem_classes="grid-2"):
+ with gr.Column(elem_classes="metric-card"):
+ gr.HTML(
+ """
+
+ ๐ Upload Documents
+
+
+ Drag & drop files or click to browse. Multiple files supported.
+
+ """
+ )
+
+ # ๐ Supported Formats Display
+ gr.HTML(
+ """
+
+ โ
Supported Formats:
+ ๐ PDF โข ๐ DOCX โข ๐ CSV โข ๐ XLSX โข ๐ฏ PPTX โข ๐ TXT โข ๐ MD
+
+ """
+ )
+
+ file_upload = gr.File(
+ label="๐ Select Files",
+ file_count="multiple",
+ file_types=[
+ ".pdf",
+ ".docx",
+ ".csv",
+ ".xlsx",
+ ".pptx",
+ ".txt",
+ ".md",
+ ],
+ height=250,
+ elem_classes="input-field",
+ )
+
+ # ๐จ Action Buttons with Modern Styling
+ with gr.Row():
+ upload_btn = gr.Button(
+ "๐ Process Documents",
+ variant="primary",
+ size="lg",
+ elem_classes="btn-primary",
+ )
+ clear_upload_btn = gr.Button(
+ "๐๏ธ Clear", variant="secondary", elem_classes="btn-secondary"
+ )
+
+ with gr.Column(elem_classes="metric-card"):
+ gr.HTML(
+ """
+
+ ๐ Processing Results
+
+
+ Real-time processing status and detailed results will appear here.
+
+ """
+ )
+
+ upload_output = gr.Textbox(
+ label="๐ Processing Log",
+ lines=18,
+ interactive=False,
+ placeholder="๐ Upload results will appear here...\n\n๐ก Tips:\nโข Multiple files can be processed simultaneously\nโข Processing time depends on file size and complexity\nโข Check the status bar for real-time updates",
+ elem_classes="input-field",
+ )
+
+ # ๐ Processing Tips
+ with gr.Accordion("๐ก Processing Tips & Best Practices", open=False):
+ gr.HTML(
+ """
+
+
+
๐ File Preparation
+
+ Ensure text is readable and not scanned images
+ Remove password protection from PDFs
+ Use descriptive filenames
+
+
+
+
โก Performance Tips
+
+ Smaller files process faster
+ Batch upload related documents
+ Monitor system resources
+
+
+
+
๐ฏ Quality Guidelines
+
+ High-quality text improves search accuracy
+ Structured documents work better
+ Remove unnecessary formatting
+
+
+
+ """
+ )
+
+ # Event handlers
+ upload_btn.click(
+ fn=self._process_documents,
+ inputs=[file_upload],
+ outputs=[upload_output, self.status_display, self.stats_display],
+ )
+
+ clear_upload_btn.click(
+ fn=lambda: ("", "Ready "), outputs=[upload_output, self.status_display]
+ )
+
+ return {
+ "file_upload": file_upload,
+ "upload_btn": upload_btn,
+ "upload_output": upload_output,
+ }
+
+ def _create_url_tab(self):
+ """Create the URL processing tab."""
+ with gr.Row():
+ with gr.Column(scale=1):
+ gr.Markdown("### Add URLs")
+ gr.Markdown("Enter URLs to extract content from web pages")
+
+ url_input = gr.Textbox(
+ label="URLs (one per line)",
+ lines=8,
+ placeholder="https://example.com\nhttps://another-site.com\n...",
+ )
+
+ with gr.Accordion("โ๏ธ Advanced Crawling Options", open=False):
+ gr.Markdown("๐ท๏ธ **Crawl Configuration**")
+
+ max_depth = gr.Slider(
+ label="๐ Crawl Depth (How deep to follow links)",
+ minimum=1,
+ maximum=5,
+ value=1,
+ step=1,
+ info="Higher depth = more pages but slower processing",
+ )
+
+ follow_links = gr.Checkbox(
+ label="๐ Follow Internal Links",
+ value=True,
+ info="Automatically discover and process linked pages",
+ )
+
+ gr.Markdown("โก **Performance Tips:**")
+ gr.Markdown("โข Depth 1: Single page only")
+ gr.Markdown("โข Depth 2-3: Good for small sites")
+ gr.Markdown("โข Depth 4-5: Use carefully, can be slow")
+
+ with gr.Row():
+ url_btn = gr.Button("๐ Process URLs", variant="primary", size="lg")
+ clear_url_btn = gr.Button("๐๏ธ Clear", variant="secondary")
+
+ # Progress indicator
+ with gr.Row():
+ progress_info = gr.Textbox(
+ label="๐ Processing Status",
+ value="Ready to process URLs...",
+ interactive=False,
+ visible=True,
+ )
+
+ with gr.Column(scale=1):
+ gr.Markdown("### Processing Results")
+ url_output = gr.Textbox(
+ label="Results",
+ lines=15,
+ interactive=False,
+ placeholder="URL processing results will appear here...",
+ )
+
+ # Event handlers
+ url_btn.click(
+ fn=self._process_urls,
+ inputs=[url_input, max_depth, follow_links],
+ outputs=[
+ url_output,
+ self.status_display,
+ self.stats_display,
+ progress_info,
+ ],
+ )
+
+ clear_url_btn.click(
+ fn=lambda: ("", "Ready ๐ข", "Ready to process URLs..."),
+ outputs=[url_output, self.status_display, progress_info],
+ )
+
+ return {
+ "url_input": url_input,
+ "url_btn": url_btn,
+ "url_output": url_output,
+ }
+
+ def _create_query_tab(self):
+ """๐จ Create the modern query interface tab with enhanced UX."""
+ # ๐ฏ Quick Action Cards
+ with gr.Row(elem_classes="analytics-grid"):
+ with gr.Column(elem_classes="stat-card accent-blue"):
+ gr.HTML(
+ """
+ ๐ค
+ AI-Powered Search
+ """
+ )
+ with gr.Column(elem_classes="stat-card accent-green"):
+ gr.HTML(
+ """
+ ๐
+ Live Web Search
+ """
+ )
+ with gr.Column(elem_classes="stat-card accent-purple"):
+ gr.HTML(
+ """
+ ๐
+ Local Knowledge
+ """
+ )
+ with gr.Column(elem_classes="stat-card accent-orange"):
+ gr.HTML(
+ """
+ โก
+ Instant Results
+ """
+ )
+
+ # ๐ Main Query Interface
+ with gr.Row(elem_classes="grid-2"):
+ with gr.Column(elem_classes="metric-card"):
+ gr.HTML(
+ """
+
+ โ Ask Your Question
+
+
+ Ask anything about your documents or get real-time information from the web.
+
+ """
+ )
+
+ # ๐ฏ Enhanced Search Input
+ with gr.Column(elem_classes="search-container"):
+ query_input = gr.Textbox(
+ label="๐ Your Question",
+ lines=4,
+ placeholder="๐ก Try asking:\nโข 'What are the main points in the uploaded document?'\nโข 'Latest news about AI developments'\nโข 'Summarize the key findings from my research papers'",
+ elem_classes="search-input",
+ )
+
+ # ๐จ Quick Query Suggestions
+ gr.HTML(
+ """
+
+
๐ก Quick Suggestions:
+
+ ๐ Summarize
+ ๐ Key Findings
+ ๐ Latest News
+
+
+ """
+ )
+
+ # โ๏ธ Advanced Query Options
+ with gr.Accordion("โ๏ธ Advanced Query Options", open=False):
+ with gr.Row():
+ include_sources = gr.Checkbox(
+ label="๐ Include Sources",
+ value=True,
+ info="Show source documents and references",
+ )
+ max_results = gr.Slider(
+ label="๐ Max Results",
+ minimum=1,
+ maximum=10,
+ value=5,
+ step=1,
+ info="Maximum number of results to return",
+ )
+
+ # ๐ Enhanced Search Mode Selection
+ with gr.Group():
+ gr.HTML(
+ """
+ ๐ Search Mode & Options
+ """
+ )
+
+ search_mode = gr.Dropdown(
+ label="๐ฏ Search Mode",
+ choices=[
+ ("๐ค Auto (Smart Routing)", "auto"),
+ ("๐ Local Only (Stored Documents)", "local_only"),
+ ("๐ Live Only (Web Search)", "live_only"),
+ ("๐ Hybrid (Local + Live)", "hybrid"),
+ ],
+ value="auto",
+ info="Choose how to search for information",
+ )
+
+ use_live_search = gr.Checkbox(
+ label="๐ Enable Live Web Search",
+ value=False,
+ info="Enable web search (will use hybrid mode by default)",
+ )
+
+ with gr.Row():
+ search_depth = gr.Dropdown(
+ label="๐ท๏ธ Search Depth",
+ choices=["basic", "advanced"],
+ value="basic",
+ info="Basic: faster, Advanced: more comprehensive",
+ visible=False,
+ )
+ time_range = gr.Dropdown(
+ label="โฐ Time Range",
+ choices=["day", "week", "month", "year"],
+ value="month",
+ info="How recent should the web results be",
+ visible=False,
+ )
+
+ # ๐ก Dynamic options visibility
+ use_live_search.change(
+ fn=lambda enabled: (
+ gr.update(visible=enabled),
+ gr.update(visible=enabled),
+ gr.update(value="hybrid" if enabled else "auto"),
+ ),
+ inputs=[use_live_search],
+ outputs=[search_depth, time_range, search_mode],
+ )
+
+ # ๐ Search Mode Guide
+ with gr.Accordion("โน๏ธ Search Mode Guide", open=False):
+ gr.HTML(
+ """
+
+
+
๐ค Auto Mode
+
Intelligently chooses the best search method based on your query
+
+ Time-sensitive queries โ Live search
+ Conceptual questions โ Local documents
+ Factual queries โ Hybrid approach
+
+
+
+
๐ Local Only
+
Search only in your uploaded documents
+
+ Fastest response time
+ Uses your knowledge base
+ No internet required
+
+
+
+
๐ Live Only
+
Search only the web for real-time information
+
+ Latest information
+ Current events and news
+ Requires Tavily API key
+
+
+
+
๐ Hybrid
+
Combines both local documents and live web search
+
+ Best of both worlds
+ Comprehensive results
+ Balanced approach (recommended)
+
+
+
+ """
+ )
+
+ # ๐ Action Buttons
+ with gr.Row():
+ query_btn = gr.Button(
+ "๐ Get Answer",
+ variant="primary",
+ size="lg",
+ elem_classes="btn-primary",
+ )
+ clear_query_btn = gr.Button(
+ "๐๏ธ Clear", variant="secondary", elem_classes="btn-secondary"
+ )
+
+ with gr.Column(elem_classes="metric-card"):
+ gr.HTML(
+ """
+
+ ๐ฌ AI Response
+
+
+ Intelligent answers with source citations and confidence scoring.
+
+ """
+ )
+
+ response_output = gr.Markdown(
+ label="๐ค AI Response",
+ value="๐ฎ **Your intelligent answer will appear here...**\n\n๐ก **Tips for better results:**\n- Be specific in your questions\n- Use natural language\n- Ask follow-up questions for clarification\n- Check the confidence score and sources",
+ height=450,
+ elem_classes="input-field",
+ )
+
+ # ๐ Response Metadata
+ with gr.Row():
+ confidence_display = gr.Textbox(
+ label="๐ฏ Confidence & Performance",
+ interactive=False,
+ visible=self.enable_confidence_display,
+ elem_classes="input-field",
+ )
+
+ # ๐ Sources Display
+ sources_output = gr.JSON(
+ label="๐ Sources & References",
+ visible=self.enable_source_display,
+ elem_classes="input-field",
+ )
+
+ # ๐ Query Performance Tips
+ with gr.Accordion("๐ฏ Query Optimization Tips", open=False):
+ gr.HTML(
+ """
+
+
+
๐ฏ Question Formulation
+
+ Be specific and clear
+ Use natural language
+ Include context when needed
+ Ask one question at a time
+
+
+
+
๐ Search Strategy
+
+ Use Auto mode for best results
+ Enable live search for current info
+ Adjust max results based on need
+ Check confidence scores
+
+
+
+
๐ Source Utilization
+
+ Review source citations
+ Cross-reference multiple sources
+ Verify critical information
+ Use sources for deeper research
+
+
+
+ """
+ )
+
+ # Event handlers
+ query_btn.click(
+ fn=self._process_query,
+ inputs=[
+ query_input,
+ include_sources,
+ max_results,
+ use_live_search,
+ search_depth,
+ time_range,
+ search_mode,
+ ],
+ outputs=[
+ response_output,
+ confidence_display,
+ sources_output,
+ self.status_display,
+ self.stats_display,
+ ],
+ )
+
+ clear_query_btn.click(
+ fn=lambda: ("", "", {}, "Ready ๐ข"),
+ outputs=[
+ response_output,
+ confidence_display,
+ sources_output,
+ self.status_display,
+ ],
+ )
+
+ return {
+ "query_input": query_input,
+ "query_btn": query_btn,
+ "response_output": response_output,
+ "sources_output": sources_output,
+ "use_live_search": use_live_search,
+ "search_depth": search_depth,
+ "time_range": time_range,
+ "search_mode": search_mode,
+ }
+
+ def _create_knowledge_base_tab(self):
+ """Create the knowledge base management tab."""
+ with gr.Column():
+ gr.Markdown("### ๐ Knowledge Base Management")
+
+ with gr.Row():
+ refresh_btn = gr.Button("Refresh", variant="secondary")
+ export_btn = gr.Button("๐ค Export", variant="secondary")
+ clear_kb_btn = gr.Button("Clear All", variant="stop")
+
+ # Knowledge base stats with enhanced embedding model info
+ kb_stats = gr.JSON(
+ label="๐ Knowledge Base Statistics",
+ value={
+ "total_documents": 0,
+ "total_chunks": 0,
+ "storage_size": "0 MB",
+ "embedding_model": "Loading...",
+ "embedding_status": "Checking...",
+ "vector_db_status": "Checking...",
+ },
+ )
+
+ # ๐ค Embedding Model Status Display
+ embedding_model_status = gr.JSON(
+ label="๐ค Embedding Model Information",
+ value={
+ "model_name": "Loading...",
+ "provider": "Checking...",
+ "status": "Initializing...",
+ "api_status": "Checking connection...",
+ "dimension": "Unknown",
+ "performance": "Gathering stats...",
+ },
+ )
+
+ # Document list
+ document_list = gr.Dataframe(
+ headers=["Source", "Type", "Chunks", "Added"],
+ datatype=["str", "str", "number", "str"],
+ label="๐ Documents in Knowledge Base",
+ interactive=False,
+ )
+
+ # Event handlers
+ refresh_btn.click(
+ fn=self._refresh_knowledge_base,
+ outputs=[kb_stats, embedding_model_status, document_list],
+ )
+
+ return {
+ "kb_stats": kb_stats,
+ "embedding_model_status": embedding_model_status,
+ "document_list": document_list,
+ }
+
+ def _create_analytics_tab(self):
+ """Create the analytics dashboard tab with real-time data."""
+ with gr.Column():
+ gr.Markdown("### ๐ Analytics Dashboard")
+ gr.Markdown("Real-time insights into your RAG system performance")
+
+ with gr.Row():
+ refresh_analytics_btn = gr.Button(
+ "๐ Refresh Analytics", variant="secondary"
+ )
+ export_analytics_btn = gr.Button(
+ "๐ Export Report", variant="secondary"
+ )
+
+ with gr.Row():
+ with gr.Column():
+ query_analytics = gr.JSON(
+ label="๐ Query Analytics",
+ value=self._get_initial_query_analytics(),
+ )
+
+ with gr.Column():
+ system_metrics = gr.JSON(
+ label="โก System Metrics",
+ value=self._get_initial_system_metrics(),
+ )
+
+ with gr.Row():
+ with gr.Column():
+ performance_metrics = gr.JSON(
+ label="๐ Performance Metrics",
+ value=self._get_initial_performance_metrics(),
+ )
+
+ with gr.Column():
+ usage_stats = gr.JSON(
+ label="๐ Usage Statistics",
+ value=self._get_initial_usage_stats(),
+ )
+
+ # Query history with enhanced information
+ query_history = gr.Dataframe(
+ headers=[
+ "Query",
+ "Results",
+ "Confidence",
+ "Processing Time",
+ "Timestamp",
+ ],
+ datatype=["str", "number", "number", "str", "str"],
+ label="๐ Recent Query History",
+ interactive=False,
+ value=self._get_initial_query_history(),
+ )
+
+ # Event handlers
+ refresh_analytics_btn.click(
+ fn=self._refresh_analytics,
+ outputs=[
+ query_analytics,
+ system_metrics,
+ performance_metrics,
+ usage_stats,
+ query_history,
+ ],
+ )
+
+ return {
+ "query_analytics": query_analytics,
+ "system_metrics": system_metrics,
+ "performance_metrics": performance_metrics,
+ "usage_stats": usage_stats,
+ "query_history": query_history,
+ }
+
+ def _get_initial_query_analytics(self) -> Dict[str, Any]:
+ """Get initial query analytics data."""
+ return {
+ "total_queries": self.query_count,
+ "average_confidence": "N/A",
+ "most_common_topics": [],
+ "query_success_rate": "100%",
+ "cache_hit_rate": "0%",
+ "status": "๐ Ready to track queries",
+ }
+
+ def _get_initial_system_metrics(self) -> Dict[str, Any]:
+ """Get initial system metrics."""
+ # Get real embedding model info
+ embedding_info = self._get_embedding_model_info()
+
+ return {
+ "documents_processed": self.total_documents,
+ "chunks_stored": self.total_chunks,
+ "embedding_model": embedding_info.get("model_name", "Gemini"),
+ "embedding_status": embedding_info.get("status", "Checking..."),
+ "embedding_provider": embedding_info.get("provider", "Google"),
+ "vector_db": "Pinecone",
+ "uptime": "Just started",
+ "status": "๐ข System operational",
+ }
+
+ def _get_initial_performance_metrics(self) -> Dict[str, Any]:
+ """Get initial performance metrics."""
+ return {
+ "avg_query_time": "N/A",
+ "avg_embedding_time": "N/A",
+ "avg_retrieval_time": "N/A",
+ "memory_usage": "Normal",
+ "throughput": "N/A queries/min",
+ "status": "โก Performance tracking active",
+ }
+
+ def _get_initial_usage_stats(self) -> Dict[str, Any]:
+ """Get initial usage statistics."""
+ return {
+ "documents_uploaded": 0,
+ "urls_processed": 0,
+ "successful_queries": 0,
+ "failed_queries": 0,
+ "peak_usage_time": "N/A",
+ "status": "๐ Usage tracking enabled",
+ }
+
+ def _get_initial_query_history(self) -> List[List[str]]:
+ """Get initial query history."""
+ return [
+ ["No queries yet", "0", "0.0", "0.0s", "Start asking questions!"],
+ ["Upload documents first", "0", "0.0", "0.0s", "Build your knowledge base"],
+ [
+ "Try the examples above",
+ "0",
+ "0.0",
+ "0.0s",
+ "Get started with sample queries",
+ ],
+ ]
+
+ def _refresh_analytics(
+ self,
+ ) -> Tuple[
+ Dict[str, Any], Dict[str, Any], Dict[str, Any], Dict[str, Any], List[List[str]]
+ ]:
+ """Refresh all analytics data."""
+ try:
+ # Get real analytics from query processor if available
+ query_analytics = self._get_real_query_analytics()
+ system_metrics = self._get_real_system_metrics()
+ performance_metrics = self._get_real_performance_metrics()
+ usage_stats = self._get_real_usage_stats()
+ query_history = self._get_real_query_history()
+
+ return (
+ query_analytics,
+ system_metrics,
+ performance_metrics,
+ usage_stats,
+ query_history,
+ )
+
+ except Exception as e:
+ self._log_safe(f"โ Error refreshing analytics: {e}", "error")
+ return (
+ {"error": str(e)},
+ {"error": str(e)},
+ {"error": str(e)},
+ {"error": str(e)},
+ [["Error loading history", "0", "0.0", "0.0s", str(e)]],
+ )
+
+ def _get_real_query_analytics(self) -> Dict[str, Any]:
+ """Get real query analytics from the system."""
+ try:
+ analytics = {
+ "total_queries": self.query_count,
+ "documents_in_kb": self.total_documents,
+ "chunks_available": self.total_chunks,
+ "last_updated": datetime.now().strftime("%H:%M:%S"),
+ }
+
+ # Get analytics from query processor if available
+ if hasattr(self.rag_system, "query_processor") and hasattr(
+ self.rag_system.query_processor, "get_query_analytics"
+ ):
+ processor_analytics = (
+ self.rag_system.query_processor.get_query_analytics()
+ )
+ analytics.update(processor_analytics)
+
+ # Calculate additional metrics
+ if self.query_count > 0:
+ analytics["avg_results_per_query"] = round(
+ self.total_chunks / max(self.query_count, 1), 2
+ )
+ analytics["system_utilization"] = (
+ "Active" if self.query_count > 5 else "Light"
+ )
+ else:
+ analytics["avg_results_per_query"] = 0
+ analytics["system_utilization"] = "Idle"
+
+ analytics["status"] = "๐ข Analytics active"
+ return analytics
+
+ except Exception as e:
+ return {"error": f"Analytics unavailable: {str(e)}", "status": "โ Error"}
+
+ def _get_real_system_metrics(self) -> Dict[str, Any]:
+ """Get real system metrics with embedding model info."""
+ try:
+ # Get embedding model information
+ embedding_info = self._get_embedding_model_info()
+
+ metrics = {
+ "documents_processed": self.total_documents,
+ "chunks_stored": self.total_chunks,
+ "queries_processed": self.query_count,
+ "last_updated": datetime.now().strftime("%H:%M:%S"),
+ "embedding_model": embedding_info.get("model_name", "Unknown"),
+ "embedding_status": embedding_info.get("status", "Unknown"),
+ "embedding_provider": embedding_info.get("provider", "Unknown"),
+ "embedding_dimension": embedding_info.get("dimension", "Unknown"),
+ }
+
+ # Get system status
+ if hasattr(self.rag_system, "get_system_status"):
+ system_status = self.rag_system.get_system_status()
+ metrics.update(
+ {
+ "overall_health": system_status.get(
+ "overall_status", "unknown"
+ ),
+ "components_healthy": sum(
+ system_status.get("components", {}).values()
+ ),
+ "total_components": len(system_status.get("components", {})),
+ }
+ )
+
+ # Add component status with embedding model details
+ components = []
+ if hasattr(self.rag_system, "embedding_generator"):
+ components.append(
+ f"Embedding Generator ({embedding_info.get('model_name', 'Unknown')})"
+ )
+ if hasattr(self.rag_system, "vector_db"):
+ components.append("Vector Database")
+ if hasattr(self.rag_system, "query_processor"):
+ components.append("Query Processor")
+
+ metrics["active_components"] = components
+ metrics["status"] = "๐ข System healthy"
+ return metrics
+
+ except Exception as e:
+ return {
+ "error": f"System metrics unavailable: {str(e)}",
+ "status": "โ Error",
+ }
+
+ def _get_real_performance_metrics(self) -> Dict[str, Any]:
+ """Get real performance metrics."""
+ try:
+ # Basic performance tracking
+ metrics = {
+ "total_processing_time": "N/A",
+ "avg_query_response": "N/A",
+ "system_load": "Normal",
+ "last_updated": datetime.now().strftime("%H:%M:%S"),
+ }
+
+ # If we have query history, calculate averages
+ if hasattr(self.rag_system, "query_processor") and hasattr(
+ self.rag_system.query_processor, "query_history"
+ ):
+ history = self.rag_system.query_processor.query_history
+ if history:
+ # Calculate average processing time if available
+ processing_times = [
+ q.get("processing_time", 0)
+ for q in history
+ if "processing_time" in q
+ ]
+ if processing_times:
+ avg_time = sum(processing_times) / len(processing_times)
+ metrics["avg_query_response"] = f"{avg_time:.2f}s"
+
+ metrics["queries_per_minute"] = (
+ f"{self.query_count / max(1, 1):.1f}" # Rough estimate
+ )
+ metrics["throughput"] = "Good" if self.query_count > 0 else "Idle"
+ metrics["status"] = "โก Performance tracking active"
+ return metrics
+
+ except Exception as e:
+ return {
+ "error": f"Performance metrics unavailable: {str(e)}",
+ "status": "โ Error",
+ }
+
+ def _get_real_usage_stats(self) -> Dict[str, Any]:
+ """Get real usage statistics."""
+ try:
+ stats = {
+ "documents_uploaded": self.total_documents,
+ "urls_processed": 0, # Would need to track this separately
+ "successful_queries": self.query_count, # Assuming all successful for now
+ "failed_queries": 0, # Would need error tracking
+ "total_chunks_created": self.total_chunks,
+ "last_updated": datetime.now().strftime("%H:%M:%S"),
+ }
+
+ # Calculate usage patterns
+ if self.query_count > 0:
+ stats["most_active_feature"] = "Query Processing"
+ stats["usage_trend"] = "Growing" if self.query_count > 5 else "Starting"
+ else:
+ stats["most_active_feature"] = "Document Upload"
+ stats["usage_trend"] = "Initial Setup"
+
+ stats["status"] = "๐ Usage tracking active"
+ return stats
+
+ except Exception as e:
+ return {"error": f"Usage stats unavailable: {str(e)}", "status": "โ Error"}
+
+ def _get_real_query_history(self) -> List[List[str]]:
+ """Get real query history."""
+ try:
+ history_data = []
+
+ # Get query history from query processor if available
+ if hasattr(self.rag_system, "query_processor") and hasattr(
+ self.rag_system.query_processor, "query_history"
+ ):
+ history = self.rag_system.query_processor.query_history[
+ -10:
+ ] # Last 10 queries
+
+ for query_item in history:
+ query_text = (
+ query_item.get("query", "Unknown")[:50] + "..."
+ if len(query_item.get("query", "")) > 50
+ else query_item.get("query", "Unknown")
+ )
+ result_count = query_item.get("result_count", 0)
+ confidence = "N/A" # Would need to store this
+ processing_time = (
+ f"{query_item.get('processing_time', 0):.2f}s"
+ if "processing_time" in query_item
+ else "N/A"
+ )
+ timestamp = (
+ query_item.get("timestamp", datetime.now()).strftime("%H:%M:%S")
+ if "timestamp" in query_item
+ else "Unknown"
+ )
+
+ history_data.append(
+ [
+ query_text,
+ str(result_count),
+ confidence,
+ processing_time,
+ timestamp,
+ ]
+ )
+
+ # If no real history, show helpful placeholder
+ if not history_data:
+ history_data = [
+ ["No queries yet", "0", "0.0", "0.0s", "Ask your first question!"],
+ [
+ "Upload documents to get started",
+ "0",
+ "0.0",
+ "0.0s",
+ "Build your knowledge base",
+ ],
+ [
+ "Try asking about your documents",
+ "0",
+ "0.0",
+ "0.0s",
+ "Get intelligent answers",
+ ],
+ ]
+
+ return history_data
+
+ except Exception as e:
+ return [["Error loading history", "0", "0.0", "0.0s", str(e)]]
+
+ def _create_settings_tab(self):
+ """Create the comprehensive settings management tab."""
+ with gr.Column():
+ gr.Markdown("### โ๏ธ Environment Variables Settings")
+ gr.Markdown(
+ "Configure API keys and system settings with secure storage options"
+ )
+
+ # ๐ Refresh and action buttons
+ with gr.Row():
+ refresh_settings_btn = gr.Button("๐ Refresh", variant="secondary")
+ load_env_btn = gr.Button("๐ Load from .env", variant="secondary")
+ clear_cache_btn = gr.Button("๐๏ธ Clear Cache", variant="secondary")
+ export_btn = gr.Button("๐ค Export Settings", variant="secondary")
+
+ # ๐ Settings status display
+ settings_status = gr.Textbox(
+ label="๐ Status",
+ value="Ready to configure settings",
+ interactive=False,
+ container=False,
+ )
+
+ # ๐ง Main settings interface
+ with gr.Tabs():
+ # API Keys Tab
+ with gr.TabItem("๐ API Keys"):
+ api_keys_components = self._create_api_keys_section()
+
+ # System Settings Tab
+ with gr.TabItem("๐ ๏ธ System Settings"):
+ system_settings_components = self._create_system_settings_section()
+
+ # Storage Options Tab
+ with gr.TabItem("๐พ Storage & Export"):
+ storage_components = self._create_storage_section()
+
+ # ๐ Settings overview
+ with gr.Accordion("๐ Current Settings Overview", open=False):
+ settings_overview = gr.JSON(
+ label="Environment Variables Status", value={}
+ )
+
+ # Event handlers for main buttons
+ refresh_settings_btn.click(
+ fn=self._refresh_all_settings,
+ outputs=[
+ settings_status,
+ settings_overview,
+ *api_keys_components.values(),
+ *system_settings_components.values(),
+ ],
+ )
+
+ load_env_btn.click(
+ fn=self._load_from_env_file,
+ outputs=[settings_status, settings_overview],
+ )
+
+ clear_cache_btn.click(
+ fn=self._clear_settings_cache,
+ outputs=[settings_status, settings_overview],
+ )
+
+ export_btn.click(fn=self._export_settings, outputs=[settings_status])
+
+ return {
+ "settings_status": settings_status,
+ "settings_overview": settings_overview,
+ **api_keys_components,
+ **system_settings_components,
+ **storage_components,
+ }
+
+ def _create_api_keys_section(self):
+ """Create the API keys configuration section."""
+ components = {}
+
+ with gr.Column():
+ gr.Markdown("#### ๐ API Keys Configuration")
+ gr.Markdown(
+ "Configure your API keys for AI services. Keys are masked for security."
+ )
+
+ # Gemini API Key
+ with gr.Group():
+ gr.Markdown("**๐ค Google Gemini API** (Required)")
+ with gr.Row():
+ gemini_key = gr.Textbox(
+ label="GEMINI_API_KEY",
+ placeholder="AIzaSy...",
+ type="password",
+ info="Required for embeddings and LLM functionality",
+ )
+ gemini_test_btn = gr.Button(
+ "๐งช Test", variant="secondary", size="sm"
+ )
+
+ gemini_status = gr.Textbox(
+ label="Status",
+ value="Not configured",
+ interactive=False,
+ container=False,
+ )
+
+ with gr.Row():
+ gemini_cache_btn = gr.Button(
+ "๐พ Save to Cache", variant="primary", size="sm"
+ )
+ gemini_env_btn = gr.Button(
+ "๐ Save to .env", variant="primary", size="sm"
+ )
+
+ gr.Markdown(
+ "๐ก [Get your Gemini API key](https://aistudio.google.com/)"
+ )
+
+ # Pinecone API Key
+ with gr.Group():
+ gr.Markdown("**๐ฒ Pinecone API (Required)**")
+ with gr.Row():
+ pinecone_key = gr.Textbox(
+ label="PINECONE_API_KEY",
+ placeholder="pc-...",
+ type="password",
+ info="For vector database storage",
+ )
+ pinecone_test_btn = gr.Button(
+ "๐งช Test", variant="secondary", size="sm"
+ )
+
+ pinecone_status = gr.Textbox(
+ label="Status",
+ value="Not configured",
+ interactive=False,
+ container=False,
+ )
+
+ with gr.Row():
+ pinecone_cache_btn = gr.Button(
+ "๐พ Save to Cache", variant="primary", size="sm"
+ )
+ pinecone_env_btn = gr.Button(
+ "๐ Save to .env", variant="primary", size="sm"
+ )
+
+ gr.Markdown("๐ก [Get your Pinecone API key](https://www.pinecone.io/)")
+
+ # OpenAI API Key
+ with gr.Group():
+ gr.Markdown("**๐ฅ OpenAI API** (Optional)")
+ with gr.Row():
+ openai_key = gr.Textbox(
+ label="OPENAI_API_KEY",
+ placeholder="sk-...",
+ type="password",
+ info="For alternative LLM functionality",
+ )
+ openai_test_btn = gr.Button(
+ "๐งช Test", variant="secondary", size="sm"
+ )
+
+ openai_status = gr.Textbox(
+ label="Status",
+ value="Not configured",
+ interactive=False,
+ container=False,
+ )
+
+ with gr.Row():
+ openai_cache_btn = gr.Button(
+ "๐พ Save to Cache", variant="primary", size="sm"
+ )
+ openai_env_btn = gr.Button(
+ "๐ Save to .env", variant="primary", size="sm"
+ )
+
+ gr.Markdown(
+ "๐ก [Get your OpenAI API key](https://platform.openai.com/api-keys)"
+ )
+
+ # Tavily API Key
+ with gr.Group():
+ gr.Markdown("**๐ Tavily API** (Optional - for Live Search)")
+ with gr.Row():
+ tavily_key = gr.Textbox(
+ label="TAVILY_API_KEY",
+ placeholder="tvly-...",
+ type="password",
+ info="For real-time web search functionality",
+ )
+ tavily_test_btn = gr.Button(
+ "๐งช Test", variant="secondary", size="sm"
+ )
+
+ tavily_status = gr.Textbox(
+ label="Status",
+ value="Not configured",
+ interactive=False,
+ container=False,
+ )
+
+ with gr.Row():
+ tavily_cache_btn = gr.Button(
+ "๐พ Save to Cache", variant="primary", size="sm"
+ )
+ tavily_env_btn = gr.Button(
+ "๐ Save to .env", variant="primary", size="sm"
+ )
+
+ gr.Markdown(
+ "๐ก [Get your Tavily API key](https://app.tavily.com/sign-in)"
+ )
+
+ # Store components for event handling
+ components.update(
+ {
+ "gemini_key": gemini_key,
+ "gemini_status": gemini_status,
+ "pinecone_key": pinecone_key,
+ "pinecone_status": pinecone_status,
+ "openai_key": openai_key,
+ "openai_status": openai_status,
+ "tavily_key": tavily_key,
+ "tavily_status": tavily_status,
+ }
+ )
+
+ # Event handlers for API keys
+ gemini_test_btn.click(
+ fn=lambda: self._test_api_connection("GEMINI_API_KEY"),
+ outputs=[gemini_status],
+ )
+
+ gemini_cache_btn.click(
+ fn=lambda key: self._save_setting("GEMINI_API_KEY", key, "cache"),
+ inputs=[gemini_key],
+ outputs=[gemini_status],
+ )
+
+ gemini_env_btn.click(
+ fn=lambda key: self._save_setting("GEMINI_API_KEY", key, "env_file"),
+ inputs=[gemini_key],
+ outputs=[gemini_status],
+ )
+
+ pinecone_test_btn.click(
+ fn=lambda: self._test_api_connection("PINECONE_API_KEY"),
+ outputs=[pinecone_status],
+ )
+
+ pinecone_cache_btn.click(
+ fn=lambda key: self._save_setting("PINECONE_API_KEY", key, "cache"),
+ inputs=[pinecone_key],
+ outputs=[pinecone_status],
+ )
+
+ pinecone_env_btn.click(
+ fn=lambda key: self._save_setting("PINECONE_API_KEY", key, "env_file"),
+ inputs=[pinecone_key],
+ outputs=[pinecone_status],
+ )
+
+ openai_test_btn.click(
+ fn=lambda: self._test_api_connection("OPENAI_API_KEY"),
+ outputs=[openai_status],
+ )
+
+ openai_cache_btn.click(
+ fn=lambda key: self._save_setting("OPENAI_API_KEY", key, "cache"),
+ inputs=[openai_key],
+ outputs=[openai_status],
+ )
+
+ openai_env_btn.click(
+ fn=lambda key: self._save_setting("OPENAI_API_KEY", key, "env_file"),
+ inputs=[openai_key],
+ outputs=[openai_status],
+ )
+
+ tavily_test_btn.click(
+ fn=lambda: self._test_api_connection("TAVILY_API_KEY"),
+ outputs=[tavily_status],
+ )
+
+ tavily_cache_btn.click(
+ fn=lambda key: self._save_setting("TAVILY_API_KEY", key, "cache"),
+ inputs=[tavily_key],
+ outputs=[tavily_status],
+ )
+
+ tavily_env_btn.click(
+ fn=lambda key: self._save_setting("TAVILY_API_KEY", key, "env_file"),
+ inputs=[tavily_key],
+ outputs=[tavily_status],
+ )
+
+ return components
+
+ def _create_system_settings_section(self):
+ """Create the system settings configuration section."""
+ components = {}
+
+ with gr.Column():
+ gr.Markdown("#### ๐ ๏ธ System Configuration")
+ gr.Markdown("Configure system-level settings and preferences")
+
+ # Pinecone Environment
+ with gr.Group():
+ gr.Markdown("**๐ Pinecone Environment**")
+ pinecone_env = gr.Dropdown(
+ label="PINECONE_ENVIRONMENT",
+ choices=[
+ "us-east-1",
+ "us-west1-gcp",
+ "eu-west1-gcp",
+ "asia-southeast1-gcp",
+ ],
+ value="us-east-1",
+ info="Pinecone server region",
+ )
+
+ with gr.Row():
+ pinecone_env_cache_btn = gr.Button(
+ "๐พ Save to Cache", variant="primary", size="sm"
+ )
+ pinecone_env_file_btn = gr.Button(
+ "๐ Save to .env", variant="primary", size="sm"
+ )
+
+ # Pinecone Index Name
+ with gr.Group():
+ gr.Markdown("**๐ Pinecone Index Name**")
+ pinecone_index = gr.Textbox(
+ label="PINECONE_INDEX_NAME",
+ value="rag-ai-index",
+ placeholder="rag-ai-index",
+ info="Name of your Pinecone index",
+ )
+
+ with gr.Row():
+ pinecone_index_cache_btn = gr.Button(
+ "๐พ Save to Cache", variant="primary", size="sm"
+ )
+ pinecone_index_file_btn = gr.Button(
+ "๐ Save to .env", variant="primary", size="sm"
+ )
+
+ # Gradio Share
+ with gr.Group():
+ gr.Markdown("**๐ Gradio Public Sharing**")
+ gradio_share = gr.Dropdown(
+ label="GRADIO_SHARE",
+ choices=["false", "true"],
+ value="false",
+ info="Enable public sharing of the interface",
+ )
+
+ with gr.Row():
+ gradio_share_cache_btn = gr.Button(
+ "๐พ Save to Cache", variant="primary", size="sm"
+ )
+ gradio_share_file_btn = gr.Button(
+ "๐ Save to .env", variant="primary", size="sm"
+ )
+
+ # Port Configuration
+ with gr.Group():
+ gr.Markdown("**๐ Server Port**")
+ port_setting = gr.Number(
+ label="PORT",
+ value=7860,
+ minimum=1000,
+ maximum=65535,
+ info="Server port number (requires restart)",
+ )
+
+ with gr.Row():
+ port_cache_btn = gr.Button(
+ "๐พ Save to Cache", variant="primary", size="sm"
+ )
+ port_file_btn = gr.Button(
+ "๐ Save to .env", variant="primary", size="sm"
+ )
+
+ # System settings status
+ system_status = gr.Textbox(
+ label="System Settings Status",
+ value="Ready",
+ interactive=False,
+ container=False,
+ )
+
+ components.update(
+ {
+ "pinecone_env": pinecone_env,
+ "pinecone_index": pinecone_index,
+ "gradio_share": gradio_share,
+ "port_setting": port_setting,
+ "system_status": system_status,
+ }
+ )
+
+ # Event handlers for system settings
+ pinecone_env_cache_btn.click(
+ fn=lambda val: self._save_setting("PINECONE_ENVIRONMENT", val, "cache"),
+ inputs=[pinecone_env],
+ outputs=[system_status],
+ )
+
+ pinecone_env_file_btn.click(
+ fn=lambda val: self._save_setting("PINECONE_ENVIRONMENT", val, "env_file"),
+ inputs=[pinecone_env],
+ outputs=[system_status],
+ )
+
+ pinecone_index_cache_btn.click(
+ fn=lambda val: self._save_setting("PINECONE_INDEX_NAME", val, "cache"),
+ inputs=[pinecone_index],
+ outputs=[system_status],
+ )
+
+ pinecone_index_file_btn.click(
+ fn=lambda val: self._save_setting("PINECONE_INDEX_NAME", val, "env_file"),
+ inputs=[pinecone_index],
+ outputs=[system_status],
+ )
+
+ gradio_share_cache_btn.click(
+ fn=lambda val: self._save_setting("GRADIO_SHARE", val, "cache"),
+ inputs=[gradio_share],
+ outputs=[system_status],
+ )
+
+ gradio_share_file_btn.click(
+ fn=lambda val: self._save_setting("GRADIO_SHARE", val, "env_file"),
+ inputs=[gradio_share],
+ outputs=[system_status],
+ )
+
+ port_cache_btn.click(
+ fn=lambda val: self._save_setting("PORT", str(int(val)), "cache"),
+ inputs=[port_setting],
+ outputs=[system_status],
+ )
+
+ port_file_btn.click(
+ fn=lambda val: self._save_setting("PORT", str(int(val)), "env_file"),
+ inputs=[port_setting],
+ outputs=[system_status],
+ )
+
+ return components
+
+ def _create_storage_section(self):
+ """Create the storage and export section."""
+ components = {}
+
+ with gr.Column():
+ gr.Markdown("#### ๐พ Storage & Export Options")
+ gr.Markdown("Manage how your settings are stored and exported")
+
+ with gr.Row():
+ with gr.Column():
+ gr.Markdown("**๐พ Cache Storage**")
+ gr.Markdown("โข Temporary storage in memory")
+ gr.Markdown("โข Lost when application restarts")
+ gr.Markdown("โข Good for testing configurations")
+
+ with gr.Column():
+ gr.Markdown("**๐ .env File Storage**")
+ gr.Markdown("โข Persistent storage in .env file")
+ gr.Markdown("โข Survives application restarts")
+ gr.Markdown("โข Recommended for production use")
+
+ # Export options
+ with gr.Group():
+ gr.Markdown("**๐ค Export Settings**")
+
+ with gr.Row():
+ include_sensitive = gr.Checkbox(
+ label="Include API Keys (masked)",
+ value=False,
+ info="Include API keys in export (they will be masked)",
+ )
+ export_format = gr.Dropdown(
+ label="Export Format",
+ choices=["JSON", "ENV"],
+ value="JSON",
+ info="Choose export format",
+ )
+
+ export_output = gr.Textbox(
+ label="Export Output",
+ lines=10,
+ interactive=False,
+ placeholder="Exported settings will appear here...",
+ )
+
+ export_settings_btn = gr.Button("๐ค Generate Export", variant="primary")
+
+ # Storage status
+ storage_status = gr.Textbox(
+ label="Storage Status",
+ value="Ready",
+ interactive=False,
+ container=False,
+ )
+
+ components.update(
+ {
+ "include_sensitive": include_sensitive,
+ "export_format": export_format,
+ "export_output": export_output,
+ "storage_status": storage_status,
+ }
+ )
+
+ # Export event handler
+ export_settings_btn.click(
+ fn=self._generate_export,
+ inputs=[include_sensitive, export_format],
+ outputs=[export_output, storage_status],
+ )
+
+ return components
+
+ def _create_health_tab(self):
+ """Create the system health monitoring tab."""
+ with gr.Column():
+ gr.Markdown("### System Health")
+
+ with gr.Row():
+ health_check_btn = gr.Button("Run Health Check", variant="primary")
+ restart_btn = gr.Button("Restart Services", variant="secondary")
+
+ # System status
+ system_status = gr.JSON(
+ label="System Status",
+ value={},
+ )
+
+ # Component status
+ component_status = gr.Dataframe(
+ headers=["Component", "Status", "Details"],
+ datatype=["str", "str", "str"],
+ label="Component Status",
+ interactive=False,
+ )
+
+ # Logs
+ system_logs = gr.Textbox(
+ label=" System Logs",
+ lines=10,
+ interactive=False,
+ placeholder="System logs will appear here...",
+ )
+
+ # Event handlers
+ health_check_btn.click(
+ fn=self._run_health_check,
+ outputs=[system_status, component_status, system_logs],
+ )
+
+ return {
+ "system_status": system_status,
+ "component_status": component_status,
+ "system_logs": system_logs,
+ }
+
+ def _get_custom_css(self) -> str:
+ """๐จ Get modern full-width custom CSS for the interface."""
+ return """
+ /* ๐ Global Container - Full Width */
+ .gradio-container {
+ max-width: 100% !important;
+ width: 100% !important;
+ margin: 0 !important;
+ padding: 0 20px !important;
+ }
+
+ /* ๐จ Modern Color Scheme */
+ :root {
+ --primary-gradient: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
+ --secondary-gradient: linear-gradient(135deg, #f093fb 0%, #f5576c 100%);
+ --success-gradient: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%);
+ --warning-gradient: linear-gradient(135deg, #43e97b 0%, #38f9d7 100%);
+ --dark-bg: #1a1a2e;
+ --dark-card: #16213e;
+ --light-bg: #f8fafc;
+ --light-card: #ffffff;
+ --text-primary: #2d3748;
+ --text-secondary: #718096;
+ --border-color: #e2e8f0;
+ --red: #f55b75;
+ --shadow-sm: 0 1px 3px 0 rgba(0, 0, 0, 0.1);
+ --shadow-md: 0 4px 6px -1px rgba(0, 0, 0, 0.1);
+ --shadow-lg: 0 10px 15px -3px rgba(0, 0, 0, 0.1);
+ }
+
+ /* ๐ Dark Theme Support */
+ .dark {
+ --text-primary: #f7fafc;
+ --text-secondary: #cbd5e0;
+ --border-color: #4a5568;
+ }
+
+ /* ๐ฑ Full Width Layout */
+ .main-container {
+ width: 100% !important;
+ max-width: 100% !important;
+ }
+
+ /* ๐ฏ Header Styling */
+ .app-header {
+ background: var(--primary-gradient);
+ color: white;
+ padding: 2rem;
+ border-radius: 0 0 20px 20px;
+ margin-bottom: 2rem;
+ box-shadow: var(--shadow-lg);
+ }
+
+ .app-title {
+ font-size: 2.5rem;
+ font-weight: 700;
+ margin-bottom: 0.5rem;
+ text-shadow: 0 2px 4px rgba(0,0,0,0.3);
+ }
+
+ .app-description {
+ font-size: 1.1rem;
+ opacity: 0.9;
+ margin-bottom: 0;
+ }
+
+ /* ๐ Status Bar Enhancement */
+ .status-bar {
+ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
+ color: white;
+ padding: 1rem 2rem;
+ border-radius: 15px;
+ margin-bottom: 2rem;
+ box-shadow: var(--shadow-md);
+ display: grid;
+ grid-template-columns: 1fr 1fr;
+ gap: 2rem;
+ }
+
+ .status-item {
+ display: flex;
+ align-items: center;
+ gap: 0.5rem;
+ }
+
+ .status-icon {
+ font-size: 1.2rem;
+ }
+
+ /* ๐จ Tab Styling */
+ .tab-nav {
+ background: var(--dark-bg);
+ border-radius: 15px;
+ padding: 0.5rem;
+ margin-bottom: 2rem;
+ box-shadow: var(--shadow-sm);
+ border: 1px solid var(--border-color);
+ }
+
+ .tab-item {
+ border-radius: 10px !important;
+ padding: 1rem 1.5rem !important;
+ font-weight: 600 !important;
+ transition: all 0.3s ease !important;
+ border: none !important;
+ }
+
+
+ .tab-item.selected {
+ background: var(--primary-gradient) !important;
+ color: white !important;
+ box-shadow: var(--shadow-md);
+ }
+
+ /* ๐ฏ Card Components */
+ .metric-card {
+ background: var(--dark-bg);
+ border: 1px solid var(--border-color);
+ border-radius: 15px;
+ padding: 1.5rem;
+ margin: 1rem 0;
+ box-shadow: var(--shadow-sm);
+ transition: all 0.3s ease;
+ }
+
+ .metric-card:hover {
+ # transform: translateY(-5px);
+ box-shadow: var(--shadow-lg);
+ # border-color: #667eea;
+ }
+
+ .feature-card {
+ background: var(--dark-bg);
+ border: 1px solid var(--border-color);
+ border-radius: 20px;
+ padding: 2rem;
+ margin: 1rem 0;
+ box-shadow: var(--shadow-md);
+ transition: all 0.3s ease;
+ position: relative;
+ overflow: hidden;
+ }
+
+
+
+ .feature-card:hover {
+ transform: translateY(-8px);
+ box-shadow: var(--shadow-lg);
+ }
+
+ /* ๐จ Button Enhancements */
+ .btn-primary {
+ background: var(--primary-gradient) !important;
+ border: none !important;
+ border-radius: 12px !important;
+ padding: 0.75rem 2rem !important;
+ font-weight: 600 !important;
+ font-size: 1rem !important;
+ transition: all 0.3s ease !important;
+ box-shadow: var(--shadow-sm) !important;
+ }
+
+ .btn-primary:hover {
+ transform: translateY(-2px) !important;
+ box-shadow: var(--shadow-lg) !important;
+ }
+
+ .btn-secondary {
+ background: var(--red) !important;
+ border: none !important;
+ border-radius: 12px !important;
+ padding: 0.75rem 1.5rem !important;
+ font-weight: 600 !important;
+ transition: all 0.3s ease !important;
+ }
+
+ .btn-success {
+ background: var(--success-gradient) !important;
+ border: none !important;
+ border-radius: 12px !important;
+ padding: 0.75rem 1.5rem !important;
+ font-weight: 600 !important;
+ }
+
+ .btn-warning {
+ background: var(--warning-gradient) !important;
+ border: none !important;
+ border-radius: 12px !important;
+ padding: 0.75rem 1.5rem !important;
+ font-weight: 600 !important;
+ }
+
+ /* ๐ Input Field Styling */
+ .input-field {
+ border: 2px solid var(--border-color) !important;
+ border-radius: 12px !important;
+ padding: 1rem !important;
+ font-size: 1rem !important;
+ transition: all 0.3s ease !important;
+ background: var(--dark-bg) !important;
+ }
+
+ .input-field:focus {
+ border-color: #667eea !important;
+ box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1) !important;
+ outline: none !important;
+ }
+
+ /* ๐ Progress Indicators */
+ .progress-bar {
+ background: var(--primary-gradient);
+ height: 8px;
+ border-radius: 4px;
+ transition: width 0.3s ease;
+ }
+
+ .progress-container {
+ background: var(--border-color);
+ height: 8px;
+ border-radius: 4px;
+ overflow: hidden;
+ }
+
+ /* ๐ฏ Grid Layouts */
+ .grid-2 {
+ display: grid;
+ grid-template-columns: 1fr 1fr;
+ gap: 2rem;
+ }
+
+ .grid-3 {
+ display: grid;
+ grid-template-columns: repeat(3, 1fr);
+ gap: 1.5rem;
+ }
+
+ .grid-4 {
+ display: grid;
+ grid-template-columns: repeat(4, 1fr);
+ gap: 1rem;
+ }
+
+ /* ๐ฑ Responsive Design */
+ @media (max-width: 1200px) {
+ .grid-4 { grid-template-columns: repeat(2, 1fr); }
+ .grid-3 { grid-template-columns: repeat(2, 1fr); }
+ }
+
+ @media (max-width: 768px) {
+ .gradio-container {
+ padding: 0 10px !important;
+ }
+
+ .grid-2, .grid-3, .grid-4 {
+ grid-template-columns: 1fr;
+ gap: 1rem;
+ }
+
+ .status-bar {
+ grid-template-columns: 1fr;
+ gap: 1rem;
+ padding: 1rem;
+ }
+
+ .app-title {
+ font-size: 2rem;
+ }
+
+ .feature-card {
+ padding: 1.5rem;
+ }
+ }
+
+ /* ๐ Animation Classes */
+ .fade-in {
+ animation: fadeIn 0.5s ease-in;
+ }
+
+ .slide-up {
+ animation: slideUp 0.6s ease-out;
+ }
+
+ @keyframes fadeIn {
+ from { opacity: 0; }
+ to { opacity: 1; }
+ }
+
+ @keyframes slideUp {
+ from {
+ opacity: 0;
+ transform: translateY(30px);
+ }
+ to {
+ opacity: 1;
+ transform: translateY(0);
+ }
+ }
+
+ /* ๐จ Accent Colors */
+ .accent-blue { border-left: 4px solid #3b82f6; }
+ .accent-green { border-left: 4px solid #10b981; }
+ .accent-purple { border-left: 4px solid #8b5cf6; }
+ .accent-orange { border-left: 4px solid #f59e0b; }
+ .accent-red { border-left: 4px solid #ef4444; }
+
+ /* ๐ Search Enhancement */
+ .search-container {
+ position: relative;
+ margin-bottom: 2rem;
+ }
+
+ .search-input {
+ width: 100%;
+ padding: 1rem 1rem 1rem 3rem;
+ border: 2px solid var(--border-color);
+ border-radius: 25px;
+ font-size: 1.1rem;
+ transition: all 0.3s ease;
+ }
+
+ .search-input:focus {
+ border-color: #667eea;
+ box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1);
+ }
+
+ .search-icon {
+ position: absolute;
+ left: 1rem;
+ top: 50%;
+ transform: translateY(-50%);
+ color: var(--text-secondary);
+ }
+
+ /* ๐ Analytics Dashboard */
+ .analytics-grid {
+ display: grid;
+ grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
+ gap: 1.5rem;
+ margin: 2rem 0;
+ }
+
+ .stat-card {
+ background: var(--dark-bg);
+ border-radius: 15px;
+ padding: 1.5rem;
+ box-shadow: var(--shadow-sm);
+ border: 1px solid var(--border-color);
+ transition: all 0.3s ease;
+ }
+
+ .stat-card:hover {
+ transform: translateY(-3px);
+ box-shadow: var(--shadow-md);
+ }
+
+ .stat-value {
+ font-size: 2rem;
+ font-weight: 700;
+ color: #667eea;
+ margin-bottom: 0.5rem;
+ }
+
+ .stat-label {
+ color: var(--text-secondary);
+ font-size: 0.9rem;
+ text-transform: uppercase;
+ letter-spacing: 0.5px;
+ }
+
+
+ /* ๐ Loading States */
+ .loading {
+ position: relative;
+ overflow: hidden;
+ }
+
+ .loading::after {
+ content: '';
+ position: absolute;
+ top: 0;
+ left: -100%;
+ width: 100%;
+ height: 100%;
+ background: linear-gradient(90deg, transparent, rgba(255,255,255,0.4), transparent);
+ animation: loading 1.5s infinite;
+ }
+
+ @keyframes loading {
+ 0% { left: -100%; }
+ 100% { left: 100%; }
+ }
+
+ /* ๐จ Custom Scrollbar */
+ ::-webkit-scrollbar {
+ width: 8px;
+ }
+
+ ::-webkit-scrollbar-track {
+ background: var(--light-bg);
+ }
+
+ ::-webkit-scrollbar-thumb {
+ background: var(--primary-gradient);
+ border-radius: 4px;
+ }
+
+ ::-webkit-scrollbar-thumb:hover {
+ background: #5a67d8;
+ }
+ """
+
+ # ๐ง Settings Management Methods
+
+ def _refresh_all_settings(self):
+ """Refresh all settings and return updated values."""
+ try:
+ settings = self.settings_manager.get_current_settings()
+
+ # Create overview for display
+ overview = {}
+ for var_name, config in settings.items():
+ overview[var_name] = {
+ "value": config["value"] if config["is_set"] else "Not set",
+ "source": config["source"],
+ "status": (
+ "โ
Valid"
+ if config["is_valid"]
+ else "โ Invalid" if config["is_set"] else "โ ๏ธ Not set"
+ ),
+ "required": config["is_required"],
+ }
+
+ # Return status and all component updates
+ status_msg = "๐ Settings refreshed successfully"
+
+ # Get current values for form fields
+ gemini_val = settings.get("GEMINI_API_KEY", {}).get("raw_value", "")
+ pinecone_val = settings.get("PINECONE_API_KEY", {}).get("raw_value", "")
+ openai_val = settings.get("OPENAI_API_KEY", {}).get("raw_value", "")
+ tavily_val = settings.get("TAVILY_API_KEY", {}).get("raw_value", "")
+
+ pinecone_env_val = settings.get("PINECONE_ENVIRONMENT", {}).get(
+ "raw_value", "us-east-1"
+ )
+ pinecone_index_val = settings.get("PINECONE_INDEX_NAME", {}).get(
+ "raw_value", "rag-ai-index"
+ )
+ gradio_share_val = settings.get("GRADIO_SHARE", {}).get(
+ "raw_value", "false"
+ )
+ port_val = int(settings.get("PORT", {}).get("raw_value", "7860"))
+
+ return (
+ status_msg,
+ overview,
+ gemini_val,
+ settings.get("GEMINI_API_KEY", {}).get("value", "Not configured"),
+ pinecone_val,
+ settings.get("PINECONE_API_KEY", {}).get("value", "Not configured"),
+ openai_val,
+ settings.get("OPENAI_API_KEY", {}).get("value", "Not configured"),
+ tavily_val,
+ settings.get("TAVILY_API_KEY", {}).get("value", "Not configured"),
+ pinecone_env_val,
+ pinecone_index_val,
+ gradio_share_val,
+ port_val,
+ "โ
Settings loaded",
+ )
+
+ except Exception as e:
+ self._log_safe(f" Error refreshing settings: {e}", "error")
+ return (
+ f" Error refreshing settings: {str(e)}",
+ {},
+ "",
+ "Error loading",
+ "",
+ "Error loading",
+ "",
+ "Error loading",
+ "",
+ "Error loading",
+ "us-east-1",
+ "rag-ai-index",
+ "false",
+ 7860,
+ "โ Error loading",
+ )
+
+ def _save_setting(self, var_name: str, value: str, storage_type: str) -> str:
+ """Save a setting with the specified storage type."""
+ try:
+ result = self.settings_manager.update_setting(var_name, value, storage_type)
+
+ if result["success"]:
+ self._log_safe(f" Saved {var_name} to {storage_type}")
+ return result["status"]
+ else:
+ self._log_safe(
+ f" Failed to save {var_name}: {result.get('error', 'Unknown error')}",
+ "error",
+ )
+ return result["status"]
+
+ except Exception as e:
+ self._log_safe(f" Error saving {var_name}: {e}", "error")
+ return f"โ Error: {str(e)}"
+
+ def _test_api_connection(self, var_name: str) -> str:
+ """Test API connection for the specified variable with optimized performance."""
+ try:
+ # Show testing status immediately
+ status_message = f"๐ Testing {var_name} connection..."
+ self._log_safe(status_message)
+
+ # For Gemini, check if we've tested recently (use cached result)
+ if var_name == "GEMINI_API_KEY" and hasattr(
+ self.settings_manager, "_gemini_last_test_time"
+ ):
+ current_time = time.time()
+ if (
+ self.settings_manager._gemini_last_test_time
+ and current_time - self.settings_manager._gemini_last_test_time < 10
+ ):
+
+ self._log_safe(
+ f"โ
Using cached {var_name} test result (tested recently)"
+ )
+ return "โ
Gemini API connected (cached result)"
+
+ # Perform the actual test
+ result = self.settings_manager.test_connection(var_name)
+
+ if result["success"]:
+ self._log_safe(f"โ
{var_name} connection test successful")
+ else:
+ self._log_safe(
+ f" {var_name} connection test failed: {result.get('error', 'Unknown error')}",
+ "warning",
+ )
+
+ return result["status"]
+
+ except Exception as e:
+ self._log_safe(f" Error testing {var_name}: {e}", "error")
+ return f" Test error: {str(e)}"
+
+ def _load_from_env_file(self) -> Tuple[str, Dict[str, Any]]:
+ """Load settings from .env file."""
+ try:
+ result = self.settings_manager.load_from_env_file()
+
+ if result["success"]:
+ self._log_safe(
+ f" Loaded {result['loaded_count']} variables from .env file"
+ )
+
+ # Get updated overview
+ settings = self.settings_manager.get_current_settings()
+ overview = {}
+ for var_name, config in settings.items():
+ overview[var_name] = {
+ "value": config["value"] if config["is_set"] else "Not set",
+ "source": config["source"],
+ "status": (
+ "โ
Valid"
+ if config["is_valid"]
+ else "โ Invalid" if config["is_set"] else "โ ๏ธ Not set"
+ ),
+ "required": config["is_required"],
+ }
+
+ return result["status"], overview
+ else:
+ self._log_safe(
+ f" Failed to load from .env: {result.get('error', 'Unknown error')}",
+ "error",
+ )
+ return result["status"], {}
+
+ except Exception as e:
+ self._log_safe(f" Error loading from .env file: {e}", "error")
+ return f" Error: {str(e)}", {}
+
+ def _clear_settings_cache(self) -> Tuple[str, Dict[str, Any]]:
+ """Clear settings cache."""
+ try:
+ result = self.settings_manager.clear_cache()
+
+ if result["success"]:
+ self._log_safe(f" Cleared {result['cleared_count']} cached variables")
+
+ # Get updated overview
+ settings = self.settings_manager.get_current_settings()
+ overview = {}
+ for var_name, config in settings.items():
+ overview[var_name] = {
+ "value": config["value"] if config["is_set"] else "Not set",
+ "source": config["source"],
+ "status": (
+ "โ
Valid"
+ if config["is_valid"]
+ else "โ Invalid" if config["is_set"] else "โ ๏ธ Not set"
+ ),
+ "required": config["is_required"],
+ }
+
+ return result["status"], overview
+ else:
+ self._log_safe(
+ f" Failed to clear cache: {result.get('error', 'Unknown error')}",
+ "error",
+ )
+ return result["status"], {}
+
+ except Exception as e:
+ self._log_safe(f" Error clearing cache: {e}", "error")
+ return f" Error: {str(e)}", {}
+
+ def _export_settings(self) -> str:
+ """Export settings (basic version for main button)."""
+ try:
+ result = self.settings_manager.export_settings(include_sensitive=False)
+
+ if result["success"]:
+ self._log_safe(" Settings exported successfully")
+ return " Settings exported (check Storage & Export tab for details)"
+ else:
+ self._log_safe(
+ f" Failed to export settings: {result.get('error', 'Unknown error')}",
+ "error",
+ )
+ return f" Export failed: {result.get('error', 'Unknown error')}"
+
+ except Exception as e:
+ self._log_safe(f" Error exporting settings: {e}", "error")
+ return f" Error: {str(e)}"
+
+ def _generate_export(
+ self, include_sensitive: bool, export_format: str
+ ) -> Tuple[str, str]:
+ """Generate detailed export output."""
+ try:
+ result = self.settings_manager.export_settings(
+ include_sensitive=include_sensitive
+ )
+
+ if not result["success"]:
+ return (
+ f" Export failed: {result.get('error', 'Unknown error')}",
+ " Export failed",
+ )
+
+ settings_data = result["settings"]
+
+ if export_format == "JSON":
+ import json
+
+ export_content = json.dumps(
+ {
+ "export_info": {
+ "timestamp": result["export_timestamp"],
+ "include_sensitive": include_sensitive,
+ "format": "JSON",
+ },
+ "settings": settings_data,
+ },
+ indent=2,
+ )
+
+ elif export_format == "ENV":
+ export_lines = [
+ "# Environment Variables Export",
+ f"# Generated on {result['export_timestamp']}",
+ f"# Include sensitive: {include_sensitive}",
+ "",
+ ]
+
+ for var_name, config in settings_data.items():
+ if config["is_set"]:
+ value = config["value"]
+ export_lines.append(f"# {config['description']}")
+ export_lines.append(f"{var_name}={value}")
+ export_lines.append("")
+
+ export_content = "\n".join(export_lines)
+
+ else:
+ return " Invalid export format", " Invalid format"
+
+ self._log_safe(
+ f" Generated {export_format} export with {len(settings_data)} variables"
+ )
+ return export_content, f" {export_format} export generated successfully"
+
+ except Exception as e:
+ self._log_safe(f" Error generating export: {e}", "error")
+ return f" Error: {str(e)}", " Export generation failed"
+
+ def _process_documents(self, files) -> Tuple[str, str, str]:
+ """
+ Process uploaded documents with progress tracking.
+
+ Args:
+ files: List of uploaded files
+
+ Returns:
+ Tuple of (processing results, status, stats)
+ """
+ if not files:
+ return "No files uploaded.", "Ready ", self._get_stats_string()
+
+ try:
+ self._log_safe(f"Processing {len(files)} uploaded files")
+
+ results = []
+ successful = 0
+
+ for i, file in enumerate(files):
+ try:
+ # Process each file
+ result = self.rag_system.process_document(file.name)
+
+ if result.get("status") == "success":
+ successful += 1
+ self.total_documents += 1
+ self.total_chunks += result.get("chunks_processed", 0)
+
+ results.append(
+ f"{os.path.basename(file.name)}: "
+ f"{result.get('chunks_processed', 0)} chunks processed"
+ )
+ else:
+ results.append(
+ f"โ {os.path.basename(file.name)}: "
+ f"{result.get('error', 'Processing failed')}"
+ )
+
+ except Exception as e:
+ results.append(f"โ {os.path.basename(file.name)}: {str(e)}")
+
+ # Summary
+ summary = (
+ f"\nSummary: {successful}/{len(files)} files processed successfully"
+ )
+ output = "\n".join(results) + summary
+
+ status = (
+ f"Processed {successful}/{len(files)} files "
+ if successful > 0
+ else "Processing failed โ"
+ )
+
+ return output, status, self._get_stats_string()
+
+ except Exception as e:
+ self._log_safe(f" Error processing documents: {str(e)}", "error")
+ return f" Error: {str(e)}", "Error ", self._get_stats_string()
+
+ def _process_urls(
+ self, urls_text: str, max_depth: int = 1, follow_links: bool = True
+ ) -> Tuple[str, str, str, str]:
+ """
+ Process URLs with advanced crawling options and progress tracking.
+
+ Args:
+ urls_text: Text containing URLs (one per line)
+ max_depth: Maximum crawling depth
+ follow_links: Whether to follow links
+
+ Returns:
+ Tuple of (processing results, status, stats, progress_info)
+ """
+ if not urls_text.strip():
+ return (
+ "No URLs provided.",
+ "Ready ๐ข",
+ self._get_stats_string(),
+ "Ready to process URLs...",
+ )
+
+ try:
+ urls = [url.strip() for url in urls_text.split("\n") if url.strip()]
+ self._log_safe(
+ f"Processing {len(urls)} URLs with depth={max_depth}, follow_links={follow_links}"
+ )
+
+ results = []
+ successful = 0
+ progress_msg = f"๐ Starting crawl of {len(urls)} URLs..."
+
+ for i, url in enumerate(urls):
+ progress_msg = f"๐ Processing URL {i+1}/{len(urls)}: {url[:50]}..."
+ try:
+ # Process each URL with advanced options
+ result = self.rag_system.process_url(
+ url, max_depth=max_depth, follow_links=follow_links
+ )
+
+ if result.get("status") == "success":
+ successful += 1
+ self.total_documents += 1
+ self.total_chunks += result.get("chunks_processed", 0)
+
+ # Enhanced result display with crawling info
+ chunks = result.get("chunks_processed", 0)
+ linked_docs = result.get("linked_documents_processed", 0)
+ depth = result.get("depth", 0)
+
+ result_text = f"โ
{url}:\n"
+ result_text += f" ๐ {chunks} chunks processed"
+ if linked_docs > 0:
+ result_text += f"\n ๐ {linked_docs} linked pages found"
+ if depth > 0:
+ result_text += f"\n ๐ท๏ธ Crawled to depth {depth}"
+
+ results.append(result_text)
+ else:
+ error_msg = result.get("error", "Processing failed")
+ results.append(f"โ {url}: {error_msg}")
+
+ # Add helpful hints for common crawling issues
+ if "depth" in error_msg.lower():
+ results.append(" ๐ก Try reducing crawl depth")
+ elif "timeout" in error_msg.lower():
+ results.append(
+ " ๐ก Site may be slow, try single page mode"
+ )
+ elif "robots" in error_msg.lower():
+ results.append(
+ " ๐ก Site blocks crawlers, try direct URL only"
+ )
+
+ except Exception as e:
+ results.append(f"โ {url}: {str(e)}")
+
+ # Enhanced Summary with crawling stats
+ total_linked = sum(
+ result.get("linked_documents_processed", 0)
+ for result in [
+ self.rag_system.process_url(url, max_depth, follow_links)
+ for url in urls
+ ]
+ if result.get("status") == "success"
+ )
+
+ summary = f"\n" + "=" * 50
+ summary += f"\n๐ **CRAWLING SUMMARY**"
+ summary += f"\nโ
URLs processed: {successful}/{len(urls)}"
+ if follow_links and max_depth > 1:
+ summary += f"\n๐ Linked pages discovered: {total_linked}"
+ summary += f"\n๐ท๏ธ Max crawl depth: {max_depth}"
+ summary += f"\n๐ Total chunks: {self.total_chunks}"
+ summary += "\n" + "=" * 50
+
+ output = "\n".join(results) + summary
+
+ status = (
+ f"Processed {successful}/{len(urls)} URLs "
+ if successful > 0
+ else "Processing failed "
+ )
+
+ final_progress = (
+ f"โ
Completed! Processed {successful}/{len(urls)} URLs successfully"
+ )
+ return output, status, self._get_stats_string(), final_progress
+
+ except Exception as e:
+ self._log_safe(f" Error processing URLs: {str(e)}", "error")
+ error_progress = f" Error occurred during processing"
+ return (
+ f" Error: {str(e)}",
+ "Error ",
+ self._get_stats_string(),
+ error_progress,
+ )
+
+ def _process_query(
+ self,
+ query: str,
+ include_sources: bool = True,
+ max_results: int = 5,
+ use_live_search: bool = False,
+ search_depth: str = "basic",
+ time_range: str = "month",
+ search_mode: str = "auto",
+ ) -> Tuple[str, str, Dict[str, Any], str, str]:
+ """
+ Process a user query with enhanced response formatting and live search options.
+
+ Args:
+ query: User query string
+ include_sources: Whether to include source information
+ max_results: Maximum number of results to return
+ use_live_search: Whether to use live web search
+ search_depth: Search depth for live search
+ time_range: Time range for live search
+
+ Returns:
+ Tuple of (response, confidence, sources, status, stats)
+ """
+ if not query.strip():
+ return (
+ "Please enter a question.",
+ "",
+ {},
+ "Ready ๐ข",
+ self._get_stats_string(),
+ )
+
+ try:
+ # โ
Enhanced search type detection
+ search_type_map = {
+ "auto": "๐ค Auto",
+ "local_only": "๐ Local Only",
+ "live_only": "๐ Live Only",
+ "hybrid": "๐ Hybrid",
+ }
+ search_type = search_type_map.get(search_mode, "๐ค Auto")
+
+ # ๐ Backward compatibility: if use_live_search is True but mode is auto, use hybrid
+ if use_live_search and search_mode == "auto":
+ search_mode = "hybrid"
+ search_type = "๐ Hybrid"
+
+ self._log_safe(
+ f" Processing query ({search_type}): {query[:100]}... "
+ f"(mode: {search_mode}, sources: {include_sources}, max_results: {max_results})"
+ )
+
+ # ๐ Route query based on search mode
+ if search_mode in ["live_only", "hybrid"] or use_live_search:
+ # Use enhanced RAG system with search mode
+ result = self.rag_system.query(
+ query,
+ max_results=max_results,
+ use_live_search=(
+ search_mode in ["live_only", "hybrid"] or use_live_search
+ ),
+ search_mode=search_mode,
+ )
+ else:
+ # Use traditional local RAG system
+ result = self.rag_system.query(
+ query, max_results=max_results, search_mode=search_mode
+ )
+
+ self.query_count += 1
+
+ response = result.get("response", "No response generated.")
+ confidence = result.get("confidence", 0.0)
+ sources = result.get("sources", [])
+
+ # ๐ฏ Format confidence display with search type indicator
+ confidence_text = f"๐ฏ Confidence: {confidence:.1%}"
+ if confidence >= 0.8:
+ confidence_text += " ๐ข High"
+ elif confidence >= 0.5:
+ confidence_text += " ๐ก Medium"
+ else:
+ confidence_text += " ๐ด Low"
+
+ # Add processing details with search type
+ context_items = result.get("context_items", 0)
+ processing_time = result.get("processing_time", 0)
+ search_indicator = "๐" if use_live_search else "๐"
+ confidence_text += f" | {search_indicator} {search_type} | โก {processing_time:.2f}s | ๐ {context_items} items"
+
+ # ๐ Format sources for display based on user preference
+ sources_display = {}
+ if include_sources and sources:
+ # Limit sources based on max_results
+ limited_sources = sources[:max_results]
+ sources_display = {
+ "confidence": f"{confidence:.3f}",
+ "total_sources": len(sources),
+ "showing": len(limited_sources),
+ "max_requested": max_results,
+ "sources": limited_sources,
+ "search_type": search_type,
+ "query_options": {
+ "include_sources": include_sources,
+ "max_results": max_results,
+ "use_live_search": use_live_search,
+ "search_depth": search_depth if use_live_search else None,
+ "time_range": time_range if use_live_search else None,
+ },
+ }
+
+ # ๐ Add live search specific metadata
+ if use_live_search:
+ sources_display.update(
+ {
+ "live_search_params": {
+ "search_depth": search_depth,
+ "time_range": time_range,
+ "routing_decision": result.get(
+ "routing_decision", "live_search"
+ ),
+ }
+ }
+ )
+
+ elif not include_sources:
+ sources_display = {
+ "message": "๐ Sources hidden by user preference",
+ "total_sources": len(sources),
+ "search_type": search_type,
+ "query_options": {
+ "include_sources": include_sources,
+ "max_results": max_results,
+ "use_live_search": use_live_search,
+ },
+ }
+
+ # ๐ Enhanced status with search type
+ status_icon = "๐" if use_live_search else "๐"
+ status = f"โ
{status_icon} Query processed (confidence: {confidence:.1%}, {len(sources)} sources)"
+
+ return (
+ response,
+ confidence_text,
+ sources_display,
+ status,
+ self._get_stats_string(),
+ )
+
+ except Exception as e:
+ self._log_safe(f" Error processing query: {str(e)}", "error")
+ return (
+ f" Error: {str(e)}",
+ "Error",
+ {},
+ "Error ",
+ self._get_stats_string(),
+ )
+
+ def _process_live_query(
+ self, query: str, max_results: int, search_depth: str, time_range: str
+ ) -> Dict[str, Any]:
+ """
+ Process query using live search via MCP Tavily integration.
+
+ Args:
+ query: User query
+ max_results: Maximum results to return
+ search_depth: Search depth parameter
+ time_range: Time range for search
+
+ Returns:
+ Dictionary with search results and metadata
+ """
+ try:
+ self._log_safe(f" Performing live search with Tavily API...")
+
+ # ๐ Use MCP Tavily tool for live search
+ # This will be the actual MCP integration point
+ search_results = self._call_tavily_mcp(
+ query, max_results, search_depth, time_range
+ )
+
+ # ๐ Process and format results for RAG response generation
+ if search_results and search_results.get("results"):
+ # Format for response generator
+ formatted_context = []
+ for result in search_results["results"]:
+ formatted_context.append(
+ {
+ "text": result.get("content", ""),
+ "source": result.get("url", "web_search"),
+ "title": result.get("title", "Web Result"),
+ "score": result.get("score", 0.0),
+ "metadata": {
+ "type": "web_result",
+ "search_engine": "tavily",
+ "url": result.get("url", ""),
+ "title": result.get("title", ""),
+ },
+ }
+ )
+
+ # ๐ง Generate response using the response generator with live context
+ if hasattr(self.rag_system, "response_generator"):
+ response_result = (
+ self.rag_system.response_generator.generate_response(
+ query, formatted_context
+ )
+ )
+
+ # ๐ Combine live search metadata with response
+ response_result.update(
+ {
+ "context_items": len(formatted_context),
+ "search_type": "live_web",
+ "routing_decision": "live_search",
+ "live_search_params": {
+ "search_depth": search_depth,
+ "time_range": time_range,
+ "total_web_results": len(search_results["results"]),
+ },
+ }
+ )
+
+ return response_result
+ else:
+ # ๐ Fallback: simple response formatting
+ combined_content = "\n\n".join(
+ [
+ f"**{result.get('title', 'Web Result')}**\n{result.get('content', '')}"
+ for result in search_results["results"][:3]
+ ]
+ )
+
+ return {
+ "response": f"Based on live web search:\n\n{combined_content}",
+ "sources": search_results["results"],
+ "confidence": 0.8,
+ "context_items": len(search_results["results"]),
+ "search_type": "live_web",
+ }
+ else:
+ return {
+ "response": "No live search results found. Please try a different query or check your internet connection.",
+ "sources": [],
+ "confidence": 0.0,
+ "context_items": 0,
+ "error": "No live search results",
+ }
+
+ except Exception as e:
+ self._log_safe(f" Live search error: {str(e)}", "error")
+ # ๐ Fallback to local search
+ self._log_safe(" Falling back to local search...", "warning")
+ return self.rag_system.query(query, max_results=max_results)
+
+ def _call_tavily_mcp(
+ self, query: str, max_results: int, search_depth: str, time_range: str
+ ) -> Dict[str, Any]:
+ """
+ Call Tavily API using the live search module.
+
+ Args:
+ query: Search query
+ max_results: Maximum results
+ search_depth: Search depth
+ time_range: Time range
+
+ Returns:
+ Tavily search results
+ """
+ try:
+ # ๐ Use the live search module with Tavily Python SDK
+ from src.rag.live_search import LiveSearchManager
+
+ self._log_safe(
+ f" Tavily API call: query='{query}', depth={search_depth}, range={time_range}"
+ )
+
+ # โ
Initialize live search manager
+ live_search = LiveSearchManager()
+
+ # ๐ Perform the search using Tavily Python SDK
+ search_results = live_search.search_web(
+ query=query,
+ max_results=max_results,
+ search_depth=search_depth,
+ time_range=time_range,
+ )
+
+ # ๐ Format results for UI consumption
+ if (
+ search_results
+ and search_results.get("results")
+ and not search_results.get("error")
+ ):
+ formatted_results = []
+ for result in search_results.get("results", []):
+ formatted_results.append(
+ {
+ "title": result.get("title", ""),
+ "content": result.get("content", ""),
+ "url": result.get("url", ""),
+ "score": result.get("score", 0.0),
+ "published_date": result.get("published_date", ""),
+ }
+ )
+
+ return {
+ "results": formatted_results,
+ "total_results": len(formatted_results),
+ "search_params": {
+ "query": query,
+ "max_results": max_results,
+ "search_depth": search_depth,
+ "time_range": time_range,
+ },
+ "status": "success",
+ "analytics": search_results.get("analytics", {}),
+ }
+ else:
+ # ๐จ Handle search failure
+ error_msg = search_results.get("error", "Unknown search error")
+ self._log_safe(f" Tavily search failed: {error_msg}", "warning")
+
+ return {
+ "results": [],
+ "total_results": 0,
+ "search_params": {
+ "query": query,
+ "max_results": max_results,
+ "search_depth": search_depth,
+ "time_range": time_range,
+ },
+ "status": "failed",
+ "error": error_msg,
+ }
+
+ except Exception as e:
+ self._log_safe(f" Tavily API call failed: {str(e)}", "error")
+ return {
+ "results": [],
+ "total_results": 0,
+ "error": str(e),
+ "status": "error",
+ }
+
+ def _refresh_knowledge_base(
+ self,
+ ) -> Tuple[Dict[str, Any], Dict[str, Any], List[List[str]]]:
+ """
+ Refresh knowledge base information with real data from vector DB and embedding model.
+
+ Returns:
+ Tuple of (kb_stats, embedding_model_status, document_list)
+ """
+ try:
+ # Get real knowledge base statistics
+ kb_info = self._get_real_kb_stats()
+
+ # Get embedding model information
+ embedding_info = self._get_embedding_model_info()
+
+ # ๐ Knowledge Base Stats
+ kb_stats = {
+ "total_documents": kb_info.get("total_documents", self.total_documents),
+ "total_chunks": kb_info.get("total_chunks", self.total_chunks),
+ "storage_size": f"{kb_info.get('total_chunks', self.total_chunks) * 0.5:.1f} MB",
+ "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
+ "vector_db_status": kb_info.get("vector_db_status", "Unknown"),
+ "embedding_model": embedding_info.get("model_name", "Unknown"),
+ "embedding_status": embedding_info.get("status", "Unknown"),
+ "index_health": kb_info.get("index_health", "Unknown"),
+ }
+
+ # ๐ค Embedding Model Status
+ embedding_status = {
+ "model_name": embedding_info.get("model_name", "Unknown"),
+ "provider": embedding_info.get("provider", "Unknown"),
+ "status": embedding_info.get("status", "Unknown"),
+ "api_status": embedding_info.get("api_status", "Unknown"),
+ "dimension": embedding_info.get("dimension", "Unknown"),
+ "performance": {
+ "total_requests": embedding_info.get("total_requests", 0),
+ "success_rate": embedding_info.get("success_rate", "0%"),
+ "cache_hit_rate": embedding_info.get("cache_hit_rate", "0%"),
+ "batch_size": embedding_info.get("batch_size", "Unknown"),
+ "max_text_length": embedding_info.get("max_text_length", "Unknown"),
+ "caching_enabled": embedding_info.get("caching_enabled", False),
+ },
+ "last_checked": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
+ }
+
+ # Get real document list from vector DB
+ documents = self._get_real_document_list()
+
+ # If no real documents, show helpful message
+ if not documents:
+ documents = [
+ [
+ "๐ No documents yet",
+ "Info",
+ "0",
+ "Upload documents to get started",
+ ],
+ ["๐ Try adding URLs", "Info", "0", "Use the 'Add URLs' tab"],
+ [
+ "๐ Knowledge base empty",
+ "Info",
+ "0",
+ "Start building your knowledge base!",
+ ],
+ ]
+
+ return kb_stats, embedding_status, documents
+
+ except Exception as e:
+ self._log_safe(f" Error refreshing knowledge base: {e}", "error")
+ # Fallback stats
+ fallback_kb_stats = {
+ "total_documents": self.total_documents,
+ "total_chunks": self.total_chunks,
+ "storage_size": f"{self.total_chunks * 0.5:.1f} MB",
+ "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
+ "error": str(e),
+ }
+
+ fallback_embedding_status = {
+ "model_name": "Error",
+ "provider": "Unknown",
+ "status": "โ Error",
+ "api_status": "โ Error",
+ "dimension": "Unknown",
+ "performance": {"error": str(e)},
+ "last_checked": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
+ }
+
+ return fallback_kb_stats, fallback_embedding_status, []
+
+ def _get_real_kb_stats(self) -> Dict[str, Any]:
+ """Get real knowledge base statistics from the RAG system."""
+ try:
+ # ๐ Get embedding model info first
+ embedding_model_info = self._get_embedding_model_info()
+
+ if hasattr(self.rag_system, "vector_db") and self.rag_system.vector_db:
+ # Try to get stats from vector DB
+ vector_stats = (
+ self.rag_system.vector_db.get_stats()
+ if hasattr(self.rag_system.vector_db, "get_stats")
+ else {}
+ )
+
+ return {
+ "total_documents": vector_stats.get(
+ "total_vectors", self.total_documents
+ ),
+ "total_chunks": vector_stats.get(
+ "total_vectors", self.total_chunks
+ ),
+ "vector_db_status": "โ
Connected" if vector_stats else "โ ๏ธ Limited",
+ "embedding_model": embedding_model_info.get(
+ "model_name", "Unknown"
+ ),
+ "embedding_model_status": embedding_model_info.get(
+ "status", "Unknown"
+ ),
+ "embedding_dimension": embedding_model_info.get(
+ "dimension", "Unknown"
+ ),
+ "embedding_provider": embedding_model_info.get(
+ "provider", "Unknown"
+ ),
+ "index_health": (
+ "โ
Healthy"
+ if vector_stats.get("total_vectors", 0) > 0
+ else "โ ๏ธ Empty"
+ ),
+ }
+ else:
+ return {
+ "total_documents": self.total_documents,
+ "total_chunks": self.total_chunks,
+ "vector_db_status": "โ Not Connected",
+ "embedding_model": embedding_model_info.get(
+ "model_name", "Unknown"
+ ),
+ "embedding_model_status": embedding_model_info.get(
+ "status", "โ Not Available"
+ ),
+ "embedding_dimension": embedding_model_info.get(
+ "dimension", "Unknown"
+ ),
+ "embedding_provider": embedding_model_info.get(
+ "provider", "Unknown"
+ ),
+ "index_health": "โ Unavailable",
+ }
+ except Exception as e:
+ self._log_safe(f"Could not get real KB stats: {e}", "warning")
+ return {}
+
+ def _get_real_document_list(self) -> List[List[str]]:
+ """Get real document list from the RAG system."""
+ try:
+ documents = []
+
+ # Try to get document metadata from vector DB
+ if hasattr(self.rag_system, "vector_db") and self.rag_system.vector_db:
+ # Get unique sources from vector DB
+ if hasattr(self.rag_system.vector_db, "get_unique_sources"):
+ sources = self.rag_system.vector_db.get_unique_sources()
+ for source_info in sources:
+ source_name = source_info.get("source", "Unknown")
+ doc_type = self._get_document_type(source_name)
+ chunk_count = source_info.get("chunk_count", 0)
+ added_date = source_info.get("added_date", "Unknown")
+
+ documents.append(
+ [source_name, doc_type, str(chunk_count), added_date]
+ )
+
+ # If vector DB doesn't have get_unique_sources, try alternative approach
+ elif hasattr(self.rag_system.vector_db, "list_documents"):
+ doc_list = self.rag_system.vector_db.list_documents()
+ for doc in doc_list:
+ documents.append(
+ [
+ doc.get("name", "Unknown"),
+ self._get_document_type(doc.get("name", "")),
+ str(doc.get("chunks", 0)),
+ doc.get("date", "Unknown"),
+ ]
+ )
+
+ return documents
+
+ except Exception as e:
+ self._log_safe(f"Could not get real document list: {e}", "warning")
+ return []
+
+ def _get_document_type(self, filename: str) -> str:
+ """Determine document type from filename."""
+ if not filename:
+ return "Unknown"
+
+ filename_lower = filename.lower()
+ if filename_lower.endswith(".pdf"):
+ return "๐ PDF"
+ elif filename_lower.endswith((".doc", ".docx")):
+ return "๐ Word"
+ elif filename_lower.endswith((".xls", ".xlsx")):
+ return "๐ Excel"
+ elif filename_lower.endswith((".ppt", ".pptx")):
+ return "๐ PowerPoint"
+ elif filename_lower.endswith(".csv"):
+ return "๐ CSV"
+ elif filename_lower.endswith((".txt", ".md")):
+ return "๐ Text"
+ elif "http" in filename_lower:
+ return "๐ Web"
+ else:
+ return "๐ Document"
+
+ def _get_embedding_model_info(self) -> Dict[str, Any]:
+ """
+ ๐ค Get comprehensive embedding model information.
+
+ Returns:
+ Dictionary with embedding model details
+ """
+ try:
+ model_info = {
+ "model_name": "Unknown",
+ "status": "โ Not Available",
+ "dimension": "Unknown",
+ "provider": "Unknown",
+ "api_status": "โ Not Connected",
+ }
+
+ # Check if embedding generator exists and is properly initialized
+ if (
+ hasattr(self.rag_system, "embedding_generator")
+ and self.rag_system.embedding_generator
+ ):
+ embedding_gen = self.rag_system.embedding_generator
+
+ # Get model name - check multiple possible attributes
+ model_name = (
+ getattr(embedding_gen, "model", None)
+ or getattr(embedding_gen, "model_name", None)
+ or "gemini-embedding-exp-03-07"
+ ) # Default Gemini model
+
+ # Get API client status
+ api_connected = (
+ hasattr(embedding_gen, "client")
+ and embedding_gen.client is not None
+ )
+
+ # Get configuration details
+ config = getattr(embedding_gen, "config", {})
+
+ model_info.update(
+ {
+ "model_name": model_name,
+ "status": "โ
Available" if api_connected else "โ ๏ธ Limited",
+ "provider": (
+ "Google Gemini"
+ if "gemini" in model_name.lower()
+ else "Unknown"
+ ),
+ "api_status": (
+ "โ
Connected" if api_connected else "โ Not Connected"
+ ),
+ "dimension": config.get("dimension", "3072"), # Gemini default
+ "batch_size": config.get("batch_size", 5),
+ "max_text_length": config.get("max_text_length", 8192),
+ "caching_enabled": config.get("enable_caching", True),
+ }
+ )
+
+ # Get statistics if available
+ if hasattr(embedding_gen, "get_statistics"):
+ try:
+ stats = embedding_gen.get_statistics()
+ model_info.update(
+ {
+ "total_requests": stats.get("total_requests", 0),
+ "successful_requests": stats.get(
+ "successful_requests", 0
+ ),
+ "cache_hits": stats.get("cache_hits", 0),
+ "cache_hit_rate": f"{stats.get('cache_hit_rate', 0):.1f}%",
+ "success_rate": f"{stats.get('success_rate', 0):.1f}%",
+ }
+ )
+ except Exception as e:
+ self._log_safe(f"Could not get embedding stats: {e}", "warning")
+
+ # Test API connection if possible (quick test)
+ if api_connected:
+ try:
+ # Quick test to verify API is working
+ test_embedding = embedding_gen.generate_query_embedding("test")
+ if test_embedding:
+ model_info["api_status"] = "โ
Connected & Working"
+ model_info["status"] = "โ
Fully Operational"
+ else:
+ model_info["api_status"] = "โ ๏ธ Connected but Limited"
+ except Exception as e:
+ model_info["api_status"] = f" Connection Error: {str(e)[:50]}"
+
+ return model_info
+
+ except Exception as e:
+ self._log_safe(f"Error getting embedding model info: {e}", "error")
+ return {
+ "model_name": "Error",
+ "status": " Error",
+ "dimension": "Unknown",
+ "provider": "Unknown",
+ "api_status": f" Error: {str(e)[:50]}",
+ "error": str(e),
+ }
+
+ def _run_health_check(self) -> Tuple[Dict[str, Any], List[List[str]], str]:
+ """
+ ๐ฉบ Run comprehensive real system health check.
+
+ Returns:
+ Tuple of (system status, component status, logs)
+ """
+ try:
+ import psutil
+ import time
+ from datetime import timedelta
+
+ # ๐ Real System Status
+ start_time = time.time()
+
+ # Get real system metrics
+ memory_info = psutil.virtual_memory()
+ cpu_percent = psutil.cpu_percent(interval=1)
+
+ # Calculate uptime (approximate)
+ boot_time = psutil.boot_time()
+ uptime_seconds = time.time() - boot_time
+ uptime = str(timedelta(seconds=int(uptime_seconds)))
+
+ system_status = {
+ "overall_health": "๐ข Healthy",
+ "uptime": uptime,
+ "memory_usage": f"{memory_info.percent:.1f}%",
+ "memory_available": f"{memory_info.available / (1024**3):.1f} GB",
+ "cpu_usage": f"{cpu_percent:.1f}%",
+ "disk_usage": f"{psutil.disk_usage('/').percent:.1f}%",
+ "last_check": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
+ "system_load": "Normal" if cpu_percent < 80 else "High",
+ }
+
+ # ๐ Real Component Status Check
+ components = []
+ logs = []
+
+ current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ logs.append(f"[{current_time}] INFO - System health check initiated")
+
+ # 1. ๐ค Embedding Generator Check
+ embedding_info = self._get_embedding_model_info()
+ embedding_status = embedding_info.get("status", "โ Unknown")
+ embedding_details = f"{embedding_info.get('model_name', 'Unknown')} - {embedding_info.get('api_status', 'Unknown')}"
+ components.append(
+ ["๐ค Embedding Generator", embedding_status, embedding_details]
+ )
+
+ if "โ
" in embedding_status:
+ logs.append(
+ f"[{current_time}] INFO - Embedding generator: {embedding_details}"
+ )
+ else:
+ logs.append(
+ f"[{current_time}] WARN - Embedding generator: {embedding_details}"
+ )
+
+ # 2. ๐ฒ Vector Database Check
+ vector_db_status, vector_db_details = self._check_vector_db_health()
+ components.append(
+ ["๐ฒ Vector Database", vector_db_status, vector_db_details]
+ )
+ logs.append(f"[{current_time}] INFO - Vector database: {vector_db_details}")
+
+ # 3. ๐ Document Processor Check
+ doc_processor_status, doc_processor_details = (
+ self._check_document_processor_health()
+ )
+ components.append(
+ ["๐ Document Processor", doc_processor_status, doc_processor_details]
+ )
+ logs.append(
+ f"[{current_time}] INFO - Document processor: {doc_processor_details}"
+ )
+
+ # 4. ๐ง Response Generator Check
+ response_gen_status, response_gen_details = (
+ self._check_response_generator_health()
+ )
+ components.append(
+ [" Response Generator", response_gen_status, response_gen_details]
+ )
+ logs.append(
+ f"[{current_time}] INFO - Response generator: {response_gen_details}"
+ )
+
+ # 5. ๐ Web Interface Check
+ components.append(
+ ["๐ Web Interface", "โ
Healthy", "Gradio running successfully"]
+ )
+ logs.append(f"[{current_time}] INFO - Web interface: Running on port 7860")
+
+ # 6. ๐ Live Search Check (if available)
+ live_search_status, live_search_details = self._check_live_search_health()
+ components.append(
+ ["๐ Live Search", live_search_status, live_search_details]
+ )
+ logs.append(f"[{current_time}] INFO - Live search: {live_search_details}")
+
+ # Calculate overall health
+ healthy_components = sum(1 for comp in components if "โ
" in comp[1])
+ total_components = len(components)
+ health_percentage = (healthy_components / total_components) * 100
+
+ if health_percentage >= 80:
+ system_status["overall_health"] = "๐ข Healthy"
+ logs.append(
+ f"[{current_time}] INFO - Overall system health: {health_percentage:.0f}% ({healthy_components}/{total_components} components healthy)"
+ )
+ elif health_percentage >= 60:
+ system_status["overall_health"] = "๐ก Degraded"
+ logs.append(
+ f"[{current_time}] WARN - System degraded: {health_percentage:.0f}% ({healthy_components}/{total_components} components healthy)"
+ )
+ else:
+ system_status["overall_health"] = "๐ด Unhealthy"
+ logs.append(
+ f"[{current_time}] ERROR - System unhealthy: {health_percentage:.0f}% ({healthy_components}/{total_components} components healthy)"
+ )
+
+ # Add performance metrics
+ health_check_time = time.time() - start_time
+ system_status["health_check_duration"] = f"{health_check_time:.2f}s"
+ logs.append(
+ f"[{current_time}] INFO - Health check completed in {health_check_time:.2f}s"
+ )
+
+ return system_status, components, "\n".join(logs)
+
+ except Exception as e:
+ self._log_safe(f"โ Error running health check: {e}", "error")
+ error_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ return (
+ {
+ "overall_health": "๐ด Error",
+ "error": str(e),
+ "last_check": error_time,
+ },
+ [["System", "โ Error", f"Health check failed: {str(e)}"]],
+ f"[{error_time}] ERROR - Health check failed: {str(e)}",
+ )
+
+ def _check_vector_db_health(self) -> Tuple[str, str]:
+ """๐ฒ Check Vector Database health status."""
+ try:
+ if hasattr(self.rag_system, "vector_db") and self.rag_system.vector_db:
+ vector_db = self.rag_system.vector_db
+
+ # Try to get health check from vector DB
+ if hasattr(vector_db, "health_check"):
+ health_result = vector_db.health_check()
+ if health_result.get("status") == "healthy":
+ return (
+ "โ
Healthy",
+ f"Pinecone connected - {health_result.get('checks', {}).get('index_stats', 'OK')}",
+ )
+ else:
+ return (
+ "โ ๏ธ Degraded",
+ f"Issues detected: {health_result.get('error', 'Unknown')}",
+ )
+
+ # Fallback: check if we can get stats
+ elif hasattr(vector_db, "get_stats"):
+ stats = vector_db.get_stats()
+ if stats.get("status") == "connected":
+ total_vectors = stats.get("total_vectors", 0)
+ return (
+ "โ
Healthy",
+ f"Pinecone connected - {total_vectors} vectors stored",
+ )
+ else:
+ return (
+ "โ Error",
+ f"Connection failed: {stats.get('error', 'Unknown')}",
+ )
+
+ else:
+ return (
+ "โ ๏ธ Limited",
+ "Vector DB available but health check not implemented",
+ )
+ else:
+ return "โ Not Available", "Vector database not initialized"
+
+ except Exception as e:
+ return "โ Error", f"Health check failed: {str(e)[:50]}"
+
+ def _check_document_processor_health(self) -> Tuple[str, str]:
+ """๐ Check Document Processor health status."""
+ try:
+ if (
+ hasattr(self.rag_system, "document_processor")
+ and self.rag_system.document_processor
+ ):
+ # Check if document processor has required dependencies
+ try:
+ # Test basic functionality
+ processor = self.rag_system.document_processor
+
+ # Check if it has the required methods
+ if hasattr(processor, "process_document"):
+ supported_formats = [
+ "PDF",
+ "DOCX",
+ "CSV",
+ "XLSX",
+ "PPTX",
+ "TXT",
+ "MD",
+ ]
+ return (
+ "โ
Healthy",
+ f"All formats supported: {', '.join(supported_formats)}",
+ )
+ else:
+ return "โ ๏ธ Limited", "Basic functionality available"
+
+ except ImportError as e:
+ return (
+ "โ Dependencies Missing",
+ f"Missing libraries: {str(e)[:30]}",
+ )
+ else:
+ return "โ Not Available", "Document processor not initialized"
+
+ except Exception as e:
+ return "โ Error", f"Health check failed: {str(e)[:50]}"
+
+ def _check_response_generator_health(self) -> Tuple[str, str]:
+ """๐ง Check Response Generator health status."""
+ try:
+ if (
+ hasattr(self.rag_system, "response_generator")
+ and self.rag_system.response_generator
+ ):
+ response_gen = self.rag_system.response_generator
+
+ # Check if it has required configuration
+ config = getattr(response_gen, "config", {})
+
+ # Check API keys availability
+ gemini_key = config.get("gemini_api_key") or os.getenv("GEMINI_API_KEY")
+ openai_key = config.get("openai_api_key") or os.getenv("OPENAI_API_KEY")
+
+ if gemini_key:
+ return "โ
Healthy", "Gemini LLM available for response generation"
+ elif openai_key:
+ return "โ
Healthy", "OpenAI LLM available for response generation"
+ else:
+ return "โ ๏ธ Limited", "No LLM API keys configured"
+ else:
+ return "โ Not Available", "Response generator not initialized"
+
+ except Exception as e:
+ return "โ Error", f"Health check failed: {str(e)[:50]}"
+
+ def _check_live_search_health(self) -> Tuple[str, str]:
+ """๐ Check Live Search health status."""
+ try:
+ # Check if Tavily API key is available
+ tavily_key = os.getenv("TAVILY_API_KEY")
+
+ if tavily_key:
+ # Check if live search components are available
+ if (
+ hasattr(self.rag_system, "live_search_processor")
+ and self.rag_system.live_search_processor
+ ):
+ return "โ
Healthy", "Tavily API configured - Live search available"
+ elif (
+ hasattr(self.rag_system, "query_router")
+ and self.rag_system.query_router
+ ):
+ return "โ
Healthy", "Query router available - Live search enabled"
+ else:
+ return (
+ "โ ๏ธ Limited",
+ "Tavily API key available but components not initialized",
+ )
+ else:
+ return (
+ "โ ๏ธ Optional",
+ "Tavily API key not configured - Live search disabled",
+ )
+
+ except Exception as e:
+ return "โ Error", f"Health check failed: {str(e)[:50]}"
+
+ def _get_stats_string(self) -> str:
+ """Get formatted stats string."""
+ return f"Documents: {self.total_documents} | Chunks: {self.total_chunks} | Queries: {self.query_count}"
+
+ def launch(self, **kwargs):
+ """
+ Launch the Gradio interface.
+
+ Args:
+ **kwargs: Additional arguments for gr.Interface.launch()
+ """
+ if not self.interface:
+ self._log_safe(" Interface not created", "error")
+ return
+
+ # Merge default config with provided kwargs
+ launch_config = {
+ "share": self.share,
+ "server_name": "0.0.0.0",
+ "server_port": 7860,
+ "show_error": True,
+ "quiet": False,
+ }
+ launch_config.update(kwargs)
+
+ self._log_safe(f"Launching Gradio interface with config: {launch_config}")
+ self.interface.launch(**launch_config)