# -*- coding: utf-8 -*- """ 智能分析系统(股票) - 股票市场数据分析系统 开发者:熊猫大侠 版本:v2.1.0 许可证:MIT License """ # risk_monitor.py import pandas as pd import numpy as np from datetime import datetime, timedelta class RiskMonitor: def __init__(self, analyzer): self.analyzer = analyzer def analyze_stock_risk(self, stock_code, market_type='A'): """分析单只股票的风险""" try: # 获取股票数据和技术指标 df = self.analyzer.get_stock_data(stock_code, market_type) df = self.analyzer.calculate_indicators(df) # 计算各类风险指标 volatility_risk = self._analyze_volatility_risk(df) trend_risk = self._analyze_trend_risk(df) reversal_risk = self._analyze_reversal_risk(df) volume_risk = self._analyze_volume_risk(df) # 综合评估总体风险 total_risk_score = ( volatility_risk['score'] * 0.3 + trend_risk['score'] * 0.3 + reversal_risk['score'] * 0.25 + volume_risk['score'] * 0.15 ) # 确定风险等级 if total_risk_score >= 80: risk_level = "极高" elif total_risk_score >= 60: risk_level = "高" elif total_risk_score >= 40: risk_level = "中等" elif total_risk_score >= 20: risk_level = "低" else: risk_level = "极低" # 生成风险警报 alerts = [] if volatility_risk['score'] >= 70: alerts.append({ "type": "volatility", "level": "高", "message": f"波动率风险较高 ({volatility_risk['value']:.2f}%),可能面临大幅波动" }) if trend_risk['score'] >= 70: alerts.append({ "type": "trend", "level": "高", "message": f"趋势风险较高,当前处于{trend_risk['trend']}趋势,可能面临加速下跌" }) if reversal_risk['score'] >= 70: alerts.append({ "type": "reversal", "level": "高", "message": f"趋势反转风险较高,技术指标显示可能{reversal_risk['direction']}反转" }) if volume_risk['score'] >= 70: alerts.append({ "type": "volume", "level": "高", "message": f"成交量异常,{volume_risk['pattern']},可能预示价格波动" }) return { "total_risk_score": total_risk_score, "risk_level": risk_level, "volatility_risk": volatility_risk, "trend_risk": trend_risk, "reversal_risk": reversal_risk, "volume_risk": volume_risk, "alerts": alerts } except Exception as e: print(f"分析股票风险出错: {str(e)}") return { "error": f"分析风险时出错: {str(e)}" } def _analyze_volatility_risk(self, df): """分析波动率风险""" # 计算近期波动率 recent_volatility = df.iloc[-1]['Volatility'] # 计算波动率变化 avg_volatility = df['Volatility'].mean() volatility_change = recent_volatility / avg_volatility - 1 # 评估风险分数 if recent_volatility > 5 and volatility_change > 0.5: score = 90 # 极高风险 elif recent_volatility > 4 and volatility_change > 0.3: score = 75 # 高风险 elif recent_volatility > 3 and volatility_change > 0.1: score = 60 # 中高风险 elif recent_volatility > 2: score = 40 # 中等风险 elif recent_volatility > 1: score = 20 # 低风险 else: score = 0 # 极低风险 return { "score": score, "value": recent_volatility, "change": volatility_change * 100, "risk_level": "高" if score >= 60 else "中" if score >= 30 else "低" } def _analyze_trend_risk(self, df): """分析趋势风险""" # 获取均线数据 ma5 = df.iloc[-1]['MA5'] ma20 = df.iloc[-1]['MA20'] ma60 = df.iloc[-1]['MA60'] # 判断当前趋势 if ma5 < ma20 < ma60: trend = "下降" # 判断下跌加速程度 ma5_ma20_gap = (ma20 - ma5) / ma20 * 100 if ma5_ma20_gap > 5: score = 90 # 极高风险 elif ma5_ma20_gap > 3: score = 75 # 高风险 elif ma5_ma20_gap > 1: score = 60 # 中高风险 else: score = 50 # 中等风险 elif ma5 > ma20 > ma60: trend = "上升" score = 20 # 低风险 else: trend = "盘整" score = 40 # 中等风险 return { "score": score, "trend": trend, "risk_level": "高" if score >= 60 else "中" if score >= 30 else "低" } def _analyze_reversal_risk(self, df): """分析趋势反转风险""" # 获取最新指标 rsi = df.iloc[-1]['RSI'] macd = df.iloc[-1]['MACD'] signal = df.iloc[-1]['Signal'] price = df.iloc[-1]['close'] ma20 = df.iloc[-1]['MA20'] # 判断潜在趋势反转信号 reversal_signals = 0 # RSI超买/超卖 if rsi > 75: reversal_signals += 1 direction = "向下" elif rsi < 25: reversal_signals += 1 direction = "向上" else: direction = "无明确方向" # MACD死叉/金叉 if macd > signal and df.iloc[-2]['MACD'] <= df.iloc[-2]['Signal']: reversal_signals += 1 direction = "向上" elif macd < signal and df.iloc[-2]['MACD'] >= df.iloc[-2]['Signal']: reversal_signals += 1 direction = "向下" # 价格与均线关系 if price > ma20 * 1.1: reversal_signals += 1 direction = "向下" elif price < ma20 * 0.9: reversal_signals += 1 direction = "向上" # 评估风险分数 if reversal_signals >= 3: score = 90 # 极高风险 elif reversal_signals == 2: score = 70 # 高风险 elif reversal_signals == 1: score = 40 # 中等风险 else: score = 10 # 低风险 return { "score": score, "reversal_signals": reversal_signals, "direction": direction, "risk_level": "高" if score >= 60 else "中" if score >= 30 else "低" } def _analyze_volume_risk(self, df): """分析成交量风险""" # 计算成交量变化 recent_volume = df.iloc[-1]['volume'] avg_volume = df['volume'].rolling(window=20).mean().iloc[-1] volume_ratio = recent_volume / avg_volume # 判断成交量模式 if volume_ratio > 3: pattern = "成交量暴增" score = 90 # 极高风险 elif volume_ratio > 2: pattern = "成交量显著放大" score = 70 # 高风险 elif volume_ratio > 1.5: pattern = "成交量温和放大" score = 50 # 中等风险 elif volume_ratio < 0.5: pattern = "成交量萎缩" score = 40 # 中低风险 else: pattern = "成交量正常" score = 20 # 低风险 # 价格与成交量背离分析 price_change = (df.iloc[-1]['close'] - df.iloc[-5]['close']) / df.iloc[-5]['close'] volume_change = (recent_volume - df.iloc[-5]['volume']) / df.iloc[-5]['volume'] if price_change > 0.05 and volume_change < -0.3: pattern = "价量背离(价格上涨但量能萎缩)" score = max(score, 80) # 提高风险评分 elif price_change < -0.05 and volume_change < -0.3: pattern = "价量同向(价格下跌且量能萎缩)" score = max(score, 70) # 提高风险评分 elif price_change < -0.05 and volume_change > 0.5: pattern = "价量同向(价格下跌且量能放大)" score = max(score, 85) # 提高风险评分 return { "score": score, "volume_ratio": volume_ratio, "pattern": pattern, "risk_level": "高" if score >= 60 else "中" if score >= 30 else "低" } def analyze_portfolio_risk(self, portfolio): """分析投资组合整体风险""" try: if not portfolio or len(portfolio) == 0: return {"error": "投资组合为空"} # 分析每只股票的风险 stock_risks = {} total_weight = 0 weighted_risk_score = 0 for stock in portfolio: stock_code = stock.get('stock_code') weight = stock.get('weight', 1) market_type = stock.get('market_type', 'A') if not stock_code: continue # 分析股票风险 risk = self.analyze_stock_risk(stock_code, market_type) stock_risks[stock_code] = risk # 计算加权风险分数 total_weight += weight weighted_risk_score += risk.get('total_risk_score', 50) * weight # 计算组合总风险分数 if total_weight > 0: portfolio_risk_score = weighted_risk_score / total_weight else: portfolio_risk_score = 0 # 确定风险等级 if portfolio_risk_score >= 80: risk_level = "极高" elif portfolio_risk_score >= 60: risk_level = "高" elif portfolio_risk_score >= 40: risk_level = "中等" elif portfolio_risk_score >= 20: risk_level = "低" else: risk_level = "极低" # 收集高风险股票 high_risk_stocks = [ { "stock_code": code, "risk_score": risk.get('total_risk_score', 0), "risk_level": risk.get('risk_level', '未知') } for code, risk in stock_risks.items() if risk.get('total_risk_score', 0) >= 60 ] # 收集所有风险警报 all_alerts = [] for code, risk in stock_risks.items(): for alert in risk.get('alerts', []): all_alerts.append({ "stock_code": code, **alert }) # 分析风险集中度 risk_concentration = self._analyze_risk_concentration(portfolio, stock_risks) return { "portfolio_risk_score": portfolio_risk_score, "risk_level": risk_level, "high_risk_stocks": high_risk_stocks, "alerts": all_alerts, "risk_concentration": risk_concentration, "stock_risks": stock_risks } except Exception as e: print(f"分析投资组合风险出错: {str(e)}") return { "error": f"分析投资组合风险时出错: {str(e)}" } def _analyze_risk_concentration(self, portfolio, stock_risks): """分析风险集中度""" # 分析行业集中度 industries = {} for stock in portfolio: stock_code = stock.get('stock_code') stock_info = self.analyzer.get_stock_info(stock_code) industry = stock_info.get('行业', '未知') weight = stock.get('weight', 1) if industry in industries: industries[industry] += weight else: industries[industry] = weight # 找出权重最大的行业 max_industry = max(industries.items(), key=lambda x: x[1]) if industries else ('未知', 0) # 计算高风险股票总权重 high_risk_weight = 0 for stock in portfolio: stock_code = stock.get('stock_code') if stock_code in stock_risks and stock_risks[stock_code].get('total_risk_score', 0) >= 60: high_risk_weight += stock.get('weight', 1) return { "max_industry": max_industry[0], "max_industry_weight": max_industry[1], "high_risk_weight": high_risk_weight }