""" Gradio UI Module This module provides an intuitive interface for document upload, URL input, and querying using Gradio. Technology: Gradio """ import logging import os import sys import tempfile import json import time from typing import Dict, List, Any, Optional, Tuple from datetime import datetime from pathlib import Path try: import gradio as gr except ImportError: logging.warning("Gradio not available.") class GradioApp: """ Provides a comprehensive Gradio-based user interface for the RAG system. Features: - Document upload with progress tracking - URL processing with status updates - Interactive Q&A interface with source display - Knowledge base management - System status and health monitoring - Analytics dashboard """ def __init__(self, rag_system, config: Optional[Dict[str, Any]] = None): """ Initialize the GradioApp with the RAG system. Args: rag_system: Instance of the complete RAG system config: Configuration dictionary with UI parameters """ self.rag_system = rag_system self.config = config or {} self.logger = self._setup_unicode_logger() # ๐Ÿ”ง Initialize settings manager from utils.settings_manager import SettingsManager config_manager = getattr(rag_system, "config_manager", None) self.settings_manager = SettingsManager(config_manager) # UI Configuration self.title = self.config.get("title", "AI Embedded Knowledge Agent") self.description = self.config.get( "description", "Upload documents or provide URLs to build your knowledge base, then ask questions!", ) self.theme = self.config.get("theme", "default") self.share = self.config.get("share", False) # Features configuration self.features = self.config.get("features", {}) self.enable_file_upload = self.features.get("file_upload", True) self.enable_url_input = self.features.get("url_input", True) self.enable_query_interface = self.features.get("query_interface", True) self.enable_source_display = self.features.get("source_display", True) self.enable_confidence_display = self.features.get("confidence_display", True) # State management self.processing_status = "Ready" self.total_documents = 0 self.total_chunks = 0 self.query_count = 0 # Initialize interface self.interface = None self._create_interface() self._log_safe("GradioApp initialized with advanced features") def _setup_unicode_logger(self): """๐Ÿ”ง Setup Unicode-safe logger for cross-platform compatibility.""" logger = logging.getLogger(__name__) # โœ… Configure handler with UTF-8 encoding for Windows compatibility if not logger.handlers: handler = logging.StreamHandler(sys.stdout) # ๐ŸŒ Force UTF-8 encoding on Windows to handle emojis if sys.platform.startswith("win"): try: # โšก Try to reconfigure stdout with UTF-8 encoding handler.stream = open( sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1 ) except Exception: # ๐Ÿ”„ Fallback to default if reconfiguration fails pass formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger def _log_safe(self, message: str, level: str = "info"): """๐Ÿ›ก๏ธ Unicode-safe logging that handles emojis on Windows.""" try: # โœ… Pre-process message to be safe for Windows cp1252 encoding safe_message = self._make_message_safe(message) getattr(self.logger, level)(safe_message) except UnicodeEncodeError: # ๐Ÿ”„ Additional fallback: Remove all non-ASCII characters ascii_message = message.encode("ascii", "ignore").decode("ascii") getattr(self.logger, level)(f"[ENCODING_SAFE] {ascii_message}") except Exception as e: # ๐Ÿšจ Last resort: Basic logging without special characters basic_message = ( str(message).replace("๐ŸŒ", "[LIVE]").replace("๐Ÿ“š", "[LOCAL]") ) try: getattr(self.logger, level)(f"[SAFE] {basic_message}") except: print(f"[FALLBACK] {basic_message}") # Direct print as last resort def _make_message_safe(self, message: str) -> str: """๐Ÿ”„ Convert emoji characters to safe text equivalents.""" emoji_map = { "๐Ÿ”": "[SEARCH]", "โœ…": "[SUCCESS]", "โŒ": "[ERROR]", "๐Ÿš€": "[ROCKET]", "๐Ÿ“„": "[DOC]", "๐Ÿ”—": "[LINK]", "โšก": "[FAST]", "๐ŸŽฏ": "[TARGET]", "๐ŸŸข": "[GREEN]", "๐ŸŸก": "[YELLOW]", "๐Ÿ”ด": "[RED]", "๐Ÿ“Š": "[CHART]", "๐Ÿ•ท๏ธ": "[SPIDER]", "๐Ÿ’ก": "[IDEA]", "๐Ÿ”„": "[REFRESH]", "๐Ÿ“š": "[BOOKS]", "๐Ÿฉบ": "[HEALTH]", "๐Ÿ“ˆ": "[ANALYTICS]", "๐ŸŒ": "[LIVE]", "๐ŸŒ": "[WORLD]", "๐Ÿ”ง": "[TOOL]", "๐Ÿ›ก๏ธ": "[SHIELD]", "๐ŸŽจ": "[DESIGN]", "๐Ÿ“": "[NOTE]", "๐Ÿ—‘๏ธ": "[DELETE]", "๐Ÿ’พ": "[SAVE]", "๐Ÿ“": "[FOLDER]", "๐Ÿ””": "[BELL]", "โš™๏ธ": "[SETTINGS]", "๐Ÿงช": "[TEST]", "๐Ÿ“ค": "[EXPORT]", "๐Ÿ”Œ": "[PORT]", "๐ŸŒฒ": "[TREE]", "๐Ÿ”ฅ": "[FIRE]", "๐Ÿ”‘": "[KEY]", "๐Ÿ› ๏ธ": "[WRENCH]", "๐Ÿ’ป": "[COMPUTER]", "๐Ÿ—๏ธ": "[BUILDING]", "โ“": "[QUESTION]", "๐Ÿชฒ": "[BUG]", "๐Ÿชƒ": "[BOOMERANG]", "๐Ÿ“˜": "[BOOK]", "๐Ÿงน": "[BROOM]", "๐Ÿ”ฌ": "[MICROSCOPE]", "๐Ÿค–": "[ROBOT]", # Added for Auto mode "๐Ÿ”„": "[HYBRID]", # Added for Hybrid mode } safe_message = message for emoji, replacement in emoji_map.items(): safe_message = safe_message.replace(emoji, replacement) return safe_message def _create_interface(self): """๐ŸŽจ Create the modern full-width Gradio interface.""" # ๐ŸŒŸ Use modern theme with custom CSS theme = gr.themes.Soft( primary_hue="blue", secondary_hue="purple", neutral_hue="slate", font=gr.themes.GoogleFont("Inter"), font_mono=gr.themes.GoogleFont("JetBrains Mono"), ).set( body_background_fill="*neutral_50", body_text_color="*neutral_800", button_primary_background_fill="linear-gradient(135deg, #667eea 0%, #764ba2 100%)", button_primary_background_fill_hover="linear-gradient(135deg, #5a67d8 0%, #6b46c1 100%)", button_primary_text_color="white", input_background_fill="*neutral_50", block_background_fill="white", block_border_width="1px", block_border_color="*neutral_200", block_radius="12px", container_radius="20px", ) with gr.Blocks( title=self.title, theme=theme, css=self._get_custom_css(), head=""" """, ) as interface: # ๐ŸŽฏ Modern Header with Gradient Background with gr.Row(elem_classes="app-header"): with gr.Column(): gr.HTML( f"""
๐Ÿš€ {self.title}
{self.description}
""" ) # ๐Ÿ“Š Enhanced Status Bar with Modern Design with gr.Row(elem_classes="status-bar"): with gr.Column(): status_display = gr.HTML( value="""
๐ŸŸข System Status: Ready
""", elem_classes="status-display", ) with gr.Column(): stats_display = gr.HTML( value="""
๐Ÿ“Š Stats: Documents: 0 | Chunks: 0 | Queries: 0
""", elem_classes="stats-display", ) # Store interface components for updates early self.status_display = status_display self.stats_display = stats_display # ๐ŸŽจ Modern Interface Tabs with Enhanced Styling with gr.Tabs(elem_classes="tab-nav") as tabs: # ๐Ÿ“„ Document Upload Tab if self.enable_file_upload: with gr.TabItem( "๐Ÿ“„ Upload Documents", id="upload_tab", elem_classes="tab-item" ): with gr.Column(elem_classes="feature-card fade-in"): upload_components = self._create_upload_tab() # ๐Ÿ”— URL Processing Tab if self.enable_url_input: with gr.TabItem( "๐Ÿ”— Add URLs", id="url_tab", elem_classes="tab-item" ): with gr.Column(elem_classes="feature-card fade-in"): url_components = self._create_url_tab() # โ“ Query Interface Tab (Primary Tab) if self.enable_query_interface: with gr.TabItem( "โ“ Ask Questions", id="query_tab", elem_classes="tab-item" ): with gr.Column(elem_classes="feature-card fade-in"): query_components = self._create_query_tab() # ๐Ÿ“š Knowledge Base Management Tab with gr.TabItem( "๐Ÿ“š Knowledge Base", id="kb_tab", elem_classes="tab-item" ): with gr.Column(elem_classes="feature-card fade-in"): kb_components = self._create_knowledge_base_tab() # ๐Ÿ“ˆ Analytics Dashboard Tab with gr.TabItem( "๐Ÿ“ˆ Analytics", id="analytics_tab", elem_classes="tab-item" ): with gr.Column(elem_classes="feature-card fade-in"): analytics_components = self._create_analytics_tab() # ๐Ÿฉบ System Health Tab with gr.TabItem( "๐Ÿฉบ System Health", id="health_tab", elem_classes="tab-item" ): with gr.Column(elem_classes="feature-card fade-in"): health_components = self._create_health_tab() # โš™๏ธ Settings Tab with gr.TabItem( "โš™๏ธ Settings", id="settings_tab", elem_classes="tab-item" ): with gr.Column(elem_classes="feature-card fade-in"): settings_components = self._create_settings_tab() self.interface = interface def _create_upload_tab(self): """๐ŸŽจ Create the modern document upload tab with full-width design.""" # ๐Ÿ“Š Upload Statistics Cards with gr.Row(elem_classes="analytics-grid"): with gr.Column(elem_classes="stat-card accent-blue"): gr.HTML( """
7+
Supported Formats
""" ) with gr.Column(elem_classes="stat-card accent-green"): gr.HTML( """
โˆž
File Size Limit
""" ) with gr.Column(elem_classes="stat-card accent-purple"): gr.HTML( """
โšก
Fast Processing
""" ) # ๐ŸŽฏ Main Upload Interface with gr.Row(elem_classes="grid-2"): with gr.Column(elem_classes="metric-card"): gr.HTML( """

๐Ÿ“„ Upload Documents

Drag & drop files or click to browse. Multiple files supported.

""" ) # ๐Ÿ“‹ Supported Formats Display gr.HTML( """
โœ… Supported Formats:
๐Ÿ“„ PDF โ€ข ๐Ÿ“ DOCX โ€ข ๐Ÿ“Š CSV โ€ข ๐Ÿ“ˆ XLSX โ€ข ๐ŸŽฏ PPTX โ€ข ๐Ÿ“„ TXT โ€ข ๐Ÿ“ MD
""" ) file_upload = gr.File( label="๐Ÿ“ Select Files", file_count="multiple", file_types=[ ".pdf", ".docx", ".csv", ".xlsx", ".pptx", ".txt", ".md", ], height=250, elem_classes="input-field", ) # ๐ŸŽจ Action Buttons with Modern Styling with gr.Row(): upload_btn = gr.Button( "๐Ÿš€ Process Documents", variant="primary", size="lg", elem_classes="btn-primary", ) clear_upload_btn = gr.Button( "๐Ÿ—‘๏ธ Clear", variant="secondary", elem_classes="btn-secondary" ) with gr.Column(elem_classes="metric-card"): gr.HTML( """

๐Ÿ“Š Processing Results

Real-time processing status and detailed results will appear here.

