Spaces:
Running
Running
import streamlit as st | |
from search_utils import SemanticSearch | |
import logging | |
import time | |
import os | |
import sys | |
import psutil # Added missing import | |
from urllib.parse import urlparse | |
import threading | |
import re | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger("SemanticSearchApp") | |
# Security validation functions | |
def is_valid_url(url): | |
"""Validate URL format and safety""" | |
try: | |
result = urlparse(url) | |
if not all([result.scheme, result.netloc]): | |
return False | |
# Add additional security checks here | |
return True | |
except: | |
return False | |
def sanitize_query(query): | |
"""Sanitize user input to prevent injection attacks""" | |
try: | |
# Remove non-alphanumeric characters except spaces and hyphens | |
clean_query = re.sub(r'[^\w\s-]', '', query) | |
return clean_query[:256] # Truncate to prevent long queries | |
except Exception as e: | |
logger.error(f"Query sanitization failed: {str(e)}") | |
return query[:256] # Fallback truncation | |
# Diagnostics integration | |
try: | |
from diagnostics import diagnose_parquet_files | |
diagnostics_available = True | |
except ImportError: | |
diagnostics_available = False | |
logger.warning("Diagnostics module not available") | |
def add_diagnostics_ui(search_system): | |
"""Enhanced diagnostics UI with proper directory checks""" | |
with st.sidebar.expander("π§ Diagnostics", expanded=False): | |
if st.button("Run Full System Check"): | |
with st.spinner("Performing comprehensive system check..."): | |
# Create columns for organized display | |
col1, col2 = st.columns(2) | |
# Get actual paths from the search system | |
metadata_dir = search_system.metadata_mgr.shard_dir | |
faiss_dir = search_system.shard_dir # From SemanticSearch class | |
with col1: | |
# Metadata directory check | |
st.subheader("π Metadata Validation") | |
if metadata_dir.exists(): | |
# Check directory structure | |
dir_status = any(metadata_dir.glob("*.parquet")) | |
st.write(f"Directory: `{metadata_dir}`") | |
st.write(f"Parquet Files Found: {'β ' if dir_status else 'β'}") | |
# Check individual files | |
if diagnose_parquet_files(str(metadata_dir)): | |
st.success("β Metadata shards valid") | |
else: | |
st.error("β Metadata issues detected") | |
else: | |
st.error("Metadata directory not found") | |
with col2: | |
# FAISS index check | |
st.subheader("π FAISS Validation") | |
if faiss_dir.exists(): | |
index_files = list(faiss_dir.glob("*.index")) | |
st.write(f"Directory: `{faiss_dir}`") | |
st.write(f"Index Files Found: {len(index_files)}") | |
if len(search_system.index_shards) > 0: | |
st.success(f"β {len(search_system.index_shards)} FAISS shards loaded") | |
st.write(f"Total Vectors: {sum(s.ntotal for s in search_system.index_shards):,}") | |
else: | |
st.error("β No FAISS shards loaded") | |
else: | |
st.error("FAISS directory not found") | |
# System resource check | |
st.subheader("π» System Resources") | |
col_res1, col_res2 = st.columns(2) | |
with col_res1: | |
st.metric("Memory Usage", | |
f"{psutil.Process().memory_info().rss // 1024 ** 2} MB", | |
help="Current process memory usage") | |
with col_res2: | |
st.metric("CPU Utilization", | |
f"{psutil.cpu_percent()}%", | |
help="Total system CPU usage") | |
def main(): | |
st.set_page_config( | |
page_title="Semantic Search Engine", | |
page_icon="π", | |
layout="wide" | |
) | |
# Initialize search system with enhanced caching | |
def init_search_system(): | |
try: | |
system = SemanticSearch() | |
system.initialize_system() | |
logger.info("Search system initialized successfully") | |
return system | |
except Exception as e: | |
logger.error(f"System initialization failed: {str(e)}") | |
st.error("Critical system initialization error. Check logs.") | |
st.stop() | |
# Custom CSS with enhanced visual design | |
st.markdown(""" | |
<style> | |
div[data-testid="stExpander"] div[role="button"] p { | |
font-size: 1.2rem; | |
font-weight: bold; | |
color: #1e88e5; | |
} | |
a.source-link { | |
color: #1a73e8 !important; | |
text-decoration: none !important; | |
border-bottom: 2px solid transparent; | |
transition: all 0.3s ease; | |
} | |
a.source-link:hover { | |
border-bottom-color: #1a73e8; | |
opacity: 0.9; | |
} | |
.similarity-badge { | |
padding: 0.2em 0.5em; | |
border-radius: 4px; | |
background: #e3f2fd; | |
color: #1e88e5; | |
font-weight: 500; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
try: | |
search_system = init_search_system() | |
except Exception as e: | |
st.error(f"Failed to initialize search system: {str(e)}") | |
st.stop() | |
# Main UI components | |
st.title("π Semantic Search Engine") | |
# Search input with sanitization | |
query = st.text_input("Enter your search query:", | |
placeholder="Search documents...", | |
max_chars=200) | |
if query: | |
try: | |
# Sanitize and validate query | |
clean_query = sanitize_query(query) | |
if not clean_query: | |
st.warning("Please enter a valid search query") | |
st.stop() | |
with st.spinner("π Searching through documents..."): | |
start_time = time.time() | |
results = search_system.search(clean_query, 5) | |
search_duration = time.time() - start_time | |
if not results.empty: | |
st.subheader(f"Top Results ({search_duration:.2f}s)") | |
# Visualize results with enhanced formatting | |
for _, row in results.iterrows(): | |
with st.expander(f"{row['title']}"): | |
# Similarity visualization | |
col1, col2 = st.columns([3, 1]) | |
with col1: | |
st.markdown(f"**Summary**: {row['summary']}") | |
with col2: | |
st.markdown( | |
f"<div class='similarity-badge'>" | |
f"Confidence: {row['similarity']:.1%}" | |
f"</div>", | |
unsafe_allow_html=True | |
) | |
st.progress(float(row['similarity'])) | |
if row['source']: | |
st.markdown(row['source'], unsafe_allow_html=True) | |
else: | |
st.warning("Invalid source URL") | |
else: | |
st.warning("No matching documents found") | |
st.info("Try these tips:") | |
st.markdown(""" | |
- Use more specific keywords | |
- Check your spelling | |
- Avoid special characters | |
""") | |
except Exception as e: | |
logger.error(f"Search failed: {str(e)}") | |
st.error("Search operation failed. Please try again.") | |
# System monitoring sidebar | |
with st.sidebar: | |
st.subheader("π System Status") | |
col1, col2 = st.columns(2) | |
with col1: | |
st.metric("Total Documents", | |
f"{search_system.metadata_mgr.total_docs:,}", | |
help="Total indexed documents in system") | |
with col2: | |
st.metric("FAISS Shards", | |
len(search_system.index_shards), | |
help="Number of loaded vector index shards") | |
st.metric("Active Memory", | |
f"{psutil.Process().memory_info().rss // 1024 ** 2} MB", | |
help="Current memory usage by the application") | |
# Diagnostics section | |
if diagnostics_available: | |
add_diagnostics_ui(search_system) | |
else: | |
st.warning("Diagnostics module not available") | |
# Health check with error handling | |
if st.button("π©Ί Run Health Check"): | |
try: | |
system_stats = { | |
"shards_loaded": len(search_system.index_shards), | |
"metadata_records": search_system.metadata_mgr.total_docs, | |
"memory_usage": f"{psutil.Process().memory_info().rss // 1024 ** 2} MB", | |
"active_threads": threading.active_count(), | |
"system_load": f"{os.getloadavg()[0]:.2f}" | |
} | |
st.json(system_stats) | |
except Exception as e: | |
st.error(f"Health check failed: {str(e)}") | |
# Cache management | |
if st.button("β»οΈ Clear Cache"): | |
try: | |
st.cache_resource.clear() | |
st.rerun() | |
except Exception as e: | |
st.error(f"Cache clearance failed: {str(e)}") | |
if __name__ == "__main__": | |
main() |