File size: 3,250 Bytes
f104fee
 
 
 
 
 
9b006e9
 
f104fee
 
9b006e9
f104fee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b006e9
 
 
 
 
 
 
 
 
 
f104fee
9b006e9
f104fee
 
 
9b006e9
 
 
 
f104fee
 
 
 
 
 
 
 
 
 
 
 
 
9b006e9
 
 
 
 
f104fee
 
 
 
 
 
 
 
 
 
9b006e9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
from langchain.tools import BaseTool
from pydantic import BaseModel, Field, PrivateAttr
import asyncio
import aiohttp
import hashlib
import json
from tenacity import retry, stop_after_attempt, wait_exponential
from src.utils.logger import get_logger
from src.utils.cache_manager import cache_manager

logger = get_logger(__name__)

class Web3ToolInput(BaseModel):
    query: str = Field(description="Search query or parameter")
    filters: Optional[Dict[str, Any]] = Field(default=None, description="Additional filters")

class BaseWeb3Tool(BaseTool, ABC):
    name: str = "base_web3_tool"
    description: str = "Base Web3 tool"
    args_schema: type[BaseModel] = Web3ToolInput
    
    _session: Optional[aiohttp.ClientSession] = PrivateAttr(default=None)
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._session = None
    
    async def get_session(self):
        if not self._session:
            timeout = aiohttp.ClientTimeout(total=30)
            self._session = aiohttp.ClientSession(timeout=timeout)
        return self._session
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=8))
    async def make_request(self, url: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
        # Create cache key
        cache_key = self._create_cache_key(url, params or {})
        
        # Check cache first
        cached_result = cache_manager.get(cache_key)
        if cached_result is not None:
            logger.debug(f"Cache hit for {url}")
            return cached_result
        
        logger.debug(f"Cache miss for {url}")
        session = await self.get_session()
        
        try:
            async with session.get(url, params=params or {}) as response:
                if response.status == 200:
                    result = await response.json()
                    # Cache successful responses for 5 minutes
                    cache_manager.set(cache_key, result, ttl=300)
                    return result
                elif response.status == 429:
                    await asyncio.sleep(2)
                    raise aiohttp.ClientResponseError(
                        request_info=response.request_info,
                        history=response.history,
                        status=response.status
                    )
                else:
                    response.raise_for_status()
        except Exception as e:
            logger.error(f"Request failed: {e}")
            raise
    
    def _create_cache_key(self, url: str, params: Dict[str, Any]) -> str:
        """Create a unique cache key from URL and parameters"""
        key_data = f"{url}:{json.dumps(params, sort_keys=True)}"
        return hashlib.md5(key_data.encode()).hexdigest()[:16]
    
    def _run(self, query: str, filters: Optional[Dict[str, Any]] = None) -> str:
        return asyncio.run(self._arun(query, filters))
    
    @abstractmethod
    async def _arun(self, query: str, filters: Optional[Dict[str, Any]] = None) -> str:
        pass

    async def cleanup(self):
        if self._session:
            await self._session.close()
            self._session = None