stock / risk_monitor.py
gitdeem's picture
Upload 32 files
f5d52f6 verified
# -*- coding: utf-8 -*-
"""
智能分析系统(股票) - 股票市场数据分析系统
开发者:熊猫大侠
版本:v2.1.0
许可证:MIT License
"""
# risk_monitor.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class RiskMonitor:
def __init__(self, analyzer):
self.analyzer = analyzer
def analyze_stock_risk(self, stock_code, market_type='A'):
"""分析单只股票的风险"""
try:
# 获取股票数据和技术指标
df = self.analyzer.get_stock_data(stock_code, market_type)
df = self.analyzer.calculate_indicators(df)
# 计算各类风险指标
volatility_risk = self._analyze_volatility_risk(df)
trend_risk = self._analyze_trend_risk(df)
reversal_risk = self._analyze_reversal_risk(df)
volume_risk = self._analyze_volume_risk(df)
# 综合评估总体风险
total_risk_score = (
volatility_risk['score'] * 0.3 +
trend_risk['score'] * 0.3 +
reversal_risk['score'] * 0.25 +
volume_risk['score'] * 0.15
)
# 确定风险等级
if total_risk_score >= 80:
risk_level = "极高"
elif total_risk_score >= 60:
risk_level = "高"
elif total_risk_score >= 40:
risk_level = "中等"
elif total_risk_score >= 20:
risk_level = "低"
else:
risk_level = "极低"
# 生成风险警报
alerts = []
if volatility_risk['score'] >= 70:
alerts.append({
"type": "volatility",
"level": "高",
"message": f"波动率风险较高 ({volatility_risk['value']:.2f}%),可能面临大幅波动"
})
if trend_risk['score'] >= 70:
alerts.append({
"type": "trend",
"level": "高",
"message": f"趋势风险较高,当前处于{trend_risk['trend']}趋势,可能面临加速下跌"
})
if reversal_risk['score'] >= 70:
alerts.append({
"type": "reversal",
"level": "高",
"message": f"趋势反转风险较高,技术指标显示可能{reversal_risk['direction']}反转"
})
if volume_risk['score'] >= 70:
alerts.append({
"type": "volume",
"level": "高",
"message": f"成交量异常,{volume_risk['pattern']},可能预示价格波动"
})
return {
"total_risk_score": total_risk_score,
"risk_level": risk_level,
"volatility_risk": volatility_risk,
"trend_risk": trend_risk,
"reversal_risk": reversal_risk,
"volume_risk": volume_risk,
"alerts": alerts
}
except Exception as e:
print(f"分析股票风险出错: {str(e)}")
return {
"error": f"分析风险时出错: {str(e)}"
}
def _analyze_volatility_risk(self, df):
"""分析波动率风险"""
# 计算近期波动率
recent_volatility = df.iloc[-1]['Volatility']
# 计算波动率变化
avg_volatility = df['Volatility'].mean()
volatility_change = recent_volatility / avg_volatility - 1
# 评估风险分数
if recent_volatility > 5 and volatility_change > 0.5:
score = 90 # 极高风险
elif recent_volatility > 4 and volatility_change > 0.3:
score = 75 # 高风险
elif recent_volatility > 3 and volatility_change > 0.1:
score = 60 # 中高风险
elif recent_volatility > 2:
score = 40 # 中等风险
elif recent_volatility > 1:
score = 20 # 低风险
else:
score = 0 # 极低风险
return {
"score": score,
"value": recent_volatility,
"change": volatility_change * 100,
"risk_level": "高" if score >= 60 else "中" if score >= 30 else "低"
}
def _analyze_trend_risk(self, df):
"""分析趋势风险"""
# 获取均线数据
ma5 = df.iloc[-1]['MA5']
ma20 = df.iloc[-1]['MA20']
ma60 = df.iloc[-1]['MA60']
# 判断当前趋势
if ma5 < ma20 < ma60:
trend = "下降"
# 判断下跌加速程度
ma5_ma20_gap = (ma20 - ma5) / ma20 * 100
if ma5_ma20_gap > 5:
score = 90 # 极高风险
elif ma5_ma20_gap > 3:
score = 75 # 高风险
elif ma5_ma20_gap > 1:
score = 60 # 中高风险
else:
score = 50 # 中等风险
elif ma5 > ma20 > ma60:
trend = "上升"
score = 20 # 低风险
else:
trend = "盘整"
score = 40 # 中等风险
return {
"score": score,
"trend": trend,
"risk_level": "高" if score >= 60 else "中" if score >= 30 else "低"
}
def _analyze_reversal_risk(self, df):
"""分析趋势反转风险"""
# 获取最新指标
rsi = df.iloc[-1]['RSI']
macd = df.iloc[-1]['MACD']
signal = df.iloc[-1]['Signal']
price = df.iloc[-1]['close']
ma20 = df.iloc[-1]['MA20']
# 判断潜在趋势反转信号
reversal_signals = 0
# RSI超买/超卖
if rsi > 75:
reversal_signals += 1
direction = "向下"
elif rsi < 25:
reversal_signals += 1
direction = "向上"
else:
direction = "无明确方向"
# MACD死叉/金叉
if macd > signal and df.iloc[-2]['MACD'] <= df.iloc[-2]['Signal']:
reversal_signals += 1
direction = "向上"
elif macd < signal and df.iloc[-2]['MACD'] >= df.iloc[-2]['Signal']:
reversal_signals += 1
direction = "向下"
# 价格与均线关系
if price > ma20 * 1.1:
reversal_signals += 1
direction = "向下"
elif price < ma20 * 0.9:
reversal_signals += 1
direction = "向上"
# 评估风险分数
if reversal_signals >= 3:
score = 90 # 极高风险
elif reversal_signals == 2:
score = 70 # 高风险
elif reversal_signals == 1:
score = 40 # 中等风险
else:
score = 10 # 低风险
return {
"score": score,
"reversal_signals": reversal_signals,
"direction": direction,
"risk_level": "高" if score >= 60 else "中" if score >= 30 else "低"
}
def _analyze_volume_risk(self, df):
"""分析成交量风险"""
# 计算成交量变化
recent_volume = df.iloc[-1]['volume']
avg_volume = df['volume'].rolling(window=20).mean().iloc[-1]
volume_ratio = recent_volume / avg_volume
# 判断成交量模式
if volume_ratio > 3:
pattern = "成交量暴增"
score = 90 # 极高风险
elif volume_ratio > 2:
pattern = "成交量显著放大"
score = 70 # 高风险
elif volume_ratio > 1.5:
pattern = "成交量温和放大"
score = 50 # 中等风险
elif volume_ratio < 0.5:
pattern = "成交量萎缩"
score = 40 # 中低风险
else:
pattern = "成交量正常"
score = 20 # 低风险
# 价格与成交量背离分析
price_change = (df.iloc[-1]['close'] - df.iloc[-5]['close']) / df.iloc[-5]['close']
volume_change = (recent_volume - df.iloc[-5]['volume']) / df.iloc[-5]['volume']
if price_change > 0.05 and volume_change < -0.3:
pattern = "价量背离(价格上涨但量能萎缩)"
score = max(score, 80) # 提高风险评分
elif price_change < -0.05 and volume_change < -0.3:
pattern = "价量同向(价格下跌且量能萎缩)"
score = max(score, 70) # 提高风险评分
elif price_change < -0.05 and volume_change > 0.5:
pattern = "价量同向(价格下跌且量能放大)"
score = max(score, 85) # 提高风险评分
return {
"score": score,
"volume_ratio": volume_ratio,
"pattern": pattern,
"risk_level": "高" if score >= 60 else "中" if score >= 30 else "低"
}
def analyze_portfolio_risk(self, portfolio):
"""分析投资组合整体风险"""
try:
if not portfolio or len(portfolio) == 0:
return {"error": "投资组合为空"}
# 分析每只股票的风险
stock_risks = {}
total_weight = 0
weighted_risk_score = 0
for stock in portfolio:
stock_code = stock.get('stock_code')
weight = stock.get('weight', 1)
market_type = stock.get('market_type', 'A')
if not stock_code:
continue
# 分析股票风险
risk = self.analyze_stock_risk(stock_code, market_type)
stock_risks[stock_code] = risk
# 计算加权风险分数
total_weight += weight
weighted_risk_score += risk.get('total_risk_score', 50) * weight
# 计算组合总风险分数
if total_weight > 0:
portfolio_risk_score = weighted_risk_score / total_weight
else:
portfolio_risk_score = 0
# 确定风险等级
if portfolio_risk_score >= 80:
risk_level = "极高"
elif portfolio_risk_score >= 60:
risk_level = "高"
elif portfolio_risk_score >= 40:
risk_level = "中等"
elif portfolio_risk_score >= 20:
risk_level = "低"
else:
risk_level = "极低"
# 收集高风险股票
high_risk_stocks = [
{
"stock_code": code,
"risk_score": risk.get('total_risk_score', 0),
"risk_level": risk.get('risk_level', '未知')
}
for code, risk in stock_risks.items()
if risk.get('total_risk_score', 0) >= 60
]
# 收集所有风险警报
all_alerts = []
for code, risk in stock_risks.items():
for alert in risk.get('alerts', []):
all_alerts.append({
"stock_code": code,
**alert
})
# 分析风险集中度
risk_concentration = self._analyze_risk_concentration(portfolio, stock_risks)
return {
"portfolio_risk_score": portfolio_risk_score,
"risk_level": risk_level,
"high_risk_stocks": high_risk_stocks,
"alerts": all_alerts,
"risk_concentration": risk_concentration,
"stock_risks": stock_risks
}
except Exception as e:
print(f"分析投资组合风险出错: {str(e)}")
return {
"error": f"分析投资组合风险时出错: {str(e)}"
}
def _analyze_risk_concentration(self, portfolio, stock_risks):
"""分析风险集中度"""
# 分析行业集中度
industries = {}
for stock in portfolio:
stock_code = stock.get('stock_code')
stock_info = self.analyzer.get_stock_info(stock_code)
industry = stock_info.get('行业', '未知')
weight = stock.get('weight', 1)
if industry in industries:
industries[industry] += weight
else:
industries[industry] = weight
# 找出权重最大的行业
max_industry = max(industries.items(), key=lambda x: x[1]) if industries else ('未知', 0)
# 计算高风险股票总权重
high_risk_weight = 0
for stock in portfolio:
stock_code = stock.get('stock_code')
if stock_code in stock_risks and stock_risks[stock_code].get('total_risk_score', 0) >= 60:
high_risk_weight += stock.get('weight', 1)
return {
"max_industry": max_industry[0],
"max_industry_weight": max_industry[1],
"high_risk_weight": high_risk_weight
}