Spaces:
Running
Running
from typing import Dict, Any, Optional | |
from pydantic import BaseModel, PrivateAttr | |
from src.tools.base_tool import BaseWeb3Tool, Web3ToolInput | |
from src.utils.logger import get_logger | |
import aiohttp | |
import json | |
logger = get_logger(__name__) | |
class DeFiLlamaTool(BaseWeb3Tool): | |
name: str = "defillama_data" | |
description: str = """Get real DeFi protocol data, TVL, and yields from DeFiLlama API. | |
Useful for: DeFi analysis, protocol rankings, TVL trends, chain analysis. | |
Input: protocol name, chain name, or general DeFi query.""" | |
args_schema: type[BaseModel] = Web3ToolInput | |
_base_url: str = PrivateAttr(default="https://api.llama.fi") | |
def __init__(self): | |
super().__init__() | |
async def make_request(self, url: str, timeout: int = 10) -> Optional[Dict[str, Any]]: | |
"""Make HTTP request to DeFiLlama API""" | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as response: | |
if response.status == 200: | |
data = await response.json() | |
logger.info(f"β DeFiLlama API call successful: {url}") | |
return data | |
else: | |
logger.error(f"β DeFiLlama API error: {response.status} for {url}") | |
return None | |
except Exception as e: | |
logger.error(f"β DeFiLlama API request failed: {e}") | |
return None | |
async def _arun(self, query: str, filters: Optional[Dict[str, Any]] = None, **kwargs) -> str: | |
try: | |
filters = filters or {} | |
query_lower = query.lower() | |
# Route based on query type | |
if "protocol" in query_lower and any(name in query_lower for name in ["uniswap", "aave", "compound", "curve"]): | |
return await self._get_protocol_data(query) | |
elif any(word in query_lower for word in ["chain", "ethereum", "polygon", "avalanche", "bsc"]): | |
return await self._get_chain_tvl(query) | |
elif "tvl" in query_lower or "total value locked" in query_lower: | |
return await self._get_tvl_overview() | |
elif "top" in query_lower or "ranking" in query_lower: | |
return await self._get_top_protocols() | |
else: | |
return await self._search_protocols(query) | |
except Exception as e: | |
logger.error(f"DeFiLlama error: {e}") | |
return f"β οΈ DeFiLlama service temporarily unavailable: {str(e)}" | |
async def _get_top_protocols(self) -> str: | |
"""Get top protocols using /protocols endpoint""" | |
try: | |
data = await self.make_request(f"{self._base_url}/protocols") | |
if not data or not isinstance(data, list): | |
return "β οΈ DeFi protocol data temporarily unavailable" | |
# Sort by TVL and take top 10 | |
top_protocols = sorted([p for p in data if p.get("tvl") is not None and p.get("tvl", 0) > 0], | |
key=lambda x: x.get("tvl", 0), reverse=True)[:10] | |
if not top_protocols: | |
return "β οΈ No valid protocol data available" | |
result = "π¦ **Top DeFi Protocols by TVL:**\n\n" | |
for i, protocol in enumerate(top_protocols, 1): | |
name = protocol.get("name", "Unknown") | |
tvl = protocol.get("tvl", 0) | |
change_1d = protocol.get("change_1d", 0) | |
chain = protocol.get("chain", "Multi-chain") | |
emoji = "π" if change_1d >= 0 else "π" | |
tvl_formatted = f"${tvl/1e9:.2f}B" if tvl >= 1e9 else f"${tvl/1e6:.1f}M" | |
change_formatted = f"({change_1d:+.2f}%)" if change_1d is not None else "(N/A)" | |
result += f"{i}. **{name}** ({chain}): {tvl_formatted} TVL {emoji} {change_formatted}\n" | |
return result | |
except Exception as e: | |
logger.error(f"Top protocols error: {e}") | |
return "β οΈ DeFi protocol data temporarily unavailable" | |
async def _get_protocol_data(self, protocol_name: str) -> str: | |
"""Get specific protocol data using /protocol/{protocol} endpoint""" | |
try: | |
# First get all protocols to find the slug | |
protocols = await self.make_request(f"{self._base_url}/protocols") | |
if not protocols: | |
return f"β Cannot fetch protocols list" | |
# Find matching protocol | |
matching_protocol = None | |
for p in protocols: | |
if protocol_name.lower() in p.get("name", "").lower(): | |
matching_protocol = p | |
break | |
if not matching_protocol: | |
return f"β Protocol '{protocol_name}' not found" | |
# Get detailed protocol data | |
protocol_slug = matching_protocol.get("slug", protocol_name.lower()) | |
detailed_data = await self.make_request(f"{self._base_url}/protocol/{protocol_slug}") | |
if detailed_data: | |
# Use detailed data if available | |
name = detailed_data.get("name", matching_protocol.get("name")) | |
tvl = detailed_data.get("tvl", matching_protocol.get("tvl", 0)) | |
change_1d = detailed_data.get("change_1d", matching_protocol.get("change_1d", 0)) | |
change_7d = detailed_data.get("change_7d", matching_protocol.get("change_7d", 0)) | |
chains = detailed_data.get("chains", [matching_protocol.get("chain", "Unknown")]) | |
category = detailed_data.get("category", matching_protocol.get("category", "Unknown")) | |
description = detailed_data.get("description", "No description available") | |
else: | |
# Fallback to basic protocol data | |
name = matching_protocol.get("name", "Unknown") | |
tvl = matching_protocol.get("tvl", 0) | |
change_1d = matching_protocol.get("change_1d", 0) | |
change_7d = matching_protocol.get("change_7d", 0) | |
chains = [matching_protocol.get("chain", "Unknown")] | |
category = matching_protocol.get("category", "Unknown") | |
description = "No description available" | |
result = f"ποΈ **{name} Protocol Analysis:**\n\n" | |
result += f"π **Description**: {description[:200]}{'...' if len(description) > 200 else ''}\n\n" | |
result += f"π° **Current TVL**: ${tvl/1e9:.2f}B\n" | |
result += f"π **24h Change**: {change_1d:+.2f}%\n" | |
result += f"π **7d Change**: {change_7d:+.2f}%\n" | |
result += f"βοΈ **Chains**: {', '.join(chains) if isinstance(chains, list) else str(chains)}\n" | |
result += f"π·οΈ **Category**: {category}\n" | |
return result | |
except Exception as e: | |
logger.error(f"Protocol data error: {e}") | |
return f"β οΈ Error fetching data for {protocol_name}: {str(e)}" | |
async def _get_tvl_overview(self) -> str: | |
"""Get TVL overview using /protocols and /v2/chains endpoints""" | |
try: | |
# Get protocols and chains data | |
protocols_data = await self.make_request(f"{self._base_url}/protocols") | |
chains_data = await self.make_request(f"{self._base_url}/v2/chains") | |
if not protocols_data: | |
return "β οΈ TVL overview data unavailable" | |
# Calculate total TVL | |
total_tvl = sum(p.get("tvl", 0) for p in protocols_data if p.get("tvl") is not None and p.get("tvl", 0) > 0) | |
result = "π **DeFi TVL Overview:**\n\n" | |
result += f"π° **Total DeFi TVL**: ${total_tvl/1e9:.2f}B\n\n" | |
# Add chain data if available | |
if chains_data and isinstance(chains_data, list): | |
top_chains = sorted([c for c in chains_data if c.get("tvl") is not None and c.get("tvl", 0) > 0], | |
key=lambda x: x.get("tvl", 0), reverse=True)[:5] | |
result += "**Top Chains by TVL:**\n" | |
for i, chain in enumerate(top_chains, 1): | |
name = chain.get("name", "Unknown") | |
tvl = chain.get("tvl", 0) | |
result += f"{i}. **{name}**: ${tvl/1e9:.2f}B\n" | |
# Add top protocol categories | |
categories = {} | |
for protocol in protocols_data: | |
if protocol.get("tvl") is not None and protocol.get("tvl", 0) > 0: | |
category = protocol.get("category", "Other") | |
categories[category] = categories.get(category, 0) + protocol.get("tvl", 0) | |
if categories: | |
result += "\n**Top Categories by TVL:**\n" | |
sorted_categories = sorted(categories.items(), key=lambda x: x[1], reverse=True)[:5] | |
for i, (category, tvl) in enumerate(sorted_categories, 1): | |
result += f"{i}. **{category}**: ${tvl/1e9:.2f}B\n" | |
return result | |
except Exception as e: | |
logger.error(f"TVL overview error: {e}") | |
return await self._get_top_protocols() | |
async def _get_chain_tvl(self, chain_query: str) -> str: | |
"""Get chain TVL data using /v2/historicalChainTvl/{chain} endpoint""" | |
try: | |
# Map common chain names | |
chain_mapping = { | |
"ethereum": "Ethereum", | |
"eth": "Ethereum", | |
"polygon": "Polygon", | |
"matic": "Polygon", | |
"bsc": "BSC", | |
"binance": "BSC", | |
"avalanche": "Avalanche", | |
"avax": "Avalanche", | |
"arbitrum": "Arbitrum", | |
"optimism": "Optimism", | |
"fantom": "Fantom", | |
"solana": "Solana", | |
"sol": "Solana" | |
} | |
# Extract chain name from query | |
chain_name = None | |
for key, value in chain_mapping.items(): | |
if key in chain_query.lower(): | |
chain_name = value | |
break | |
if not chain_name: | |
# Try to get all chains first | |
chains_data = await self.make_request(f"{self._base_url}/v2/chains") | |
if chains_data: | |
result = "βοΈ **Available Chains:**\n\n" | |
sorted_chains = sorted([c for c in chains_data if c.get("tvl", 0) > 0], | |
key=lambda x: x.get("tvl", 0), reverse=True)[:10] | |
for i, chain in enumerate(sorted_chains, 1): | |
name = chain.get("name", "Unknown") | |
tvl = chain.get("tvl", 0) | |
result += f"{i}. **{name}**: ${tvl/1e9:.2f}B TVL\n" | |
return result | |
else: | |
return f"β Chain '{chain_query}' not recognized. Try: ethereum, polygon, bsc, avalanche, etc." | |
# Get historical TVL for the chain | |
historical_data = await self.make_request(f"{self._base_url}/v2/historicalChainTvl/{chain_name}") | |
if not historical_data: | |
return f"β No data available for {chain_name}" | |
# Get current TVL (last entry) | |
current_tvl = historical_data[-1]["tvl"] if historical_data else 0 | |
result = f"βοΈ **{chain_name} Chain Analysis:**\n\n" | |
result += f"π° **Current TVL**: ${current_tvl/1e9:.2f}B\n" | |
# Calculate changes if we have enough data | |
if len(historical_data) >= 2: | |
prev_tvl = historical_data[-2]["tvl"] | |
daily_change = ((current_tvl - prev_tvl) / prev_tvl) * 100 if prev_tvl > 0 else 0 | |
emoji = "π" if daily_change >= 0 else "π" | |
result += f"οΏ½ **24h Change**: {daily_change:+.2f}% {emoji}\n" | |
if len(historical_data) >= 7: | |
week_ago_tvl = historical_data[-7]["tvl"] | |
weekly_change = ((current_tvl - week_ago_tvl) / week_ago_tvl) * 100 if week_ago_tvl > 0 else 0 | |
emoji = "π" if weekly_change >= 0 else "π" | |
result += f"π **7d Change**: {weekly_change:+.2f}% {emoji}\n" | |
return result | |
except Exception as e: | |
logger.error(f"Chain TVL error: {e}") | |
return f"β οΈ Error fetching chain data: {str(e)}" | |
async def _search_protocols(self, query: str) -> str: | |
"""Search protocols by name""" | |
try: | |
protocols = await self.make_request(f"{self._base_url}/protocols") | |
if not protocols: | |
return "β οΈ No protocol data available" | |
# Search for matching protocols | |
query_lower = query.lower() | |
matching = [] | |
for p in protocols: | |
name = p.get("name", "").lower() | |
category = p.get("category", "").lower() | |
if (query_lower in name or | |
query_lower in category or | |
any(word in name for word in query_lower.split())): | |
matching.append(p) | |
# Sort by TVL and limit results | |
matching = sorted([p for p in matching if p.get("tvl") is not None and p.get("tvl", 0) > 0], | |
key=lambda x: x.get("tvl", 0), reverse=True)[:8] | |
if not matching: | |
return f"β No protocols found matching '{query}'" | |
result = f"π **Protocols matching '{query}':**\n\n" | |
for i, protocol in enumerate(matching, 1): | |
name = protocol.get("name", "Unknown") | |
tvl = protocol.get("tvl", 0) | |
chain = protocol.get("chain", "Multi-chain") | |
category = protocol.get("category", "Unknown") | |
change_1d = protocol.get("change_1d", 0) | |
emoji = "π" if change_1d >= 0 else "π" | |
tvl_formatted = f"${tvl/1e9:.2f}B" if tvl >= 1e9 else f"${tvl/1e6:.1f}M" | |
result += f"{i}. **{name}** ({category})\n" | |
result += f" π° {tvl_formatted} TVL on {chain} {emoji} {change_1d:+.1f}%\n\n" | |
return result | |
except Exception as e: | |
logger.error(f"Search protocols error: {e}") | |
return f"β οΈ Search temporarily unavailable: {str(e)}" | |