Spaces:
Sleeping
Sleeping
""" | |
Gradio UI Module | |
This module provides an intuitive interface for document upload, | |
URL input, and querying using Gradio. | |
Technology: Gradio | |
""" | |
import logging | |
import os | |
import sys | |
import tempfile | |
import json | |
import time | |
from typing import Dict, List, Any, Optional, Tuple | |
from datetime import datetime | |
from pathlib import Path | |
try: | |
import gradio as gr | |
except ImportError: | |
logging.warning("Gradio not available.") | |
class GradioApp: | |
""" | |
Provides a comprehensive Gradio-based user interface for the RAG system. | |
Features: | |
- Document upload with progress tracking | |
- URL processing with status updates | |
- Interactive Q&A interface with source display | |
- Knowledge base management | |
- System status and health monitoring | |
- Analytics dashboard | |
""" | |
def __init__(self, rag_system, config: Optional[Dict[str, Any]] = None): | |
""" | |
Initialize the GradioApp with the RAG system. | |
Args: | |
rag_system: Instance of the complete RAG system | |
config: Configuration dictionary with UI parameters | |
""" | |
self.rag_system = rag_system | |
self.config = config or {} | |
self.logger = self._setup_unicode_logger() | |
# 🔧 Initialize settings manager | |
from utils.settings_manager import SettingsManager | |
config_manager = getattr(rag_system, "config_manager", None) | |
self.settings_manager = SettingsManager(config_manager) | |
# UI Configuration | |
self.title = self.config.get("title", "AI Embedded Knowledge Agent") | |
self.description = self.config.get( | |
"description", | |
"Upload documents or provide URLs to build your knowledge base, then ask questions!", | |
) | |
self.theme = self.config.get("theme", "default") | |
self.share = self.config.get("share", False) | |
# Features configuration | |
self.features = self.config.get("features", {}) | |
self.enable_file_upload = self.features.get("file_upload", True) | |
self.enable_url_input = self.features.get("url_input", True) | |
self.enable_query_interface = self.features.get("query_interface", True) | |
self.enable_source_display = self.features.get("source_display", True) | |
self.enable_confidence_display = self.features.get("confidence_display", True) | |
# State management | |
self.processing_status = "Ready" | |
self.total_documents = 0 | |
self.total_chunks = 0 | |
self.query_count = 0 | |
# Initialize interface | |
self.interface = None | |
self._create_interface() | |
self._log_safe("GradioApp initialized with advanced features") | |
def _setup_unicode_logger(self): | |
"""🔧 Setup Unicode-safe logger for cross-platform compatibility.""" | |
logger = logging.getLogger(__name__) | |
# ✅ Configure handler with UTF-8 encoding for Windows compatibility | |
if not logger.handlers: | |
handler = logging.StreamHandler(sys.stdout) | |
# 🌍 Force UTF-8 encoding on Windows to handle emojis | |
if sys.platform.startswith("win"): | |
try: | |
# ⚡ Try to reconfigure stdout with UTF-8 encoding | |
handler.stream = open( | |
sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1 | |
) | |
except Exception: | |
# 🔄 Fallback to default if reconfiguration fails | |
pass | |
formatter = logging.Formatter( | |
"%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
) | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
logger.setLevel(logging.INFO) | |
return logger | |
def _log_safe(self, message: str, level: str = "info"): | |
"""🛡️ Unicode-safe logging that handles emojis on Windows.""" | |
try: | |
# ✅ Pre-process message to be safe for Windows cp1252 encoding | |
safe_message = self._make_message_safe(message) | |
getattr(self.logger, level)(safe_message) | |
except UnicodeEncodeError: | |
# 🔄 Additional fallback: Remove all non-ASCII characters | |
ascii_message = message.encode("ascii", "ignore").decode("ascii") | |
getattr(self.logger, level)(f"[ENCODING_SAFE] {ascii_message}") | |
except Exception as e: | |
# 🚨 Last resort: Basic logging without special characters | |
basic_message = ( | |
str(message).replace("🌐", "[LIVE]").replace("📚", "[LOCAL]") | |
) | |
try: | |
getattr(self.logger, level)(f"[SAFE] {basic_message}") | |
except: | |
print(f"[FALLBACK] {basic_message}") # Direct print as last resort | |
def _make_message_safe(self, message: str) -> str: | |
"""🔄 Convert emoji characters to safe text equivalents.""" | |
emoji_map = { | |
"🔍": "[SEARCH]", | |
"✅": "[SUCCESS]", | |
"❌": "[ERROR]", | |
"🚀": "[ROCKET]", | |
"📄": "[DOC]", | |
"🔗": "[LINK]", | |
"⚡": "[FAST]", | |
"🎯": "[TARGET]", | |
"🟢": "[GREEN]", | |
"🟡": "[YELLOW]", | |
"🔴": "[RED]", | |
"📊": "[CHART]", | |
"🕷️": "[SPIDER]", | |
"💡": "[IDEA]", | |
"🔄": "[REFRESH]", | |
"📚": "[BOOKS]", | |
"🩺": "[HEALTH]", | |
"📈": "[ANALYTICS]", | |
"🌐": "[LIVE]", | |
"🌍": "[WORLD]", | |
"🔧": "[TOOL]", | |
"🛡️": "[SHIELD]", | |
"🎨": "[DESIGN]", | |
"📝": "[NOTE]", | |
"🗑️": "[DELETE]", | |
"💾": "[SAVE]", | |
"📁": "[FOLDER]", | |
"🔔": "[BELL]", | |
"⚙️": "[SETTINGS]", | |
"🧪": "[TEST]", | |
"📤": "[EXPORT]", | |
"🔌": "[PORT]", | |
"🌲": "[TREE]", | |
"🔥": "[FIRE]", | |
"🔑": "[KEY]", | |
"🛠️": "[WRENCH]", | |
"💻": "[COMPUTER]", | |
"🏗️": "[BUILDING]", | |
"❓": "[QUESTION]", | |
"🪲": "[BUG]", | |
"🪃": "[BOOMERANG]", | |
"📘": "[BOOK]", | |
"🧹": "[BROOM]", | |
"🔬": "[MICROSCOPE]", | |
"🤖": "[ROBOT]", # Added for Auto mode | |
"🔄": "[HYBRID]", # Added for Hybrid mode | |
} | |
safe_message = message | |
for emoji, replacement in emoji_map.items(): | |
safe_message = safe_message.replace(emoji, replacement) | |
return safe_message | |
def _create_interface(self): | |
"""🎨 Create the modern full-width Gradio interface.""" | |
# 🌟 Use modern theme with custom CSS | |
theme = gr.themes.Soft( | |
primary_hue="blue", | |
secondary_hue="purple", | |
neutral_hue="slate", | |
font=gr.themes.GoogleFont("Inter"), | |
font_mono=gr.themes.GoogleFont("JetBrains Mono"), | |
).set( | |
body_background_fill="*neutral_50", | |
body_text_color="*neutral_800", | |
button_primary_background_fill="linear-gradient(135deg, #667eea 0%, #764ba2 100%)", | |
button_primary_background_fill_hover="linear-gradient(135deg, #5a67d8 0%, #6b46c1 100%)", | |
button_primary_text_color="white", | |
input_background_fill="*neutral_50", | |
block_background_fill="white", | |
block_border_width="1px", | |
block_border_color="*neutral_200", | |
block_radius="12px", | |
container_radius="20px", | |
) | |
with gr.Blocks( | |
title=self.title, | |
theme=theme, | |
css=self._get_custom_css(), | |
head=""" | |
<meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
<link rel="preconnect" href="https://fonts.googleapis.com"> | |
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin> | |
<link href="https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap" rel="stylesheet"> | |
""", | |
) as interface: | |
# 🎯 Modern Header with Gradient Background | |
with gr.Row(elem_classes="app-header"): | |
with gr.Column(): | |
gr.HTML( | |
f""" | |
<div class="app-title">🚀 {self.title}</div> | |
<div class="app-description">{self.description}</div> | |
""" | |
) | |
# 📊 Enhanced Status Bar with Modern Design | |
with gr.Row(elem_classes="status-bar"): | |
with gr.Column(): | |
status_display = gr.HTML( | |
value=""" | |
<div class="status-item"> | |
<span class="status-icon">🟢</span> | |
<span><strong>System Status:</strong> Ready</span> | |
</div> | |
""", | |
elem_classes="status-display", | |
) | |
with gr.Column(): | |
stats_display = gr.HTML( | |
value=""" | |
<div class="status-item"> | |
<span class="status-icon">📊</span> | |
<span><strong>Stats:</strong> Documents: 0 | Chunks: 0 | Queries: 0</span> | |
</div> | |
""", | |
elem_classes="stats-display", | |
) | |
# Store interface components for updates early | |
self.status_display = status_display | |
self.stats_display = stats_display | |
# 🎨 Modern Interface Tabs with Enhanced Styling | |
with gr.Tabs(elem_classes="tab-nav") as tabs: | |
# 📄 Document Upload Tab | |
if self.enable_file_upload: | |
with gr.TabItem( | |
"📄 Upload Documents", id="upload_tab", elem_classes="tab-item" | |
): | |
with gr.Column(elem_classes="feature-card fade-in"): | |
upload_components = self._create_upload_tab() | |
# 🔗 URL Processing Tab | |
if self.enable_url_input: | |
with gr.TabItem( | |
"🔗 Add URLs", id="url_tab", elem_classes="tab-item" | |
): | |
with gr.Column(elem_classes="feature-card fade-in"): | |
url_components = self._create_url_tab() | |
# ❓ Query Interface Tab (Primary Tab) | |
if self.enable_query_interface: | |
with gr.TabItem( | |
"❓ Ask Questions", id="query_tab", elem_classes="tab-item" | |
): | |
with gr.Column(elem_classes="feature-card fade-in"): | |
query_components = self._create_query_tab() | |
# 📚 Knowledge Base Management Tab | |
with gr.TabItem( | |
"📚 Knowledge Base", id="kb_tab", elem_classes="tab-item" | |
): | |
with gr.Column(elem_classes="feature-card fade-in"): | |
kb_components = self._create_knowledge_base_tab() | |
# 📈 Analytics Dashboard Tab | |
with gr.TabItem( | |
"📈 Analytics", id="analytics_tab", elem_classes="tab-item" | |
): | |
with gr.Column(elem_classes="feature-card fade-in"): | |
analytics_components = self._create_analytics_tab() | |
# 🩺 System Health Tab | |
with gr.TabItem( | |
"🩺 System Health", id="health_tab", elem_classes="tab-item" | |
): | |
with gr.Column(elem_classes="feature-card fade-in"): | |
health_components = self._create_health_tab() | |
# ⚙️ Settings Tab | |
with gr.TabItem( | |
"⚙️ Settings", id="settings_tab", elem_classes="tab-item" | |
): | |
with gr.Column(elem_classes="feature-card fade-in"): | |
settings_components = self._create_settings_tab() | |
self.interface = interface | |
def _create_upload_tab(self): | |
"""🎨 Create the modern document upload tab with full-width design.""" | |
# 📊 Upload Statistics Cards | |
with gr.Row(elem_classes="analytics-grid"): | |
with gr.Column(elem_classes="stat-card accent-blue"): | |
gr.HTML( | |
""" | |
<div class="stat-value">7+</div> | |
<div class="stat-label">Supported Formats</div> | |
""" | |
) | |
with gr.Column(elem_classes="stat-card accent-green"): | |
gr.HTML( | |
""" | |
<div class="stat-value">∞</div> | |
<div class="stat-label">File Size Limit</div> | |
""" | |
) | |
with gr.Column(elem_classes="stat-card accent-purple"): | |
gr.HTML( | |
""" | |
<div class="stat-value">⚡</div> | |
<div class="stat-label">Fast Processing</div> | |
""" | |
) | |
# 🎯 Main Upload Interface | |
with gr.Row(elem_classes="grid-2"): | |
with gr.Column(elem_classes="metric-card"): | |
gr.HTML( | |
""" | |
<h3 style="margin-top: 0; color: #667eea; font-weight: 600;"> | |
📄 Upload Documents | |
</h3> | |
<p style="color: #718096; margin-bottom: 1.5rem;"> | |
Drag & drop files or click to browse. Multiple files supported. | |
</p> | |
""" | |
) | |
# 📋 Supported Formats Display | |
gr.HTML( | |
""" | |
<div style="background: linear-gradient(135deg, #1c1c32 0%, #1c1c32 100%); | |
color: white; padding: 1rem; border-radius: 12px; margin-bottom: 1.5rem;"> | |
<strong>✅ Supported Formats:</strong><br> | |
📄 PDF • 📝 DOCX • 📊 CSV • 📈 XLSX • 🎯 PPTX • 📄 TXT • 📝 MD | |
</div> | |
""" | |
) | |
file_upload = gr.File( | |
label="📁 Select Files", | |
file_count="multiple", | |
file_types=[ | |
".pdf", | |
".docx", | |
".csv", | |
".xlsx", | |
".pptx", | |
".txt", | |
".md", | |
], | |
height=250, | |
elem_classes="input-field", | |
) | |
# 🎨 Action Buttons with Modern Styling | |
with gr.Row(): | |
upload_btn = gr.Button( | |
"🚀 Process Documents", | |
variant="primary", | |
size="lg", | |
elem_classes="btn-primary", | |
) | |
clear_upload_btn = gr.Button( | |
"🗑️ Clear", variant="secondary", elem_classes="btn-secondary" | |
) | |
with gr.Column(elem_classes="metric-card"): | |
gr.HTML( | |
""" | |
<h3 style="margin-top: 0; color: #1a1a2e; font-weight: 600;"> | |
📊 Processing Results | |
</h3> | |
<p style="color: #718096; margin-bottom: 1.5rem;"> | |
Real-time processing status and detailed results will appear here. | |
</p> | |
""" | |
) | |
upload_output = gr.Textbox( | |
label="📋 Processing Log", | |
lines=18, | |
interactive=False, | |
placeholder="🔄 Upload results will appear here...\n\n💡 Tips:\n• Multiple files can be processed simultaneously\n• Processing time depends on file size and complexity\n• Check the status bar for real-time updates", | |
elem_classes="input-field", | |
) | |
# 📈 Processing Tips | |
with gr.Accordion("💡 Processing Tips & Best Practices", open=False): | |
gr.HTML( | |
""" | |
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 1rem;"> | |
<div class="metric-card accent-blue"> | |
<h4>📄 File Preparation</h4> | |
<ul> | |
<li>Ensure text is readable and not scanned images</li> | |
<li>Remove password protection from PDFs</li> | |
<li>Use descriptive filenames</li> | |
</ul> | |
</div> | |
<div class="metric-card accent-green"> | |
<h4>⚡ Performance Tips</h4> | |
<ul> | |
<li>Smaller files process faster</li> | |
<li>Batch upload related documents</li> | |
<li>Monitor system resources</li> | |
</ul> | |
</div> | |
<div class="metric-card accent-purple"> | |
<h4>🎯 Quality Guidelines</h4> | |
<ul> | |
<li>High-quality text improves search accuracy</li> | |
<li>Structured documents work better</li> | |
<li>Remove unnecessary formatting</li> | |
</ul> | |
</div> | |
</div> | |
""" | |
) | |
# Event handlers | |
upload_btn.click( | |
fn=self._process_documents, | |
inputs=[file_upload], | |
outputs=[upload_output, self.status_display, self.stats_display], | |
) | |
clear_upload_btn.click( | |
fn=lambda: ("", "Ready "), outputs=[upload_output, self.status_display] | |
) | |
return { | |
"file_upload": file_upload, | |
"upload_btn": upload_btn, | |
"upload_output": upload_output, | |
} | |
def _create_url_tab(self): | |
"""Create the URL processing tab.""" | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### Add URLs") | |
gr.Markdown("Enter URLs to extract content from web pages") | |
url_input = gr.Textbox( | |
label="URLs (one per line)", | |
lines=8, | |
placeholder="https://example.com\nhttps://another-site.com\n...", | |
) | |
with gr.Accordion("⚙️ Advanced Crawling Options", open=False): | |
gr.Markdown("🕷️ **Crawl Configuration**") | |
max_depth = gr.Slider( | |
label="🔍 Crawl Depth (How deep to follow links)", | |
minimum=1, | |
maximum=5, | |
value=1, | |
step=1, | |
info="Higher depth = more pages but slower processing", | |
) | |
follow_links = gr.Checkbox( | |
label="🔗 Follow Internal Links", | |
value=True, | |
info="Automatically discover and process linked pages", | |
) | |
gr.Markdown("⚡ **Performance Tips:**") | |
gr.Markdown("• Depth 1: Single page only") | |
gr.Markdown("• Depth 2-3: Good for small sites") | |
gr.Markdown("• Depth 4-5: Use carefully, can be slow") | |
with gr.Row(): | |
url_btn = gr.Button("🚀 Process URLs", variant="primary", size="lg") | |
clear_url_btn = gr.Button("🗑️ Clear", variant="secondary") | |
# Progress indicator | |
with gr.Row(): | |
progress_info = gr.Textbox( | |
label="🔄 Processing Status", | |
value="Ready to process URLs...", | |
interactive=False, | |
visible=True, | |
) | |
with gr.Column(scale=1): | |
gr.Markdown("### Processing Results") | |
url_output = gr.Textbox( | |
label="Results", | |
lines=15, | |
interactive=False, | |
placeholder="URL processing results will appear here...", | |
) | |
# Event handlers | |
url_btn.click( | |
fn=self._process_urls, | |
inputs=[url_input, max_depth, follow_links], | |
outputs=[ | |
url_output, | |
self.status_display, | |
self.stats_display, | |
progress_info, | |
], | |
) | |
clear_url_btn.click( | |
fn=lambda: ("", "Ready 🟢", "Ready to process URLs..."), | |
outputs=[url_output, self.status_display, progress_info], | |
) | |
return { | |
"url_input": url_input, | |
"url_btn": url_btn, | |
"url_output": url_output, | |
} | |
def _create_query_tab(self): | |
"""🎨 Create the modern query interface tab with enhanced UX.""" | |
# 🎯 Quick Action Cards | |
with gr.Row(elem_classes="analytics-grid"): | |
with gr.Column(elem_classes="stat-card accent-blue"): | |
gr.HTML( | |
""" | |
<div class="stat-value">🤖</div> | |
<div class="stat-label">AI-Powered Search</div> | |
""" | |
) | |
with gr.Column(elem_classes="stat-card accent-green"): | |
gr.HTML( | |
""" | |
<div class="stat-value">🌐</div> | |
<div class="stat-label">Live Web Search</div> | |
""" | |
) | |
with gr.Column(elem_classes="stat-card accent-purple"): | |
gr.HTML( | |
""" | |
<div class="stat-value">📚</div> | |
<div class="stat-label">Local Knowledge</div> | |
""" | |
) | |
with gr.Column(elem_classes="stat-card accent-orange"): | |
gr.HTML( | |
""" | |
<div class="stat-value">⚡</div> | |
<div class="stat-label">Instant Results</div> | |
""" | |
) | |
# 🔍 Main Query Interface | |
with gr.Row(elem_classes="grid-2"): | |
with gr.Column(elem_classes="metric-card"): | |
gr.HTML( | |
""" | |
<h3 style="margin-top: 0; color: #667eea; font-weight: 600;"> | |
❓ Ask Your Question | |
</h3> | |
<p style="color: #718096; margin-bottom: 1.5rem;"> | |
Ask anything about your documents or get real-time information from the web. | |
</p> | |
""" | |
) | |
# 🎯 Enhanced Search Input | |
with gr.Column(elem_classes="search-container"): | |
query_input = gr.Textbox( | |
label="🔍 Your Question", | |
lines=4, | |
placeholder="💡 Try asking:\n• 'What are the main points in the uploaded document?'\n• 'Latest news about AI developments'\n• 'Summarize the key findings from my research papers'", | |
elem_classes="search-input", | |
) | |
# 🎨 Quick Query Suggestions | |
gr.HTML( | |
""" | |
<div style="margin: 1rem 0;"> | |
<strong style="color: #667eea;">💡 Quick Suggestions:</strong> | |
<div style="display: flex; flex-wrap: wrap; gap: 0.5rem; margin-top: 0.5rem;"> | |
<span style="background: #f0f9ff; color: #1e40af; padding: 0.25rem 0.75rem; border-radius: 20px; font-size: 0.875rem; cursor: pointer;" onclick="document.querySelector('textarea').value='Summarize the main points'">📄 Summarize</span> | |
<span style="background: #f0fdf4; color: #166534; padding: 0.25rem 0.75rem; border-radius: 20px; font-size: 0.875rem; cursor: pointer;" onclick="document.querySelector('textarea').value='What are the key findings?'">🔍 Key Findings</span> | |
<span style="background: #fef7ff; color: #7c2d12; padding: 0.25rem 0.75rem; border-radius: 20px; font-size: 0.875rem; cursor: pointer;" onclick="document.querySelector('textarea').value='Latest developments in this field'">🌐 Latest News</span> | |
</div> | |
</div> | |
""" | |
) | |
# ⚙️ Advanced Query Options | |
with gr.Accordion("⚙️ Advanced Query Options", open=False): | |
with gr.Row(): | |
include_sources = gr.Checkbox( | |
label="📚 Include Sources", | |
value=True, | |
info="Show source documents and references", | |
) | |
max_results = gr.Slider( | |
label="📊 Max Results", | |
minimum=1, | |
maximum=10, | |
value=5, | |
step=1, | |
info="Maximum number of results to return", | |
) | |
# 🌐 Enhanced Search Mode Selection | |
with gr.Group(): | |
gr.HTML( | |
""" | |
<h4 style="color: #667eea; margin-bottom: 1rem;">🔍 Search Mode & Options</h4> | |
""" | |
) | |
search_mode = gr.Dropdown( | |
label="🎯 Search Mode", | |
choices=[ | |
("🤖 Auto (Smart Routing)", "auto"), | |
("📚 Local Only (Stored Documents)", "local_only"), | |
("🌐 Live Only (Web Search)", "live_only"), | |
("🔄 Hybrid (Local + Live)", "hybrid"), | |
], | |
value="auto", | |
info="Choose how to search for information", | |
) | |
use_live_search = gr.Checkbox( | |
label="🔍 Enable Live Web Search", | |
value=False, | |
info="Enable web search (will use hybrid mode by default)", | |
) | |
with gr.Row(): | |
search_depth = gr.Dropdown( | |
label="🕷️ Search Depth", | |
choices=["basic", "advanced"], | |
value="basic", | |
info="Basic: faster, Advanced: more comprehensive", | |
visible=False, | |
) | |
time_range = gr.Dropdown( | |
label="⏰ Time Range", | |
choices=["day", "week", "month", "year"], | |
value="month", | |
info="How recent should the web results be", | |
visible=False, | |
) | |
# 💡 Dynamic options visibility | |
use_live_search.change( | |
fn=lambda enabled: ( | |
gr.update(visible=enabled), | |
gr.update(visible=enabled), | |
gr.update(value="hybrid" if enabled else "auto"), | |
), | |
inputs=[use_live_search], | |
outputs=[search_depth, time_range, search_mode], | |
) | |
# 📝 Search Mode Guide | |
with gr.Accordion("ℹ️ Search Mode Guide", open=False): | |
gr.HTML( | |
""" | |
<div style="display: grid; gap: 1rem;"> | |
<div class="metric-card accent-blue"> | |
<h4>🤖 Auto Mode</h4> | |
<p>Intelligently chooses the best search method based on your query</p> | |
<ul> | |
<li>Time-sensitive queries → Live search</li> | |
<li>Conceptual questions → Local documents</li> | |
<li>Factual queries → Hybrid approach</li> | |
</ul> | |
</div> | |
<div class="metric-card accent-green"> | |
<h4>📚 Local Only</h4> | |
<p>Search only in your uploaded documents</p> | |
<ul> | |
<li>Fastest response time</li> | |
<li>Uses your knowledge base</li> | |
<li>No internet required</li> | |
</ul> | |
</div> | |
<div class="metric-card accent-purple"> | |
<h4>🌐 Live Only</h4> | |
<p>Search only the web for real-time information</p> | |
<ul> | |
<li>Latest information</li> | |
<li>Current events and news</li> | |
<li>Requires Tavily API key</li> | |
</ul> | |
</div> | |
<div class="metric-card accent-orange"> | |
<h4>🔄 Hybrid</h4> | |
<p>Combines both local documents and live web search</p> | |
<ul> | |
<li>Best of both worlds</li> | |
<li>Comprehensive results</li> | |
<li>Balanced approach (recommended)</li> | |
</ul> | |
</div> | |
</div> | |
""" | |
) | |
# 🚀 Action Buttons | |
with gr.Row(): | |
query_btn = gr.Button( | |
"🚀 Get Answer", | |
variant="primary", | |
size="lg", | |
elem_classes="btn-primary", | |
) | |
clear_query_btn = gr.Button( | |
"🗑️ Clear", variant="secondary", elem_classes="btn-secondary" | |
) | |
with gr.Column(elem_classes="metric-card"): | |
gr.HTML( | |
""" | |
<h3 style="margin-top: 0; color: #667eea; font-weight: 600;"> | |
💬 AI Response | |
</h3> | |
<p style="color: #718096; margin-bottom: .5rem;"> | |
Intelligent answers with source citations and confidence scoring. | |
</p> | |
""" | |
) | |
response_output = gr.Markdown( | |
label="🤖 AI Response", | |
value="🔮 **Your intelligent answer will appear here...**\n\n💡 **Tips for better results:**\n- Be specific in your questions\n- Use natural language\n- Ask follow-up questions for clarification\n- Check the confidence score and sources", | |
height=450, | |
elem_classes="input-field", | |
) | |
# 📊 Response Metadata | |
with gr.Row(): | |
confidence_display = gr.Textbox( | |
label="🎯 Confidence & Performance", | |
interactive=False, | |
visible=self.enable_confidence_display, | |
elem_classes="input-field", | |
) | |
# 📚 Sources Display | |
sources_output = gr.JSON( | |
label="📚 Sources & References", | |
visible=self.enable_source_display, | |
elem_classes="input-field", | |
) | |
# 📈 Query Performance Tips | |
with gr.Accordion("🎯 Query Optimization Tips", open=False): | |
gr.HTML( | |
""" | |
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 1rem;"> | |
<div class="metric-card accent-blue"> | |
<h4>🎯 Question Formulation</h4> | |
<ul> | |
<li>Be specific and clear</li> | |
<li>Use natural language</li> | |
<li>Include context when needed</li> | |
<li>Ask one question at a time</li> | |
</ul> | |
</div> | |
<div class="metric-card accent-green"> | |
<h4>🔍 Search Strategy</h4> | |
<ul> | |
<li>Use Auto mode for best results</li> | |
<li>Enable live search for current info</li> | |
<li>Adjust max results based on need</li> | |
<li>Check confidence scores</li> | |
</ul> | |
</div> | |
<div class="metric-card accent-purple"> | |
<h4>📚 Source Utilization</h4> | |
<ul> | |
<li>Review source citations</li> | |
<li>Cross-reference multiple sources</li> | |
<li>Verify critical information</li> | |
<li>Use sources for deeper research</li> | |
</ul> | |
</div> | |
</div> | |
""" | |
) | |
# Event handlers | |
query_btn.click( | |
fn=self._process_query, | |
inputs=[ | |
query_input, | |
include_sources, | |
max_results, | |
use_live_search, | |
search_depth, | |
time_range, | |
search_mode, | |
], | |
outputs=[ | |
response_output, | |
confidence_display, | |
sources_output, | |
self.status_display, | |
self.stats_display, | |
], | |
) | |
clear_query_btn.click( | |
fn=lambda: ("", "", {}, "Ready 🟢"), | |
outputs=[ | |
response_output, | |
confidence_display, | |
sources_output, | |
self.status_display, | |
], | |
) | |
return { | |
"query_input": query_input, | |
"query_btn": query_btn, | |
"response_output": response_output, | |
"sources_output": sources_output, | |
"use_live_search": use_live_search, | |
"search_depth": search_depth, | |
"time_range": time_range, | |
"search_mode": search_mode, | |
} | |
def _create_knowledge_base_tab(self): | |
"""Create the knowledge base management tab.""" | |
with gr.Column(): | |
gr.Markdown("### 📚 Knowledge Base Management") | |
with gr.Row(): | |
refresh_btn = gr.Button("Refresh", variant="secondary") | |
export_btn = gr.Button("📤 Export", variant="secondary") | |
clear_kb_btn = gr.Button("Clear All", variant="stop") | |
# Knowledge base stats with enhanced embedding model info | |
kb_stats = gr.JSON( | |
label="📊 Knowledge Base Statistics", | |
value={ | |
"total_documents": 0, | |
"total_chunks": 0, | |
"storage_size": "0 MB", | |
"embedding_model": "Loading...", | |
"embedding_status": "Checking...", | |
"vector_db_status": "Checking...", | |
}, | |
) | |
# 🤖 Embedding Model Status Display | |
embedding_model_status = gr.JSON( | |
label="🤖 Embedding Model Information", | |
value={ | |
"model_name": "Loading...", | |
"provider": "Checking...", | |
"status": "Initializing...", | |
"api_status": "Checking connection...", | |
"dimension": "Unknown", | |
"performance": "Gathering stats...", | |
}, | |
) | |
# Document list | |
document_list = gr.Dataframe( | |
headers=["Source", "Type", "Chunks", "Added"], | |
datatype=["str", "str", "number", "str"], | |
label="📄 Documents in Knowledge Base", | |
interactive=False, | |
) | |
# Event handlers | |
refresh_btn.click( | |
fn=self._refresh_knowledge_base, | |
outputs=[kb_stats, embedding_model_status, document_list], | |
) | |
return { | |
"kb_stats": kb_stats, | |
"embedding_model_status": embedding_model_status, | |
"document_list": document_list, | |
} | |
def _create_analytics_tab(self): | |
"""Create the analytics dashboard tab with real-time data.""" | |
with gr.Column(): | |
gr.Markdown("### 📈 Analytics Dashboard") | |
gr.Markdown("Real-time insights into your RAG system performance") | |
with gr.Row(): | |
refresh_analytics_btn = gr.Button( | |
"🔄 Refresh Analytics", variant="secondary" | |
) | |
export_analytics_btn = gr.Button( | |
"📊 Export Report", variant="secondary" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
query_analytics = gr.JSON( | |
label="🔍 Query Analytics", | |
value=self._get_initial_query_analytics(), | |
) | |
with gr.Column(): | |
system_metrics = gr.JSON( | |
label="⚡ System Metrics", | |
value=self._get_initial_system_metrics(), | |
) | |
with gr.Row(): | |
with gr.Column(): | |
performance_metrics = gr.JSON( | |
label="🚀 Performance Metrics", | |
value=self._get_initial_performance_metrics(), | |
) | |
with gr.Column(): | |
usage_stats = gr.JSON( | |
label="📊 Usage Statistics", | |
value=self._get_initial_usage_stats(), | |
) | |
# Query history with enhanced information | |
query_history = gr.Dataframe( | |
headers=[ | |
"Query", | |
"Results", | |
"Confidence", | |
"Processing Time", | |
"Timestamp", | |
], | |
datatype=["str", "number", "number", "str", "str"], | |
label="📝 Recent Query History", | |
interactive=False, | |
value=self._get_initial_query_history(), | |
) | |
# Event handlers | |
refresh_analytics_btn.click( | |
fn=self._refresh_analytics, | |
outputs=[ | |
query_analytics, | |
system_metrics, | |
performance_metrics, | |
usage_stats, | |
query_history, | |
], | |
) | |
return { | |
"query_analytics": query_analytics, | |
"system_metrics": system_metrics, | |
"performance_metrics": performance_metrics, | |
"usage_stats": usage_stats, | |
"query_history": query_history, | |
} | |
def _get_initial_query_analytics(self) -> Dict[str, Any]: | |
"""Get initial query analytics data.""" | |
return { | |
"total_queries": self.query_count, | |
"average_confidence": "N/A", | |
"most_common_topics": [], | |
"query_success_rate": "100%", | |
"cache_hit_rate": "0%", | |
"status": "📊 Ready to track queries", | |
} | |
def _get_initial_system_metrics(self) -> Dict[str, Any]: | |
"""Get initial system metrics.""" | |
# Get real embedding model info | |
embedding_info = self._get_embedding_model_info() | |
return { | |
"documents_processed": self.total_documents, | |
"chunks_stored": self.total_chunks, | |
"embedding_model": embedding_info.get("model_name", "Gemini"), | |
"embedding_status": embedding_info.get("status", "Checking..."), | |
"embedding_provider": embedding_info.get("provider", "Google"), | |
"vector_db": "Pinecone", | |
"uptime": "Just started", | |
"status": "🟢 System operational", | |
} | |
def _get_initial_performance_metrics(self) -> Dict[str, Any]: | |
"""Get initial performance metrics.""" | |
return { | |
"avg_query_time": "N/A", | |
"avg_embedding_time": "N/A", | |
"avg_retrieval_time": "N/A", | |
"memory_usage": "Normal", | |
"throughput": "N/A queries/min", | |
"status": "⚡ Performance tracking active", | |
} | |
def _get_initial_usage_stats(self) -> Dict[str, Any]: | |
"""Get initial usage statistics.""" | |
return { | |
"documents_uploaded": 0, | |
"urls_processed": 0, | |
"successful_queries": 0, | |
"failed_queries": 0, | |
"peak_usage_time": "N/A", | |
"status": "📈 Usage tracking enabled", | |
} | |
def _get_initial_query_history(self) -> List[List[str]]: | |
"""Get initial query history.""" | |
return [ | |
["No queries yet", "0", "0.0", "0.0s", "Start asking questions!"], | |
["Upload documents first", "0", "0.0", "0.0s", "Build your knowledge base"], | |
[ | |
"Try the examples above", | |
"0", | |
"0.0", | |
"0.0s", | |
"Get started with sample queries", | |
], | |
] | |
def _refresh_analytics( | |
self, | |
) -> Tuple[ | |
Dict[str, Any], Dict[str, Any], Dict[str, Any], Dict[str, Any], List[List[str]] | |
]: | |
"""Refresh all analytics data.""" | |
try: | |
# Get real analytics from query processor if available | |
query_analytics = self._get_real_query_analytics() | |
system_metrics = self._get_real_system_metrics() | |
performance_metrics = self._get_real_performance_metrics() | |
usage_stats = self._get_real_usage_stats() | |
query_history = self._get_real_query_history() | |
return ( | |
query_analytics, | |
system_metrics, | |
performance_metrics, | |
usage_stats, | |
query_history, | |
) | |
except Exception as e: | |
self._log_safe(f"❌ Error refreshing analytics: {e}", "error") | |
return ( | |
{"error": str(e)}, | |
{"error": str(e)}, | |
{"error": str(e)}, | |
{"error": str(e)}, | |
[["Error loading history", "0", "0.0", "0.0s", str(e)]], | |
) | |
def _get_real_query_analytics(self) -> Dict[str, Any]: | |
"""Get real query analytics from the system.""" | |
try: | |
analytics = { | |
"total_queries": self.query_count, | |
"documents_in_kb": self.total_documents, | |
"chunks_available": self.total_chunks, | |
"last_updated": datetime.now().strftime("%H:%M:%S"), | |
} | |
# Get analytics from query processor if available | |
if hasattr(self.rag_system, "query_processor") and hasattr( | |
self.rag_system.query_processor, "get_query_analytics" | |
): | |
processor_analytics = ( | |
self.rag_system.query_processor.get_query_analytics() | |
) | |
analytics.update(processor_analytics) | |
# Calculate additional metrics | |
if self.query_count > 0: | |
analytics["avg_results_per_query"] = round( | |
self.total_chunks / max(self.query_count, 1), 2 | |
) | |
analytics["system_utilization"] = ( | |
"Active" if self.query_count > 5 else "Light" | |
) | |
else: | |
analytics["avg_results_per_query"] = 0 | |
analytics["system_utilization"] = "Idle" | |
analytics["status"] = "🟢 Analytics active" | |
return analytics | |
except Exception as e: | |
return {"error": f"Analytics unavailable: {str(e)}", "status": "❌ Error"} | |
def _get_real_system_metrics(self) -> Dict[str, Any]: | |
"""Get real system metrics with embedding model info.""" | |
try: | |
# Get embedding model information | |
embedding_info = self._get_embedding_model_info() | |
metrics = { | |
"documents_processed": self.total_documents, | |
"chunks_stored": self.total_chunks, | |
"queries_processed": self.query_count, | |
"last_updated": datetime.now().strftime("%H:%M:%S"), | |
"embedding_model": embedding_info.get("model_name", "Unknown"), | |
"embedding_status": embedding_info.get("status", "Unknown"), | |
"embedding_provider": embedding_info.get("provider", "Unknown"), | |
"embedding_dimension": embedding_info.get("dimension", "Unknown"), | |
} | |
# Get system status | |
if hasattr(self.rag_system, "get_system_status"): | |
system_status = self.rag_system.get_system_status() | |
metrics.update( | |
{ | |
"overall_health": system_status.get( | |
"overall_status", "unknown" | |
), | |
"components_healthy": sum( | |
system_status.get("components", {}).values() | |
), | |
"total_components": len(system_status.get("components", {})), | |
} | |
) | |
# Add component status with embedding model details | |
components = [] | |
if hasattr(self.rag_system, "embedding_generator"): | |
components.append( | |
f"Embedding Generator ({embedding_info.get('model_name', 'Unknown')})" | |
) | |
if hasattr(self.rag_system, "vector_db"): | |
components.append("Vector Database") | |
if hasattr(self.rag_system, "query_processor"): | |
components.append("Query Processor") | |
metrics["active_components"] = components | |
metrics["status"] = "🟢 System healthy" | |
return metrics | |
except Exception as e: | |
return { | |
"error": f"System metrics unavailable: {str(e)}", | |
"status": "❌ Error", | |
} | |
def _get_real_performance_metrics(self) -> Dict[str, Any]: | |
"""Get real performance metrics.""" | |
try: | |
# Basic performance tracking | |
metrics = { | |
"total_processing_time": "N/A", | |
"avg_query_response": "N/A", | |
"system_load": "Normal", | |
"last_updated": datetime.now().strftime("%H:%M:%S"), | |
} | |
# If we have query history, calculate averages | |
if hasattr(self.rag_system, "query_processor") and hasattr( | |
self.rag_system.query_processor, "query_history" | |
): | |
history = self.rag_system.query_processor.query_history | |
if history: | |
# Calculate average processing time if available | |
processing_times = [ | |
q.get("processing_time", 0) | |
for q in history | |
if "processing_time" in q | |
] | |
if processing_times: | |
avg_time = sum(processing_times) / len(processing_times) | |
metrics["avg_query_response"] = f"{avg_time:.2f}s" | |
metrics["queries_per_minute"] = ( | |
f"{self.query_count / max(1, 1):.1f}" # Rough estimate | |
) | |
metrics["throughput"] = "Good" if self.query_count > 0 else "Idle" | |
metrics["status"] = "⚡ Performance tracking active" | |
return metrics | |
except Exception as e: | |
return { | |
"error": f"Performance metrics unavailable: {str(e)}", | |
"status": "❌ Error", | |
} | |
def _get_real_usage_stats(self) -> Dict[str, Any]: | |
"""Get real usage statistics.""" | |
try: | |
stats = { | |
"documents_uploaded": self.total_documents, | |
"urls_processed": 0, # Would need to track this separately | |
"successful_queries": self.query_count, # Assuming all successful for now | |
"failed_queries": 0, # Would need error tracking | |
"total_chunks_created": self.total_chunks, | |
"last_updated": datetime.now().strftime("%H:%M:%S"), | |
} | |
# Calculate usage patterns | |
if self.query_count > 0: | |
stats["most_active_feature"] = "Query Processing" | |
stats["usage_trend"] = "Growing" if self.query_count > 5 else "Starting" | |
else: | |
stats["most_active_feature"] = "Document Upload" | |
stats["usage_trend"] = "Initial Setup" | |
stats["status"] = "📊 Usage tracking active" | |
return stats | |
except Exception as e: | |
return {"error": f"Usage stats unavailable: {str(e)}", "status": "❌ Error"} | |
def _get_real_query_history(self) -> List[List[str]]: | |
"""Get real query history.""" | |
try: | |
history_data = [] | |
# Get query history from query processor if available | |
if hasattr(self.rag_system, "query_processor") and hasattr( | |
self.rag_system.query_processor, "query_history" | |
): | |
history = self.rag_system.query_processor.query_history[ | |
-10: | |
] # Last 10 queries | |
for query_item in history: | |
query_text = ( | |
query_item.get("query", "Unknown")[:50] + "..." | |
if len(query_item.get("query", "")) > 50 | |
else query_item.get("query", "Unknown") | |
) | |
result_count = query_item.get("result_count", 0) | |
confidence = "N/A" # Would need to store this | |
processing_time = ( | |
f"{query_item.get('processing_time', 0):.2f}s" | |
if "processing_time" in query_item | |
else "N/A" | |
) | |
timestamp = ( | |
query_item.get("timestamp", datetime.now()).strftime("%H:%M:%S") | |
if "timestamp" in query_item | |
else "Unknown" | |
) | |
history_data.append( | |
[ | |
query_text, | |
str(result_count), | |
confidence, | |
processing_time, | |
timestamp, | |
] | |
) | |
# If no real history, show helpful placeholder | |
if not history_data: | |
history_data = [ | |
["No queries yet", "0", "0.0", "0.0s", "Ask your first question!"], | |
[ | |
"Upload documents to get started", | |
"0", | |
"0.0", | |
"0.0s", | |
"Build your knowledge base", | |
], | |
[ | |
"Try asking about your documents", | |
"0", | |
"0.0", | |
"0.0s", | |
"Get intelligent answers", | |
], | |
] | |
return history_data | |
except Exception as e: | |
return [["Error loading history", "0", "0.0", "0.0s", str(e)]] | |
def _create_settings_tab(self): | |
"""Create the comprehensive settings management tab.""" | |
with gr.Column(): | |
gr.Markdown("### ⚙️ Environment Variables Settings") | |
gr.Markdown( | |
"Configure API keys and system settings with secure storage options" | |
) | |
# 🔄 Refresh and action buttons | |
with gr.Row(): | |
refresh_settings_btn = gr.Button("🔄 Refresh", variant="secondary") | |
load_env_btn = gr.Button("📁 Load from .env", variant="secondary") | |
clear_cache_btn = gr.Button("🗑️ Clear Cache", variant="secondary") | |
export_btn = gr.Button("📤 Export Settings", variant="secondary") | |
# 📊 Settings status display | |
settings_status = gr.Textbox( | |
label="🔔 Status", | |
value="Ready to configure settings", | |
interactive=False, | |
container=False, | |
) | |
# 🔧 Main settings interface | |
with gr.Tabs(): | |
# API Keys Tab | |
with gr.TabItem("🔑 API Keys"): | |
api_keys_components = self._create_api_keys_section() | |
# System Settings Tab | |
with gr.TabItem("🛠️ System Settings"): | |
system_settings_components = self._create_system_settings_section() | |
# Storage Options Tab | |
with gr.TabItem("💾 Storage & Export"): | |
storage_components = self._create_storage_section() | |
# 📋 Settings overview | |
with gr.Accordion("📋 Current Settings Overview", open=False): | |
settings_overview = gr.JSON( | |
label="Environment Variables Status", value={} | |
) | |
# Event handlers for main buttons | |
refresh_settings_btn.click( | |
fn=self._refresh_all_settings, | |
outputs=[ | |
settings_status, | |
settings_overview, | |
*api_keys_components.values(), | |
*system_settings_components.values(), | |
], | |
) | |
load_env_btn.click( | |
fn=self._load_from_env_file, | |
outputs=[settings_status, settings_overview], | |
) | |
clear_cache_btn.click( | |
fn=self._clear_settings_cache, | |
outputs=[settings_status, settings_overview], | |
) | |
export_btn.click(fn=self._export_settings, outputs=[settings_status]) | |
return { | |
"settings_status": settings_status, | |
"settings_overview": settings_overview, | |
**api_keys_components, | |
**system_settings_components, | |
**storage_components, | |
} | |
def _create_api_keys_section(self): | |
"""Create the API keys configuration section.""" | |
components = {} | |
with gr.Column(): | |
gr.Markdown("#### 🔑 API Keys Configuration") | |
gr.Markdown( | |
"Configure your API keys for AI services. Keys are masked for security." | |
) | |
# Gemini API Key | |
with gr.Group(): | |
gr.Markdown("**🤖 Google Gemini API** (Required)") | |
with gr.Row(): | |
gemini_key = gr.Textbox( | |
label="GEMINI_API_KEY", | |
placeholder="AIzaSy...", | |
type="password", | |
info="Required for embeddings and LLM functionality", | |
) | |
gemini_test_btn = gr.Button( | |
"🧪 Test", variant="secondary", size="sm" | |
) | |
gemini_status = gr.Textbox( | |
label="Status", | |
value="Not configured", | |
interactive=False, | |
container=False, | |
) | |
with gr.Row(): | |
gemini_cache_btn = gr.Button( | |
"💾 Save to Cache", variant="primary", size="sm" | |
) | |
gemini_env_btn = gr.Button( | |
"📁 Save to .env", variant="primary", size="sm" | |
) | |
gr.Markdown( | |
"💡 [Get your Gemini API key](https://aistudio.google.com/)" | |
) | |
# Pinecone API Key | |
with gr.Group(): | |
gr.Markdown("**🌲 Pinecone API (Required)**") | |
with gr.Row(): | |
pinecone_key = gr.Textbox( | |
label="PINECONE_API_KEY", | |
placeholder="pc-...", | |
type="password", | |
info="For vector database storage", | |
) | |
pinecone_test_btn = gr.Button( | |
"🧪 Test", variant="secondary", size="sm" | |
) | |
pinecone_status = gr.Textbox( | |
label="Status", | |
value="Not configured", | |
interactive=False, | |
container=False, | |
) | |
with gr.Row(): | |
pinecone_cache_btn = gr.Button( | |
"💾 Save to Cache", variant="primary", size="sm" | |
) | |
pinecone_env_btn = gr.Button( | |
"📁 Save to .env", variant="primary", size="sm" | |
) | |
gr.Markdown("💡 [Get your Pinecone API key](https://www.pinecone.io/)") | |
# OpenAI API Key | |
with gr.Group(): | |
gr.Markdown("**🔥 OpenAI API** (Optional)") | |
with gr.Row(): | |
openai_key = gr.Textbox( | |
label="OPENAI_API_KEY", | |
placeholder="sk-...", | |
type="password", | |
info="For alternative LLM functionality", | |
) | |
openai_test_btn = gr.Button( | |
"🧪 Test", variant="secondary", size="sm" | |
) | |
openai_status = gr.Textbox( | |
label="Status", | |
value="Not configured", | |
interactive=False, | |
container=False, | |
) | |
with gr.Row(): | |
openai_cache_btn = gr.Button( | |
"💾 Save to Cache", variant="primary", size="sm" | |
) | |
openai_env_btn = gr.Button( | |
"📁 Save to .env", variant="primary", size="sm" | |
) | |
gr.Markdown( | |
"💡 [Get your OpenAI API key](https://platform.openai.com/api-keys)" | |
) | |
# Tavily API Key | |
with gr.Group(): | |
gr.Markdown("**🌐 Tavily API** (Optional - for Live Search)") | |
with gr.Row(): | |
tavily_key = gr.Textbox( | |
label="TAVILY_API_KEY", | |
placeholder="tvly-...", | |
type="password", | |
info="For real-time web search functionality", | |
) | |
tavily_test_btn = gr.Button( | |
"🧪 Test", variant="secondary", size="sm" | |
) | |
tavily_status = gr.Textbox( | |
label="Status", | |
value="Not configured", | |
interactive=False, | |
container=False, | |
) | |
with gr.Row(): | |
tavily_cache_btn = gr.Button( | |
"💾 Save to Cache", variant="primary", size="sm" | |
) | |
tavily_env_btn = gr.Button( | |
"📁 Save to .env", variant="primary", size="sm" | |
) | |
gr.Markdown( | |
"💡 [Get your Tavily API key](https://app.tavily.com/sign-in)" | |
) | |
# Store components for event handling | |
components.update( | |
{ | |
"gemini_key": gemini_key, | |
"gemini_status": gemini_status, | |
"pinecone_key": pinecone_key, | |
"pinecone_status": pinecone_status, | |
"openai_key": openai_key, | |
"openai_status": openai_status, | |
"tavily_key": tavily_key, | |
"tavily_status": tavily_status, | |
} | |
) | |
# Event handlers for API keys | |
gemini_test_btn.click( | |
fn=lambda: self._test_api_connection("GEMINI_API_KEY"), | |
outputs=[gemini_status], | |
) | |
gemini_cache_btn.click( | |
fn=lambda key: self._save_setting("GEMINI_API_KEY", key, "cache"), | |
inputs=[gemini_key], | |
outputs=[gemini_status], | |
) | |
gemini_env_btn.click( | |
fn=lambda key: self._save_setting("GEMINI_API_KEY", key, "env_file"), | |
inputs=[gemini_key], | |
outputs=[gemini_status], | |
) | |
pinecone_test_btn.click( | |
fn=lambda: self._test_api_connection("PINECONE_API_KEY"), | |
outputs=[pinecone_status], | |
) | |
pinecone_cache_btn.click( | |
fn=lambda key: self._save_setting("PINECONE_API_KEY", key, "cache"), | |
inputs=[pinecone_key], | |
outputs=[pinecone_status], | |
) | |
pinecone_env_btn.click( | |
fn=lambda key: self._save_setting("PINECONE_API_KEY", key, "env_file"), | |
inputs=[pinecone_key], | |
outputs=[pinecone_status], | |
) | |
openai_test_btn.click( | |
fn=lambda: self._test_api_connection("OPENAI_API_KEY"), | |
outputs=[openai_status], | |
) | |
openai_cache_btn.click( | |
fn=lambda key: self._save_setting("OPENAI_API_KEY", key, "cache"), | |
inputs=[openai_key], | |
outputs=[openai_status], | |
) | |
openai_env_btn.click( | |
fn=lambda key: self._save_setting("OPENAI_API_KEY", key, "env_file"), | |
inputs=[openai_key], | |
outputs=[openai_status], | |
) | |
tavily_test_btn.click( | |
fn=lambda: self._test_api_connection("TAVILY_API_KEY"), | |
outputs=[tavily_status], | |
) | |
tavily_cache_btn.click( | |
fn=lambda key: self._save_setting("TAVILY_API_KEY", key, "cache"), | |
inputs=[tavily_key], | |
outputs=[tavily_status], | |
) | |
tavily_env_btn.click( | |
fn=lambda key: self._save_setting("TAVILY_API_KEY", key, "env_file"), | |
inputs=[tavily_key], | |
outputs=[tavily_status], | |
) | |
return components | |
def _create_system_settings_section(self): | |
"""Create the system settings configuration section.""" | |
components = {} | |
with gr.Column(): | |
gr.Markdown("#### 🛠️ System Configuration") | |
gr.Markdown("Configure system-level settings and preferences") | |
# Pinecone Environment | |
with gr.Group(): | |
gr.Markdown("**🌍 Pinecone Environment**") | |
pinecone_env = gr.Dropdown( | |
label="PINECONE_ENVIRONMENT", | |
choices=[ | |
"us-east-1", | |
"us-west1-gcp", | |
"eu-west1-gcp", | |
"asia-southeast1-gcp", | |
], | |
value="us-east-1", | |
info="Pinecone server region", | |
) | |
with gr.Row(): | |
pinecone_env_cache_btn = gr.Button( | |
"💾 Save to Cache", variant="primary", size="sm" | |
) | |
pinecone_env_file_btn = gr.Button( | |
"📁 Save to .env", variant="primary", size="sm" | |
) | |
# Pinecone Index Name | |
with gr.Group(): | |
gr.Markdown("**📊 Pinecone Index Name**") | |
pinecone_index = gr.Textbox( | |
label="PINECONE_INDEX_NAME", | |
value="rag-ai-index", | |
placeholder="rag-ai-index", | |
info="Name of your Pinecone index", | |
) | |
with gr.Row(): | |
pinecone_index_cache_btn = gr.Button( | |
"💾 Save to Cache", variant="primary", size="sm" | |
) | |
pinecone_index_file_btn = gr.Button( | |
"📁 Save to .env", variant="primary", size="sm" | |
) | |
# Gradio Share | |
with gr.Group(): | |
gr.Markdown("**🌐 Gradio Public Sharing**") | |
gradio_share = gr.Dropdown( | |
label="GRADIO_SHARE", | |
choices=["false", "true"], | |
value="false", | |
info="Enable public sharing of the interface", | |
) | |
with gr.Row(): | |
gradio_share_cache_btn = gr.Button( | |
"💾 Save to Cache", variant="primary", size="sm" | |
) | |
gradio_share_file_btn = gr.Button( | |
"📁 Save to .env", variant="primary", size="sm" | |
) | |
# Port Configuration | |
with gr.Group(): | |
gr.Markdown("**🔌 Server Port**") | |
port_setting = gr.Number( | |
label="PORT", | |
value=7860, | |
minimum=1000, | |
maximum=65535, | |
info="Server port number (requires restart)", | |
) | |
with gr.Row(): | |
port_cache_btn = gr.Button( | |
"💾 Save to Cache", variant="primary", size="sm" | |
) | |
port_file_btn = gr.Button( | |
"📁 Save to .env", variant="primary", size="sm" | |
) | |
# System settings status | |
system_status = gr.Textbox( | |
label="System Settings Status", | |
value="Ready", | |
interactive=False, | |
container=False, | |
) | |
components.update( | |
{ | |
"pinecone_env": pinecone_env, | |
"pinecone_index": pinecone_index, | |
"gradio_share": gradio_share, | |
"port_setting": port_setting, | |
"system_status": system_status, | |
} | |
) | |
# Event handlers for system settings | |
pinecone_env_cache_btn.click( | |
fn=lambda val: self._save_setting("PINECONE_ENVIRONMENT", val, "cache"), | |
inputs=[pinecone_env], | |
outputs=[system_status], | |
) | |
pinecone_env_file_btn.click( | |
fn=lambda val: self._save_setting("PINECONE_ENVIRONMENT", val, "env_file"), | |
inputs=[pinecone_env], | |
outputs=[system_status], | |
) | |
pinecone_index_cache_btn.click( | |
fn=lambda val: self._save_setting("PINECONE_INDEX_NAME", val, "cache"), | |
inputs=[pinecone_index], | |
outputs=[system_status], | |
) | |
pinecone_index_file_btn.click( | |
fn=lambda val: self._save_setting("PINECONE_INDEX_NAME", val, "env_file"), | |
inputs=[pinecone_index], | |
outputs=[system_status], | |
) | |
gradio_share_cache_btn.click( | |
fn=lambda val: self._save_setting("GRADIO_SHARE", val, "cache"), | |
inputs=[gradio_share], | |
outputs=[system_status], | |
) | |
gradio_share_file_btn.click( | |
fn=lambda val: self._save_setting("GRADIO_SHARE", val, "env_file"), | |
inputs=[gradio_share], | |
outputs=[system_status], | |
) | |
port_cache_btn.click( | |
fn=lambda val: self._save_setting("PORT", str(int(val)), "cache"), | |
inputs=[port_setting], | |
outputs=[system_status], | |
) | |
port_file_btn.click( | |
fn=lambda val: self._save_setting("PORT", str(int(val)), "env_file"), | |
inputs=[port_setting], | |
outputs=[system_status], | |
) | |
return components | |
def _create_storage_section(self): | |
"""Create the storage and export section.""" | |
components = {} | |
with gr.Column(): | |
gr.Markdown("#### 💾 Storage & Export Options") | |
gr.Markdown("Manage how your settings are stored and exported") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("**💾 Cache Storage**") | |
gr.Markdown("• Temporary storage in memory") | |
gr.Markdown("• Lost when application restarts") | |
gr.Markdown("• Good for testing configurations") | |
with gr.Column(): | |
gr.Markdown("**📁 .env File Storage**") | |
gr.Markdown("• Persistent storage in .env file") | |
gr.Markdown("• Survives application restarts") | |
gr.Markdown("• Recommended for production use") | |
# Export options | |
with gr.Group(): | |
gr.Markdown("**📤 Export Settings**") | |
with gr.Row(): | |
include_sensitive = gr.Checkbox( | |
label="Include API Keys (masked)", | |
value=False, | |
info="Include API keys in export (they will be masked)", | |
) | |
export_format = gr.Dropdown( | |
label="Export Format", | |
choices=["JSON", "ENV"], | |
value="JSON", | |
info="Choose export format", | |
) | |
export_output = gr.Textbox( | |
label="Export Output", | |
lines=10, | |
interactive=False, | |
placeholder="Exported settings will appear here...", | |
) | |
export_settings_btn = gr.Button("📤 Generate Export", variant="primary") | |
# Storage status | |
storage_status = gr.Textbox( | |
label="Storage Status", | |
value="Ready", | |
interactive=False, | |
container=False, | |
) | |
components.update( | |
{ | |
"include_sensitive": include_sensitive, | |
"export_format": export_format, | |
"export_output": export_output, | |
"storage_status": storage_status, | |
} | |
) | |
# Export event handler | |
export_settings_btn.click( | |
fn=self._generate_export, | |
inputs=[include_sensitive, export_format], | |
outputs=[export_output, storage_status], | |
) | |
return components | |
def _create_health_tab(self): | |
"""Create the system health monitoring tab.""" | |
with gr.Column(): | |
gr.Markdown("### System Health") | |
with gr.Row(): | |
health_check_btn = gr.Button("Run Health Check", variant="primary") | |
restart_btn = gr.Button("Restart Services", variant="secondary") | |
# System status | |
system_status = gr.JSON( | |
label="System Status", | |
value={}, | |
) | |
# Component status | |
component_status = gr.Dataframe( | |
headers=["Component", "Status", "Details"], | |
datatype=["str", "str", "str"], | |
label="Component Status", | |
interactive=False, | |
) | |
# Logs | |
system_logs = gr.Textbox( | |
label=" System Logs", | |
lines=10, | |
interactive=False, | |
placeholder="System logs will appear here...", | |
) | |
# Event handlers | |
health_check_btn.click( | |
fn=self._run_health_check, | |
outputs=[system_status, component_status, system_logs], | |
) | |
return { | |
"system_status": system_status, | |
"component_status": component_status, | |
"system_logs": system_logs, | |
} | |
def _get_custom_css(self) -> str: | |
"""🎨 Get modern full-width custom CSS for the interface.""" | |
return """ | |
/* 🌟 Global Container - Full Width */ | |
.gradio-container { | |
max-width: 100% !important; | |
width: 100% !important; | |
margin: 0 !important; | |
padding: 0 20px !important; | |
} | |
/* 🎨 Modern Color Scheme */ | |
:root { | |
--primary-gradient: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
--secondary-gradient: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); | |
--success-gradient: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%); | |
--warning-gradient: linear-gradient(135deg, #43e97b 0%, #38f9d7 100%); | |
--dark-bg: #1a1a2e; | |
--dark-card: #16213e; | |
--light-bg: #f8fafc; | |
--light-card: #ffffff; | |
--text-primary: #2d3748; | |
--text-secondary: #718096; | |
--border-color: #e2e8f0; | |
--red: #f55b75; | |
--shadow-sm: 0 1px 3px 0 rgba(0, 0, 0, 0.1); | |
--shadow-md: 0 4px 6px -1px rgba(0, 0, 0, 0.1); | |
--shadow-lg: 0 10px 15px -3px rgba(0, 0, 0, 0.1); | |
} | |
/* 🌙 Dark Theme Support */ | |
.dark { | |
--text-primary: #f7fafc; | |
--text-secondary: #cbd5e0; | |
--border-color: #4a5568; | |
} | |
/* 📱 Full Width Layout */ | |
.main-container { | |
width: 100% !important; | |
max-width: 100% !important; | |
} | |
/* 🎯 Header Styling */ | |
.app-header { | |
background: var(--primary-gradient); | |
color: white; | |
padding: 2rem; | |
border-radius: 0 0 20px 20px; | |
margin-bottom: 2rem; | |
box-shadow: var(--shadow-lg); | |
} | |
.app-title { | |
font-size: 2.5rem; | |
font-weight: 700; | |
margin-bottom: 0.5rem; | |
text-shadow: 0 2px 4px rgba(0,0,0,0.3); | |
} | |
.app-description { | |
font-size: 1.1rem; | |
opacity: 0.9; | |
margin-bottom: 0; | |
} | |
/* 📊 Status Bar Enhancement */ | |
.status-bar { | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
color: white; | |
padding: 1rem 2rem; | |
border-radius: 15px; | |
margin-bottom: 2rem; | |
box-shadow: var(--shadow-md); | |
display: grid; | |
grid-template-columns: 1fr 1fr; | |
gap: 2rem; | |
} | |
.status-item { | |
display: flex; | |
align-items: center; | |
gap: 0.5rem; | |
} | |
.status-icon { | |
font-size: 1.2rem; | |
} | |
/* 🎨 Tab Styling */ | |
.tab-nav { | |
background: var(--dark-bg); | |
border-radius: 15px; | |
padding: 0.5rem; | |
margin-bottom: 2rem; | |
box-shadow: var(--shadow-sm); | |
border: 1px solid var(--border-color); | |
} | |
.tab-item { | |
border-radius: 10px !important; | |
padding: 1rem 1.5rem !important; | |
font-weight: 600 !important; | |
transition: all 0.3s ease !important; | |
border: none !important; | |
} | |
.tab-item.selected { | |
background: var(--primary-gradient) !important; | |
color: white !important; | |
box-shadow: var(--shadow-md); | |
} | |
/* 🎯 Card Components */ | |
.metric-card { | |
background: var(--dark-bg); | |
border: 1px solid var(--border-color); | |
border-radius: 15px; | |
padding: 1.5rem; | |
margin: 1rem 0; | |
box-shadow: var(--shadow-sm); | |
transition: all 0.3s ease; | |
} | |
.metric-card:hover { | |
# transform: translateY(-5px); | |
box-shadow: var(--shadow-lg); | |
# border-color: #667eea; | |
} | |
.feature-card { | |
background: var(--dark-bg); | |
border: 1px solid var(--border-color); | |
border-radius: 20px; | |
padding: 2rem; | |
margin: 1rem 0; | |
box-shadow: var(--shadow-md); | |
transition: all 0.3s ease; | |
position: relative; | |
overflow: hidden; | |
} | |
.feature-card:hover { | |
transform: translateY(-8px); | |
box-shadow: var(--shadow-lg); | |
} | |
/* 🎨 Button Enhancements */ | |
.btn-primary { | |
background: var(--primary-gradient) !important; | |
border: none !important; | |
border-radius: 12px !important; | |
padding: 0.75rem 2rem !important; | |
font-weight: 600 !important; | |
font-size: 1rem !important; | |
transition: all 0.3s ease !important; | |
box-shadow: var(--shadow-sm) !important; | |
} | |
.btn-primary:hover { | |
transform: translateY(-2px) !important; | |
box-shadow: var(--shadow-lg) !important; | |
} | |
.btn-secondary { | |
background: var(--red) !important; | |
border: none !important; | |
border-radius: 12px !important; | |
padding: 0.75rem 1.5rem !important; | |
font-weight: 600 !important; | |
transition: all 0.3s ease !important; | |
} | |
.btn-success { | |
background: var(--success-gradient) !important; | |
border: none !important; | |
border-radius: 12px !important; | |
padding: 0.75rem 1.5rem !important; | |
font-weight: 600 !important; | |
} | |
.btn-warning { | |
background: var(--warning-gradient) !important; | |
border: none !important; | |
border-radius: 12px !important; | |
padding: 0.75rem 1.5rem !important; | |
font-weight: 600 !important; | |
} | |
/* 📝 Input Field Styling */ | |
.input-field { | |
border: 2px solid var(--border-color) !important; | |
border-radius: 12px !important; | |
padding: 1rem !important; | |
font-size: 1rem !important; | |
transition: all 0.3s ease !important; | |
background: var(--dark-bg) !important; | |
} | |
.input-field:focus { | |
border-color: #667eea !important; | |
box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1) !important; | |
outline: none !important; | |
} | |
/* 📊 Progress Indicators */ | |
.progress-bar { | |
background: var(--primary-gradient); | |
height: 8px; | |
border-radius: 4px; | |
transition: width 0.3s ease; | |
} | |
.progress-container { | |
background: var(--border-color); | |
height: 8px; | |
border-radius: 4px; | |
overflow: hidden; | |
} | |
/* 🎯 Grid Layouts */ | |
.grid-2 { | |
display: grid; | |
grid-template-columns: 1fr 1fr; | |
gap: 2rem; | |
} | |
.grid-3 { | |
display: grid; | |
grid-template-columns: repeat(3, 1fr); | |
gap: 1.5rem; | |
} | |
.grid-4 { | |
display: grid; | |
grid-template-columns: repeat(4, 1fr); | |
gap: 1rem; | |
} | |
/* 📱 Responsive Design */ | |
@media (max-width: 1200px) { | |
.grid-4 { grid-template-columns: repeat(2, 1fr); } | |
.grid-3 { grid-template-columns: repeat(2, 1fr); } | |
} | |
@media (max-width: 768px) { | |
.gradio-container { | |
padding: 0 10px !important; | |
} | |
.grid-2, .grid-3, .grid-4 { | |
grid-template-columns: 1fr; | |
gap: 1rem; | |
} | |
.status-bar { | |
grid-template-columns: 1fr; | |
gap: 1rem; | |
padding: 1rem; | |
} | |
.app-title { | |
font-size: 2rem; | |
} | |
.feature-card { | |
padding: 1.5rem; | |
} | |
} | |
/* 🌟 Animation Classes */ | |
.fade-in { | |
animation: fadeIn 0.5s ease-in; | |
} | |
.slide-up { | |
animation: slideUp 0.6s ease-out; | |
} | |
@keyframes fadeIn { | |
from { opacity: 0; } | |
to { opacity: 1; } | |
} | |
@keyframes slideUp { | |
from { | |
opacity: 0; | |
transform: translateY(30px); | |
} | |
to { | |
opacity: 1; | |
transform: translateY(0); | |
} | |
} | |
/* 🎨 Accent Colors */ | |
.accent-blue { border-left: 4px solid #3b82f6; } | |
.accent-green { border-left: 4px solid #10b981; } | |
.accent-purple { border-left: 4px solid #8b5cf6; } | |
.accent-orange { border-left: 4px solid #f59e0b; } | |
.accent-red { border-left: 4px solid #ef4444; } | |
/* 🔍 Search Enhancement */ | |
.search-container { | |
position: relative; | |
margin-bottom: 2rem; | |
} | |
.search-input { | |
width: 100%; | |
padding: 1rem 1rem 1rem 3rem; | |
border: 2px solid var(--border-color); | |
border-radius: 25px; | |
font-size: 1.1rem; | |
transition: all 0.3s ease; | |
} | |
.search-input:focus { | |
border-color: #667eea; | |
box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1); | |
} | |
.search-icon { | |
position: absolute; | |
left: 1rem; | |
top: 50%; | |
transform: translateY(-50%); | |
color: var(--text-secondary); | |
} | |
/* 📈 Analytics Dashboard */ | |
.analytics-grid { | |
display: grid; | |
grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); | |
gap: 1.5rem; | |
margin: 2rem 0; | |
} | |
.stat-card { | |
background: var(--dark-bg); | |
border-radius: 15px; | |
padding: 1.5rem; | |
box-shadow: var(--shadow-sm); | |
border: 1px solid var(--border-color); | |
transition: all 0.3s ease; | |
} | |
.stat-card:hover { | |
transform: translateY(-3px); | |
box-shadow: var(--shadow-md); | |
} | |
.stat-value { | |
font-size: 2rem; | |
font-weight: 700; | |
color: #667eea; | |
margin-bottom: 0.5rem; | |
} | |
.stat-label { | |
color: var(--text-secondary); | |
font-size: 0.9rem; | |
text-transform: uppercase; | |
letter-spacing: 0.5px; | |
} | |
/* 🚀 Loading States */ | |
.loading { | |
position: relative; | |
overflow: hidden; | |
} | |
.loading::after { | |
content: ''; | |
position: absolute; | |
top: 0; | |
left: -100%; | |
width: 100%; | |
height: 100%; | |
background: linear-gradient(90deg, transparent, rgba(255,255,255,0.4), transparent); | |
animation: loading 1.5s infinite; | |
} | |
@keyframes loading { | |
0% { left: -100%; } | |
100% { left: 100%; } | |
} | |
/* 🎨 Custom Scrollbar */ | |
::-webkit-scrollbar { | |
width: 8px; | |
} | |
::-webkit-scrollbar-track { | |
background: var(--light-bg); | |
} | |
::-webkit-scrollbar-thumb { | |
background: var(--primary-gradient); | |
border-radius: 4px; | |
} | |
::-webkit-scrollbar-thumb:hover { | |
background: #5a67d8; | |
} | |
""" | |
# 🔧 Settings Management Methods | |
def _refresh_all_settings(self): | |
"""Refresh all settings and return updated values.""" | |
try: | |
settings = self.settings_manager.get_current_settings() | |
# Create overview for display | |
overview = {} | |
for var_name, config in settings.items(): | |
overview[var_name] = { | |
"value": config["value"] if config["is_set"] else "Not set", | |
"source": config["source"], | |
"status": ( | |
"✅ Valid" | |
if config["is_valid"] | |
else "❌ Invalid" if config["is_set"] else "⚠️ Not set" | |
), | |
"required": config["is_required"], | |
} | |
# Return status and all component updates | |
status_msg = "🔄 Settings refreshed successfully" | |
# Get current values for form fields | |
gemini_val = settings.get("GEMINI_API_KEY", {}).get("raw_value", "") | |
pinecone_val = settings.get("PINECONE_API_KEY", {}).get("raw_value", "") | |
openai_val = settings.get("OPENAI_API_KEY", {}).get("raw_value", "") | |
tavily_val = settings.get("TAVILY_API_KEY", {}).get("raw_value", "") | |
pinecone_env_val = settings.get("PINECONE_ENVIRONMENT", {}).get( | |
"raw_value", "us-east-1" | |
) | |
pinecone_index_val = settings.get("PINECONE_INDEX_NAME", {}).get( | |
"raw_value", "rag-ai-index" | |
) | |
gradio_share_val = settings.get("GRADIO_SHARE", {}).get( | |
"raw_value", "false" | |
) | |
port_val = int(settings.get("PORT", {}).get("raw_value", "7860")) | |
return ( | |
status_msg, | |
overview, | |
gemini_val, | |
settings.get("GEMINI_API_KEY", {}).get("value", "Not configured"), | |
pinecone_val, | |
settings.get("PINECONE_API_KEY", {}).get("value", "Not configured"), | |
openai_val, | |
settings.get("OPENAI_API_KEY", {}).get("value", "Not configured"), | |
tavily_val, | |
settings.get("TAVILY_API_KEY", {}).get("value", "Not configured"), | |
pinecone_env_val, | |
pinecone_index_val, | |
gradio_share_val, | |
port_val, | |
"✅ Settings loaded", | |
) | |
except Exception as e: | |
self._log_safe(f" Error refreshing settings: {e}", "error") | |
return ( | |
f" Error refreshing settings: {str(e)}", | |
{}, | |
"", | |
"Error loading", | |
"", | |
"Error loading", | |
"", | |
"Error loading", | |
"", | |
"Error loading", | |
"us-east-1", | |
"rag-ai-index", | |
"false", | |
7860, | |
"❌ Error loading", | |
) | |
def _save_setting(self, var_name: str, value: str, storage_type: str) -> str: | |
"""Save a setting with the specified storage type.""" | |
try: | |
result = self.settings_manager.update_setting(var_name, value, storage_type) | |
if result["success"]: | |
self._log_safe(f" Saved {var_name} to {storage_type}") | |
return result["status"] | |
else: | |
self._log_safe( | |
f" Failed to save {var_name}: {result.get('error', 'Unknown error')}", | |
"error", | |
) | |
return result["status"] | |
except Exception as e: | |
self._log_safe(f" Error saving {var_name}: {e}", "error") | |
return f"❌ Error: {str(e)}" | |
def _test_api_connection(self, var_name: str) -> str: | |
"""Test API connection for the specified variable with optimized performance.""" | |
try: | |
# Show testing status immediately | |
status_message = f"🔄 Testing {var_name} connection..." | |
self._log_safe(status_message) | |
# For Gemini, check if we've tested recently (use cached result) | |
if var_name == "GEMINI_API_KEY" and hasattr( | |
self.settings_manager, "_gemini_last_test_time" | |
): | |
current_time = time.time() | |
if ( | |
self.settings_manager._gemini_last_test_time | |
and current_time - self.settings_manager._gemini_last_test_time < 10 | |
): | |
self._log_safe( | |
f"✅ Using cached {var_name} test result (tested recently)" | |
) | |
return "✅ Gemini API connected (cached result)" | |
# Perform the actual test | |
result = self.settings_manager.test_connection(var_name) | |
if result["success"]: | |
self._log_safe(f"✅ {var_name} connection test successful") | |
else: | |
self._log_safe( | |
f" {var_name} connection test failed: {result.get('error', 'Unknown error')}", | |
"warning", | |
) | |
return result["status"] | |
except Exception as e: | |
self._log_safe(f" Error testing {var_name}: {e}", "error") | |
return f" Test error: {str(e)}" | |
def _load_from_env_file(self) -> Tuple[str, Dict[str, Any]]: | |
"""Load settings from .env file.""" | |
try: | |
result = self.settings_manager.load_from_env_file() | |
if result["success"]: | |
self._log_safe( | |
f" Loaded {result['loaded_count']} variables from .env file" | |
) | |
# Get updated overview | |
settings = self.settings_manager.get_current_settings() | |
overview = {} | |
for var_name, config in settings.items(): | |
overview[var_name] = { | |
"value": config["value"] if config["is_set"] else "Not set", | |
"source": config["source"], | |
"status": ( | |
"✅ Valid" | |
if config["is_valid"] | |
else "❌ Invalid" if config["is_set"] else "⚠️ Not set" | |
), | |
"required": config["is_required"], | |
} | |
return result["status"], overview | |
else: | |
self._log_safe( | |
f" Failed to load from .env: {result.get('error', 'Unknown error')}", | |
"error", | |
) | |
return result["status"], {} | |
except Exception as e: | |
self._log_safe(f" Error loading from .env file: {e}", "error") | |
return f" Error: {str(e)}", {} | |
def _clear_settings_cache(self) -> Tuple[str, Dict[str, Any]]: | |
"""Clear settings cache.""" | |
try: | |
result = self.settings_manager.clear_cache() | |
if result["success"]: | |
self._log_safe(f" Cleared {result['cleared_count']} cached variables") | |
# Get updated overview | |
settings = self.settings_manager.get_current_settings() | |
overview = {} | |
for var_name, config in settings.items(): | |
overview[var_name] = { | |
"value": config["value"] if config["is_set"] else "Not set", | |
"source": config["source"], | |
"status": ( | |
"✅ Valid" | |
if config["is_valid"] | |
else "❌ Invalid" if config["is_set"] else "⚠️ Not set" | |
), | |
"required": config["is_required"], | |
} | |
return result["status"], overview | |
else: | |
self._log_safe( | |
f" Failed to clear cache: {result.get('error', 'Unknown error')}", | |
"error", | |
) | |
return result["status"], {} | |
except Exception as e: | |
self._log_safe(f" Error clearing cache: {e}", "error") | |
return f" Error: {str(e)}", {} | |
def _export_settings(self) -> str: | |
"""Export settings (basic version for main button).""" | |
try: | |
result = self.settings_manager.export_settings(include_sensitive=False) | |
if result["success"]: | |
self._log_safe(" Settings exported successfully") | |
return " Settings exported (check Storage & Export tab for details)" | |
else: | |
self._log_safe( | |
f" Failed to export settings: {result.get('error', 'Unknown error')}", | |
"error", | |
) | |
return f" Export failed: {result.get('error', 'Unknown error')}" | |
except Exception as e: | |
self._log_safe(f" Error exporting settings: {e}", "error") | |
return f" Error: {str(e)}" | |
def _generate_export( | |
self, include_sensitive: bool, export_format: str | |
) -> Tuple[str, str]: | |
"""Generate detailed export output.""" | |
try: | |
result = self.settings_manager.export_settings( | |
include_sensitive=include_sensitive | |
) | |
if not result["success"]: | |
return ( | |
f" Export failed: {result.get('error', 'Unknown error')}", | |
" Export failed", | |
) | |
settings_data = result["settings"] | |
if export_format == "JSON": | |
import json | |
export_content = json.dumps( | |
{ | |
"export_info": { | |
"timestamp": result["export_timestamp"], | |
"include_sensitive": include_sensitive, | |
"format": "JSON", | |
}, | |
"settings": settings_data, | |
}, | |
indent=2, | |
) | |
elif export_format == "ENV": | |
export_lines = [ | |
"# Environment Variables Export", | |
f"# Generated on {result['export_timestamp']}", | |
f"# Include sensitive: {include_sensitive}", | |
"", | |
] | |
for var_name, config in settings_data.items(): | |
if config["is_set"]: | |
value = config["value"] | |
export_lines.append(f"# {config['description']}") | |
export_lines.append(f"{var_name}={value}") | |
export_lines.append("") | |
export_content = "\n".join(export_lines) | |
else: | |
return " Invalid export format", " Invalid format" | |
self._log_safe( | |
f" Generated {export_format} export with {len(settings_data)} variables" | |
) | |
return export_content, f" {export_format} export generated successfully" | |
except Exception as e: | |
self._log_safe(f" Error generating export: {e}", "error") | |
return f" Error: {str(e)}", " Export generation failed" | |
def _process_documents(self, files) -> Tuple[str, str, str]: | |
""" | |
Process uploaded documents with progress tracking. | |
Args: | |
files: List of uploaded files | |
Returns: | |
Tuple of (processing results, status, stats) | |
""" | |
if not files: | |
return "No files uploaded.", "Ready ", self._get_stats_string() | |
try: | |
self._log_safe(f"Processing {len(files)} uploaded files") | |
results = [] | |
successful = 0 | |
for i, file in enumerate(files): | |
try: | |
# Process each file | |
result = self.rag_system.process_document(file.name) | |
if result.get("status") == "success": | |
successful += 1 | |
self.total_documents += 1 | |
self.total_chunks += result.get("chunks_processed", 0) | |
results.append( | |
f"{os.path.basename(file.name)}: " | |
f"{result.get('chunks_processed', 0)} chunks processed" | |
) | |
else: | |
results.append( | |
f"❌ {os.path.basename(file.name)}: " | |
f"{result.get('error', 'Processing failed')}" | |
) | |
except Exception as e: | |
results.append(f"❌ {os.path.basename(file.name)}: {str(e)}") | |
# Summary | |
summary = ( | |
f"\nSummary: {successful}/{len(files)} files processed successfully" | |
) | |
output = "\n".join(results) + summary | |
status = ( | |
f"Processed {successful}/{len(files)} files " | |
if successful > 0 | |
else "Processing failed ❌" | |
) | |
return output, status, self._get_stats_string() | |
except Exception as e: | |
self._log_safe(f" Error processing documents: {str(e)}", "error") | |
return f" Error: {str(e)}", "Error ", self._get_stats_string() | |
def _process_urls( | |
self, urls_text: str, max_depth: int = 1, follow_links: bool = True | |
) -> Tuple[str, str, str, str]: | |
""" | |
Process URLs with advanced crawling options and progress tracking. | |
Args: | |
urls_text: Text containing URLs (one per line) | |
max_depth: Maximum crawling depth | |
follow_links: Whether to follow links | |
Returns: | |
Tuple of (processing results, status, stats, progress_info) | |
""" | |
if not urls_text.strip(): | |
return ( | |
"No URLs provided.", | |
"Ready 🟢", | |
self._get_stats_string(), | |
"Ready to process URLs...", | |
) | |
try: | |
urls = [url.strip() for url in urls_text.split("\n") if url.strip()] | |
self._log_safe( | |
f"Processing {len(urls)} URLs with depth={max_depth}, follow_links={follow_links}" | |
) | |
results = [] | |
successful = 0 | |
progress_msg = f"🚀 Starting crawl of {len(urls)} URLs..." | |
for i, url in enumerate(urls): | |
progress_msg = f"🔄 Processing URL {i+1}/{len(urls)}: {url[:50]}..." | |
try: | |
# Process each URL with advanced options | |
result = self.rag_system.process_url( | |
url, max_depth=max_depth, follow_links=follow_links | |
) | |
if result.get("status") == "success": | |
successful += 1 | |
self.total_documents += 1 | |
self.total_chunks += result.get("chunks_processed", 0) | |
# Enhanced result display with crawling info | |
chunks = result.get("chunks_processed", 0) | |
linked_docs = result.get("linked_documents_processed", 0) | |
depth = result.get("depth", 0) | |
result_text = f"✅ {url}:\n" | |
result_text += f" 📄 {chunks} chunks processed" | |
if linked_docs > 0: | |
result_text += f"\n 🔗 {linked_docs} linked pages found" | |
if depth > 0: | |
result_text += f"\n 🕷️ Crawled to depth {depth}" | |
results.append(result_text) | |
else: | |
error_msg = result.get("error", "Processing failed") | |
results.append(f"❌ {url}: {error_msg}") | |
# Add helpful hints for common crawling issues | |
if "depth" in error_msg.lower(): | |
results.append(" 💡 Try reducing crawl depth") | |
elif "timeout" in error_msg.lower(): | |
results.append( | |
" 💡 Site may be slow, try single page mode" | |
) | |
elif "robots" in error_msg.lower(): | |
results.append( | |
" 💡 Site blocks crawlers, try direct URL only" | |
) | |
except Exception as e: | |
results.append(f"❌ {url}: {str(e)}") | |
# Enhanced Summary with crawling stats | |
total_linked = sum( | |
result.get("linked_documents_processed", 0) | |
for result in [ | |
self.rag_system.process_url(url, max_depth, follow_links) | |
for url in urls | |
] | |
if result.get("status") == "success" | |
) | |
summary = f"\n" + "=" * 50 | |
summary += f"\n📊 **CRAWLING SUMMARY**" | |
summary += f"\n✅ URLs processed: {successful}/{len(urls)}" | |
if follow_links and max_depth > 1: | |
summary += f"\n🔗 Linked pages discovered: {total_linked}" | |
summary += f"\n🕷️ Max crawl depth: {max_depth}" | |
summary += f"\n📄 Total chunks: {self.total_chunks}" | |
summary += "\n" + "=" * 50 | |
output = "\n".join(results) + summary | |
status = ( | |
f"Processed {successful}/{len(urls)} URLs " | |
if successful > 0 | |
else "Processing failed " | |
) | |
final_progress = ( | |
f"✅ Completed! Processed {successful}/{len(urls)} URLs successfully" | |
) | |
return output, status, self._get_stats_string(), final_progress | |
except Exception as e: | |
self._log_safe(f" Error processing URLs: {str(e)}", "error") | |
error_progress = f" Error occurred during processing" | |
return ( | |
f" Error: {str(e)}", | |
"Error ", | |
self._get_stats_string(), | |
error_progress, | |
) | |
def _process_query( | |
self, | |
query: str, | |
include_sources: bool = True, | |
max_results: int = 5, | |
use_live_search: bool = False, | |
search_depth: str = "basic", | |
time_range: str = "month", | |
search_mode: str = "auto", | |
) -> Tuple[str, str, Dict[str, Any], str, str]: | |
""" | |
Process a user query with enhanced response formatting and live search options. | |
Args: | |
query: User query string | |
include_sources: Whether to include source information | |
max_results: Maximum number of results to return | |
use_live_search: Whether to use live web search | |
search_depth: Search depth for live search | |
time_range: Time range for live search | |
Returns: | |
Tuple of (response, confidence, sources, status, stats) | |
""" | |
if not query.strip(): | |
return ( | |
"Please enter a question.", | |
"", | |
{}, | |
"Ready 🟢", | |
self._get_stats_string(), | |
) | |
try: | |
# ✅ Enhanced search type detection | |
search_type_map = { | |
"auto": "🤖 Auto", | |
"local_only": "📚 Local Only", | |
"live_only": "🌐 Live Only", | |
"hybrid": "🔄 Hybrid", | |
} | |
search_type = search_type_map.get(search_mode, "🤖 Auto") | |
# 🔄 Backward compatibility: if use_live_search is True but mode is auto, use hybrid | |
if use_live_search and search_mode == "auto": | |
search_mode = "hybrid" | |
search_type = "🔄 Hybrid" | |
self._log_safe( | |
f" Processing query ({search_type}): {query[:100]}... " | |
f"(mode: {search_mode}, sources: {include_sources}, max_results: {max_results})" | |
) | |
# 🚀 Route query based on search mode | |
if search_mode in ["live_only", "hybrid"] or use_live_search: | |
# Use enhanced RAG system with search mode | |
result = self.rag_system.query( | |
query, | |
max_results=max_results, | |
use_live_search=( | |
search_mode in ["live_only", "hybrid"] or use_live_search | |
), | |
search_mode=search_mode, | |
) | |
else: | |
# Use traditional local RAG system | |
result = self.rag_system.query( | |
query, max_results=max_results, search_mode=search_mode | |
) | |
self.query_count += 1 | |
response = result.get("response", "No response generated.") | |
confidence = result.get("confidence", 0.0) | |
sources = result.get("sources", []) | |
# 🎯 Format confidence display with search type indicator | |
confidence_text = f"🎯 Confidence: {confidence:.1%}" | |
if confidence >= 0.8: | |
confidence_text += " 🟢 High" | |
elif confidence >= 0.5: | |
confidence_text += " 🟡 Medium" | |
else: | |
confidence_text += " 🔴 Low" | |
# Add processing details with search type | |
context_items = result.get("context_items", 0) | |
processing_time = result.get("processing_time", 0) | |
search_indicator = "🌐" if use_live_search else "📚" | |
confidence_text += f" | {search_indicator} {search_type} | ⚡ {processing_time:.2f}s | 📄 {context_items} items" | |
# 📊 Format sources for display based on user preference | |
sources_display = {} | |
if include_sources and sources: | |
# Limit sources based on max_results | |
limited_sources = sources[:max_results] | |
sources_display = { | |
"confidence": f"{confidence:.3f}", | |
"total_sources": len(sources), | |
"showing": len(limited_sources), | |
"max_requested": max_results, | |
"sources": limited_sources, | |
"search_type": search_type, | |
"query_options": { | |
"include_sources": include_sources, | |
"max_results": max_results, | |
"use_live_search": use_live_search, | |
"search_depth": search_depth if use_live_search else None, | |
"time_range": time_range if use_live_search else None, | |
}, | |
} | |
# 🌐 Add live search specific metadata | |
if use_live_search: | |
sources_display.update( | |
{ | |
"live_search_params": { | |
"search_depth": search_depth, | |
"time_range": time_range, | |
"routing_decision": result.get( | |
"routing_decision", "live_search" | |
), | |
} | |
} | |
) | |
elif not include_sources: | |
sources_display = { | |
"message": "🔒 Sources hidden by user preference", | |
"total_sources": len(sources), | |
"search_type": search_type, | |
"query_options": { | |
"include_sources": include_sources, | |
"max_results": max_results, | |
"use_live_search": use_live_search, | |
}, | |
} | |
# 📈 Enhanced status with search type | |
status_icon = "🌐" if use_live_search else "📚" | |
status = f"✅ {status_icon} Query processed (confidence: {confidence:.1%}, {len(sources)} sources)" | |
return ( | |
response, | |
confidence_text, | |
sources_display, | |
status, | |
self._get_stats_string(), | |
) | |
except Exception as e: | |
self._log_safe(f" Error processing query: {str(e)}", "error") | |
return ( | |
f" Error: {str(e)}", | |
"Error", | |
{}, | |
"Error ", | |
self._get_stats_string(), | |
) | |
def _process_live_query( | |
self, query: str, max_results: int, search_depth: str, time_range: str | |
) -> Dict[str, Any]: | |
""" | |
Process query using live search via MCP Tavily integration. | |
Args: | |
query: User query | |
max_results: Maximum results to return | |
search_depth: Search depth parameter | |
time_range: Time range for search | |
Returns: | |
Dictionary with search results and metadata | |
""" | |
try: | |
self._log_safe(f" Performing live search with Tavily API...") | |
# 🚀 Use MCP Tavily tool for live search | |
# This will be the actual MCP integration point | |
search_results = self._call_tavily_mcp( | |
query, max_results, search_depth, time_range | |
) | |
# 🔄 Process and format results for RAG response generation | |
if search_results and search_results.get("results"): | |
# Format for response generator | |
formatted_context = [] | |
for result in search_results["results"]: | |
formatted_context.append( | |
{ | |
"text": result.get("content", ""), | |
"source": result.get("url", "web_search"), | |
"title": result.get("title", "Web Result"), | |
"score": result.get("score", 0.0), | |
"metadata": { | |
"type": "web_result", | |
"search_engine": "tavily", | |
"url": result.get("url", ""), | |
"title": result.get("title", ""), | |
}, | |
} | |
) | |
# 🧠 Generate response using the response generator with live context | |
if hasattr(self.rag_system, "response_generator"): | |
response_result = ( | |
self.rag_system.response_generator.generate_response( | |
query, formatted_context | |
) | |
) | |
# 📊 Combine live search metadata with response | |
response_result.update( | |
{ | |
"context_items": len(formatted_context), | |
"search_type": "live_web", | |
"routing_decision": "live_search", | |
"live_search_params": { | |
"search_depth": search_depth, | |
"time_range": time_range, | |
"total_web_results": len(search_results["results"]), | |
}, | |
} | |
) | |
return response_result | |
else: | |
# 📝 Fallback: simple response formatting | |
combined_content = "\n\n".join( | |
[ | |
f"**{result.get('title', 'Web Result')}**\n{result.get('content', '')}" | |
for result in search_results["results"][:3] | |
] | |
) | |
return { | |
"response": f"Based on live web search:\n\n{combined_content}", | |
"sources": search_results["results"], | |
"confidence": 0.8, | |
"context_items": len(search_results["results"]), | |
"search_type": "live_web", | |
} | |
else: | |
return { | |
"response": "No live search results found. Please try a different query or check your internet connection.", | |
"sources": [], | |
"confidence": 0.0, | |
"context_items": 0, | |
"error": "No live search results", | |
} | |
except Exception as e: | |
self._log_safe(f" Live search error: {str(e)}", "error") | |
# 🔄 Fallback to local search | |
self._log_safe(" Falling back to local search...", "warning") | |
return self.rag_system.query(query, max_results=max_results) | |
def _call_tavily_mcp( | |
self, query: str, max_results: int, search_depth: str, time_range: str | |
) -> Dict[str, Any]: | |
""" | |
Call Tavily API using the live search module. | |
Args: | |
query: Search query | |
max_results: Maximum results | |
search_depth: Search depth | |
time_range: Time range | |
Returns: | |
Tavily search results | |
""" | |
try: | |
# 🌐 Use the live search module with Tavily Python SDK | |
from src.rag.live_search import LiveSearchManager | |
self._log_safe( | |
f" Tavily API call: query='{query}', depth={search_depth}, range={time_range}" | |
) | |
# ✅ Initialize live search manager | |
live_search = LiveSearchManager() | |
# 🚀 Perform the search using Tavily Python SDK | |
search_results = live_search.search_web( | |
query=query, | |
max_results=max_results, | |
search_depth=search_depth, | |
time_range=time_range, | |
) | |
# 📊 Format results for UI consumption | |
if ( | |
search_results | |
and search_results.get("results") | |
and not search_results.get("error") | |
): | |
formatted_results = [] | |
for result in search_results.get("results", []): | |
formatted_results.append( | |
{ | |
"title": result.get("title", ""), | |
"content": result.get("content", ""), | |
"url": result.get("url", ""), | |
"score": result.get("score", 0.0), | |
"published_date": result.get("published_date", ""), | |
} | |
) | |
return { | |
"results": formatted_results, | |
"total_results": len(formatted_results), | |
"search_params": { | |
"query": query, | |
"max_results": max_results, | |
"search_depth": search_depth, | |
"time_range": time_range, | |
}, | |
"status": "success", | |
"analytics": search_results.get("analytics", {}), | |
} | |
else: | |
# 🚨 Handle search failure | |
error_msg = search_results.get("error", "Unknown search error") | |
self._log_safe(f" Tavily search failed: {error_msg}", "warning") | |
return { | |
"results": [], | |
"total_results": 0, | |
"search_params": { | |
"query": query, | |
"max_results": max_results, | |
"search_depth": search_depth, | |
"time_range": time_range, | |
}, | |
"status": "failed", | |
"error": error_msg, | |
} | |
except Exception as e: | |
self._log_safe(f" Tavily API call failed: {str(e)}", "error") | |
return { | |
"results": [], | |
"total_results": 0, | |
"error": str(e), | |
"status": "error", | |
} | |
def _refresh_knowledge_base( | |
self, | |
) -> Tuple[Dict[str, Any], Dict[str, Any], List[List[str]]]: | |
""" | |
Refresh knowledge base information with real data from vector DB and embedding model. | |
Returns: | |
Tuple of (kb_stats, embedding_model_status, document_list) | |
""" | |
try: | |
# Get real knowledge base statistics | |
kb_info = self._get_real_kb_stats() | |
# Get embedding model information | |
embedding_info = self._get_embedding_model_info() | |
# 📊 Knowledge Base Stats | |
kb_stats = { | |
"total_documents": kb_info.get("total_documents", self.total_documents), | |
"total_chunks": kb_info.get("total_chunks", self.total_chunks), | |
"storage_size": f"{kb_info.get('total_chunks', self.total_chunks) * 0.5:.1f} MB", | |
"last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"vector_db_status": kb_info.get("vector_db_status", "Unknown"), | |
"embedding_model": embedding_info.get("model_name", "Unknown"), | |
"embedding_status": embedding_info.get("status", "Unknown"), | |
"index_health": kb_info.get("index_health", "Unknown"), | |
} | |
# 🤖 Embedding Model Status | |
embedding_status = { | |
"model_name": embedding_info.get("model_name", "Unknown"), | |
"provider": embedding_info.get("provider", "Unknown"), | |
"status": embedding_info.get("status", "Unknown"), | |
"api_status": embedding_info.get("api_status", "Unknown"), | |
"dimension": embedding_info.get("dimension", "Unknown"), | |
"performance": { | |
"total_requests": embedding_info.get("total_requests", 0), | |
"success_rate": embedding_info.get("success_rate", "0%"), | |
"cache_hit_rate": embedding_info.get("cache_hit_rate", "0%"), | |
"batch_size": embedding_info.get("batch_size", "Unknown"), | |
"max_text_length": embedding_info.get("max_text_length", "Unknown"), | |
"caching_enabled": embedding_info.get("caching_enabled", False), | |
}, | |
"last_checked": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
} | |
# Get real document list from vector DB | |
documents = self._get_real_document_list() | |
# If no real documents, show helpful message | |
if not documents: | |
documents = [ | |
[ | |
"📝 No documents yet", | |
"Info", | |
"0", | |
"Upload documents to get started", | |
], | |
["🔗 Try adding URLs", "Info", "0", "Use the 'Add URLs' tab"], | |
[ | |
"📚 Knowledge base empty", | |
"Info", | |
"0", | |
"Start building your knowledge base!", | |
], | |
] | |
return kb_stats, embedding_status, documents | |
except Exception as e: | |
self._log_safe(f" Error refreshing knowledge base: {e}", "error") | |
# Fallback stats | |
fallback_kb_stats = { | |
"total_documents": self.total_documents, | |
"total_chunks": self.total_chunks, | |
"storage_size": f"{self.total_chunks * 0.5:.1f} MB", | |
"last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"error": str(e), | |
} | |
fallback_embedding_status = { | |
"model_name": "Error", | |
"provider": "Unknown", | |
"status": "❌ Error", | |
"api_status": "❌ Error", | |
"dimension": "Unknown", | |
"performance": {"error": str(e)}, | |
"last_checked": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
} | |
return fallback_kb_stats, fallback_embedding_status, [] | |
def _get_real_kb_stats(self) -> Dict[str, Any]: | |
"""Get real knowledge base statistics from the RAG system.""" | |
try: | |
# 🔍 Get embedding model info first | |
embedding_model_info = self._get_embedding_model_info() | |
if hasattr(self.rag_system, "vector_db") and self.rag_system.vector_db: | |
# Try to get stats from vector DB | |
vector_stats = ( | |
self.rag_system.vector_db.get_stats() | |
if hasattr(self.rag_system.vector_db, "get_stats") | |
else {} | |
) | |
return { | |
"total_documents": vector_stats.get( | |
"total_vectors", self.total_documents | |
), | |
"total_chunks": vector_stats.get( | |
"total_vectors", self.total_chunks | |
), | |
"vector_db_status": "✅ Connected" if vector_stats else "⚠️ Limited", | |
"embedding_model": embedding_model_info.get( | |
"model_name", "Unknown" | |
), | |
"embedding_model_status": embedding_model_info.get( | |
"status", "Unknown" | |
), | |
"embedding_dimension": embedding_model_info.get( | |
"dimension", "Unknown" | |
), | |
"embedding_provider": embedding_model_info.get( | |
"provider", "Unknown" | |
), | |
"index_health": ( | |
"✅ Healthy" | |
if vector_stats.get("total_vectors", 0) > 0 | |
else "⚠️ Empty" | |
), | |
} | |
else: | |
return { | |
"total_documents": self.total_documents, | |
"total_chunks": self.total_chunks, | |
"vector_db_status": "❌ Not Connected", | |
"embedding_model": embedding_model_info.get( | |
"model_name", "Unknown" | |
), | |
"embedding_model_status": embedding_model_info.get( | |
"status", "❌ Not Available" | |
), | |
"embedding_dimension": embedding_model_info.get( | |
"dimension", "Unknown" | |
), | |
"embedding_provider": embedding_model_info.get( | |
"provider", "Unknown" | |
), | |
"index_health": "❌ Unavailable", | |
} | |
except Exception as e: | |
self._log_safe(f"Could not get real KB stats: {e}", "warning") | |
return {} | |
def _get_real_document_list(self) -> List[List[str]]: | |
"""Get real document list from the RAG system.""" | |
try: | |
documents = [] | |
# Try to get document metadata from vector DB | |
if hasattr(self.rag_system, "vector_db") and self.rag_system.vector_db: | |
# Get unique sources from vector DB | |
if hasattr(self.rag_system.vector_db, "get_unique_sources"): | |
sources = self.rag_system.vector_db.get_unique_sources() | |
for source_info in sources: | |
source_name = source_info.get("source", "Unknown") | |
doc_type = self._get_document_type(source_name) | |
chunk_count = source_info.get("chunk_count", 0) | |
added_date = source_info.get("added_date", "Unknown") | |
documents.append( | |
[source_name, doc_type, str(chunk_count), added_date] | |
) | |
# If vector DB doesn't have get_unique_sources, try alternative approach | |
elif hasattr(self.rag_system.vector_db, "list_documents"): | |
doc_list = self.rag_system.vector_db.list_documents() | |
for doc in doc_list: | |
documents.append( | |
[ | |
doc.get("name", "Unknown"), | |
self._get_document_type(doc.get("name", "")), | |
str(doc.get("chunks", 0)), | |
doc.get("date", "Unknown"), | |
] | |
) | |
return documents | |
except Exception as e: | |
self._log_safe(f"Could not get real document list: {e}", "warning") | |
return [] | |
def _get_document_type(self, filename: str) -> str: | |
"""Determine document type from filename.""" | |
if not filename: | |
return "Unknown" | |
filename_lower = filename.lower() | |
if filename_lower.endswith(".pdf"): | |
return "📄 PDF" | |
elif filename_lower.endswith((".doc", ".docx")): | |
return "📝 Word" | |
elif filename_lower.endswith((".xls", ".xlsx")): | |
return "📊 Excel" | |
elif filename_lower.endswith((".ppt", ".pptx")): | |
return "📈 PowerPoint" | |
elif filename_lower.endswith(".csv"): | |
return "📋 CSV" | |
elif filename_lower.endswith((".txt", ".md")): | |
return "📄 Text" | |
elif "http" in filename_lower: | |
return "🌐 Web" | |
else: | |
return "📄 Document" | |
def _get_embedding_model_info(self) -> Dict[str, Any]: | |
""" | |
🤖 Get comprehensive embedding model information. | |
Returns: | |
Dictionary with embedding model details | |
""" | |
try: | |
model_info = { | |
"model_name": "Unknown", | |
"status": "❌ Not Available", | |
"dimension": "Unknown", | |
"provider": "Unknown", | |
"api_status": "❌ Not Connected", | |
} | |
# Check if embedding generator exists and is properly initialized | |
if ( | |
hasattr(self.rag_system, "embedding_generator") | |
and self.rag_system.embedding_generator | |
): | |
embedding_gen = self.rag_system.embedding_generator | |
# Get model name - check multiple possible attributes | |
model_name = ( | |
getattr(embedding_gen, "model", None) | |
or getattr(embedding_gen, "model_name", None) | |
or "gemini-embedding-exp-03-07" | |
) # Default Gemini model | |
# Get API client status | |
api_connected = ( | |
hasattr(embedding_gen, "client") | |
and embedding_gen.client is not None | |
) | |
# Get configuration details | |
config = getattr(embedding_gen, "config", {}) | |
model_info.update( | |
{ | |
"model_name": model_name, | |
"status": "✅ Available" if api_connected else "⚠️ Limited", | |
"provider": ( | |
"Google Gemini" | |
if "gemini" in model_name.lower() | |
else "Unknown" | |
), | |
"api_status": ( | |
"✅ Connected" if api_connected else "❌ Not Connected" | |
), | |
"dimension": config.get("dimension", "3072"), # Gemini default | |
"batch_size": config.get("batch_size", 5), | |
"max_text_length": config.get("max_text_length", 8192), | |
"caching_enabled": config.get("enable_caching", True), | |
} | |
) | |
# Get statistics if available | |
if hasattr(embedding_gen, "get_statistics"): | |
try: | |
stats = embedding_gen.get_statistics() | |
model_info.update( | |
{ | |
"total_requests": stats.get("total_requests", 0), | |
"successful_requests": stats.get( | |
"successful_requests", 0 | |
), | |
"cache_hits": stats.get("cache_hits", 0), | |
"cache_hit_rate": f"{stats.get('cache_hit_rate', 0):.1f}%", | |
"success_rate": f"{stats.get('success_rate', 0):.1f}%", | |
} | |
) | |
except Exception as e: | |
self._log_safe(f"Could not get embedding stats: {e}", "warning") | |
# Test API connection if possible (quick test) | |
if api_connected: | |
try: | |
# Quick test to verify API is working | |
test_embedding = embedding_gen.generate_query_embedding("test") | |
if test_embedding: | |
model_info["api_status"] = "✅ Connected & Working" | |
model_info["status"] = "✅ Fully Operational" | |
else: | |
model_info["api_status"] = "⚠️ Connected but Limited" | |
except Exception as e: | |
model_info["api_status"] = f" Connection Error: {str(e)[:50]}" | |
return model_info | |
except Exception as e: | |
self._log_safe(f"Error getting embedding model info: {e}", "error") | |
return { | |
"model_name": "Error", | |
"status": " Error", | |
"dimension": "Unknown", | |
"provider": "Unknown", | |
"api_status": f" Error: {str(e)[:50]}", | |
"error": str(e), | |
} | |
def _run_health_check(self) -> Tuple[Dict[str, Any], List[List[str]], str]: | |
""" | |
🩺 Run comprehensive real system health check. | |
Returns: | |
Tuple of (system status, component status, logs) | |
""" | |
try: | |
import psutil | |
import time | |
from datetime import timedelta | |
# 📊 Real System Status | |
start_time = time.time() | |
# Get real system metrics | |
memory_info = psutil.virtual_memory() | |
cpu_percent = psutil.cpu_percent(interval=1) | |
# Calculate uptime (approximate) | |
boot_time = psutil.boot_time() | |
uptime_seconds = time.time() - boot_time | |
uptime = str(timedelta(seconds=int(uptime_seconds))) | |
system_status = { | |
"overall_health": "🟢 Healthy", | |
"uptime": uptime, | |
"memory_usage": f"{memory_info.percent:.1f}%", | |
"memory_available": f"{memory_info.available / (1024**3):.1f} GB", | |
"cpu_usage": f"{cpu_percent:.1f}%", | |
"disk_usage": f"{psutil.disk_usage('/').percent:.1f}%", | |
"last_check": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"system_load": "Normal" if cpu_percent < 80 else "High", | |
} | |
# 🔍 Real Component Status Check | |
components = [] | |
logs = [] | |
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
logs.append(f"[{current_time}] INFO - System health check initiated") | |
# 1. 🤖 Embedding Generator Check | |
embedding_info = self._get_embedding_model_info() | |
embedding_status = embedding_info.get("status", "❌ Unknown") | |
embedding_details = f"{embedding_info.get('model_name', 'Unknown')} - {embedding_info.get('api_status', 'Unknown')}" | |
components.append( | |
["🤖 Embedding Generator", embedding_status, embedding_details] | |
) | |
if "✅" in embedding_status: | |
logs.append( | |
f"[{current_time}] INFO - Embedding generator: {embedding_details}" | |
) | |
else: | |
logs.append( | |
f"[{current_time}] WARN - Embedding generator: {embedding_details}" | |
) | |
# 2. 🌲 Vector Database Check | |
vector_db_status, vector_db_details = self._check_vector_db_health() | |
components.append( | |
["🌲 Vector Database", vector_db_status, vector_db_details] | |
) | |
logs.append(f"[{current_time}] INFO - Vector database: {vector_db_details}") | |
# 3. 📄 Document Processor Check | |
doc_processor_status, doc_processor_details = ( | |
self._check_document_processor_health() | |
) | |
components.append( | |
["📄 Document Processor", doc_processor_status, doc_processor_details] | |
) | |
logs.append( | |
f"[{current_time}] INFO - Document processor: {doc_processor_details}" | |
) | |
# 4. 🧠 Response Generator Check | |
response_gen_status, response_gen_details = ( | |
self._check_response_generator_health() | |
) | |
components.append( | |
[" Response Generator", response_gen_status, response_gen_details] | |
) | |
logs.append( | |
f"[{current_time}] INFO - Response generator: {response_gen_details}" | |
) | |
# 5. 🌐 Web Interface Check | |
components.append( | |
["🌐 Web Interface", "✅ Healthy", "Gradio running successfully"] | |
) | |
logs.append(f"[{current_time}] INFO - Web interface: Running on port 7860") | |
# 6. 🔍 Live Search Check (if available) | |
live_search_status, live_search_details = self._check_live_search_health() | |
components.append( | |
["🔍 Live Search", live_search_status, live_search_details] | |
) | |
logs.append(f"[{current_time}] INFO - Live search: {live_search_details}") | |
# Calculate overall health | |
healthy_components = sum(1 for comp in components if "✅" in comp[1]) | |
total_components = len(components) | |
health_percentage = (healthy_components / total_components) * 100 | |
if health_percentage >= 80: | |
system_status["overall_health"] = "🟢 Healthy" | |
logs.append( | |
f"[{current_time}] INFO - Overall system health: {health_percentage:.0f}% ({healthy_components}/{total_components} components healthy)" | |
) | |
elif health_percentage >= 60: | |
system_status["overall_health"] = "🟡 Degraded" | |
logs.append( | |
f"[{current_time}] WARN - System degraded: {health_percentage:.0f}% ({healthy_components}/{total_components} components healthy)" | |
) | |
else: | |
system_status["overall_health"] = "🔴 Unhealthy" | |
logs.append( | |
f"[{current_time}] ERROR - System unhealthy: {health_percentage:.0f}% ({healthy_components}/{total_components} components healthy)" | |
) | |
# Add performance metrics | |
health_check_time = time.time() - start_time | |
system_status["health_check_duration"] = f"{health_check_time:.2f}s" | |
logs.append( | |
f"[{current_time}] INFO - Health check completed in {health_check_time:.2f}s" | |
) | |
return system_status, components, "\n".join(logs) | |
except Exception as e: | |
self._log_safe(f"❌ Error running health check: {e}", "error") | |
error_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
return ( | |
{ | |
"overall_health": "🔴 Error", | |
"error": str(e), | |
"last_check": error_time, | |
}, | |
[["System", "❌ Error", f"Health check failed: {str(e)}"]], | |
f"[{error_time}] ERROR - Health check failed: {str(e)}", | |
) | |
def _check_vector_db_health(self) -> Tuple[str, str]: | |
"""🌲 Check Vector Database health status.""" | |
try: | |
if hasattr(self.rag_system, "vector_db") and self.rag_system.vector_db: | |
vector_db = self.rag_system.vector_db | |
# Try to get health check from vector DB | |
if hasattr(vector_db, "health_check"): | |
health_result = vector_db.health_check() | |
if health_result.get("status") == "healthy": | |
return ( | |
"✅ Healthy", | |
f"Pinecone connected - {health_result.get('checks', {}).get('index_stats', 'OK')}", | |
) | |
else: | |
return ( | |
"⚠️ Degraded", | |
f"Issues detected: {health_result.get('error', 'Unknown')}", | |
) | |
# Fallback: check if we can get stats | |
elif hasattr(vector_db, "get_stats"): | |
stats = vector_db.get_stats() | |
if stats.get("status") == "connected": | |
total_vectors = stats.get("total_vectors", 0) | |
return ( | |
"✅ Healthy", | |
f"Pinecone connected - {total_vectors} vectors stored", | |
) | |
else: | |
return ( | |
"❌ Error", | |
f"Connection failed: {stats.get('error', 'Unknown')}", | |
) | |
else: | |
return ( | |
"⚠️ Limited", | |
"Vector DB available but health check not implemented", | |
) | |
else: | |
return "❌ Not Available", "Vector database not initialized" | |
except Exception as e: | |
return "❌ Error", f"Health check failed: {str(e)[:50]}" | |
def _check_document_processor_health(self) -> Tuple[str, str]: | |
"""📄 Check Document Processor health status.""" | |
try: | |
if ( | |
hasattr(self.rag_system, "document_processor") | |
and self.rag_system.document_processor | |
): | |
# Check if document processor has required dependencies | |
try: | |
# Test basic functionality | |
processor = self.rag_system.document_processor | |
# Check if it has the required methods | |
if hasattr(processor, "process_document"): | |
supported_formats = [ | |
"PDF", | |
"DOCX", | |
"CSV", | |
"XLSX", | |
"PPTX", | |
"TXT", | |
"MD", | |
] | |
return ( | |
"✅ Healthy", | |
f"All formats supported: {', '.join(supported_formats)}", | |
) | |
else: | |
return "⚠️ Limited", "Basic functionality available" | |
except ImportError as e: | |
return ( | |
"❌ Dependencies Missing", | |
f"Missing libraries: {str(e)[:30]}", | |
) | |
else: | |
return "❌ Not Available", "Document processor not initialized" | |
except Exception as e: | |
return "❌ Error", f"Health check failed: {str(e)[:50]}" | |
def _check_response_generator_health(self) -> Tuple[str, str]: | |
"""🧠 Check Response Generator health status.""" | |
try: | |
if ( | |
hasattr(self.rag_system, "response_generator") | |
and self.rag_system.response_generator | |
): | |
response_gen = self.rag_system.response_generator | |
# Check if it has required configuration | |
config = getattr(response_gen, "config", {}) | |
# Check API keys availability | |
gemini_key = config.get("gemini_api_key") or os.getenv("GEMINI_API_KEY") | |
openai_key = config.get("openai_api_key") or os.getenv("OPENAI_API_KEY") | |
if gemini_key: | |
return "✅ Healthy", "Gemini LLM available for response generation" | |
elif openai_key: | |
return "✅ Healthy", "OpenAI LLM available for response generation" | |
else: | |
return "⚠️ Limited", "No LLM API keys configured" | |
else: | |
return "❌ Not Available", "Response generator not initialized" | |
except Exception as e: | |
return "❌ Error", f"Health check failed: {str(e)[:50]}" | |
def _check_live_search_health(self) -> Tuple[str, str]: | |
"""🔍 Check Live Search health status.""" | |
try: | |
# Check if Tavily API key is available | |
tavily_key = os.getenv("TAVILY_API_KEY") | |
if tavily_key: | |
# Check if live search components are available | |
if ( | |
hasattr(self.rag_system, "live_search_processor") | |
and self.rag_system.live_search_processor | |
): | |
return "✅ Healthy", "Tavily API configured - Live search available" | |
elif ( | |
hasattr(self.rag_system, "query_router") | |
and self.rag_system.query_router | |
): | |
return "✅ Healthy", "Query router available - Live search enabled" | |
else: | |
return ( | |
"⚠️ Limited", | |
"Tavily API key available but components not initialized", | |
) | |
else: | |
return ( | |
"⚠️ Optional", | |
"Tavily API key not configured - Live search disabled", | |
) | |
except Exception as e: | |
return "❌ Error", f"Health check failed: {str(e)[:50]}" | |
def _get_stats_string(self) -> str: | |
"""Get formatted stats string.""" | |
return f"Documents: {self.total_documents} | Chunks: {self.total_chunks} | Queries: {self.query_count}" | |
def launch(self, **kwargs): | |
""" | |
Launch the Gradio interface. | |
Args: | |
**kwargs: Additional arguments for gr.Interface.launch() | |
""" | |
if not self.interface: | |
self._log_safe(" Interface not created", "error") | |
return | |
# Merge default config with provided kwargs | |
launch_config = { | |
"share": self.share, | |
"server_name": "0.0.0.0", | |
"server_port": 7860, | |
"show_error": True, | |
"quiet": False, | |
} | |
launch_config.update(kwargs) | |
self._log_safe(f"Launching Gradio interface with config: {launch_config}") | |
self.interface.launch(**launch_config) | |