""" ) upload_output = gr.Textbox( label="๐Ÿ“‹ Processing Log", lines=18, interactive=False, placeholder="๐Ÿ”„ Upload results will appear here...\n\n๐Ÿ’ก Tips:\nโ€ข Multiple files can be processed simultaneously\nโ€ข Processing time depends on file size and complexity\nโ€ข Check the status bar for real-time updates", elem_classes="input-field", ) # ๐Ÿ“ˆ Processing Tips with gr.Accordion("๐Ÿ’ก Processing Tips & Best Practices", open=False): gr.HTML( """

๐Ÿ“„ File Preparation

โšก Performance Tips

๐ŸŽฏ Quality Guidelines

""" ) # Event handlers upload_btn.click( fn=self._process_documents, inputs=[file_upload], outputs=[upload_output, self.status_display, self.stats_display], ) clear_upload_btn.click( fn=lambda: ("", "Ready "), outputs=[upload_output, self.status_display] ) return { "file_upload": file_upload, "upload_btn": upload_btn, "upload_output": upload_output, } def _create_url_tab(self): """Create the URL processing tab.""" with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Add URLs") gr.Markdown("Enter URLs to extract content from web pages") url_input = gr.Textbox( label="URLs (one per line)", lines=8, placeholder="https://example.com\nhttps://another-site.com\n...", ) with gr.Accordion("โš™๏ธ Advanced Crawling Options", open=False): gr.Markdown("๐Ÿ•ท๏ธ **Crawl Configuration**") max_depth = gr.Slider( label="๐Ÿ” Crawl Depth (How deep to follow links)", minimum=1, maximum=5, value=1, step=1, info="Higher depth = more pages but slower processing", ) follow_links = gr.Checkbox( label="๐Ÿ”— Follow Internal Links", value=True, info="Automatically discover and process linked pages", ) gr.Markdown("โšก **Performance Tips:**") gr.Markdown("โ€ข Depth 1: Single page only") gr.Markdown("โ€ข Depth 2-3: Good for small sites") gr.Markdown("โ€ข Depth 4-5: Use carefully, can be slow") with gr.Row(): url_btn = gr.Button("๐Ÿš€ Process URLs", variant="primary", size="lg") clear_url_btn = gr.Button("๐Ÿ—‘๏ธ Clear", variant="secondary") # Progress indicator with gr.Row(): progress_info = gr.Textbox( label="๐Ÿ”„ Processing Status", value="Ready to process URLs...", interactive=False, visible=True, ) with gr.Column(scale=1): gr.Markdown("### Processing Results") url_output = gr.Textbox( label="Results", lines=15, interactive=False, placeholder="URL processing results will appear here...", ) # Event handlers url_btn.click( fn=self._process_urls, inputs=[url_input, max_depth, follow_links], outputs=[ url_output, self.status_display, self.stats_display, progress_info, ], ) clear_url_btn.click( fn=lambda: ("", "Ready ๐ŸŸข", "Ready to process URLs..."), outputs=[url_output, self.status_display, progress_info], ) return { "url_input": url_input, "url_btn": url_btn, "url_output": url_output, } def _create_query_tab(self): """๐ŸŽจ Create the modern query interface tab with enhanced UX.""" # ๐ŸŽฏ Quick Action Cards with gr.Row(elem_classes="analytics-grid"): with gr.Column(elem_classes="stat-card accent-blue"): gr.HTML( """
๐Ÿค–
AI-Powered Search
""" ) with gr.Column(elem_classes="stat-card accent-green"): gr.HTML( """
๐ŸŒ
Live Web Search
""" ) with gr.Column(elem_classes="stat-card accent-purple"): gr.HTML( """
๐Ÿ“š
Local Knowledge
""" ) with gr.Column(elem_classes="stat-card accent-orange"): gr.HTML( """
โšก
Instant Results
""" ) # ๐Ÿ” Main Query Interface with gr.Row(elem_classes="grid-2"): with gr.Column(elem_classes="metric-card"): gr.HTML( """

โ“ Ask Your Question

Ask anything about your documents or get real-time information from the web.

""" ) # ๐ŸŽฏ Enhanced Search Input with gr.Column(elem_classes="search-container"): query_input = gr.Textbox( label="๐Ÿ” Your Question", lines=4, placeholder="๐Ÿ’ก Try asking:\nโ€ข 'What are the main points in the uploaded document?'\nโ€ข 'Latest news about AI developments'\nโ€ข 'Summarize the key findings from my research papers'", elem_classes="search-input", ) # ๐ŸŽจ Quick Query Suggestions gr.HTML( """
๐Ÿ’ก Quick Suggestions:
๐Ÿ“„ Summarize ๐Ÿ” Key Findings ๐ŸŒ Latest News
""" ) # โš™๏ธ Advanced Query Options with gr.Accordion("โš™๏ธ Advanced Query Options", open=False): with gr.Row(): include_sources = gr.Checkbox( label="๐Ÿ“š Include Sources", value=True, info="Show source documents and references", ) max_results = gr.Slider( label="๐Ÿ“Š Max Results", minimum=1, maximum=10, value=5, step=1, info="Maximum number of results to return", ) # ๐ŸŒ Enhanced Search Mode Selection with gr.Group(): gr.HTML( """

๐Ÿ” Search Mode & Options

""" ) search_mode = gr.Dropdown( label="๐ŸŽฏ Search Mode", choices=[ ("๐Ÿค– Auto (Smart Routing)", "auto"), ("๐Ÿ“š Local Only (Stored Documents)", "local_only"), ("๐ŸŒ Live Only (Web Search)", "live_only"), ("๐Ÿ”„ Hybrid (Local + Live)", "hybrid"), ], value="auto", info="Choose how to search for information", ) use_live_search = gr.Checkbox( label="๐Ÿ” Enable Live Web Search", value=False, info="Enable web search (will use hybrid mode by default)", ) with gr.Row(): search_depth = gr.Dropdown( label="๐Ÿ•ท๏ธ Search Depth", choices=["basic", "advanced"], value="basic", info="Basic: faster, Advanced: more comprehensive", visible=False, ) time_range = gr.Dropdown( label="โฐ Time Range", choices=["day", "week", "month", "year"], value="month", info="How recent should the web results be", visible=False, ) # ๐Ÿ’ก Dynamic options visibility use_live_search.change( fn=lambda enabled: ( gr.update(visible=enabled), gr.update(visible=enabled), gr.update(value="hybrid" if enabled else "auto"), ), inputs=[use_live_search], outputs=[search_depth, time_range, search_mode], ) # ๐Ÿ“ Search Mode Guide with gr.Accordion("โ„น๏ธ Search Mode Guide", open=False): gr.HTML( """

๐Ÿค– Auto Mode

Intelligently chooses the best search method based on your query

๐Ÿ“š Local Only

Search only in your uploaded documents

๐ŸŒ Live Only

Search only the web for real-time information

๐Ÿ”„ Hybrid

Combines both local documents and live web search

""" ) # ๐Ÿš€ Action Buttons with gr.Row(): query_btn = gr.Button( "๐Ÿš€ Get Answer", variant="primary", size="lg", elem_classes="btn-primary", ) clear_query_btn = gr.Button( "๐Ÿ—‘๏ธ Clear", variant="secondary", elem_classes="btn-secondary" ) with gr.Column(elem_classes="metric-card"): gr.HTML( """

๐Ÿ’ฌ AI Response

Intelligent answers with source citations and confidence scoring.

""" ) response_output = gr.Markdown( label="๐Ÿค– AI Response", value="๐Ÿ”ฎ **Your intelligent answer will appear here...**\n\n๐Ÿ’ก **Tips for better results:**\n- Be specific in your questions\n- Use natural language\n- Ask follow-up questions for clarification\n- Check the confidence score and sources", height=450, elem_classes="input-field", ) # ๐Ÿ“Š Response Metadata with gr.Row(): confidence_display = gr.Textbox( label="๐ŸŽฏ Confidence & Performance", interactive=False, visible=self.enable_confidence_display, elem_classes="input-field", ) # ๐Ÿ“š Sources Display sources_output = gr.JSON( label="๐Ÿ“š Sources & References", visible=self.enable_source_display, elem_classes="input-field", ) # ๐Ÿ“ˆ Query Performance Tips with gr.Accordion("๐ŸŽฏ Query Optimization Tips", open=False): gr.HTML( """

๐ŸŽฏ Question Formulation

๐Ÿ” Search Strategy

๐Ÿ“š Source Utilization

