payman / src /rag /live_search.py
satyamdev404's picture
Upload 31 files
e0aa230 verified
"""
Live Search Processor using Tavily Python Client.
Provides real-time web search capabilities for the RAG system.
"""
import logging
import os
import time
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class LiveSearchProcessor:
"""Handles live web search using Tavily Python Client."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize the LiveSearchProcessor.
Args:
config: Configuration dictionary containing live search settings
"""
self.config = config or {}
self.logger = logging.getLogger(__name__)
# Search configuration
self.enabled = self.config.get("enabled", False)
self.max_results = self.config.get("max_results", 5)
self.search_depth = self.config.get("search_depth", "basic")
self.include_answer = self.config.get("include_answer", True)
self.include_raw_content = self.config.get("include_raw_content", False)
self.include_images = self.config.get("include_images", False)
self.topic = self.config.get("topic", "general")
self.enable_caching = self.config.get("enable_caching", True)
# Search cache and analytics
self.search_cache = {}
self.search_history = []
# Initialize Tavily client
self.tavily_client = None
self._initialize_client()
self.logger.info(f"LiveSearchProcessor initialized - Enabled: {self.enabled}")
def _initialize_client(self):
"""Initialize the Tavily client."""
try:
# Get API key from environment variable
api_key = os.getenv("TAVILY_API_KEY")
if not api_key:
self.logger.warning("TAVILY_API_KEY not found in environment variables")
self.enabled = False
return
# Import and initialize Tavily client
from tavily import TavilyClient
self.tavily_client = TavilyClient(api_key=api_key)
# ✅ Auto-enable if client initializes successfully and no explicit config
if self.tavily_client and not self.config.get(
"enabled_explicitly_set", False
):
self.enabled = True
self.logger.info(
"Tavily client initialized successfully - Auto-enabled live search"
)
else:
self.logger.info("Tavily client initialized successfully")
except ImportError:
self.logger.error(
"tavily-python package not installed. Install with: pip install tavily-python"
)
self.enabled = False
except Exception as e:
self.logger.error(f"Failed to initialize Tavily client: {str(e)}")
self.enabled = False
def is_enabled(self) -> bool:
"""Check if live search is enabled."""
return self.enabled and self.tavily_client is not None
def search_web(
self,
query: str,
max_results: Optional[int] = None,
search_depth: Optional[str] = None,
time_range: Optional[str] = None,
) -> Dict[str, Any]:
"""
Perform live web search using Tavily API.
Args:
query: Search query string
max_results: Maximum number of results to return
search_depth: Search depth ('basic' or 'advanced')
time_range: Time range for search results
Returns:
Dictionary containing search results and metadata
"""
if not query or not query.strip():
return {
"query": query,
"results": [],
"total_results": 0,
"error": "Empty query provided",
"source": "live_search",
}
if not self.is_enabled():
self.logger.warning("Live search is disabled or client not initialized")
return {
"query": query,
"results": [],
"total_results": 0,
"error": "Live search is disabled or Tavily client not initialized",
"source": "live_search",
}
self.logger.info(f"Performing live search: {query[:100]}...")
start_time = time.time()
try:
# Use provided parameters or defaults
search_max_results = max_results or self.max_results
search_depth_param = search_depth or self.search_depth
# Check cache first
cache_key = self._generate_cache_key(
query, search_max_results, search_depth_param
)
if self.enable_caching and cache_key in self.search_cache:
cached_result = self.search_cache[cache_key]
if self._is_cache_valid(cached_result["timestamp"]):
self.logger.info("Returning cached search result")
cached_result["from_cache"] = True
return cached_result
# Prepare search parameters
search_params = {
"query": query,
"max_results": min(search_max_results, 20), # Tavily limit
"search_depth": search_depth_param,
"include_answer": self.include_answer,
"include_raw_content": self.include_raw_content,
"include_images": self.include_images,
"topic": self.topic,
}
# Add time_range if provided
if time_range:
search_params["time_range"] = time_range
# Perform the search
response = self.tavily_client.search(**search_params)
# Process and format results
processed_results = self._process_search_results(
response.get("results", []), query
)
# Prepare final result
result = {
"query": query,
"results": processed_results,
"total_results": len(processed_results),
"answer": response.get("answer"),
"images": response.get("images", []),
"follow_up_questions": response.get("follow_up_questions", []),
"search_params": {
"max_results": search_max_results,
"search_depth": search_depth_param,
"time_range": time_range,
},
"processing_time": time.time() - start_time,
"timestamp": datetime.now(),
"source": "live_search",
"from_cache": False,
"search_metadata": {
"source": "tavily",
"timestamp": datetime.now().isoformat(),
"results_count": len(processed_results),
"search_depth": search_depth_param,
"max_results": search_max_results,
"response_time": response.get("response_time"),
},
}
# Cache the result
if self.enable_caching:
self.search_cache[cache_key] = result.copy()
# Add to search history
self._add_to_history(query, len(processed_results))
self.logger.info(
f"Live search completed in {result['processing_time']:.2f}s"
)
return result
except Exception as e:
self.logger.error(f"Error in live search: {str(e)}")
return {
"query": query,
"results": [],
"total_results": 0,
"error": str(e),
"processing_time": time.time() - start_time,
"source": "live_search",
}
def search(self, query: str, **kwargs) -> Dict[str, Any]:
"""
Perform a live web search using Tavily API.
Args:
query: Search query string
**kwargs: Additional search parameters
Returns:
Dictionary containing search results
"""
return self.search_web(query, **kwargs)
def _process_search_results(
self, raw_results: List[Dict[str, Any]], query: str
) -> List[Dict[str, Any]]:
"""
Process and format raw search results from Tavily.
Args:
raw_results: Raw results from Tavily API
query: Original search query
Returns:
Processed and formatted results
"""
processed_results = []
query_words = set(query.lower().split())
for i, result in enumerate(raw_results):
try:
# Extract key information
title = result.get("title", "")
url = result.get("url", "")
content = result.get("content", "")
raw_content = result.get("raw_content", "")
score = result.get("score", 0.0)
# Calculate relevance score
relevance_score = self._calculate_relevance_score(
title, content, query_words, score
)
# Format result
formatted_result = {
"title": title,
"url": url,
"content": content[:500] + "..." if len(content) > 500 else content,
"raw_content": raw_content if self.include_raw_content else "",
"score": score,
"relevance_score": relevance_score,
"rank": i + 1,
"source": "web_search",
"search_engine": "tavily",
"published_date": result.get("published_date"),
"metadata": {
"title": title,
"url": url,
"content_length": len(content),
"has_raw_content": bool(raw_content),
"search_rank": i + 1,
},
}
processed_results.append(formatted_result)
except Exception as e:
self.logger.warning(f"Error processing search result {i}: {str(e)}")
continue
# Sort by relevance score
processed_results.sort(key=lambda x: x["relevance_score"], reverse=True)
return processed_results
def _calculate_relevance_score(
self, title: str, content: str, query_words: set, base_score: float
) -> float:
"""
Calculate relevance score for search results.
Args:
title: Result title
content: Result content
query_words: Set of query words
base_score: Base score from search engine
Returns:
Calculated relevance score
"""
try:
# Start with base score
relevance = base_score
# Title relevance (higher weight)
title_words = set(title.lower().split())
title_overlap = len(query_words.intersection(title_words))
title_boost = (title_overlap / max(len(query_words), 1)) * 0.3
# Content relevance
content_words = set(content.lower().split())
content_overlap = len(query_words.intersection(content_words))
content_boost = (content_overlap / max(len(query_words), 1)) * 0.2
# Exact phrase matching bonus
query_phrase = " ".join(query_words).lower()
if query_phrase in title.lower():
relevance += 0.2
elif query_phrase in content.lower():
relevance += 0.1
# Final score calculation
final_score = min(relevance + title_boost + content_boost, 1.0)
return round(final_score, 3)
except Exception as e:
self.logger.warning(f"Error calculating relevance score: {str(e)}")
return base_score
def get_search_context(self, query: str, **kwargs) -> str:
"""
Get search context suitable for RAG applications.
Args:
query: Search query string
**kwargs: Additional search parameters
Returns:
Formatted context string
"""
search_results = self.search(query, **kwargs)
if not search_results.get("results"):
error_msg = search_results.get("error", "Unknown error")
return f"No live search results found for: {query}. Error: {error_msg}"
context_parts = []
# Add answer if available
if search_results.get("answer"):
context_parts.append(f"Answer: {search_results['answer']}")
context_parts.append("")
# Add search results
context_parts.append("Search Results:")
for i, result in enumerate(search_results["results"], 1):
context_parts.append(f"{i}. {result['title']}")
context_parts.append(f" URL: {result['url']}")
context_parts.append(f" Content: {result['content']}")
if result.get("published_date"):
context_parts.append(f" Published: {result['published_date']}")
context_parts.append("")
# Add metadata
metadata = search_results.get("search_metadata", {})
context_parts.append(
f"Search performed at: {metadata.get('timestamp', 'Unknown')}"
)
context_parts.append(f"Source: {metadata.get('source', 'Unknown')}")
context_parts.append(f"Results count: {metadata.get('results_count', 0)}")
return "\n".join(context_parts)
def qna_search(self, query: str, **kwargs) -> str:
"""
Get a quick answer to a question using Tavily's QnA search.
Args:
query: Question to answer
**kwargs: Additional search parameters
Returns:
Answer string
"""
if not self.is_enabled():
return "Live search is disabled or not properly configured."
try:
# Use Tavily's QnA search method
answer = self.tavily_client.qna_search(query=query)
return answer if answer else "No answer found for the given question."
except Exception as e:
self.logger.error(f"Error in QnA search: {str(e)}")
return f"Error getting answer: {str(e)}"
def _generate_cache_key(
self, query: str, max_results: int, search_depth: str
) -> str:
"""Generate cache key for search results."""
import hashlib
cache_string = f"{query.lower().strip()}{max_results}{search_depth}"
return hashlib.md5(cache_string.encode()).hexdigest()
def _is_cache_valid(self, timestamp: datetime) -> bool:
"""Check if cached result is still valid (30 minutes for live search)."""
return datetime.now() - timestamp < timedelta(minutes=30)
def _add_to_history(self, query: str, result_count: int):
"""Add search to history for analytics."""
self.search_history.append(
{
"query": query,
"timestamp": datetime.now(),
"result_count": result_count,
"search_type": "live_web",
}
)
# Keep only last 50 searches
if len(self.search_history) > 50:
self.search_history = self.search_history[-50:]
def health_check(self) -> Dict[str, Any]:
"""
Perform a health check of the live search service.
Returns:
Dictionary containing health status
"""
try:
if not self.enabled:
return {
"status": "disabled",
"message": "Live search is disabled in configuration",
"timestamp": datetime.now().isoformat(),
}
if not self.tavily_client:
return {
"status": "error",
"message": "Tavily client not initialized. Check TAVILY_API_KEY environment variable.",
"timestamp": datetime.now().isoformat(),
}
# Perform a simple test search
test_result = self.search("test health check", max_results=1)
if test_result.get("error"):
return {
"status": "error",
"message": f"Health check failed: {test_result['error']}",
"timestamp": datetime.now().isoformat(),
}
return {
"status": "healthy",
"message": "Live search service is operational",
"timestamp": datetime.now().isoformat(),
"config": {
"max_results": self.max_results,
"search_depth": self.search_depth,
"include_answer": self.include_answer,
"topic": self.topic,
},
}
except Exception as e:
self.logger.error(f"Health check failed: {str(e)}")
return {
"status": "error",
"message": f"Health check failed: {str(e)}",
"timestamp": datetime.now().isoformat(),
}
def get_search_analytics(self) -> Dict[str, Any]:
"""
Get analytics about search patterns.
Returns:
Dictionary with search analytics
"""
if not self.search_history:
return {"total_searches": 0, "cache_hit_rate": 0.0, "average_results": 0.0}
total_searches = len(self.search_history)
avg_results = (
sum(s["result_count"] for s in self.search_history) / total_searches
)
# Recent search trends
recent_searches = [s["query"] for s in self.search_history[-10:]]
return {
"total_searches": total_searches,
"average_results_per_search": round(avg_results, 2),
"recent_searches": recent_searches,
"cache_size": len(self.search_cache),
"search_type": "live_web",
}
def clear_cache(self):
"""Clear the search cache."""
self.search_cache.clear()
self.logger.info("Live search cache cleared")
def clear_history(self):
"""Clear the search history."""
self.search_history.clear()
self.logger.info("Live search history cleared")
# 🔄 Compatibility alias for existing imports
LiveSearchManager = LiveSearchProcessor