|
|
|
"""
|
|
智能分析系统(股票) - 股票市场数据分析系统
|
|
开发者:熊猫大侠
|
|
版本:v2.1.0
|
|
许可证:MIT License
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from datetime import datetime, timedelta
|
|
|
|
class RiskMonitor:
|
|
def __init__(self, analyzer):
|
|
self.analyzer = analyzer
|
|
|
|
def analyze_stock_risk(self, stock_code, market_type='A'):
|
|
"""分析单只股票的风险"""
|
|
try:
|
|
|
|
df = self.analyzer.get_stock_data(stock_code, market_type)
|
|
df = self.analyzer.calculate_indicators(df)
|
|
|
|
|
|
volatility_risk = self._analyze_volatility_risk(df)
|
|
trend_risk = self._analyze_trend_risk(df)
|
|
reversal_risk = self._analyze_reversal_risk(df)
|
|
volume_risk = self._analyze_volume_risk(df)
|
|
|
|
|
|
total_risk_score = (
|
|
volatility_risk['score'] * 0.3 +
|
|
trend_risk['score'] * 0.3 +
|
|
reversal_risk['score'] * 0.25 +
|
|
volume_risk['score'] * 0.15
|
|
)
|
|
|
|
|
|
if total_risk_score >= 80:
|
|
risk_level = "极高"
|
|
elif total_risk_score >= 60:
|
|
risk_level = "高"
|
|
elif total_risk_score >= 40:
|
|
risk_level = "中等"
|
|
elif total_risk_score >= 20:
|
|
risk_level = "低"
|
|
else:
|
|
risk_level = "极低"
|
|
|
|
|
|
alerts = []
|
|
|
|
if volatility_risk['score'] >= 70:
|
|
alerts.append({
|
|
"type": "volatility",
|
|
"level": "高",
|
|
"message": f"波动率风险较高 ({volatility_risk['value']:.2f}%),可能面临大幅波动"
|
|
})
|
|
|
|
if trend_risk['score'] >= 70:
|
|
alerts.append({
|
|
"type": "trend",
|
|
"level": "高",
|
|
"message": f"趋势风险较高,当前处于{trend_risk['trend']}趋势,可能面临加速下跌"
|
|
})
|
|
|
|
if reversal_risk['score'] >= 70:
|
|
alerts.append({
|
|
"type": "reversal",
|
|
"level": "高",
|
|
"message": f"趋势反转风险较高,技术指标显示可能{reversal_risk['direction']}反转"
|
|
})
|
|
|
|
if volume_risk['score'] >= 70:
|
|
alerts.append({
|
|
"type": "volume",
|
|
"level": "高",
|
|
"message": f"成交量异常,{volume_risk['pattern']},可能预示价格波动"
|
|
})
|
|
|
|
return {
|
|
"total_risk_score": total_risk_score,
|
|
"risk_level": risk_level,
|
|
"volatility_risk": volatility_risk,
|
|
"trend_risk": trend_risk,
|
|
"reversal_risk": reversal_risk,
|
|
"volume_risk": volume_risk,
|
|
"alerts": alerts
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"分析股票风险出错: {str(e)}")
|
|
return {
|
|
"error": f"分析风险时出错: {str(e)}"
|
|
}
|
|
|
|
def _analyze_volatility_risk(self, df):
|
|
"""分析波动率风险"""
|
|
|
|
recent_volatility = df.iloc[-1]['Volatility']
|
|
|
|
|
|
avg_volatility = df['Volatility'].mean()
|
|
volatility_change = recent_volatility / avg_volatility - 1
|
|
|
|
|
|
if recent_volatility > 5 and volatility_change > 0.5:
|
|
score = 90
|
|
elif recent_volatility > 4 and volatility_change > 0.3:
|
|
score = 75
|
|
elif recent_volatility > 3 and volatility_change > 0.1:
|
|
score = 60
|
|
elif recent_volatility > 2:
|
|
score = 40
|
|
elif recent_volatility > 1:
|
|
score = 20
|
|
else:
|
|
score = 0
|
|
|
|
return {
|
|
"score": score,
|
|
"value": recent_volatility,
|
|
"change": volatility_change * 100,
|
|
"risk_level": "高" if score >= 60 else "中" if score >= 30 else "低"
|
|
}
|
|
|
|
def _analyze_trend_risk(self, df):
|
|
"""分析趋势风险"""
|
|
|
|
ma5 = df.iloc[-1]['MA5']
|
|
ma20 = df.iloc[-1]['MA20']
|
|
ma60 = df.iloc[-1]['MA60']
|
|
|
|
|
|
if ma5 < ma20 < ma60:
|
|
trend = "下降"
|
|
|
|
|
|
ma5_ma20_gap = (ma20 - ma5) / ma20 * 100
|
|
|
|
if ma5_ma20_gap > 5:
|
|
score = 90
|
|
elif ma5_ma20_gap > 3:
|
|
score = 75
|
|
elif ma5_ma20_gap > 1:
|
|
score = 60
|
|
else:
|
|
score = 50
|
|
|
|
elif ma5 > ma20 > ma60:
|
|
trend = "上升"
|
|
score = 20
|
|
else:
|
|
trend = "盘整"
|
|
score = 40
|
|
|
|
return {
|
|
"score": score,
|
|
"trend": trend,
|
|
"risk_level": "高" if score >= 60 else "中" if score >= 30 else "低"
|
|
}
|
|
|
|
def _analyze_reversal_risk(self, df):
|
|
"""分析趋势反转风险"""
|
|
|
|
rsi = df.iloc[-1]['RSI']
|
|
macd = df.iloc[-1]['MACD']
|
|
signal = df.iloc[-1]['Signal']
|
|
price = df.iloc[-1]['close']
|
|
ma20 = df.iloc[-1]['MA20']
|
|
|
|
|
|
reversal_signals = 0
|
|
|
|
|
|
if rsi > 75:
|
|
reversal_signals += 1
|
|
direction = "向下"
|
|
elif rsi < 25:
|
|
reversal_signals += 1
|
|
direction = "向上"
|
|
else:
|
|
direction = "无明确方向"
|
|
|
|
|
|
if macd > signal and df.iloc[-2]['MACD'] <= df.iloc[-2]['Signal']:
|
|
reversal_signals += 1
|
|
direction = "向上"
|
|
elif macd < signal and df.iloc[-2]['MACD'] >= df.iloc[-2]['Signal']:
|
|
reversal_signals += 1
|
|
direction = "向下"
|
|
|
|
|
|
if price > ma20 * 1.1:
|
|
reversal_signals += 1
|
|
direction = "向下"
|
|
elif price < ma20 * 0.9:
|
|
reversal_signals += 1
|
|
direction = "向上"
|
|
|
|
|
|
if reversal_signals >= 3:
|
|
score = 90
|
|
elif reversal_signals == 2:
|
|
score = 70
|
|
elif reversal_signals == 1:
|
|
score = 40
|
|
else:
|
|
score = 10
|
|
|
|
return {
|
|
"score": score,
|
|
"reversal_signals": reversal_signals,
|
|
"direction": direction,
|
|
"risk_level": "高" if score >= 60 else "中" if score >= 30 else "低"
|
|
}
|
|
|
|
def _analyze_volume_risk(self, df):
|
|
"""分析成交量风险"""
|
|
|
|
recent_volume = df.iloc[-1]['volume']
|
|
avg_volume = df['volume'].rolling(window=20).mean().iloc[-1]
|
|
volume_ratio = recent_volume / avg_volume
|
|
|
|
|
|
if volume_ratio > 3:
|
|
pattern = "成交量暴增"
|
|
score = 90
|
|
elif volume_ratio > 2:
|
|
pattern = "成交量显著放大"
|
|
score = 70
|
|
elif volume_ratio > 1.5:
|
|
pattern = "成交量温和放大"
|
|
score = 50
|
|
elif volume_ratio < 0.5:
|
|
pattern = "成交量萎缩"
|
|
score = 40
|
|
else:
|
|
pattern = "成交量正常"
|
|
score = 20
|
|
|
|
|
|
price_change = (df.iloc[-1]['close'] - df.iloc[-5]['close']) / df.iloc[-5]['close']
|
|
volume_change = (recent_volume - df.iloc[-5]['volume']) / df.iloc[-5]['volume']
|
|
|
|
if price_change > 0.05 and volume_change < -0.3:
|
|
pattern = "价量背离(价格上涨但量能萎缩)"
|
|
score = max(score, 80)
|
|
elif price_change < -0.05 and volume_change < -0.3:
|
|
pattern = "价量同向(价格下跌且量能萎缩)"
|
|
score = max(score, 70)
|
|
elif price_change < -0.05 and volume_change > 0.5:
|
|
pattern = "价量同向(价格下跌且量能放大)"
|
|
score = max(score, 85)
|
|
|
|
return {
|
|
"score": score,
|
|
"volume_ratio": volume_ratio,
|
|
"pattern": pattern,
|
|
"risk_level": "高" if score >= 60 else "中" if score >= 30 else "低"
|
|
}
|
|
|
|
def analyze_portfolio_risk(self, portfolio):
|
|
"""分析投资组合整体风险"""
|
|
try:
|
|
if not portfolio or len(portfolio) == 0:
|
|
return {"error": "投资组合为空"}
|
|
|
|
|
|
stock_risks = {}
|
|
total_weight = 0
|
|
weighted_risk_score = 0
|
|
|
|
for stock in portfolio:
|
|
stock_code = stock.get('stock_code')
|
|
weight = stock.get('weight', 1)
|
|
market_type = stock.get('market_type', 'A')
|
|
|
|
if not stock_code:
|
|
continue
|
|
|
|
|
|
risk = self.analyze_stock_risk(stock_code, market_type)
|
|
stock_risks[stock_code] = risk
|
|
|
|
|
|
total_weight += weight
|
|
weighted_risk_score += risk.get('total_risk_score', 50) * weight
|
|
|
|
|
|
if total_weight > 0:
|
|
portfolio_risk_score = weighted_risk_score / total_weight
|
|
else:
|
|
portfolio_risk_score = 0
|
|
|
|
|
|
if portfolio_risk_score >= 80:
|
|
risk_level = "极高"
|
|
elif portfolio_risk_score >= 60:
|
|
risk_level = "高"
|
|
elif portfolio_risk_score >= 40:
|
|
risk_level = "中等"
|
|
elif portfolio_risk_score >= 20:
|
|
risk_level = "低"
|
|
else:
|
|
risk_level = "极低"
|
|
|
|
|
|
high_risk_stocks = [
|
|
{
|
|
"stock_code": code,
|
|
"risk_score": risk.get('total_risk_score', 0),
|
|
"risk_level": risk.get('risk_level', '未知')
|
|
}
|
|
for code, risk in stock_risks.items()
|
|
if risk.get('total_risk_score', 0) >= 60
|
|
]
|
|
|
|
|
|
all_alerts = []
|
|
for code, risk in stock_risks.items():
|
|
for alert in risk.get('alerts', []):
|
|
all_alerts.append({
|
|
"stock_code": code,
|
|
**alert
|
|
})
|
|
|
|
|
|
risk_concentration = self._analyze_risk_concentration(portfolio, stock_risks)
|
|
|
|
return {
|
|
"portfolio_risk_score": portfolio_risk_score,
|
|
"risk_level": risk_level,
|
|
"high_risk_stocks": high_risk_stocks,
|
|
"alerts": all_alerts,
|
|
"risk_concentration": risk_concentration,
|
|
"stock_risks": stock_risks
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"分析投资组合风险出错: {str(e)}")
|
|
return {
|
|
"error": f"分析投资组合风险时出错: {str(e)}"
|
|
}
|
|
|
|
def _analyze_risk_concentration(self, portfolio, stock_risks):
|
|
"""分析风险集中度"""
|
|
|
|
industries = {}
|
|
for stock in portfolio:
|
|
stock_code = stock.get('stock_code')
|
|
stock_info = self.analyzer.get_stock_info(stock_code)
|
|
industry = stock_info.get('行业', '未知')
|
|
weight = stock.get('weight', 1)
|
|
|
|
if industry in industries:
|
|
industries[industry] += weight
|
|
else:
|
|
industries[industry] = weight
|
|
|
|
|
|
max_industry = max(industries.items(), key=lambda x: x[1]) if industries else ('未知', 0)
|
|
|
|
|
|
high_risk_weight = 0
|
|
for stock in portfolio:
|
|
stock_code = stock.get('stock_code')
|
|
if stock_code in stock_risks and stock_risks[stock_code].get('total_risk_score', 0) >= 60:
|
|
high_risk_weight += stock.get('weight', 1)
|
|
|
|
return {
|
|
"max_industry": max_industry[0],
|
|
"max_industry_weight": max_industry[1],
|
|
"high_risk_weight": high_risk_weight
|
|
} |