""" ) # Event handlers query_btn.click( fn=self._process_query, inputs=[ query_input, include_sources, max_results, use_live_search, search_depth, time_range, search_mode, ], outputs=[ response_output, confidence_display, sources_output, self.status_display, self.stats_display, ], ) clear_query_btn.click( fn=lambda: ("", "", {}, "Ready ๐ŸŸข"), outputs=[ response_output, confidence_display, sources_output, self.status_display, ], ) return { "query_input": query_input, "query_btn": query_btn, "response_output": response_output, "sources_output": sources_output, "use_live_search": use_live_search, "search_depth": search_depth, "time_range": time_range, "search_mode": search_mode, } def _create_knowledge_base_tab(self): """Create the knowledge base management tab.""" with gr.Column(): gr.Markdown("### ๐Ÿ“š Knowledge Base Management") with gr.Row(): refresh_btn = gr.Button("Refresh", variant="secondary") export_btn = gr.Button("๐Ÿ“ค Export", variant="secondary") clear_kb_btn = gr.Button("Clear All", variant="stop") # Knowledge base stats with enhanced embedding model info kb_stats = gr.JSON( label="๐Ÿ“Š Knowledge Base Statistics", value={ "total_documents": 0, "total_chunks": 0, "storage_size": "0 MB", "embedding_model": "Loading...", "embedding_status": "Checking...", "vector_db_status": "Checking...", }, ) # ๐Ÿค– Embedding Model Status Display embedding_model_status = gr.JSON( label="๐Ÿค– Embedding Model Information", value={ "model_name": "Loading...", "provider": "Checking...", "status": "Initializing...", "api_status": "Checking connection...", "dimension": "Unknown", "performance": "Gathering stats...", }, ) # Document list document_list = gr.Dataframe( headers=["Source", "Type", "Chunks", "Added"], datatype=["str", "str", "number", "str"], label="๐Ÿ“„ Documents in Knowledge Base", interactive=False, ) # Event handlers refresh_btn.click( fn=self._refresh_knowledge_base, outputs=[kb_stats, embedding_model_status, document_list], ) return { "kb_stats": kb_stats, "embedding_model_status": embedding_model_status, "document_list": document_list, } def _create_analytics_tab(self): """Create the analytics dashboard tab with real-time data.""" with gr.Column(): gr.Markdown("### ๐Ÿ“ˆ Analytics Dashboard") gr.Markdown("Real-time insights into your RAG system performance") with gr.Row(): refresh_analytics_btn = gr.Button( "๐Ÿ”„ Refresh Analytics", variant="secondary" ) export_analytics_btn = gr.Button( "๐Ÿ“Š Export Report", variant="secondary" ) with gr.Row(): with gr.Column(): query_analytics = gr.JSON( label="๐Ÿ” Query Analytics", value=self._get_initial_query_analytics(), ) with gr.Column(): system_metrics = gr.JSON( label="โšก System Metrics", value=self._get_initial_system_metrics(), ) with gr.Row(): with gr.Column(): performance_metrics = gr.JSON( label="๐Ÿš€ Performance Metrics", value=self._get_initial_performance_metrics(), ) with gr.Column(): usage_stats = gr.JSON( label="๐Ÿ“Š Usage Statistics", value=self._get_initial_usage_stats(), ) # Query history with enhanced information query_history = gr.Dataframe( headers=[ "Query", "Results", "Confidence", "Processing Time", "Timestamp", ], datatype=["str", "number", "number", "str", "str"], label="๐Ÿ“ Recent Query History", interactive=False, value=self._get_initial_query_history(), ) # Event handlers refresh_analytics_btn.click( fn=self._refresh_analytics, outputs=[ query_analytics, system_metrics, performance_metrics, usage_stats, query_history, ], ) return { "query_analytics": query_analytics, "system_metrics": system_metrics, "performance_metrics": performance_metrics, "usage_stats": usage_stats, "query_history": query_history, } def _get_initial_query_analytics(self) -> Dict[str, Any]: """Get initial query analytics data.""" return { "total_queries": self.query_count, "average_confidence": "N/A", "most_common_topics": [], "query_success_rate": "100%", "cache_hit_rate": "0%", "status": "๐Ÿ“Š Ready to track queries", } def _get_initial_system_metrics(self) -> Dict[str, Any]: """Get initial system metrics.""" # Get real embedding model info embedding_info = self._get_embedding_model_info() return { "documents_processed": self.total_documents, "chunks_stored": self.total_chunks, "embedding_model": embedding_info.get("model_name", "Gemini"), "embedding_status": embedding_info.get("status", "Checking..."), "embedding_provider": embedding_info.get("provider", "Google"), "vector_db": "Pinecone", "uptime": "Just started", "status": "๐ŸŸข System operational", } def _get_initial_performance_metrics(self) -> Dict[str, Any]: """Get initial performance metrics.""" return { "avg_query_time": "N/A", "avg_embedding_time": "N/A", "avg_retrieval_time": "N/A", "memory_usage": "Normal", "throughput": "N/A queries/min", "status": "โšก Performance tracking active", } def _get_initial_usage_stats(self) -> Dict[str, Any]: """Get initial usage statistics.""" return { "documents_uploaded": 0, "urls_processed": 0, "successful_queries": 0, "failed_queries": 0, "peak_usage_time": "N/A", "status": "๐Ÿ“ˆ Usage tracking enabled", } def _get_initial_query_history(self) -> List[List[str]]: """Get initial query history.""" return [ ["No queries yet", "0", "0.0", "0.0s", "Start asking questions!"], ["Upload documents first", "0", "0.0", "0.0s", "Build your knowledge base"], [ "Try the examples above", "0", "0.0", "0.0s", "Get started with sample queries", ], ] def _refresh_analytics( self, ) -> Tuple[ Dict[str, Any], Dict[str, Any], Dict[str, Any], Dict[str, Any], List[List[str]] ]: """Refresh all analytics data.""" try: # Get real analytics from query processor if available query_analytics = self._get_real_query_analytics() system_metrics = self._get_real_system_metrics() performance_metrics = self._get_real_performance_metrics() usage_stats = self._get_real_usage_stats() query_history = self._get_real_query_history() return ( query_analytics, system_metrics, performance_metrics, usage_stats, query_history, ) except Exception as e: self._log_safe(f"โŒ Error refreshing analytics: {e}", "error") return ( {"error": str(e)}, {"error": str(e)}, {"error": str(e)}, {"error": str(e)}, [["Error loading history", "0", "0.0", "0.0s", str(e)]], ) def _get_real_query_analytics(self) -> Dict[str, Any]: """Get real query analytics from the system.""" try: analytics = { "total_queries": self.query_count, "documents_in_kb": self.total_documents, "chunks_available": self.total_chunks, "last_updated": datetime.now().strftime("%H:%M:%S"), } # Get analytics from query processor if available if hasattr(self.rag_system, "query_processor") and hasattr( self.rag_system.query_processor, "get_query_analytics" ): processor_analytics = ( self.rag_system.query_processor.get_query_analytics() ) analytics.update(processor_analytics) # Calculate additional metrics if self.query_count > 0: analytics["avg_results_per_query"] = round( self.total_chunks / max(self.query_count, 1), 2 ) analytics["system_utilization"] = ( "Active" if self.query_count > 5 else "Light" ) else: analytics["avg_results_per_query"] = 0 analytics["system_utilization"] = "Idle" analytics["status"] = "๐ŸŸข Analytics active" return analytics except Exception as e: return {"error": f"Analytics unavailable: {str(e)}", "status": "โŒ Error"} def _get_real_system_metrics(self) -> Dict[str, Any]: """Get real system metrics with embedding model info.""" try: # Get embedding model information embedding_info = self._get_embedding_model_info() metrics = { "documents_processed": self.total_documents, "chunks_stored": self.total_chunks, "queries_processed": self.query_count, "last_updated": datetime.now().strftime("%H:%M:%S"), "embedding_model": embedding_info.get("model_name", "Unknown"), "embedding_status": embedding_info.get("status", "Unknown"), "embedding_provider": embedding_info.get("provider", "Unknown"), "embedding_dimension": embedding_info.get("dimension", "Unknown"), } # Get system status if hasattr(self.rag_system, "get_system_status"): system_status = self.rag_system.get_system_status() metrics.update( { "overall_health": system_status.get( "overall_status", "unknown" ), "components_healthy": sum( system_status.get("components", {}).values() ), "total_components": len(system_status.get("components", {})), } ) # Add component status with embedding model details components = [] if hasattr(self.rag_system, "embedding_generator"): components.append( f"Embedding Generator ({embedding_info.get('model_name', 'Unknown')})" ) if hasattr(self.rag_system, "vector_db"): components.append("Vector Database") if hasattr(self.rag_system, "query_processor"): components.append("Query Processor") metrics["active_components"] = components metrics["status"] = "๐ŸŸข System healthy" return metrics except Exception as e: return { "error": f"System metrics unavailable: {str(e)}", "status": "โŒ Error", } def _get_real_performance_metrics(self) -> Dict[str, Any]: """Get real performance metrics.""" try: # Basic performance tracking metrics = { "total_processing_time": "N/A", "avg_query_response": "N/A", "system_load": "Normal", "last_updated": datetime.now().strftime("%H:%M:%S"), } # If we have query history, calculate averages if hasattr(self.rag_system, "query_processor") and hasattr( self.rag_system.query_processor, "query_history" ): history = self.rag_system.query_processor.query_history if history: # Calculate average processing time if available processing_times = [ q.get("processing_time", 0) for q in history if "processing_time" in q ] if processing_times: avg_time = sum(processing_times) / len(processing_times) metrics["avg_query_response"] = f"{avg_time:.2f}s" metrics["queries_per_minute"] = ( f"{self.query_count / max(1, 1):.1f}" # Rough estimate ) metrics["throughput"] = "Good" if self.query_count > 0 else "Idle" metrics["status"] = "โšก Performance tracking active" return metrics except Exception as e: return { "error": f"Performance metrics unavailable: {str(e)}", "status": "โŒ Error", } def _get_real_usage_stats(self) -> Dict[str, Any]: """Get real usage statistics.""" try: stats = { "documents_uploaded": self.total_documents, "urls_processed": 0, # Would need to track this separately "successful_queries": self.query_count, # Assuming all successful for now "failed_queries": 0, # Would need error tracking "total_chunks_created": self.total_chunks, "last_updated": datetime.now().strftime("%H:%M:%S"), } # Calculate usage patterns if self.query_count > 0: stats["most_active_feature"] = "Query Processing" stats["usage_trend"] = "Growing" if self.query_count > 5 else "Starting" else: stats["most_active_feature"] = "Document Upload" stats["usage_trend"] = "Initial Setup" stats["status"] = "๐Ÿ“Š Usage tracking active" return stats except Exception as e: return {"error": f"Usage stats unavailable: {str(e)}", "status": "โŒ Error"} def _get_real_query_history(self) -> List[List[str]]: """Get real query history.""" try: history_data = [] # Get query history from query processor if available if hasattr(self.rag_system, "query_processor") and hasattr( self.rag_system.query_processor, "query_history" ): history = self.rag_system.query_processor.query_history[ -10: ] # Last 10 queries for query_item in history: query_text = ( query_item.get("query", "Unknown")[:50] + "..." if len(query_item.get("query", "")) > 50 else query_item.get("query", "Unknown") ) result_count = query_item.get("result_count", 0) confidence = "N/A" # Would need to store this processing_time = ( f"{query_item.get('processing_time', 0):.2f}s" if "processing_time" in query_item else "N/A" ) timestamp = ( query_item.get("timestamp", datetime.now()).strftime("%H:%M:%S") if "timestamp" in query_item else "Unknown" ) history_data.append( [ query_text, str(result_count), confidence, processing_time, timestamp, ] ) # If no real history, show helpful placeholder if not history_data: history_data = [ ["No queries yet", "0", "0.0", "0.0s", "Ask your first question!"], [ "Upload documents to get started", "0", "0.0", "0.0s", "Build your knowledge base", ], [ "Try asking about your documents", "0", "0.0", "0.0s", "Get intelligent answers", ], ] return history_data except Exception as e: return [["Error loading history", "0", "0.0", "0.0s", str(e)]] def _create_settings_tab(self): """Create the comprehensive settings management tab.""" with gr.Column(): gr.Markdown("### โš™๏ธ Environment Variables Settings") gr.Markdown( "Configure API keys and system settings with secure storage options" ) # ๐Ÿ”„ Refresh and action buttons with gr.Row(): refresh_settings_btn = gr.Button("๐Ÿ”„ Refresh", variant="secondary") load_env_btn = gr.Button("๐Ÿ“ Load from .env", variant="secondary") clear_cache_btn = gr.Button("๐Ÿ—‘๏ธ Clear Cache", variant="secondary") export_btn = gr.Button("๐Ÿ“ค Export Settings", variant="secondary") # ๐Ÿ“Š Settings status display settings_status = gr.Textbox( label="๐Ÿ”” Status", value="Ready to configure settings", interactive=False, container=False, ) # ๐Ÿ”ง Main settings interface with gr.Tabs(): # API Keys Tab with gr.TabItem("๐Ÿ”‘ API Keys"): api_keys_components = self._create_api_keys_section() # System Settings Tab with gr.TabItem("๐Ÿ› ๏ธ System Settings"): system_settings_components = self._create_system_settings_section() # Storage Options Tab with gr.TabItem("๐Ÿ’พ Storage & Export"): storage_components = self._create_storage_section() # ๐Ÿ“‹ Settings overview with gr.Accordion("๐Ÿ“‹ Current Settings Overview", open=False): settings_overview = gr.JSON( label="Environment Variables Status", value={} ) # Event handlers for main buttons refresh_settings_btn.click( fn=self._refresh_all_settings, outputs=[ settings_status, settings_overview, *api_keys_components.values(), *system_settings_components.values(), ], ) load_env_btn.click( fn=self._load_from_env_file, outputs=[settings_status, settings_overview], ) clear_cache_btn.click( fn=self._clear_settings_cache, outputs=[settings_status, settings_overview], ) export_btn.click(fn=self._export_settings, outputs=[settings_status]) return { "settings_status": settings_status, "settings_overview": settings_overview, **api_keys_components, **system_settings_components, **storage_components, } def _create_api_keys_section(self): """Create the API keys configuration section.""" components = {} with gr.Column(): gr.Markdown("#### ๐Ÿ”‘ API Keys Configuration") gr.Markdown( "Configure your API keys for AI services. Keys are masked for security." ) # Gemini API Key with gr.Group(): gr.Markdown("**๐Ÿค– Google Gemini API** (Required)") with gr.Row(): gemini_key = gr.Textbox( label="GEMINI_API_KEY", placeholder="AIzaSy...", type="password", info="Required for embeddings and LLM functionality", ) gemini_test_btn = gr.Button( "๐Ÿงช Test", variant="secondary", size="sm" ) gemini_status = gr.Textbox( label="Status", value="Not configured", interactive=False, container=False, ) with gr.Row(): gemini_cache_btn = gr.Button( "๐Ÿ’พ Save to Cache", variant="primary", size="sm" ) gemini_env_btn = gr.Button( "๐Ÿ“ Save to .env", variant="primary", size="sm" ) gr.Markdown( "๐Ÿ’ก [Get your Gemini API key](https://aistudio.google.com/)" ) # Pinecone API Key with gr.Group(): gr.Markdown("**๐ŸŒฒ Pinecone API (Required)**") with gr.Row(): pinecone_key = gr.Textbox( label="PINECONE_API_KEY", placeholder="pc-...", type="password", info="For vector database storage", ) pinecone_test_btn = gr.Button( "๐Ÿงช Test", variant="secondary", size="sm" ) pinecone_status = gr.Textbox( label="Status", value="Not configured", interactive=False, container=False, ) with gr.Row(): pinecone_cache_btn = gr.Button( "๐Ÿ’พ Save to Cache", variant="primary", size="sm" ) pinecone_env_btn = gr.Button( "๐Ÿ“ Save to .env", variant="primary", size="sm" ) gr.Markdown("๐Ÿ’ก [Get your Pinecone API key](https://www.pinecone.io/)") # OpenAI API Key with gr.Group(): gr.Markdown("**๐Ÿ”ฅ OpenAI API** (Optional)") with gr.Row(): openai_key = gr.Textbox( label="OPENAI_API_KEY", placeholder="sk-...", type="password", info="For alternative LLM functionality", ) openai_test_btn = gr.Button( "๐Ÿงช Test", variant="secondary", size="sm" ) openai_status = gr.Textbox( label="Status", value="Not configured", interactive=False, container=False, ) with gr.Row(): openai_cache_btn = gr.Button( "๐Ÿ’พ Save to Cache", variant="primary", size="sm" ) openai_env_btn = gr.Button( "๐Ÿ“ Save to .env", variant="primary", size="sm" ) gr.Markdown( "๐Ÿ’ก [Get your OpenAI API key](https://platform.openai.com/api-keys)" ) # Tavily API Key with gr.Group(): gr.Markdown("**๐ŸŒ Tavily API** (Optional - for Live Search)") with gr.Row(): tavily_key = gr.Textbox( label="TAVILY_API_KEY", placeholder="tvly-...", type="password", info="For real-time web search functionality", ) tavily_test_btn = gr.Button( "๐Ÿงช Test", variant="secondary", size="sm" ) tavily_status = gr.Textbox( label="Status", value="Not configured", interactive=False, container=False, ) with gr.Row(): tavily_cache_btn = gr.Button( "๐Ÿ’พ Save to Cache", variant="primary", size="sm" ) tavily_env_btn = gr.Button( "๐Ÿ“ Save to .env", variant="primary", size="sm" ) gr.Markdown( "๐Ÿ’ก [Get your Tavily API key](https://app.tavily.com/sign-in)" ) # Store components for event handling components.update( { "gemini_key": gemini_key, "gemini_status": gemini_status, "pinecone_key": pinecone_key, "pinecone_status": pinecone_status, "openai_key": openai_key, "openai_status": openai_status, "tavily_key": tavily_key, "tavily_status": tavily_status, } ) # Event handlers for API keys gemini_test_btn.click( fn=lambda: self._test_api_connection("GEMINI_API_KEY"), outputs=[gemini_status], ) gemini_cache_btn.click( fn=lambda key: self._save_setting("GEMINI_API_KEY", key, "cache"), inputs=[gemini_key], outputs=[gemini_status], ) gemini_env_btn.click( fn=lambda key: self._save_setting("GEMINI_API_KEY", key, "env_file"), inputs=[gemini_key], outputs=[gemini_status], ) pinecone_test_btn.click( fn=lambda: self._test_api_connection("PINECONE_API_KEY"), outputs=[pinecone_status], ) pinecone_cache_btn.click( fn=lambda key: self._save_setting("PINECONE_API_KEY", key, "cache"), inputs=[pinecone_key], outputs=[pinecone_status], ) pinecone_env_btn.click( fn=lambda key: self._save_setting("PINECONE_API_KEY", key, "env_file"), inputs=[pinecone_key], outputs=[pinecone_status], ) openai_test_btn.click( fn=lambda: self._test_api_connection("OPENAI_API_KEY"), outputs=[openai_status], ) openai_cache_btn.click( fn=lambda key: self._save_setting("OPENAI_API_KEY", key, "cache"), inputs=[openai_key], outputs=[openai_status], ) openai_env_btn.click( fn=lambda key: self._save_setting("OPENAI_API_KEY", key, "env_file"), inputs=[openai_key], outputs=[openai_status], ) tavily_test_btn.click( fn=lambda: self._test_api_connection("TAVILY_API_KEY"), outputs=[tavily_status], ) tavily_cache_btn.click( fn=lambda key: self._save_setting("TAVILY_API_KEY", key, "cache"), inputs=[tavily_key], outputs=[tavily_status], ) tavily_env_btn.click( fn=lambda key: self._save_setting("TAVILY_API_KEY", key, "env_file"), inputs=[tavily_key], outputs=[tavily_status], ) return components def _create_system_settings_section(self): """Create the system settings configuration section.""" components = {} with gr.Column(): gr.Markdown("#### ๐Ÿ› ๏ธ System Configuration") gr.Markdown("Configure system-level settings and preferences") # Pinecone Environment with gr.Group(): gr.Markdown("**๐ŸŒ Pinecone Environment**") pinecone_env = gr.Dropdown( label="PINECONE_ENVIRONMENT", choices=[ "us-east-1", "us-west1-gcp", "eu-west1-gcp", "asia-southeast1-gcp", ], value="us-east-1", info="Pinecone server region", ) with gr.Row(): pinecone_env_cache_btn = gr.Button( "๐Ÿ’พ Save to Cache", variant="primary", size="sm" ) pinecone_env_file_btn = gr.Button( "๐Ÿ“ Save to .env", variant="primary", size="sm" ) # Pinecone Index Name with gr.Group(): gr.Markdown("**๐Ÿ“Š Pinecone Index Name**") pinecone_index = gr.Textbox( label="PINECONE_INDEX_NAME", value="rag-ai-index", placeholder="rag-ai-index", info="Name of your Pinecone index", ) with gr.Row(): pinecone_index_cache_btn = gr.Button( "๐Ÿ’พ Save to Cache", variant="primary", size="sm" ) pinecone_index_file_btn = gr.Button( "๐Ÿ“ Save to .env", variant="primary", size="sm" ) # Gradio Share with gr.Group(): gr.Markdown("**๐ŸŒ Gradio Public Sharing**") gradio_share = gr.Dropdown( label="GRADIO_SHARE", choices=["false", "true"], value="false", info="Enable public sharing of the interface", ) with gr.Row(): gradio_share_cache_btn = gr.Button( "๐Ÿ’พ Save to Cache", variant="primary", size="sm" ) gradio_share_file_btn = gr.Button( "๐Ÿ“ Save to .env", variant="primary", size="sm" ) # Port Configuration with gr.Group(): gr.Markdown("**๐Ÿ”Œ Server Port**") port_setting = gr.Number( label="PORT", value=7860, minimum=1000, maximum=65535, info="Server port number (requires restart)", ) with gr.Row(): port_cache_btn = gr.Button( "๐Ÿ’พ Save to Cache", variant="primary", size="sm" ) port_file_btn = gr.Button( "๐Ÿ“ Save to .env", variant="primary", size="sm" ) # System settings status system_status = gr.Textbox( label="System Settings Status", value="Ready", interactive=False, container=False, ) components.update( { "pinecone_env": pinecone_env, "pinecone_index": pinecone_index, "gradio_share": gradio_share, "port_setting": port_setting, "system_status": system_status, } ) # Event handlers for system settings pinecone_env_cache_btn.click( fn=lambda val: self._save_setting("PINECONE_ENVIRONMENT", val, "cache"), inputs=[pinecone_env], outputs=[system_status], ) pinecone_env_file_btn.click( fn=lambda val: self._save_setting("PINECONE_ENVIRONMENT", val, "env_file"), inputs=[pinecone_env], outputs=[system_status], ) pinecone_index_cache_btn.click( fn=lambda val: self._save_setting("PINECONE_INDEX_NAME", val, "cache"), inputs=[pinecone_index], outputs=[system_status], ) pinecone_index_file_btn.click( fn=lambda val: self._save_setting("PINECONE_INDEX_NAME", val, "env_file"), inputs=[pinecone_index], outputs=[system_status], ) gradio_share_cache_btn.click( fn=lambda val: self._save_setting("GRADIO_SHARE", val, "cache"), inputs=[gradio_share], outputs=[system_status], ) gradio_share_file_btn.click( fn=lambda val: self._save_setting("GRADIO_SHARE", val, "env_file"), inputs=[gradio_share], outputs=[system_status], ) port_cache_btn.click( fn=lambda val: self._save_setting("PORT", str(int(val)), "cache"), inputs=[port_setting], outputs=[system_status], ) port_file_btn.click( fn=lambda val: self._save_setting("PORT", str(int(val)), "env_file"), inputs=[port_setting], outputs=[system_status], ) return components def _create_storage_section(self): """Create the storage and export section.""" components = {} with gr.Column(): gr.Markdown("#### ๐Ÿ’พ Storage & Export Options") gr.Markdown("Manage how your settings are stored and exported") with gr.Row(): with gr.Column(): gr.Markdown("**๐Ÿ’พ Cache Storage**") gr.Markdown("โ€ข Temporary storage in memory") gr.Markdown("โ€ข Lost when application restarts") gr.Markdown("โ€ข Good for testing configurations") with gr.Column(): gr.Markdown("**๐Ÿ“ .env File Storage**") gr.Markdown("โ€ข Persistent storage in .env file") gr.Markdown("โ€ข Survives application restarts") gr.Markdown("โ€ข Recommended for production use") # Export options with gr.Group(): gr.Markdown("**๐Ÿ“ค Export Settings**") with gr.Row(): include_sensitive = gr.Checkbox( label="Include API Keys (masked)", value=False, info="Include API keys in export (they will be masked)", ) export_format = gr.Dropdown( label="Export Format", choices=["JSON", "ENV"], value="JSON", info="Choose export format", ) export_output = gr.Textbox( label="Export Output", lines=10, interactive=False, placeholder="Exported settings will appear here...", ) export_settings_btn = gr.Button("๐Ÿ“ค Generate Export", variant="primary") # Storage status storage_status = gr.Textbox( label="Storage Status", value="Ready", interactive=False, container=False, ) components.update( { "include_sensitive": include_sensitive, "export_format": export_format, "export_output": export_output, "storage_status": storage_status, } ) # Export event handler export_settings_btn.click( fn=self._generate_export, inputs=[include_sensitive, export_format], outputs=[export_output, storage_status], ) return components def _create_health_tab(self): """Create the system health monitoring tab.""" with gr.Column(): gr.Markdown("### System Health") with gr.Row(): health_check_btn = gr.Button("Run Health Check", variant="primary") restart_btn = gr.Button("Restart Services", variant="secondary") # System status system_status = gr.JSON( label="System Status", value={}, ) # Component status component_status = gr.Dataframe( headers=["Component", "Status", "Details"], datatype=["str", "str", "str"], label="Component Status", interactive=False, ) # Logs system_logs = gr.Textbox( label=" System Logs", lines=10, interactive=False, placeholder="System logs will appear here...", ) # Event handlers health_check_btn.click( fn=self._run_health_check, outputs=[system_status, component_status, system_logs], ) return { "system_status": system_status, "component_status": component_status, "system_logs": system_logs, } def _get_custom_css(self) -> str: """๐ŸŽจ Get modern full-width custom CSS for the interface.""" return """ /* ๐ŸŒŸ Global Container - Full Width */ .gradio-container { max-width: 100% !important; width: 100% !important; margin: 0 !important; padding: 0 20px !important; } /* ๐ŸŽจ Modern Color Scheme */ :root { --primary-gradient: linear-gradient(135deg, #667eea 0%, #764ba2 100%); --secondary-gradient: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); --success-gradient: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%); --warning-gradient: linear-gradient(135deg, #43e97b 0%, #38f9d7 100%); --dark-bg: #1a1a2e; --dark-card: #16213e; --light-bg: #f8fafc; --light-card: #ffffff; --text-primary: #2d3748; --text-secondary: #718096; --border-color: #e2e8f0; --red: #f55b75; --shadow-sm: 0 1px 3px 0 rgba(0, 0, 0, 0.1); --shadow-md: 0 4px 6px -1px rgba(0, 0, 0, 0.1); --shadow-lg: 0 10px 15px -3px rgba(0, 0, 0, 0.1); } /* ๐ŸŒ™ Dark Theme Support */ .dark { --text-primary: #f7fafc; --text-secondary: #cbd5e0; --border-color: #4a5568; } /* ๐Ÿ“ฑ Full Width Layout */ .main-container { width: 100% !important; max-width: 100% !important; } /* ๐ŸŽฏ Header Styling */ .app-header { background: var(--primary-gradient); color: white; padding: 2rem; border-radius: 0 0 20px 20px; margin-bottom: 2rem; box-shadow: var(--shadow-lg); } .app-title { font-size: 2.5rem; font-weight: 700; margin-bottom: 0.5rem; text-shadow: 0 2px 4px rgba(0,0,0,0.3); } .app-description { font-size: 1.1rem; opacity: 0.9; margin-bottom: 0; } /* ๐Ÿ“Š Status Bar Enhancement */ .status-bar { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 1rem 2rem; border-radius: 15px; margin-bottom: 2rem; box-shadow: var(--shadow-md); display: grid; grid-template-columns: 1fr 1fr; gap: 2rem; } .status-item { display: flex; align-items: center; gap: 0.5rem; } .status-icon { font-size: 1.2rem; } /* ๐ŸŽจ Tab Styling */ .tab-nav { background: var(--dark-bg); border-radius: 15px; padding: 0.5rem; margin-bottom: 2rem; box-shadow: var(--shadow-sm); border: 1px solid var(--border-color); } .tab-item { border-radius: 10px !important; padding: 1rem 1.5rem !important; font-weight: 600 !important; transition: all 0.3s ease !important; border: none !important; } .tab-item.selected { background: var(--primary-gradient) !important; color: white !important; box-shadow: var(--shadow-md); } /* ๐ŸŽฏ Card Components */ .metric-card { background: var(--dark-bg); border: 1px solid var(--border-color); border-radius: 15px; padding: 1.5rem; margin: 1rem 0; box-shadow: var(--shadow-sm); transition: all 0.3s ease; } .metric-card:hover { # transform: translateY(-5px); box-shadow: var(--shadow-lg); # border-color: #667eea; } .feature-card { background: var(--dark-bg); border: 1px solid var(--border-color); border-radius: 20px; padding: 2rem; margin: 1rem 0; box-shadow: var(--shadow-md); transition: all 0.3s ease; position: relative; overflow: hidden; } .feature-card:hover { transform: translateY(-8px); box-shadow: var(--shadow-lg); } /* ๐ŸŽจ Button Enhancements */ .btn-primary { background: var(--primary-gradient) !important; border: none !important; border-radius: 12px !important; padding: 0.75rem 2rem !important; font-weight: 600 !important; font-size: 1rem !important; transition: all 0.3s ease !important; box-shadow: var(--shadow-sm) !important; } .btn-primary:hover { transform: translateY(-2px) !important; box-shadow: var(--shadow-lg) !important; } .btn-secondary { background: var(--red) !important; border: none !important; border-radius: 12px !important; padding: 0.75rem 1.5rem !important; font-weight: 600 !important; transition: all 0.3s ease !important; } .btn-success { background: var(--success-gradient) !important; border: none !important; border-radius: 12px !important; padding: 0.75rem 1.5rem !important; font-weight: 600 !important; } .btn-warning { background: var(--warning-gradient) !important; border: none !important; border-radius: 12px !important; padding: 0.75rem 1.5rem !important; font-weight: 600 !important; } /* ๐Ÿ“ Input Field Styling */ .input-field { border: 2px solid var(--border-color) !important; border-radius: 12px !important; padding: 1rem !important; font-size: 1rem !important; transition: all 0.3s ease !important; background: var(--dark-bg) !important; } .input-field:focus { border-color: #667eea !important; box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1) !important; outline: none !important; } /* ๐Ÿ“Š Progress Indicators */ .progress-bar { background: var(--primary-gradient); height: 8px; border-radius: 4px; transition: width 0.3s ease; } .progress-container { background: var(--border-color); height: 8px; border-radius: 4px; overflow: hidden; } /* ๐ŸŽฏ Grid Layouts */ .grid-2 { display: grid; grid-template-columns: 1fr 1fr; gap: 2rem; } .grid-3 { display: grid; grid-template-columns: repeat(3, 1fr); gap: 1.5rem; } .grid-4 { display: grid; grid-template-columns: repeat(4, 1fr); gap: 1rem; } /* ๐Ÿ“ฑ Responsive Design */ @media (max-width: 1200px) { .grid-4 { grid-template-columns: repeat(2, 1fr); } .grid-3 { grid-template-columns: repeat(2, 1fr); } } @media (max-width: 768px) { .gradio-container { padding: 0 10px !important; } .grid-2, .grid-3, .grid-4 { grid-template-columns: 1fr; gap: 1rem; } .status-bar { grid-template-columns: 1fr; gap: 1rem; padding: 1rem; } .app-title { font-size: 2rem; } .feature-card { padding: 1.5rem; } } /* ๐ŸŒŸ Animation Classes */ .fade-in { animation: fadeIn 0.5s ease-in; } .slide-up { animation: slideUp 0.6s ease-out; } @keyframes fadeIn { from { opacity: 0; } to { opacity: 1; } } @keyframes slideUp { from { opacity: 0; transform: translateY(30px); } to { opacity: 1; transform: translateY(0); } } /* ๐ŸŽจ Accent Colors */ .accent-blue { border-left: 4px solid #3b82f6; } .accent-green { border-left: 4px solid #10b981; } .accent-purple { border-left: 4px solid #8b5cf6; } .accent-orange { border-left: 4px solid #f59e0b; } .accent-red { border-left: 4px solid #ef4444; } /* ๐Ÿ” Search Enhancement */ .search-container { position: relative; margin-bottom: 2rem; } .search-input { width: 100%; padding: 1rem 1rem 1rem 3rem; border: 2px solid var(--border-color); border-radius: 25px; font-size: 1.1rem; transition: all 0.3s ease; } .search-input:focus { border-color: #667eea; box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1); } .search-icon { position: absolute; left: 1rem; top: 50%; transform: translateY(-50%); color: var(--text-secondary); } /* ๐Ÿ“ˆ Analytics Dashboard */ .analytics-grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 1.5rem; margin: 2rem 0; } .stat-card { background: var(--dark-bg); border-radius: 15px; padding: 1.5rem; box-shadow: var(--shadow-sm); border: 1px solid var(--border-color); transition: all 0.3s ease; } .stat-card:hover { transform: translateY(-3px); box-shadow: var(--shadow-md); } .stat-value { font-size: 2rem; font-weight: 700; color: #667eea; margin-bottom: 0.5rem; } .stat-label { color: var(--text-secondary); font-size: 0.9rem; text-transform: uppercase; letter-spacing: 0.5px; } /* ๐Ÿš€ Loading States */ .loading { position: relative; overflow: hidden; } .loading::after { content: ''; position: absolute; top: 0; left: -100%; width: 100%; height: 100%; background: linear-gradient(90deg, transparent, rgba(255,255,255,0.4), transparent); animation: loading 1.5s infinite; } @keyframes loading { 0% { left: -100%; } 100% { left: 100%; } } /* ๐ŸŽจ Custom Scrollbar */ ::-webkit-scrollbar { width: 8px; } ::-webkit-scrollbar-track { background: var(--light-bg); } ::-webkit-scrollbar-thumb { background: var(--primary-gradient); border-radius: 4px; } ::-webkit-scrollbar-thumb:hover { background: #5a67d8; } """ # ๐Ÿ”ง Settings Management Methods def _refresh_all_settings(self): """Refresh all settings and return updated values.""" try: settings = self.settings_manager.get_current_settings() # Create overview for display overview = {} for var_name, config in settings.items(): overview[var_name] = { "value": config["value"] if config["is_set"] else "Not set", "source": config["source"], "status": ( "โœ… Valid" if config["is_valid"] else "โŒ Invalid" if config["is_set"] else "โš ๏ธ Not set" ), "required": config["is_required"], } # Return status and all component updates status_msg = "๐Ÿ”„ Settings refreshed successfully" # Get current values for form fields gemini_val = settings.get("GEMINI_API_KEY", {}).get("raw_value", "") pinecone_val = settings.get("PINECONE_API_KEY", {}).get("raw_value", "") openai_val = settings.get("OPENAI_API_KEY", {}).get("raw_value", "") tavily_val = settings.get("TAVILY_API_KEY", {}).get("raw_value", "") pinecone_env_val = settings.get("PINECONE_ENVIRONMENT", {}).get( "raw_value", "us-east-1" ) pinecone_index_val = settings.get("PINECONE_INDEX_NAME", {}).get( "raw_value", "rag-ai-index" ) gradio_share_val = settings.get("GRADIO_SHARE", {}).get( "raw_value", "false" ) port_val = int(settings.get("PORT", {}).get("raw_value", "7860")) return ( status_msg, overview, gemini_val, settings.get("GEMINI_API_KEY", {}).get("value", "Not configured"), pinecone_val, settings.get("PINECONE_API_KEY", {}).get("value", "Not configured"), openai_val, settings.get("OPENAI_API_KEY", {}).get("value", "Not configured"), tavily_val, settings.get("TAVILY_API_KEY", {}).get("value", "Not configured"), pinecone_env_val, pinecone_index_val, gradio_share_val, port_val, "โœ… Settings loaded", ) except Exception as e: self._log_safe(f" Error refreshing settings: {e}", "error") return ( f" Error refreshing settings: {str(e)}", {}, "", "Error loading", "", "Error loading", "", "Error loading", "", "Error loading", "us-east-1", "rag-ai-index", "false", 7860, "โŒ Error loading", ) def _save_setting(self, var_name: str, value: str, storage_type: str) -> str: """Save a setting with the specified storage type.""" try: result = self.settings_manager.update_setting(var_name, value, storage_type) if result["success"]: self._log_safe(f" Saved {var_name} to {storage_type}") return result["status"] else: self._log_safe( f" Failed to save {var_name}: {result.get('error', 'Unknown error')}", "error", ) return result["status"] except Exception as e: self._log_safe(f" Error saving {var_name}: {e}", "error") return f"โŒ Error: {str(e)}" def _test_api_connection(self, var_name: str) -> str: """Test API connection for the specified variable with optimized performance.""" try: # Show testing status immediately status_message = f"๐Ÿ”„ Testing {var_name} connection..." self._log_safe(status_message) # For Gemini, check if we've tested recently (use cached result) if var_name == "GEMINI_API_KEY" and hasattr( self.settings_manager, "_gemini_last_test_time" ): current_time = time.time() if ( self.settings_manager._gemini_last_test_time and current_time - self.settings_manager._gemini_last_test_time < 10 ): self._log_safe( f"โœ… Using cached {var_name} test result (tested recently)" ) return "โœ… Gemini API connected (cached result)" # Perform the actual test result = self.settings_manager.test_connection(var_name) if result["success"]: self._log_safe(f"โœ… {var_name} connection test successful") else: self._log_safe( f" {var_name} connection test failed: {result.get('error', 'Unknown error')}", "warning", ) return result["status"] except Exception as e: self._log_safe(f" Error testing {var_name}: {e}", "error") return f" Test error: {str(e)}" def _load_from_env_file(self) -> Tuple[str, Dict[str, Any]]: """Load settings from .env file.""" try: result = self.settings_manager.load_from_env_file() if result["success"]: self._log_safe( f" Loaded {result['loaded_count']} variables from .env file" ) # Get updated overview settings = self.settings_manager.get_current_settings() overview = {} for var_name, config in settings.items(): overview[var_name] = { "value": config["value"] if config["is_set"] else "Not set", "source": config["source"], "status": ( "โœ… Valid" if config["is_valid"] else "โŒ Invalid" if config["is_set"] else "โš ๏ธ Not set" ), "required": config["is_required"], } return result["status"], overview else: self._log_safe( f" Failed to load from .env: {result.get('error', 'Unknown error')}", "error", ) return result["status"], {} except Exception as e: self._log_safe(f" Error loading from .env file: {e}", "error") return f" Error: {str(e)}", {} def _clear_settings_cache(self) -> Tuple[str, Dict[str, Any]]: """Clear settings cache.""" try: result = self.settings_manager.clear_cache() if result["success"]: self._log_safe(f" Cleared {result['cleared_count']} cached variables") # Get updated overview settings = self.settings_manager.get_current_settings() overview = {} for var_name, config in settings.items(): overview[var_name] = { "value": config["value"] if config["is_set"] else "Not set", "source": config["source"], "status": ( "โœ… Valid" if config["is_valid"] else "โŒ Invalid" if config["is_set"] else "โš ๏ธ Not set" ), "required": config["is_required"], } return result["status"], overview else: self._log_safe( f" Failed to clear cache: {result.get('error', 'Unknown error')}", "error", ) return result["status"], {} except Exception as e: self._log_safe(f" Error clearing cache: {e}", "error") return f" Error: {str(e)}", {} def _export_settings(self) -> str: """Export settings (basic version for main button).""" try: result = self.settings_manager.export_settings(include_sensitive=False) if result["success"]: self._log_safe(" Settings exported successfully") return " Settings exported (check Storage & Export tab for details)" else: self._log_safe( f" Failed to export settings: {result.get('error', 'Unknown error')}", "error", ) return f" Export failed: {result.get('error', 'Unknown error')}" except Exception as e: self._log_safe(f" Error exporting settings: {e}", "error") return f" Error: {str(e)}" def _generate_export( self, include_sensitive: bool, export_format: str ) -> Tuple[str, str]: """Generate detailed export output.""" try: result = self.settings_manager.export_settings( include_sensitive=include_sensitive ) if not result["success"]: return ( f" Export failed: {result.get('error', 'Unknown error')}", " Export failed", ) settings_data = result["settings"] if export_format == "JSON": import json export_content = json.dumps( { "export_info": { "timestamp": result["export_timestamp"], "include_sensitive": include_sensitive, "format": "JSON", }, "settings": settings_data, }, indent=2, ) elif export_format == "ENV": export_lines = [ "# Environment Variables Export", f"# Generated on {result['export_timestamp']}", f"# Include sensitive: {include_sensitive}", "", ] for var_name, config in settings_data.items(): if config["is_set"]: value = config["value"] export_lines.append(f"# {config['description']}") export_lines.append(f"{var_name}={value}") export_lines.append("") export_content = "\n".join(export_lines) else: return " Invalid export format", " Invalid format" self._log_safe( f" Generated {export_format} export with {len(settings_data)} variables" ) return export_content, f" {export_format} export generated successfully" except Exception as e: self._log_safe(f" Error generating export: {e}", "error") return f" Error: {str(e)}", " Export generation failed" def _process_documents(self, files) -> Tuple[str, str, str]: """ Process uploaded documents with progress tracking. Args: files: List of uploaded files Returns: Tuple of (processing results, status, stats) """ if not files: return "No files uploaded.", "Ready ", self._get_stats_string() try: self._log_safe(f"Processing {len(files)} uploaded files") results = [] successful = 0 for i, file in enumerate(files): try: # Process each file result = self.rag_system.process_document(file.name) if result.get("status") == "success": successful += 1 self.total_documents += 1 self.total_chunks += result.get("chunks_processed", 0) results.append( f"{os.path.basename(file.name)}: " f"{result.get('chunks_processed', 0)} chunks processed" ) else: results.append( f"โŒ {os.path.basename(file.name)}: " f"{result.get('error', 'Processing failed')}" ) except Exception as e: results.append(f"โŒ {os.path.basename(file.name)}: {str(e)}") # Summary summary = ( f"\nSummary: {successful}/{len(files)} files processed successfully" ) output = "\n".join(results) + summary status = ( f"Processed {successful}/{len(files)} files " if successful > 0 else "Processing failed โŒ" ) return output, status, self._get_stats_string() except Exception as e: self._log_safe(f" Error processing documents: {str(e)}", "error") return f" Error: {str(e)}", "Error ", self._get_stats_string() def _process_urls( self, urls_text: str, max_depth: int = 1, follow_links: bool = True ) -> Tuple[str, str, str, str]: """ Process URLs with advanced crawling options and progress tracking. Args: urls_text: Text containing URLs (one per line) max_depth: Maximum crawling depth follow_links: Whether to follow links Returns: Tuple of (processing results, status, stats, progress_info) """ if not urls_text.strip(): return ( "No URLs provided.", "Ready ๐ŸŸข", self._get_stats_string(), "Ready to process URLs...", ) try: urls = [url.strip() for url in urls_text.split("\n") if url.strip()] self._log_safe( f"Processing {len(urls)} URLs with depth={max_depth}, follow_links={follow_links}" ) results = [] successful = 0 progress_msg = f"๐Ÿš€ Starting crawl of {len(urls)} URLs..." for i, url in enumerate(urls): progress_msg = f"๐Ÿ”„ Processing URL {i+1}/{len(urls)}: {url[:50]}..." try: # Process each URL with advanced options result = self.rag_system.process_url( url, max_depth=max_depth, follow_links=follow_links ) if result.get("status") == "success": successful += 1 self.total_documents += 1 self.total_chunks += result.get("chunks_processed", 0) # Enhanced result display with crawling info chunks = result.get("chunks_processed", 0) linked_docs = result.get("linked_documents_processed", 0) depth = result.get("depth", 0) result_text = f"โœ… {url}:\n" result_text += f" ๐Ÿ“„ {chunks} chunks processed" if linked_docs > 0: result_text += f"\n ๐Ÿ”— {linked_docs} linked pages found" if depth > 0: result_text += f"\n ๐Ÿ•ท๏ธ Crawled to depth {depth}" results.append(result_text) else: error_msg = result.get("error", "Processing failed") results.append(f"โŒ {url}: {error_msg}") # Add helpful hints for common crawling issues if "depth" in error_msg.lower(): results.append(" ๐Ÿ’ก Try reducing crawl depth") elif "timeout" in error_msg.lower(): results.append( " ๐Ÿ’ก Site may be slow, try single page mode" ) elif "robots" in error_msg.lower(): results.append( " ๐Ÿ’ก Site blocks crawlers, try direct URL only" ) except Exception as e: results.append(f"โŒ {url}: {str(e)}") # Enhanced Summary with crawling stats total_linked = sum( result.get("linked_documents_processed", 0) for result in [ self.rag_system.process_url(url, max_depth, follow_links) for url in urls ] if result.get("status") == "success" ) summary = f"\n" + "=" * 50 summary += f"\n๐Ÿ“Š **CRAWLING SUMMARY**" summary += f"\nโœ… URLs processed: {successful}/{len(urls)}" if follow_links and max_depth > 1: summary += f"\n๐Ÿ”— Linked pages discovered: {total_linked}" summary += f"\n๐Ÿ•ท๏ธ Max crawl depth: {max_depth}" summary += f"\n๐Ÿ“„ Total chunks: {self.total_chunks}" summary += "\n" + "=" * 50 output = "\n".join(results) + summary status = ( f"Processed {successful}/{len(urls)} URLs " if successful > 0 else "Processing failed " ) final_progress = ( f"โœ… Completed! Processed {successful}/{len(urls)} URLs successfully" ) return output, status, self._get_stats_string(), final_progress except Exception as e: self._log_safe(f" Error processing URLs: {str(e)}", "error") error_progress = f" Error occurred during processing" return ( f" Error: {str(e)}", "Error ", self._get_stats_string(), error_progress, ) def _process_query( self, query: str, include_sources: bool = True, max_results: int = 5, use_live_search: bool = False, search_depth: str = "basic", time_range: str = "month", search_mode: str = "auto", ) -> Tuple[str, str, Dict[str, Any], str, str]: """ Process a user query with enhanced response formatting and live search options. Args: query: User query string include_sources: Whether to include source information max_results: Maximum number of results to return use_live_search: Whether to use live web search search_depth: Search depth for live search time_range: Time range for live search Returns: Tuple of (response, confidence, sources, status, stats) """ if not query.strip(): return ( "Please enter a question.", "", {}, "Ready ๐ŸŸข", self._get_stats_string(), ) try: # โœ… Enhanced search type detection search_type_map = { "auto": "๐Ÿค– Auto", "local_only": "๐Ÿ“š Local Only", "live_only": "๐ŸŒ Live Only", "hybrid": "๐Ÿ”„ Hybrid", } search_type = search_type_map.get(search_mode, "๐Ÿค– Auto") # ๐Ÿ”„ Backward compatibility: if use_live_search is True but mode is auto, use hybrid if use_live_search and search_mode == "auto": search_mode = "hybrid" search_type = "๐Ÿ”„ Hybrid" self._log_safe( f" Processing query ({search_type}): {query[:100]}... " f"(mode: {search_mode}, sources: {include_sources}, max_results: {max_results})" ) # ๐Ÿš€ Route query based on search mode if search_mode in ["live_only", "hybrid"] or use_live_search: # Use enhanced RAG system with search mode result = self.rag_system.query( query, max_results=max_results, use_live_search=( search_mode in ["live_only", "hybrid"] or use_live_search ), search_mode=search_mode, ) else: # Use traditional local RAG system result = self.rag_system.query( query, max_results=max_results, search_mode=search_mode ) self.query_count += 1 response = result.get("response", "No response generated.") confidence = result.get("confidence", 0.0) sources = result.get("sources", []) # ๐ŸŽฏ Format confidence display with search type indicator confidence_text = f"๐ŸŽฏ Confidence: {confidence:.1%}" if confidence >= 0.8: confidence_text += " ๐ŸŸข High" elif confidence >= 0.5: confidence_text += " ๐ŸŸก Medium" else: confidence_text += " ๐Ÿ”ด Low" # Add processing details with search type context_items = result.get("context_items", 0) processing_time = result.get("processing_time", 0) search_indicator = "๐ŸŒ" if use_live_search else "๐Ÿ“š" confidence_text += f" | {search_indicator} {search_type} | โšก {processing_time:.2f}s | ๐Ÿ“„ {context_items} items" # ๐Ÿ“Š Format sources for display based on user preference sources_display = {} if include_sources and sources: # Limit sources based on max_results limited_sources = sources[:max_results] sources_display = { "confidence": f"{confidence:.3f}", "total_sources": len(sources), "showing": len(limited_sources), "max_requested": max_results, "sources": limited_sources, "search_type": search_type, "query_options": { "include_sources": include_sources, "max_results": max_results, "use_live_search": use_live_search, "search_depth": search_depth if use_live_search else None, "time_range": time_range if use_live_search else None, }, } # ๐ŸŒ Add live search specific metadata if use_live_search: sources_display.update( { "live_search_params": { "search_depth": search_depth, "time_range": time_range, "routing_decision": result.get( "routing_decision", "live_search" ), } } ) elif not include_sources: sources_display = { "message": "๐Ÿ”’ Sources hidden by user preference", "total_sources": len(sources), "search_type": search_type, "query_options": { "include_sources": include_sources, "max_results": max_results, "use_live_search": use_live_search, }, } # ๐Ÿ“ˆ Enhanced status with search type status_icon = "๐ŸŒ" if use_live_search else "๐Ÿ“š" status = f"โœ… {status_icon} Query processed (confidence: {confidence:.1%}, {len(sources)} sources)" return ( response, confidence_text, sources_display, status, self._get_stats_string(), ) except Exception as e: self._log_safe(f" Error processing query: {str(e)}", "error") return ( f" Error: {str(e)}", "Error", {}, "Error ", self._get_stats_string(), ) def _process_live_query( self, query: str, max_results: int, search_depth: str, time_range: str ) -> Dict[str, Any]: """ Process query using live search via MCP Tavily integration. Args: query: User query max_results: Maximum results to return search_depth: Search depth parameter time_range: Time range for search Returns: Dictionary with search results and metadata """ try: self._log_safe(f" Performing live search with Tavily API...") # ๐Ÿš€ Use MCP Tavily tool for live search # This will be the actual MCP integration point search_results = self._call_tavily_mcp( query, max_results, search_depth, time_range ) # ๐Ÿ”„ Process and format results for RAG response generation if search_results and search_results.get("results"): # Format for response generator formatted_context = [] for result in search_results["results"]: formatted_context.append( { "text": result.get("content", ""), "source": result.get("url", "web_search"), "title": result.get("title", "Web Result"), "score": result.get("score", 0.0), "metadata": { "type": "web_result", "search_engine": "tavily", "url": result.get("url", ""), "title": result.get("title", ""), }, } ) # ๐Ÿง  Generate response using the response generator with live context if hasattr(self.rag_system, "response_generator"): response_result = ( self.rag_system.response_generator.generate_response( query, formatted_context ) ) # ๐Ÿ“Š Combine live search metadata with response response_result.update( { "context_items": len(formatted_context), "search_type": "live_web", "routing_decision": "live_search", "live_search_params": { "search_depth": search_depth, "time_range": time_range, "total_web_results": len(search_results["results"]), }, } ) return response_result else: # ๐Ÿ“ Fallback: simple response formatting combined_content = "\n\n".join( [ f"**{result.get('title', 'Web Result')}**\n{result.get('content', '')}" for result in search_results["results"][:3] ] ) return { "response": f"Based on live web search:\n\n{combined_content}", "sources": search_results["results"], "confidence": 0.8, "context_items": len(search_results["results"]), "search_type": "live_web", } else: return { "response": "No live search results found. Please try a different query or check your internet connection.", "sources": [], "confidence": 0.0, "context_items": 0, "error": "No live search results", } except Exception as e: self._log_safe(f" Live search error: {str(e)}", "error") # ๐Ÿ”„ Fallback to local search self._log_safe(" Falling back to local search...", "warning") return self.rag_system.query(query, max_results=max_results) def _call_tavily_mcp( self, query: str, max_results: int, search_depth: str, time_range: str ) -> Dict[str, Any]: """ Call Tavily API using the live search module. Args: query: Search query max_results: Maximum results search_depth: Search depth time_range: Time range Returns: Tavily search results """ try: # ๐ŸŒ Use the live search module with Tavily Python SDK from src.rag.live_search import LiveSearchManager self._log_safe( f" Tavily API call: query='{query}', depth={search_depth}, range={time_range}" ) # โœ… Initialize live search manager live_search = LiveSearchManager() # ๐Ÿš€ Perform the search using Tavily Python SDK search_results = live_search.search_web( query=query, max_results=max_results, search_depth=search_depth, time_range=time_range, ) # ๐Ÿ“Š Format results for UI consumption if ( search_results and search_results.get("results") and not search_results.get("error") ): formatted_results = [] for result in search_results.get("results", []): formatted_results.append( { "title": result.get("title", ""), "content": result.get("content", ""), "url": result.get("url", ""), "score": result.get("score", 0.0), "published_date": result.get("published_date", ""), } ) return { "results": formatted_results, "total_results": len(formatted_results), "search_params": { "query": query, "max_results": max_results, "search_depth": search_depth, "time_range": time_range, }, "status": "success", "analytics": search_results.get("analytics", {}), } else: # ๐Ÿšจ Handle search failure error_msg = search_results.get("error", "Unknown search error") self._log_safe(f" Tavily search failed: {error_msg}", "warning") return { "results": [], "total_results": 0, "search_params": { "query": query, "max_results": max_results, "search_depth": search_depth, "time_range": time_range, }, "status": "failed", "error": error_msg, } except Exception as e: self._log_safe(f" Tavily API call failed: {str(e)}", "error") return { "results": [], "total_results": 0, "error": str(e), "status": "error", } def _refresh_knowledge_base( self, ) -> Tuple[Dict[str, Any], Dict[str, Any], List[List[str]]]: """ Refresh knowledge base information with real data from vector DB and embedding model. Returns: Tuple of (kb_stats, embedding_model_status, document_list) """ try: # Get real knowledge base statistics kb_info = self._get_real_kb_stats() # Get embedding model information embedding_info = self._get_embedding_model_info() # ๐Ÿ“Š Knowledge Base Stats kb_stats = { "total_documents": kb_info.get("total_documents", self.total_documents), "total_chunks": kb_info.get("total_chunks", self.total_chunks), "storage_size": f"{kb_info.get('total_chunks', self.total_chunks) * 0.5:.1f} MB", "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "vector_db_status": kb_info.get("vector_db_status", "Unknown"), "embedding_model": embedding_info.get("model_name", "Unknown"), "embedding_status": embedding_info.get("status", "Unknown"), "index_health": kb_info.get("index_health", "Unknown"), } # ๐Ÿค– Embedding Model Status embedding_status = { "model_name": embedding_info.get("model_name", "Unknown"), "provider": embedding_info.get("provider", "Unknown"), "status": embedding_info.get("status", "Unknown"), "api_status": embedding_info.get("api_status", "Unknown"), "dimension": embedding_info.get("dimension", "Unknown"), "performance": { "total_requests": embedding_info.get("total_requests", 0), "success_rate": embedding_info.get("success_rate", "0%"), "cache_hit_rate": embedding_info.get("cache_hit_rate", "0%"), "batch_size": embedding_info.get("batch_size", "Unknown"), "max_text_length": embedding_info.get("max_text_length", "Unknown"), "caching_enabled": embedding_info.get("caching_enabled", False), }, "last_checked": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } # Get real document list from vector DB documents = self._get_real_document_list() # If no real documents, show helpful message if not documents: documents = [ [ "๐Ÿ“ No documents yet", "Info", "0", "Upload documents to get started", ], ["๐Ÿ”— Try adding URLs", "Info", "0", "Use the 'Add URLs' tab"], [ "๐Ÿ“š Knowledge base empty", "Info", "0", "Start building your knowledge base!", ], ] return kb_stats, embedding_status, documents except Exception as e: self._log_safe(f" Error refreshing knowledge base: {e}", "error") # Fallback stats fallback_kb_stats = { "total_documents": self.total_documents, "total_chunks": self.total_chunks, "storage_size": f"{self.total_chunks * 0.5:.1f} MB", "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "error": str(e), } fallback_embedding_status = { "model_name": "Error", "provider": "Unknown", "status": "โŒ Error", "api_status": "โŒ Error", "dimension": "Unknown", "performance": {"error": str(e)}, "last_checked": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } return fallback_kb_stats, fallback_embedding_status, [] def _get_real_kb_stats(self) -> Dict[str, Any]: """Get real knowledge base statistics from the RAG system.""" try: # ๐Ÿ” Get embedding model info first embedding_model_info = self._get_embedding_model_info() if hasattr(self.rag_system, "vector_db") and self.rag_system.vector_db: # Try to get stats from vector DB vector_stats = ( self.rag_system.vector_db.get_stats() if hasattr(self.rag_system.vector_db, "get_stats") else {} ) return { "total_documents": vector_stats.get( "total_vectors", self.total_documents ), "total_chunks": vector_stats.get( "total_vectors", self.total_chunks ), "vector_db_status": "โœ… Connected" if vector_stats else "โš ๏ธ Limited", "embedding_model": embedding_model_info.get( "model_name", "Unknown" ), "embedding_model_status": embedding_model_info.get( "status", "Unknown" ), "embedding_dimension": embedding_model_info.get( "dimension", "Unknown" ), "embedding_provider": embedding_model_info.get( "provider", "Unknown" ), "index_health": ( "โœ… Healthy" if vector_stats.get("total_vectors", 0) > 0 else "โš ๏ธ Empty" ), } else: return { "total_documents": self.total_documents, "total_chunks": self.total_chunks, "vector_db_status": "โŒ Not Connected", "embedding_model": embedding_model_info.get( "model_name", "Unknown" ), "embedding_model_status": embedding_model_info.get( "status", "โŒ Not Available" ), "embedding_dimension": embedding_model_info.get( "dimension", "Unknown" ), "embedding_provider": embedding_model_info.get( "provider", "Unknown" ), "index_health": "โŒ Unavailable", } except Exception as e: self._log_safe(f"Could not get real KB stats: {e}", "warning") return {} def _get_real_document_list(self) -> List[List[str]]: """Get real document list from the RAG system.""" try: documents = [] # Try to get document metadata from vector DB if hasattr(self.rag_system, "vector_db") and self.rag_system.vector_db: # Get unique sources from vector DB if hasattr(self.rag_system.vector_db, "get_unique_sources"): sources = self.rag_system.vector_db.get_unique_sources() for source_info in sources: source_name = source_info.get("source", "Unknown") doc_type = self._get_document_type(source_name) chunk_count = source_info.get("chunk_count", 0) added_date = source_info.get("added_date", "Unknown") documents.append( [source_name, doc_type, str(chunk_count), added_date] ) # If vector DB doesn't have get_unique_sources, try alternative approach elif hasattr(self.rag_system.vector_db, "list_documents"): doc_list = self.rag_system.vector_db.list_documents() for doc in doc_list: documents.append( [ doc.get("name", "Unknown"), self._get_document_type(doc.get("name", "")), str(doc.get("chunks", 0)), doc.get("date", "Unknown"), ] ) return documents except Exception as e: self._log_safe(f"Could not get real document list: {e}", "warning") return [] def _get_document_type(self, filename: str) -> str: """Determine document type from filename.""" if not filename: return "Unknown" filename_lower = filename.lower() if filename_lower.endswith(".pdf"): return "๐Ÿ“„ PDF" elif filename_lower.endswith((".doc", ".docx")): return "๐Ÿ“ Word" elif filename_lower.endswith((".xls", ".xlsx")): return "๐Ÿ“Š Excel" elif filename_lower.endswith((".ppt", ".pptx")): return "๐Ÿ“ˆ PowerPoint" elif filename_lower.endswith(".csv"): return "๐Ÿ“‹ CSV" elif filename_lower.endswith((".txt", ".md")): return "๐Ÿ“„ Text" elif "http" in filename_lower: return "๐ŸŒ Web" else: return "๐Ÿ“„ Document" def _get_embedding_model_info(self) -> Dict[str, Any]: """ ๐Ÿค– Get comprehensive embedding model information. Returns: Dictionary with embedding model details """ try: model_info = { "model_name": "Unknown", "status": "โŒ Not Available", "dimension": "Unknown", "provider": "Unknown", "api_status": "โŒ Not Connected", } # Check if embedding generator exists and is properly initialized if ( hasattr(self.rag_system, "embedding_generator") and self.rag_system.embedding_generator ): embedding_gen = self.rag_system.embedding_generator # Get model name - check multiple possible attributes model_name = ( getattr(embedding_gen, "model", None) or getattr(embedding_gen, "model_name", None) or "gemini-embedding-exp-03-07" ) # Default Gemini model # Get API client status api_connected = ( hasattr(embedding_gen, "client") and embedding_gen.client is not None ) # Get configuration details config = getattr(embedding_gen, "config", {}) model_info.update( { "model_name": model_name, "status": "โœ… Available" if api_connected else "โš ๏ธ Limited", "provider": ( "Google Gemini" if "gemini" in model_name.lower() else "Unknown" ), "api_status": ( "โœ… Connected" if api_connected else "โŒ Not Connected" ), "dimension": config.get("dimension", "3072"), # Gemini default "batch_size": config.get("batch_size", 5), "max_text_length": config.get("max_text_length", 8192), "caching_enabled": config.get("enable_caching", True), } ) # Get statistics if available if hasattr(embedding_gen, "get_statistics"): try: stats = embedding_gen.get_statistics() model_info.update( { "total_requests": stats.get("total_requests", 0), "successful_requests": stats.get( "successful_requests", 0 ), "cache_hits": stats.get("cache_hits", 0), "cache_hit_rate": f"{stats.get('cache_hit_rate', 0):.1f}%", "success_rate": f"{stats.get('success_rate', 0):.1f}%", } ) except Exception as e: self._log_safe(f"Could not get embedding stats: {e}", "warning") # Test API connection if possible (quick test) if api_connected: try: # Quick test to verify API is working test_embedding = embedding_gen.generate_query_embedding("test") if test_embedding: model_info["api_status"] = "โœ… Connected & Working" model_info["status"] = "โœ… Fully Operational" else: model_info["api_status"] = "โš ๏ธ Connected but Limited" except Exception as e: model_info["api_status"] = f" Connection Error: {str(e)[:50]}" return model_info except Exception as e: self._log_safe(f"Error getting embedding model info: {e}", "error") return { "model_name": "Error", "status": " Error", "dimension": "Unknown", "provider": "Unknown", "api_status": f" Error: {str(e)[:50]}", "error": str(e), } def _run_health_check(self) -> Tuple[Dict[str, Any], List[List[str]], str]: """ ๐Ÿฉบ Run comprehensive real system health check. Returns: Tuple of (system status, component status, logs) """ try: import psutil import time from datetime import timedelta # ๐Ÿ“Š Real System Status start_time = time.time() # Get real system metrics memory_info = psutil.virtual_memory() cpu_percent = psutil.cpu_percent(interval=1) # Calculate uptime (approximate) boot_time = psutil.boot_time() uptime_seconds = time.time() - boot_time uptime = str(timedelta(seconds=int(uptime_seconds))) system_status = { "overall_health": "๐ŸŸข Healthy", "uptime": uptime, "memory_usage": f"{memory_info.percent:.1f}%", "memory_available": f"{memory_info.available / (1024**3):.1f} GB", "cpu_usage": f"{cpu_percent:.1f}%", "disk_usage": f"{psutil.disk_usage('/').percent:.1f}%", "last_check": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "system_load": "Normal" if cpu_percent < 80 else "High", } # ๐Ÿ” Real Component Status Check components = [] logs = [] current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") logs.append(f"[{current_time}] INFO - System health check initiated") # 1. ๐Ÿค– Embedding Generator Check embedding_info = self._get_embedding_model_info() embedding_status = embedding_info.get("status", "โŒ Unknown") embedding_details = f"{embedding_info.get('model_name', 'Unknown')} - {embedding_info.get('api_status', 'Unknown')}" components.append( ["๐Ÿค– Embedding Generator", embedding_status, embedding_details] ) if "โœ…" in embedding_status: logs.append( f"[{current_time}] INFO - Embedding generator: {embedding_details}" ) else: logs.append( f"[{current_time}] WARN - Embedding generator: {embedding_details}" ) # 2. ๐ŸŒฒ Vector Database Check vector_db_status, vector_db_details = self._check_vector_db_health() components.append( ["๐ŸŒฒ Vector Database", vector_db_status, vector_db_details] ) logs.append(f"[{current_time}] INFO - Vector database: {vector_db_details}") # 3. ๐Ÿ“„ Document Processor Check doc_processor_status, doc_processor_details = ( self._check_document_processor_health() ) components.append( ["๐Ÿ“„ Document Processor", doc_processor_status, doc_processor_details] ) logs.append( f"[{current_time}] INFO - Document processor: {doc_processor_details}" ) # 4. ๐Ÿง  Response Generator Check response_gen_status, response_gen_details = ( self._check_response_generator_health() ) components.append( [" Response Generator", response_gen_status, response_gen_details] ) logs.append( f"[{current_time}] INFO - Response generator: {response_gen_details}" ) # 5. ๐ŸŒ Web Interface Check components.append( ["๐ŸŒ Web Interface", "โœ… Healthy", "Gradio running successfully"] ) logs.append(f"[{current_time}] INFO - Web interface: Running on port 7860") # 6. ๐Ÿ” Live Search Check (if available) live_search_status, live_search_details = self._check_live_search_health() components.append( ["๐Ÿ” Live Search", live_search_status, live_search_details] ) logs.append(f"[{current_time}] INFO - Live search: {live_search_details}") # Calculate overall health healthy_components = sum(1 for comp in components if "โœ…" in comp[1]) total_components = len(components) health_percentage = (healthy_components / total_components) * 100 if health_percentage >= 80: system_status["overall_health"] = "๐ŸŸข Healthy" logs.append( f"[{current_time}] INFO - Overall system health: {health_percentage:.0f}% ({healthy_components}/{total_components} components healthy)" ) elif health_percentage >= 60: system_status["overall_health"] = "๐ŸŸก Degraded" logs.append( f"[{current_time}] WARN - System degraded: {health_percentage:.0f}% ({healthy_components}/{total_components} components healthy)" ) else: system_status["overall_health"] = "๐Ÿ”ด Unhealthy" logs.append( f"[{current_time}] ERROR - System unhealthy: {health_percentage:.0f}% ({healthy_components}/{total_components} components healthy)" ) # Add performance metrics health_check_time = time.time() - start_time system_status["health_check_duration"] = f"{health_check_time:.2f}s" logs.append( f"[{current_time}] INFO - Health check completed in {health_check_time:.2f}s" ) return system_status, components, "\n".join(logs) except Exception as e: self._log_safe(f"โŒ Error running health check: {e}", "error") error_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return ( { "overall_health": "๐Ÿ”ด Error", "error": str(e), "last_check": error_time, }, [["System", "โŒ Error", f"Health check failed: {str(e)}"]], f"[{error_time}] ERROR - Health check failed: {str(e)}", ) def _check_vector_db_health(self) -> Tuple[str, str]: """๐ŸŒฒ Check Vector Database health status.""" try: if hasattr(self.rag_system, "vector_db") and self.rag_system.vector_db: vector_db = self.rag_system.vector_db # Try to get health check from vector DB if hasattr(vector_db, "health_check"): health_result = vector_db.health_check() if health_result.get("status") == "healthy": return ( "โœ… Healthy", f"Pinecone connected - {health_result.get('checks', {}).get('index_stats', 'OK')}", ) else: return ( "โš ๏ธ Degraded", f"Issues detected: {health_result.get('error', 'Unknown')}", ) # Fallback: check if we can get stats elif hasattr(vector_db, "get_stats"): stats = vector_db.get_stats() if stats.get("status") == "connected": total_vectors = stats.get("total_vectors", 0) return ( "โœ… Healthy", f"Pinecone connected - {total_vectors} vectors stored", ) else: return ( "โŒ Error", f"Connection failed: {stats.get('error', 'Unknown')}", ) else: return ( "โš ๏ธ Limited", "Vector DB available but health check not implemented", ) else: return "โŒ Not Available", "Vector database not initialized" except Exception as e: return "โŒ Error", f"Health check failed: {str(e)[:50]}" def _check_document_processor_health(self) -> Tuple[str, str]: """๐Ÿ“„ Check Document Processor health status.""" try: if ( hasattr(self.rag_system, "document_processor") and self.rag_system.document_processor ): # Check if document processor has required dependencies try: # Test basic functionality processor = self.rag_system.document_processor # Check if it has the required methods if hasattr(processor, "process_document"): supported_formats = [ "PDF", "DOCX", "CSV", "XLSX", "PPTX", "TXT", "MD", ] return ( "โœ… Healthy", f"All formats supported: {', '.join(supported_formats)}", ) else: return "โš ๏ธ Limited", "Basic functionality available" except ImportError as e: return ( "โŒ Dependencies Missing", f"Missing libraries: {str(e)[:30]}", ) else: return "โŒ Not Available", "Document processor not initialized" except Exception as e: return "โŒ Error", f"Health check failed: {str(e)[:50]}" def _check_response_generator_health(self) -> Tuple[str, str]: """๐Ÿง  Check Response Generator health status.""" try: if ( hasattr(self.rag_system, "response_generator") and self.rag_system.response_generator ): response_gen = self.rag_system.response_generator # Check if it has required configuration config = getattr(response_gen, "config", {}) # Check API keys availability gemini_key = config.get("gemini_api_key") or os.getenv("GEMINI_API_KEY") openai_key = config.get("openai_api_key") or os.getenv("OPENAI_API_KEY") if gemini_key: return "โœ… Healthy", "Gemini LLM available for response generation" elif openai_key: return "โœ… Healthy", "OpenAI LLM available for response generation" else: return "โš ๏ธ Limited", "No LLM API keys configured" else: return "โŒ Not Available", "Response generator not initialized" except Exception as e: return "โŒ Error", f"Health check failed: {str(e)[:50]}" def _check_live_search_health(self) -> Tuple[str, str]: """๐Ÿ” Check Live Search health status.""" try: # Check if Tavily API key is available tavily_key = os.getenv("TAVILY_API_KEY") if tavily_key: # Check if live search components are available if ( hasattr(self.rag_system, "live_search_processor") and self.rag_system.live_search_processor ): return "โœ… Healthy", "Tavily API configured - Live search available" elif ( hasattr(self.rag_system, "query_router") and self.rag_system.query_router ): return "โœ… Healthy", "Query router available - Live search enabled" else: return ( "โš ๏ธ Limited", "Tavily API key available but components not initialized", ) else: return ( "โš ๏ธ Optional", "Tavily API key not configured - Live search disabled", ) except Exception as e: return "โŒ Error", f"Health check failed: {str(e)[:50]}" def _get_stats_string(self) -> str: """Get formatted stats string.""" return f"Documents: {self.total_documents} | Chunks: {self.total_chunks} | Queries: {self.query_count}" def launch(self, **kwargs): """ Launch the Gradio interface. Args: **kwargs: Additional arguments for gr.Interface.launch() """ if not self.interface: self._log_safe(" Interface not created", "error") return # Merge default config with provided kwargs launch_config = { "share": self.share, "server_name": "0.0.0.0", "server_port": 7860, "show_error": True, "quiet": False, } launch_config.update(kwargs) self._log_safe(f"Launching Gradio interface with config: {launch_config}") self.interface.launch(**launch_config)