""" Live Search Processor using Tavily Python Client. Provides real-time web search capabilities for the RAG system. """ import logging import os import time from typing import Dict, List, Any, Optional from datetime import datetime, timedelta logger = logging.getLogger(__name__) class LiveSearchProcessor: """Handles live web search using Tavily Python Client.""" def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize the LiveSearchProcessor. Args: config: Configuration dictionary containing live search settings """ self.config = config or {} self.logger = logging.getLogger(__name__) # Search configuration self.enabled = self.config.get("enabled", False) self.max_results = self.config.get("max_results", 5) self.search_depth = self.config.get("search_depth", "basic") self.include_answer = self.config.get("include_answer", True) self.include_raw_content = self.config.get("include_raw_content", False) self.include_images = self.config.get("include_images", False) self.topic = self.config.get("topic", "general") self.enable_caching = self.config.get("enable_caching", True) # Search cache and analytics self.search_cache = {} self.search_history = [] # Initialize Tavily client self.tavily_client = None self._initialize_client() self.logger.info(f"LiveSearchProcessor initialized - Enabled: {self.enabled}") def _initialize_client(self): """Initialize the Tavily client.""" try: # Get API key from environment variable api_key = os.getenv("TAVILY_API_KEY") if not api_key: self.logger.warning("TAVILY_API_KEY not found in environment variables") self.enabled = False return # Import and initialize Tavily client from tavily import TavilyClient self.tavily_client = TavilyClient(api_key=api_key) # ✅ Auto-enable if client initializes successfully and no explicit config if self.tavily_client and not self.config.get( "enabled_explicitly_set", False ): self.enabled = True self.logger.info( "Tavily client initialized successfully - Auto-enabled live search" ) else: self.logger.info("Tavily client initialized successfully") except ImportError: self.logger.error( "tavily-python package not installed. Install with: pip install tavily-python" ) self.enabled = False except Exception as e: self.logger.error(f"Failed to initialize Tavily client: {str(e)}") self.enabled = False def is_enabled(self) -> bool: """Check if live search is enabled.""" return self.enabled and self.tavily_client is not None def search_web( self, query: str, max_results: Optional[int] = None, search_depth: Optional[str] = None, time_range: Optional[str] = None, ) -> Dict[str, Any]: """ Perform live web search using Tavily API. Args: query: Search query string max_results: Maximum number of results to return search_depth: Search depth ('basic' or 'advanced') time_range: Time range for search results Returns: Dictionary containing search results and metadata """ if not query or not query.strip(): return { "query": query, "results": [], "total_results": 0, "error": "Empty query provided", "source": "live_search", } if not self.is_enabled(): self.logger.warning("Live search is disabled or client not initialized") return { "query": query, "results": [], "total_results": 0, "error": "Live search is disabled or Tavily client not initialized", "source": "live_search", } self.logger.info(f"Performing live search: {query[:100]}...") start_time = time.time() try: # Use provided parameters or defaults search_max_results = max_results or self.max_results search_depth_param = search_depth or self.search_depth # Check cache first cache_key = self._generate_cache_key( query, search_max_results, search_depth_param ) if self.enable_caching and cache_key in self.search_cache: cached_result = self.search_cache[cache_key] if self._is_cache_valid(cached_result["timestamp"]): self.logger.info("Returning cached search result") cached_result["from_cache"] = True return cached_result # Prepare search parameters search_params = { "query": query, "max_results": min(search_max_results, 20), # Tavily limit "search_depth": search_depth_param, "include_answer": self.include_answer, "include_raw_content": self.include_raw_content, "include_images": self.include_images, "topic": self.topic, } # Add time_range if provided if time_range: search_params["time_range"] = time_range # Perform the search response = self.tavily_client.search(**search_params) # Process and format results processed_results = self._process_search_results( response.get("results", []), query ) # Prepare final result result = { "query": query, "results": processed_results, "total_results": len(processed_results), "answer": response.get("answer"), "images": response.get("images", []), "follow_up_questions": response.get("follow_up_questions", []), "search_params": { "max_results": search_max_results, "search_depth": search_depth_param, "time_range": time_range, }, "processing_time": time.time() - start_time, "timestamp": datetime.now(), "source": "live_search", "from_cache": False, "search_metadata": { "source": "tavily", "timestamp": datetime.now().isoformat(), "results_count": len(processed_results), "search_depth": search_depth_param, "max_results": search_max_results, "response_time": response.get("response_time"), }, } # Cache the result if self.enable_caching: self.search_cache[cache_key] = result.copy() # Add to search history self._add_to_history(query, len(processed_results)) self.logger.info( f"Live search completed in {result['processing_time']:.2f}s" ) return result except Exception as e: self.logger.error(f"Error in live search: {str(e)}") return { "query": query, "results": [], "total_results": 0, "error": str(e), "processing_time": time.time() - start_time, "source": "live_search", } def search(self, query: str, **kwargs) -> Dict[str, Any]: """ Perform a live web search using Tavily API. Args: query: Search query string **kwargs: Additional search parameters Returns: Dictionary containing search results """ return self.search_web(query, **kwargs) def _process_search_results( self, raw_results: List[Dict[str, Any]], query: str ) -> List[Dict[str, Any]]: """ Process and format raw search results from Tavily. Args: raw_results: Raw results from Tavily API query: Original search query Returns: Processed and formatted results """ processed_results = [] query_words = set(query.lower().split()) for i, result in enumerate(raw_results): try: # Extract key information title = result.get("title", "") url = result.get("url", "") content = result.get("content", "") raw_content = result.get("raw_content", "") score = result.get("score", 0.0) # Calculate relevance score relevance_score = self._calculate_relevance_score( title, content, query_words, score ) # Format result formatted_result = { "title": title, "url": url, "content": content[:500] + "..." if len(content) > 500 else content, "raw_content": raw_content if self.include_raw_content else "", "score": score, "relevance_score": relevance_score, "rank": i + 1, "source": "web_search", "search_engine": "tavily", "published_date": result.get("published_date"), "metadata": { "title": title, "url": url, "content_length": len(content), "has_raw_content": bool(raw_content), "search_rank": i + 1, }, } processed_results.append(formatted_result) except Exception as e: self.logger.warning(f"Error processing search result {i}: {str(e)}") continue # Sort by relevance score processed_results.sort(key=lambda x: x["relevance_score"], reverse=True) return processed_results def _calculate_relevance_score( self, title: str, content: str, query_words: set, base_score: float ) -> float: """ Calculate relevance score for search results. Args: title: Result title content: Result content query_words: Set of query words base_score: Base score from search engine Returns: Calculated relevance score """ try: # Start with base score relevance = base_score # Title relevance (higher weight) title_words = set(title.lower().split()) title_overlap = len(query_words.intersection(title_words)) title_boost = (title_overlap / max(len(query_words), 1)) * 0.3 # Content relevance content_words = set(content.lower().split()) content_overlap = len(query_words.intersection(content_words)) content_boost = (content_overlap / max(len(query_words), 1)) * 0.2 # Exact phrase matching bonus query_phrase = " ".join(query_words).lower() if query_phrase in title.lower(): relevance += 0.2 elif query_phrase in content.lower(): relevance += 0.1 # Final score calculation final_score = min(relevance + title_boost + content_boost, 1.0) return round(final_score, 3) except Exception as e: self.logger.warning(f"Error calculating relevance score: {str(e)}") return base_score def get_search_context(self, query: str, **kwargs) -> str: """ Get search context suitable for RAG applications. Args: query: Search query string **kwargs: Additional search parameters Returns: Formatted context string """ search_results = self.search(query, **kwargs) if not search_results.get("results"): error_msg = search_results.get("error", "Unknown error") return f"No live search results found for: {query}. Error: {error_msg}" context_parts = [] # Add answer if available if search_results.get("answer"): context_parts.append(f"Answer: {search_results['answer']}") context_parts.append("") # Add search results context_parts.append("Search Results:") for i, result in enumerate(search_results["results"], 1): context_parts.append(f"{i}. {result['title']}") context_parts.append(f" URL: {result['url']}") context_parts.append(f" Content: {result['content']}") if result.get("published_date"): context_parts.append(f" Published: {result['published_date']}") context_parts.append("") # Add metadata metadata = search_results.get("search_metadata", {}) context_parts.append( f"Search performed at: {metadata.get('timestamp', 'Unknown')}" ) context_parts.append(f"Source: {metadata.get('source', 'Unknown')}") context_parts.append(f"Results count: {metadata.get('results_count', 0)}") return "\n".join(context_parts) def qna_search(self, query: str, **kwargs) -> str: """ Get a quick answer to a question using Tavily's QnA search. Args: query: Question to answer **kwargs: Additional search parameters Returns: Answer string """ if not self.is_enabled(): return "Live search is disabled or not properly configured." try: # Use Tavily's QnA search method answer = self.tavily_client.qna_search(query=query) return answer if answer else "No answer found for the given question." except Exception as e: self.logger.error(f"Error in QnA search: {str(e)}") return f"Error getting answer: {str(e)}" def _generate_cache_key( self, query: str, max_results: int, search_depth: str ) -> str: """Generate cache key for search results.""" import hashlib cache_string = f"{query.lower().strip()}{max_results}{search_depth}" return hashlib.md5(cache_string.encode()).hexdigest() def _is_cache_valid(self, timestamp: datetime) -> bool: """Check if cached result is still valid (30 minutes for live search).""" return datetime.now() - timestamp < timedelta(minutes=30) def _add_to_history(self, query: str, result_count: int): """Add search to history for analytics.""" self.search_history.append( { "query": query, "timestamp": datetime.now(), "result_count": result_count, "search_type": "live_web", } ) # Keep only last 50 searches if len(self.search_history) > 50: self.search_history = self.search_history[-50:] def health_check(self) -> Dict[str, Any]: """ Perform a health check of the live search service. Returns: Dictionary containing health status """ try: if not self.enabled: return { "status": "disabled", "message": "Live search is disabled in configuration", "timestamp": datetime.now().isoformat(), } if not self.tavily_client: return { "status": "error", "message": "Tavily client not initialized. Check TAVILY_API_KEY environment variable.", "timestamp": datetime.now().isoformat(), } # Perform a simple test search test_result = self.search("test health check", max_results=1) if test_result.get("error"): return { "status": "error", "message": f"Health check failed: {test_result['error']}", "timestamp": datetime.now().isoformat(), } return { "status": "healthy", "message": "Live search service is operational", "timestamp": datetime.now().isoformat(), "config": { "max_results": self.max_results, "search_depth": self.search_depth, "include_answer": self.include_answer, "topic": self.topic, }, } except Exception as e: self.logger.error(f"Health check failed: {str(e)}") return { "status": "error", "message": f"Health check failed: {str(e)}", "timestamp": datetime.now().isoformat(), } def get_search_analytics(self) -> Dict[str, Any]: """ Get analytics about search patterns. Returns: Dictionary with search analytics """ if not self.search_history: return {"total_searches": 0, "cache_hit_rate": 0.0, "average_results": 0.0} total_searches = len(self.search_history) avg_results = ( sum(s["result_count"] for s in self.search_history) / total_searches ) # Recent search trends recent_searches = [s["query"] for s in self.search_history[-10:]] return { "total_searches": total_searches, "average_results_per_search": round(avg_results, 2), "recent_searches": recent_searches, "cache_size": len(self.search_cache), "search_type": "live_web", } def clear_cache(self): """Clear the search cache.""" self.search_cache.clear() self.logger.info("Live search cache cleared") def clear_history(self): """Clear the search history.""" self.search_history.clear() self.logger.info("Live search history cleared") # 🔄 Compatibility alias for existing imports LiveSearchManager = LiveSearchProcessor