File size: 7,513 Bytes
20eee66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import asyncio
from typing import Dict, Any, List, Optional
from src.api_clients import coingecko_client
from src.cache_manager import cache_manager
import json

class PortfolioAnalyzer:
    def __init__(self):
        self.symbol_map = {
            "btc": "bitcoin", "eth": "ethereum", "sol": "solana", "ada": "cardano",
            "dot": "polkadot", "bnb": "binancecoin", "usdc": "usd-coin", 
            "usdt": "tether", "xrp": "ripple", "avax": "avalanche-2",
            "link": "chainlink", "matic": "matic-network", "uni": "uniswap"
        }
    
    def _format_coin_id(self, symbol: str) -> str:
        return self.symbol_map.get(symbol.lower(), symbol.lower())
    
    async def analyze_portfolio(self, holdings: List[Dict[str, Any]]) -> Dict[str, Any]:
        try:
            coin_ids = [self._format_coin_id(h["symbol"]) for h in holdings]
            
            tasks = []
            for coin_id in coin_ids:
                tasks.append(coingecko_client.get_coin_data(coin_id))
                tasks.append(coingecko_client.get_price_history(coin_id, days=30))
            
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            portfolio_value = 0
            portfolio_change_24h = 0
            asset_allocation = []
            risk_metrics = []
            
            for i, holding in enumerate(holdings):
                coin_data_idx = i * 2
                price_history_idx = i * 2 + 1
                
                if not isinstance(results[coin_data_idx], Exception):
                    coin_data = results[coin_data_idx]
                    market_data = coin_data.get("market_data", {})
                    current_price = market_data.get("current_price", {}).get("usd", 0)
                    price_change_24h = market_data.get("price_change_percentage_24h", 0)
                    
                    holding_value = current_price * holding["amount"]
                    portfolio_value += holding_value
                    portfolio_change_24h += holding_value * (price_change_24h / 100)
                    
                    volatility = 0
                    if not isinstance(results[price_history_idx], Exception):
                        price_history = results[price_history_idx]
                        prices = [p[1] for p in price_history.get("prices", [])]
                        if len(prices) > 1:
                            price_changes = [(prices[i] - prices[i-1]) / prices[i-1] for i in range(1, len(prices))]
                            volatility = sum(abs(change) for change in price_changes) / len(price_changes)
                    
                    asset_allocation.append({
                        "symbol": holding["symbol"].upper(),
                        "name": coin_data.get("name", "Unknown"),
                        "value": holding_value,
                        "percentage": 0,
                        "amount": holding["amount"],
                        "price": current_price,
                        "change_24h": price_change_24h
                    })
                    
                    risk_metrics.append({
                        "symbol": holding["symbol"].upper(),
                        "volatility": volatility,
                        "market_cap_rank": coin_data.get("market_cap_rank", 999)
                    })
            
            for asset in asset_allocation:
                asset["percentage"] = (asset["value"] / portfolio_value) * 100 if portfolio_value > 0 else 0
            
            portfolio_change_percentage = (portfolio_change_24h / portfolio_value) * 100 if portfolio_value > 0 else 0
            
            avg_volatility = sum(r["volatility"] for r in risk_metrics) / len(risk_metrics) if risk_metrics else 0
            
            diversification_score = len([a for a in asset_allocation if a["percentage"] >= 5])
            
            risk_level = "Low" if avg_volatility < 0.05 else "Medium" if avg_volatility < 0.10 else "High"
            
            return {
                "total_value": portfolio_value,
                "change_24h": portfolio_change_24h,
                "change_24h_percentage": portfolio_change_percentage,
                "asset_allocation": sorted(asset_allocation, key=lambda x: x["value"], reverse=True),
                "risk_metrics": {
                    "overall_risk": risk_level,
                    "avg_volatility": avg_volatility,
                    "diversification_score": diversification_score,
                    "largest_holding_percentage": max([a["percentage"] for a in asset_allocation]) if asset_allocation else 0
                },
                "recommendations": self._generate_recommendations(asset_allocation, risk_metrics)
            }
            
        except Exception as e:
            raise Exception(f"Portfolio analysis failed: {str(e)}")
    
    def _generate_recommendations(self, allocation: List[Dict[str, Any]], risk_metrics: List[Dict[str, Any]]) -> List[str]:
        recommendations = []
        
        if not allocation:
            return ["Unable to generate recommendations - no valid portfolio data"]
        
        largest_holding = max(allocation, key=lambda x: x["percentage"])
        if largest_holding["percentage"] > 50:
            recommendations.append(f"Consider reducing {largest_holding['symbol']} position (currently {largest_holding['percentage']:.1f}%) to improve diversification")
        
        high_risk_assets = [r for r in risk_metrics if r["volatility"] > 0.15]
        if len(high_risk_assets) > len(allocation) * 0.6:
            recommendations.append("Portfolio has high volatility exposure - consider adding stable assets like BTC or ETH")
        
        small_cap_heavy = len([r for r in risk_metrics if r["market_cap_rank"] > 100])
        if small_cap_heavy > len(allocation) * 0.4:
            recommendations.append("High small-cap exposure detected - consider balancing with top 20 cryptocurrencies")
        
        if len(allocation) < 5:
            recommendations.append("Consider diversifying into 5-10 different cryptocurrencies to reduce risk")
        
        stablecoin_exposure = sum(a["percentage"] for a in allocation if a["symbol"] in ["USDC", "USDT", "DAI"])
        if stablecoin_exposure < 10:
            recommendations.append("Consider allocating 10-20% to stablecoins for portfolio stability")
        
        return recommendations[:5]
    
    async def compare_portfolios(self, portfolio1: List[Dict[str, Any]], portfolio2: List[Dict[str, Any]]) -> Dict[str, Any]:
        analysis1 = await self.analyze_portfolio(portfolio1)
        analysis2 = await self.analyze_portfolio(portfolio2)
        
        return {
            "portfolio_1": analysis1,
            "portfolio_2": analysis2,
            "comparison": {
                "value_difference": analysis2["total_value"] - analysis1["total_value"],
                "performance_difference": analysis2["change_24h_percentage"] - analysis1["change_24h_percentage"],
                "risk_comparison": f"Portfolio 2 is {'higher' if analysis2['risk_metrics']['avg_volatility'] > analysis1['risk_metrics']['avg_volatility'] else 'lower'} risk",
                "diversification_comparison": f"Portfolio 2 is {'more' if analysis2['risk_metrics']['diversification_score'] > analysis1['risk_metrics']['diversification_score'] else 'less'} diversified"
            }
        }

portfolio_analyzer = PortfolioAnalyzer()