Spaces:
Running
Running
import asyncio | |
from typing import Dict, Any, List, Optional | |
from src.api_clients import coingecko_client | |
from src.cache_manager import cache_manager | |
import json | |
class PortfolioAnalyzer: | |
def __init__(self): | |
self.symbol_map = { | |
"btc": "bitcoin", "eth": "ethereum", "sol": "solana", "ada": "cardano", | |
"dot": "polkadot", "bnb": "binancecoin", "usdc": "usd-coin", | |
"usdt": "tether", "xrp": "ripple", "avax": "avalanche-2", | |
"link": "chainlink", "matic": "matic-network", "uni": "uniswap" | |
} | |
def _format_coin_id(self, symbol: str) -> str: | |
return self.symbol_map.get(symbol.lower(), symbol.lower()) | |
async def analyze_portfolio(self, holdings: List[Dict[str, Any]]) -> Dict[str, Any]: | |
try: | |
coin_ids = [self._format_coin_id(h["symbol"]) for h in holdings] | |
tasks = [] | |
for coin_id in coin_ids: | |
tasks.append(coingecko_client.get_coin_data(coin_id)) | |
tasks.append(coingecko_client.get_price_history(coin_id, days=30)) | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
portfolio_value = 0 | |
portfolio_change_24h = 0 | |
asset_allocation = [] | |
risk_metrics = [] | |
for i, holding in enumerate(holdings): | |
coin_data_idx = i * 2 | |
price_history_idx = i * 2 + 1 | |
if not isinstance(results[coin_data_idx], Exception): | |
coin_data = results[coin_data_idx] | |
market_data = coin_data.get("market_data", {}) | |
current_price = market_data.get("current_price", {}).get("usd", 0) | |
price_change_24h = market_data.get("price_change_percentage_24h", 0) | |
holding_value = current_price * holding["amount"] | |
portfolio_value += holding_value | |
portfolio_change_24h += holding_value * (price_change_24h / 100) | |
volatility = 0 | |
if not isinstance(results[price_history_idx], Exception): | |
price_history = results[price_history_idx] | |
prices = [p[1] for p in price_history.get("prices", [])] | |
if len(prices) > 1: | |
price_changes = [(prices[i] - prices[i-1]) / prices[i-1] for i in range(1, len(prices))] | |
volatility = sum(abs(change) for change in price_changes) / len(price_changes) | |
asset_allocation.append({ | |
"symbol": holding["symbol"].upper(), | |
"name": coin_data.get("name", "Unknown"), | |
"value": holding_value, | |
"percentage": 0, | |
"amount": holding["amount"], | |
"price": current_price, | |
"change_24h": price_change_24h | |
}) | |
risk_metrics.append({ | |
"symbol": holding["symbol"].upper(), | |
"volatility": volatility, | |
"market_cap_rank": coin_data.get("market_cap_rank", 999) | |
}) | |
for asset in asset_allocation: | |
asset["percentage"] = (asset["value"] / portfolio_value) * 100 if portfolio_value > 0 else 0 | |
portfolio_change_percentage = (portfolio_change_24h / portfolio_value) * 100 if portfolio_value > 0 else 0 | |
avg_volatility = sum(r["volatility"] for r in risk_metrics) / len(risk_metrics) if risk_metrics else 0 | |
diversification_score = len([a for a in asset_allocation if a["percentage"] >= 5]) | |
risk_level = "Low" if avg_volatility < 0.05 else "Medium" if avg_volatility < 0.10 else "High" | |
return { | |
"total_value": portfolio_value, | |
"change_24h": portfolio_change_24h, | |
"change_24h_percentage": portfolio_change_percentage, | |
"asset_allocation": sorted(asset_allocation, key=lambda x: x["value"], reverse=True), | |
"risk_metrics": { | |
"overall_risk": risk_level, | |
"avg_volatility": avg_volatility, | |
"diversification_score": diversification_score, | |
"largest_holding_percentage": max([a["percentage"] for a in asset_allocation]) if asset_allocation else 0 | |
}, | |
"recommendations": self._generate_recommendations(asset_allocation, risk_metrics) | |
} | |
except Exception as e: | |
raise Exception(f"Portfolio analysis failed: {str(e)}") | |
def _generate_recommendations(self, allocation: List[Dict[str, Any]], risk_metrics: List[Dict[str, Any]]) -> List[str]: | |
recommendations = [] | |
if not allocation: | |
return ["Unable to generate recommendations - no valid portfolio data"] | |
largest_holding = max(allocation, key=lambda x: x["percentage"]) | |
if largest_holding["percentage"] > 50: | |
recommendations.append(f"Consider reducing {largest_holding['symbol']} position (currently {largest_holding['percentage']:.1f}%) to improve diversification") | |
high_risk_assets = [r for r in risk_metrics if r["volatility"] > 0.15] | |
if len(high_risk_assets) > len(allocation) * 0.6: | |
recommendations.append("Portfolio has high volatility exposure - consider adding stable assets like BTC or ETH") | |
small_cap_heavy = len([r for r in risk_metrics if r["market_cap_rank"] > 100]) | |
if small_cap_heavy > len(allocation) * 0.4: | |
recommendations.append("High small-cap exposure detected - consider balancing with top 20 cryptocurrencies") | |
if len(allocation) < 5: | |
recommendations.append("Consider diversifying into 5-10 different cryptocurrencies to reduce risk") | |
stablecoin_exposure = sum(a["percentage"] for a in allocation if a["symbol"] in ["USDC", "USDT", "DAI"]) | |
if stablecoin_exposure < 10: | |
recommendations.append("Consider allocating 10-20% to stablecoins for portfolio stability") | |
return recommendations[:5] | |
async def compare_portfolios(self, portfolio1: List[Dict[str, Any]], portfolio2: List[Dict[str, Any]]) -> Dict[str, Any]: | |
analysis1 = await self.analyze_portfolio(portfolio1) | |
analysis2 = await self.analyze_portfolio(portfolio2) | |
return { | |
"portfolio_1": analysis1, | |
"portfolio_2": analysis2, | |
"comparison": { | |
"value_difference": analysis2["total_value"] - analysis1["total_value"], | |
"performance_difference": analysis2["change_24h_percentage"] - analysis1["change_24h_percentage"], | |
"risk_comparison": f"Portfolio 2 is {'higher' if analysis2['risk_metrics']['avg_volatility'] > analysis1['risk_metrics']['avg_volatility'] else 'lower'} risk", | |
"diversification_comparison": f"Portfolio 2 is {'more' if analysis2['risk_metrics']['diversification_score'] > analysis1['risk_metrics']['diversification_score'] else 'less'} diversified" | |
} | |
} | |
portfolio_analyzer = PortfolioAnalyzer() |