web3-copilot / src /portfolio_analyzer.py
“Transcendental-Programmer”
feat: Initial commit with project structure and initial files
20eee66
raw
history blame
7.51 kB
import asyncio
from typing import Dict, Any, List, Optional
from src.api_clients import coingecko_client
from src.cache_manager import cache_manager
import json
class PortfolioAnalyzer:
def __init__(self):
self.symbol_map = {
"btc": "bitcoin", "eth": "ethereum", "sol": "solana", "ada": "cardano",
"dot": "polkadot", "bnb": "binancecoin", "usdc": "usd-coin",
"usdt": "tether", "xrp": "ripple", "avax": "avalanche-2",
"link": "chainlink", "matic": "matic-network", "uni": "uniswap"
}
def _format_coin_id(self, symbol: str) -> str:
return self.symbol_map.get(symbol.lower(), symbol.lower())
async def analyze_portfolio(self, holdings: List[Dict[str, Any]]) -> Dict[str, Any]:
try:
coin_ids = [self._format_coin_id(h["symbol"]) for h in holdings]
tasks = []
for coin_id in coin_ids:
tasks.append(coingecko_client.get_coin_data(coin_id))
tasks.append(coingecko_client.get_price_history(coin_id, days=30))
results = await asyncio.gather(*tasks, return_exceptions=True)
portfolio_value = 0
portfolio_change_24h = 0
asset_allocation = []
risk_metrics = []
for i, holding in enumerate(holdings):
coin_data_idx = i * 2
price_history_idx = i * 2 + 1
if not isinstance(results[coin_data_idx], Exception):
coin_data = results[coin_data_idx]
market_data = coin_data.get("market_data", {})
current_price = market_data.get("current_price", {}).get("usd", 0)
price_change_24h = market_data.get("price_change_percentage_24h", 0)
holding_value = current_price * holding["amount"]
portfolio_value += holding_value
portfolio_change_24h += holding_value * (price_change_24h / 100)
volatility = 0
if not isinstance(results[price_history_idx], Exception):
price_history = results[price_history_idx]
prices = [p[1] for p in price_history.get("prices", [])]
if len(prices) > 1:
price_changes = [(prices[i] - prices[i-1]) / prices[i-1] for i in range(1, len(prices))]
volatility = sum(abs(change) for change in price_changes) / len(price_changes)
asset_allocation.append({
"symbol": holding["symbol"].upper(),
"name": coin_data.get("name", "Unknown"),
"value": holding_value,
"percentage": 0,
"amount": holding["amount"],
"price": current_price,
"change_24h": price_change_24h
})
risk_metrics.append({
"symbol": holding["symbol"].upper(),
"volatility": volatility,
"market_cap_rank": coin_data.get("market_cap_rank", 999)
})
for asset in asset_allocation:
asset["percentage"] = (asset["value"] / portfolio_value) * 100 if portfolio_value > 0 else 0
portfolio_change_percentage = (portfolio_change_24h / portfolio_value) * 100 if portfolio_value > 0 else 0
avg_volatility = sum(r["volatility"] for r in risk_metrics) / len(risk_metrics) if risk_metrics else 0
diversification_score = len([a for a in asset_allocation if a["percentage"] >= 5])
risk_level = "Low" if avg_volatility < 0.05 else "Medium" if avg_volatility < 0.10 else "High"
return {
"total_value": portfolio_value,
"change_24h": portfolio_change_24h,
"change_24h_percentage": portfolio_change_percentage,
"asset_allocation": sorted(asset_allocation, key=lambda x: x["value"], reverse=True),
"risk_metrics": {
"overall_risk": risk_level,
"avg_volatility": avg_volatility,
"diversification_score": diversification_score,
"largest_holding_percentage": max([a["percentage"] for a in asset_allocation]) if asset_allocation else 0
},
"recommendations": self._generate_recommendations(asset_allocation, risk_metrics)
}
except Exception as e:
raise Exception(f"Portfolio analysis failed: {str(e)}")
def _generate_recommendations(self, allocation: List[Dict[str, Any]], risk_metrics: List[Dict[str, Any]]) -> List[str]:
recommendations = []
if not allocation:
return ["Unable to generate recommendations - no valid portfolio data"]
largest_holding = max(allocation, key=lambda x: x["percentage"])
if largest_holding["percentage"] > 50:
recommendations.append(f"Consider reducing {largest_holding['symbol']} position (currently {largest_holding['percentage']:.1f}%) to improve diversification")
high_risk_assets = [r for r in risk_metrics if r["volatility"] > 0.15]
if len(high_risk_assets) > len(allocation) * 0.6:
recommendations.append("Portfolio has high volatility exposure - consider adding stable assets like BTC or ETH")
small_cap_heavy = len([r for r in risk_metrics if r["market_cap_rank"] > 100])
if small_cap_heavy > len(allocation) * 0.4:
recommendations.append("High small-cap exposure detected - consider balancing with top 20 cryptocurrencies")
if len(allocation) < 5:
recommendations.append("Consider diversifying into 5-10 different cryptocurrencies to reduce risk")
stablecoin_exposure = sum(a["percentage"] for a in allocation if a["symbol"] in ["USDC", "USDT", "DAI"])
if stablecoin_exposure < 10:
recommendations.append("Consider allocating 10-20% to stablecoins for portfolio stability")
return recommendations[:5]
async def compare_portfolios(self, portfolio1: List[Dict[str, Any]], portfolio2: List[Dict[str, Any]]) -> Dict[str, Any]:
analysis1 = await self.analyze_portfolio(portfolio1)
analysis2 = await self.analyze_portfolio(portfolio2)
return {
"portfolio_1": analysis1,
"portfolio_2": analysis2,
"comparison": {
"value_difference": analysis2["total_value"] - analysis1["total_value"],
"performance_difference": analysis2["change_24h_percentage"] - analysis1["change_24h_percentage"],
"risk_comparison": f"Portfolio 2 is {'higher' if analysis2['risk_metrics']['avg_volatility'] > analysis1['risk_metrics']['avg_volatility'] else 'lower'} risk",
"diversification_comparison": f"Portfolio 2 is {'more' if analysis2['risk_metrics']['diversification_score'] > analysis1['risk_metrics']['diversification_score'] else 'less'} diversified"
}
}
portfolio_analyzer = PortfolioAnalyzer()