File size: 11,512 Bytes
d445f2a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import random
import time
from typing import List, Dict, Any, Optional, Type
from crewai.tools import BaseTool
from pydantic import BaseModel, Field

class SearchRotationArgs(BaseModel):
    """Input schema for SearchRotationTool."""
    query: str = Field(..., description="The search query to look up")

class SearchRotationTool(BaseTool):
    """

    Tool for rotating between multiple search engines with a limit on searches per query.

    

    This tool alternates between different search engines and enforces a maximum

    number of searches per query to manage API usage and costs.

    """
    name: str = Field(
        default="Web Search Rotation",
        description="Search the internet using multiple search engines in rotation"
    )
    description: str = Field(
        default="Use this tool to search for information on the internet using different search engines in rotation.",
        description="Description of the search rotation tool"
    )
    
    search_tools: List[BaseTool] = Field(
        default=[],
        description="List of search tools to rotate between"
    )
    max_searches_per_query: int = Field(
        default=5,
        description="Maximum number of searches allowed per query"
    )
    cache_timeout: int = Field(
        default=300,  # 5 minutes
        description="How long to cache results for similar queries in seconds"
    )
    
    args_schema: Type[BaseModel] = SearchRotationArgs
    
    def __init__(self, **data):
        super().__init__(**data)
        if not self.search_tools:
            raise ValueError("At least one search tool must be provided")
        self._search_count = 0
        self._current_search_query = None
        self._last_used_tool = None
        self._cache = {}  # Simple cache for recent queries
        self._last_search_time = {}  # Track when each tool was last used
        
        # Log available search tools
        tool_names = [tool.name for tool in self.search_tools]
        print(f"SearchRotationTool initialized with tools: {', '.join(tool_names)}")
    
    def _run(self, query: str) -> str:
        """

        Execute a web search using a rotation of search engines.

        

        Args:

            query: The search query to look up

            

        Returns:

            String containing the search results

        """
        print(f"SearchRotationTool executing search for: '{query}'")
        
        # Check cache first for very similar queries
        for cached_query, (timestamp, result) in list(self._cache.items()):
            # Simple similarity check - if query is very similar to a cached query
            if self._is_similar_query(query, cached_query):
                # Check if cache is still valid
                if time.time() - timestamp < self.cache_timeout:
                    print(f"Using cached result for similar query: '{cached_query}'")
                    return f"{result}\n\n[Cached result from similar query: '{cached_query}']"
                else:
                    # Remove expired cache entries to prevent cache bloat
                    print(f"Cache expired for query: '{cached_query}'")
                    self._cache.pop(cached_query, None)
        
        # Reset counter if this is a new query
        if not self._is_similar_query(self._current_search_query, query):
            print(f"New search query detected. Resetting search count.")
            self._current_search_query = query
            self._search_count = 0
        
        # Check if we've reached the search limit
        if self._search_count >= self.max_searches_per_query:
            print(f"Search limit reached ({self._search_count}/{self.max_searches_per_query})")
            return (f"Search limit reached. You've performed {self._search_count} searches "
                    f"for this query. Maximum allowed is {self.max_searches_per_query}.")
        
        # Select the most appropriate search tool based on usage and delay
        search_tool = self._select_optimal_tool()
        print(f"Selected search tool: {search_tool.name}")
        
        # Keep track of which tools we've tried for this specific search attempt
        tried_tools = set()
        max_retry_attempts = min(3, len(self.search_tools))
        retry_count = 0
        
        while retry_count < max_retry_attempts:
            tried_tools.add(search_tool.name)
            
            try:
                # Execute the search
                print(f"Using Tool: {search_tool.name}")
                start_time = time.time()
                result = search_tool.run(query)
                search_time = time.time() - start_time
                
                # Basic validation of result - check if it's empty or error message
                if not result or "error" in result.lower() or len(result.strip()) < 20:
                    # Result might be invalid, try another tool if available
                    print(f"Invalid or error result from {search_tool.name}. Trying another tool.")
                    retry_count += 1
                    search_tool = self._select_next_tool(tried_tools)
                    if not search_tool:  # No more tools to try
                        print("All search tools failed. No more tools to try.")
                        return "All search tools failed to provide meaningful results for this query."
                    continue
                
                # Valid result obtained
                print(f"Valid result obtained from {search_tool.name} in {search_time:.2f}s")
                
                # Update tracking
                self._last_used_tool = search_tool
                self._last_search_time[search_tool.name] = time.time()
                
                # Cache the result
                self._cache[query] = (time.time(), result)
                
                # Increment the counter
                self._search_count += 1
                print(f"Search count incremented to {self._search_count}/{self.max_searches_per_query}")
                
                # Add usage information
                searches_left = self.max_searches_per_query - self._search_count
                usage_info = f"\n\nSearch performed using {search_tool.name} in {search_time:.2f}s. "
                usage_info += f"Searches used: {self._search_count}/{self.max_searches_per_query}. "
                usage_info += f"Searches remaining: {max(0, searches_left)}."
                
                return f"{result}\n{usage_info}"
            
            except Exception as e:
                # If this search tool fails, try another one
                print(f"Exception in {search_tool.name}: {str(e)}")
                retry_count += 1
                search_tool = self._select_next_tool(tried_tools)
                if not search_tool:  # No more tools to try
                    print("All search tools failed with exceptions. No more tools to try.")
                    return f"Error searching with all available search engines: {str(e)}"
        
        # If we've exhausted our retry attempts
        print(f"Failed after {retry_count} retry attempts")
        return "Failed to get search results after multiple attempts with different search engines."
    
    def _select_next_tool(self, tried_tools: set) -> Optional[BaseTool]:
        """Select the next tool that hasn't been tried yet."""
        available_tools = [t for t in self.search_tools if t.name not in tried_tools]
        if not available_tools:
            return None
        
        # Sort by last used time (oldest first) if we have that data
        if self._last_search_time:
            available_tools.sort(key=lambda t: self._last_search_time.get(t.name, 0))
        
        return available_tools[0] if available_tools else None
    
    def _select_optimal_tool(self) -> BaseTool:
        """Select the best tool based on recent usage patterns."""
        current_time = time.time()
        
        # If we have no history or all tools used recently, pick randomly with weights
        if not self._last_used_tool or not self._last_search_time:
            return random.choice(self.search_tools)
        
        # Try to avoid using the same tool twice in a row
        available_tools = [t for t in self.search_tools if t != self._last_used_tool]
        
        # If we have multiple tools available, choose the one used least recently
        if available_tools:
            # Sort by last used time (oldest first)
            available_tools.sort(key=lambda t: self._last_search_time.get(t.name, 0))
            return available_tools[0]
        
        # If only one tool available, use it
        return self.search_tools[0]
    
    def _is_similar_query(self, query1, query2):
        """Check if two queries are similar enough to use cached results."""
        if not query1 or not query2:
            return False
            
        # Convert to lowercase and remove common filler words
        q1 = query1.lower()
        q2 = query2.lower()
        
        # If the strings are identical
        if q1 == q2:
            return True
            
        # Remove common filler words to focus on meaningful terms
        filler_words = {'the', 'a', 'an', 'and', 'or', 'but', 'is', 'are', 'was', 'were',
                       'in', 'on', 'at', 'to', 'for', 'with', 'by', 'about', 'like',
                       'through', 'over', 'before', 'between', 'after', 'since', 'without',
                       'under', 'within', 'along', 'following', 'across', 'behind',
                       'beyond', 'plus', 'except', 'but', 'up', 'down', 'off', 'on', 'me', 'you'}
        
        # Clean and tokenize
        def clean_and_tokenize(q):
            # Remove punctuation
            q = ''.join(c for c in q if c.isalnum() or c.isspace())
            # Tokenize
            tokens = q.split()
            # Remove filler words
            return {word for word in tokens if word.lower() not in filler_words and len(word) > 1}
        
        words1 = clean_and_tokenize(q1)
        words2 = clean_and_tokenize(q2)
        
        # If either query has no significant words after cleaning, they're not similar
        if not words1 or not words2:
            return False
        
        # Calculate Jaccard similarity
        intersection = len(words1.intersection(words2))
        union = len(words1.union(words2))
        
        # If the queries are short, we require more overlap
        min_words = min(len(words1), len(words2))
        max_words = max(len(words1), len(words2))
        
        # For short queries, use strict similarity threshold
        if min_words <= 3:
            # For very short queries, require almost exact match
            return intersection / union > 0.8
        # For normal length queries
        elif min_words <= 6:
            return intersection / union > 0.7
        # For longer queries
        else:
            # Check both Jaccard similarity and absolute intersection size
            # For long queries, having many words in common is important
            absolute_overlap_threshold = min(5, min_words // 2)
            return (intersection / union > 0.6) or (intersection >= absolute_overlap_threshold)