File size: 19,223 Bytes
e0aa230
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
"""

Live Search Processor using Tavily Python Client.

Provides real-time web search capabilities for the RAG system.

"""

import logging
import os
import time
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta

logger = logging.getLogger(__name__)


class LiveSearchProcessor:
    """Handles live web search using Tavily Python Client."""

    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """

        Initialize the LiveSearchProcessor.



        Args:

            config: Configuration dictionary containing live search settings

        """
        self.config = config or {}
        self.logger = logging.getLogger(__name__)

        # Search configuration
        self.enabled = self.config.get("enabled", False)
        self.max_results = self.config.get("max_results", 5)
        self.search_depth = self.config.get("search_depth", "basic")
        self.include_answer = self.config.get("include_answer", True)
        self.include_raw_content = self.config.get("include_raw_content", False)
        self.include_images = self.config.get("include_images", False)
        self.topic = self.config.get("topic", "general")
        self.enable_caching = self.config.get("enable_caching", True)

        # Search cache and analytics
        self.search_cache = {}
        self.search_history = []

        # Initialize Tavily client
        self.tavily_client = None
        self._initialize_client()

        self.logger.info(f"LiveSearchProcessor initialized - Enabled: {self.enabled}")

    def _initialize_client(self):
        """Initialize the Tavily client."""
        try:
            # Get API key from environment variable
            api_key = os.getenv("TAVILY_API_KEY")

            if not api_key:
                self.logger.warning("TAVILY_API_KEY not found in environment variables")
                self.enabled = False
                return

            # Import and initialize Tavily client
            from tavily import TavilyClient

            self.tavily_client = TavilyClient(api_key=api_key)

            # ✅ Auto-enable if client initializes successfully and no explicit config
            if self.tavily_client and not self.config.get(
                "enabled_explicitly_set", False
            ):
                self.enabled = True
                self.logger.info(
                    "Tavily client initialized successfully - Auto-enabled live search"
                )
            else:
                self.logger.info("Tavily client initialized successfully")

        except ImportError:
            self.logger.error(
                "tavily-python package not installed. Install with: pip install tavily-python"
            )
            self.enabled = False
        except Exception as e:
            self.logger.error(f"Failed to initialize Tavily client: {str(e)}")
            self.enabled = False

    def is_enabled(self) -> bool:
        """Check if live search is enabled."""
        return self.enabled and self.tavily_client is not None

    def search_web(

        self,

        query: str,

        max_results: Optional[int] = None,

        search_depth: Optional[str] = None,

        time_range: Optional[str] = None,

    ) -> Dict[str, Any]:
        """

        Perform live web search using Tavily API.



        Args:

            query: Search query string

            max_results: Maximum number of results to return

            search_depth: Search depth ('basic' or 'advanced')

            time_range: Time range for search results



        Returns:

            Dictionary containing search results and metadata

        """
        if not query or not query.strip():
            return {
                "query": query,
                "results": [],
                "total_results": 0,
                "error": "Empty query provided",
                "source": "live_search",
            }

        if not self.is_enabled():
            self.logger.warning("Live search is disabled or client not initialized")
            return {
                "query": query,
                "results": [],
                "total_results": 0,
                "error": "Live search is disabled or Tavily client not initialized",
                "source": "live_search",
            }

        self.logger.info(f"Performing live search: {query[:100]}...")
        start_time = time.time()

        try:
            # Use provided parameters or defaults
            search_max_results = max_results or self.max_results
            search_depth_param = search_depth or self.search_depth

            # Check cache first
            cache_key = self._generate_cache_key(
                query, search_max_results, search_depth_param
            )
            if self.enable_caching and cache_key in self.search_cache:
                cached_result = self.search_cache[cache_key]
                if self._is_cache_valid(cached_result["timestamp"]):
                    self.logger.info("Returning cached search result")
                    cached_result["from_cache"] = True
                    return cached_result

            # Prepare search parameters
            search_params = {
                "query": query,
                "max_results": min(search_max_results, 20),  # Tavily limit
                "search_depth": search_depth_param,
                "include_answer": self.include_answer,
                "include_raw_content": self.include_raw_content,
                "include_images": self.include_images,
                "topic": self.topic,
            }

            # Add time_range if provided
            if time_range:
                search_params["time_range"] = time_range

            # Perform the search
            response = self.tavily_client.search(**search_params)

            # Process and format results
            processed_results = self._process_search_results(
                response.get("results", []), query
            )

            # Prepare final result
            result = {
                "query": query,
                "results": processed_results,
                "total_results": len(processed_results),
                "answer": response.get("answer"),
                "images": response.get("images", []),
                "follow_up_questions": response.get("follow_up_questions", []),
                "search_params": {
                    "max_results": search_max_results,
                    "search_depth": search_depth_param,
                    "time_range": time_range,
                },
                "processing_time": time.time() - start_time,
                "timestamp": datetime.now(),
                "source": "live_search",
                "from_cache": False,
                "search_metadata": {
                    "source": "tavily",
                    "timestamp": datetime.now().isoformat(),
                    "results_count": len(processed_results),
                    "search_depth": search_depth_param,
                    "max_results": search_max_results,
                    "response_time": response.get("response_time"),
                },
            }

            # Cache the result
            if self.enable_caching:
                self.search_cache[cache_key] = result.copy()

            # Add to search history
            self._add_to_history(query, len(processed_results))

            self.logger.info(
                f"Live search completed in {result['processing_time']:.2f}s"
            )
            return result

        except Exception as e:
            self.logger.error(f"Error in live search: {str(e)}")
            return {
                "query": query,
                "results": [],
                "total_results": 0,
                "error": str(e),
                "processing_time": time.time() - start_time,
                "source": "live_search",
            }

    def search(self, query: str, **kwargs) -> Dict[str, Any]:
        """

        Perform a live web search using Tavily API.



        Args:

            query: Search query string

            **kwargs: Additional search parameters



        Returns:

            Dictionary containing search results

        """
        return self.search_web(query, **kwargs)

    def _process_search_results(

        self, raw_results: List[Dict[str, Any]], query: str

    ) -> List[Dict[str, Any]]:
        """

        Process and format raw search results from Tavily.



        Args:

            raw_results: Raw results from Tavily API

            query: Original search query



        Returns:

            Processed and formatted results

        """
        processed_results = []
        query_words = set(query.lower().split())

        for i, result in enumerate(raw_results):
            try:
                # Extract key information
                title = result.get("title", "")
                url = result.get("url", "")
                content = result.get("content", "")
                raw_content = result.get("raw_content", "")
                score = result.get("score", 0.0)

                # Calculate relevance score
                relevance_score = self._calculate_relevance_score(
                    title, content, query_words, score
                )

                # Format result
                formatted_result = {
                    "title": title,
                    "url": url,
                    "content": content[:500] + "..." if len(content) > 500 else content,
                    "raw_content": raw_content if self.include_raw_content else "",
                    "score": score,
                    "relevance_score": relevance_score,
                    "rank": i + 1,
                    "source": "web_search",
                    "search_engine": "tavily",
                    "published_date": result.get("published_date"),
                    "metadata": {
                        "title": title,
                        "url": url,
                        "content_length": len(content),
                        "has_raw_content": bool(raw_content),
                        "search_rank": i + 1,
                    },
                }

                processed_results.append(formatted_result)

            except Exception as e:
                self.logger.warning(f"Error processing search result {i}: {str(e)}")
                continue

        # Sort by relevance score
        processed_results.sort(key=lambda x: x["relevance_score"], reverse=True)

        return processed_results

    def _calculate_relevance_score(

        self, title: str, content: str, query_words: set, base_score: float

    ) -> float:
        """

        Calculate relevance score for search results.



        Args:

            title: Result title

            content: Result content

            query_words: Set of query words

            base_score: Base score from search engine



        Returns:

            Calculated relevance score

        """
        try:
            # Start with base score
            relevance = base_score

            # Title relevance (higher weight)
            title_words = set(title.lower().split())
            title_overlap = len(query_words.intersection(title_words))
            title_boost = (title_overlap / max(len(query_words), 1)) * 0.3

            # Content relevance
            content_words = set(content.lower().split())
            content_overlap = len(query_words.intersection(content_words))
            content_boost = (content_overlap / max(len(query_words), 1)) * 0.2

            # Exact phrase matching bonus
            query_phrase = " ".join(query_words).lower()
            if query_phrase in title.lower():
                relevance += 0.2
            elif query_phrase in content.lower():
                relevance += 0.1

            # Final score calculation
            final_score = min(relevance + title_boost + content_boost, 1.0)

            return round(final_score, 3)

        except Exception as e:
            self.logger.warning(f"Error calculating relevance score: {str(e)}")
            return base_score

    def get_search_context(self, query: str, **kwargs) -> str:
        """

        Get search context suitable for RAG applications.



        Args:

            query: Search query string

            **kwargs: Additional search parameters



        Returns:

            Formatted context string

        """
        search_results = self.search(query, **kwargs)

        if not search_results.get("results"):
            error_msg = search_results.get("error", "Unknown error")
            return f"No live search results found for: {query}. Error: {error_msg}"

        context_parts = []

        # Add answer if available
        if search_results.get("answer"):
            context_parts.append(f"Answer: {search_results['answer']}")
            context_parts.append("")

        # Add search results
        context_parts.append("Search Results:")
        for i, result in enumerate(search_results["results"], 1):
            context_parts.append(f"{i}. {result['title']}")
            context_parts.append(f"   URL: {result['url']}")
            context_parts.append(f"   Content: {result['content']}")
            if result.get("published_date"):
                context_parts.append(f"   Published: {result['published_date']}")
            context_parts.append("")

        # Add metadata
        metadata = search_results.get("search_metadata", {})
        context_parts.append(
            f"Search performed at: {metadata.get('timestamp', 'Unknown')}"
        )
        context_parts.append(f"Source: {metadata.get('source', 'Unknown')}")
        context_parts.append(f"Results count: {metadata.get('results_count', 0)}")

        return "\n".join(context_parts)

    def qna_search(self, query: str, **kwargs) -> str:
        """

        Get a quick answer to a question using Tavily's QnA search.



        Args:

            query: Question to answer

            **kwargs: Additional search parameters



        Returns:

            Answer string

        """
        if not self.is_enabled():
            return "Live search is disabled or not properly configured."

        try:
            # Use Tavily's QnA search method
            answer = self.tavily_client.qna_search(query=query)
            return answer if answer else "No answer found for the given question."

        except Exception as e:
            self.logger.error(f"Error in QnA search: {str(e)}")
            return f"Error getting answer: {str(e)}"

    def _generate_cache_key(

        self, query: str, max_results: int, search_depth: str

    ) -> str:
        """Generate cache key for search results."""
        import hashlib

        cache_string = f"{query.lower().strip()}{max_results}{search_depth}"
        return hashlib.md5(cache_string.encode()).hexdigest()

    def _is_cache_valid(self, timestamp: datetime) -> bool:
        """Check if cached result is still valid (30 minutes for live search)."""
        return datetime.now() - timestamp < timedelta(minutes=30)

    def _add_to_history(self, query: str, result_count: int):
        """Add search to history for analytics."""
        self.search_history.append(
            {
                "query": query,
                "timestamp": datetime.now(),
                "result_count": result_count,
                "search_type": "live_web",
            }
        )

        # Keep only last 50 searches
        if len(self.search_history) > 50:
            self.search_history = self.search_history[-50:]

    def health_check(self) -> Dict[str, Any]:
        """

        Perform a health check of the live search service.



        Returns:

            Dictionary containing health status

        """
        try:
            if not self.enabled:
                return {
                    "status": "disabled",
                    "message": "Live search is disabled in configuration",
                    "timestamp": datetime.now().isoformat(),
                }

            if not self.tavily_client:
                return {
                    "status": "error",
                    "message": "Tavily client not initialized. Check TAVILY_API_KEY environment variable.",
                    "timestamp": datetime.now().isoformat(),
                }

            # Perform a simple test search
            test_result = self.search("test health check", max_results=1)

            if test_result.get("error"):
                return {
                    "status": "error",
                    "message": f"Health check failed: {test_result['error']}",
                    "timestamp": datetime.now().isoformat(),
                }

            return {
                "status": "healthy",
                "message": "Live search service is operational",
                "timestamp": datetime.now().isoformat(),
                "config": {
                    "max_results": self.max_results,
                    "search_depth": self.search_depth,
                    "include_answer": self.include_answer,
                    "topic": self.topic,
                },
            }

        except Exception as e:
            self.logger.error(f"Health check failed: {str(e)}")
            return {
                "status": "error",
                "message": f"Health check failed: {str(e)}",
                "timestamp": datetime.now().isoformat(),
            }

    def get_search_analytics(self) -> Dict[str, Any]:
        """

        Get analytics about search patterns.



        Returns:

            Dictionary with search analytics

        """
        if not self.search_history:
            return {"total_searches": 0, "cache_hit_rate": 0.0, "average_results": 0.0}

        total_searches = len(self.search_history)
        avg_results = (
            sum(s["result_count"] for s in self.search_history) / total_searches
        )

        # Recent search trends
        recent_searches = [s["query"] for s in self.search_history[-10:]]

        return {
            "total_searches": total_searches,
            "average_results_per_search": round(avg_results, 2),
            "recent_searches": recent_searches,
            "cache_size": len(self.search_cache),
            "search_type": "live_web",
        }

    def clear_cache(self):
        """Clear the search cache."""
        self.search_cache.clear()
        self.logger.info("Live search cache cleared")

    def clear_history(self):
        """Clear the search history."""
        self.search_history.clear()
        self.logger.info("Live search history cleared")


# 🔄 Compatibility alias for existing imports
LiveSearchManager = LiveSearchProcessor