Spaces:
Sleeping
Sleeping
import time | |
from typing import Any, Dict, Optional, Type | |
from crewai.tools import BaseTool | |
from pydantic import BaseModel, Field, model_validator, create_model | |
import logging | |
logger = logging.getLogger(__name__) | |
class RateLimitedToolWrapper(BaseTool): | |
""" | |
A wrapper tool that adds an optional time delay after executing another tool. | |
Useful for enforcing rate limits on API calls or simply adding a pause. | |
It also ensures that arguments are correctly passed to the wrapped tool. | |
""" | |
name: str = Field( | |
default="Rate Limited Tool Wrapper", | |
description="A tool that wraps another tool to add a delay after execution" | |
) | |
description: str = Field( | |
default="Wraps another tool to add a delay after execution, enforcing rate limits.", | |
description="The tool's description that will be passed to the agent" | |
) | |
tool: BaseTool = Field( | |
..., | |
description="The tool to be wrapped with rate limiting" | |
) | |
delay: float = Field( | |
default=0.0, | |
description="Delay in seconds to wait after tool execution (0 means no delay)", | |
ge=0.0 | |
) | |
# Create a simple args schema for fallback | |
class RateLimitedToolArgs(BaseModel): | |
query: str = Field(..., description="The search query to pass to the wrapped tool") | |
def __init__(self, **data): | |
# Store the original args_schema if available | |
tool = data.get('tool') | |
# Set args_schema directly in data before initialization | |
if tool and hasattr(tool, 'args_schema') and tool.args_schema is not None: | |
if isinstance(tool.args_schema, type) and issubclass(tool.args_schema, BaseModel): | |
data['args_schema'] = tool.args_schema | |
else: | |
data['args_schema'] = self.RateLimitedToolArgs | |
else: | |
data['args_schema'] = self.RateLimitedToolArgs | |
super().__init__(**data) | |
def _run(self, query: str) -> str: | |
""" | |
Run the wrapped tool with the query parameter and then pause for the specified delay. | |
Args: | |
query: The query string to pass to the wrapped tool. | |
Returns: | |
The result from the wrapped tool. | |
""" | |
logger.debug(f"RateLimitedToolWrapper: Running tool '{self.tool.name}' with query: {query}") | |
try: | |
# Call the tool's run method with the query | |
result = self.tool.run(query) | |
except Exception as e: | |
logger.error(f"Exception running wrapped tool '{self.tool.name}': {e}") | |
# Fall back to trying the _run method directly if the run method fails | |
try: | |
if hasattr(self.tool, '_run'): | |
logger.warning(f"Falling back to direct _run call for tool '{self.tool.name}'") | |
result = self.tool._run(query) | |
else: | |
raise e | |
except Exception as inner_e: | |
logger.error(f"Fallback also failed for tool '{self.tool.name}': {inner_e}") | |
raise inner_e | |
# Enforce the delay only if greater than 0 | |
if self.delay > 0: | |
logger.info(f"Rate limit enforced: Waiting {self.delay:.2f} seconds after running {self.tool.name}.") | |
time.sleep(self.delay) | |
return result |