stock / capital_flow_analyzer.py
gitdeem's picture
Upload capital_flow_analyzer.py
4820aff verified
# capital_flow_analyzer.py
import logging
import traceback
import akshare as ak
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class CapitalFlowAnalyzer:
def __init__(self):
self.data_cache = {}
# 设置日志记录
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
self.logger = logging.getLogger(__name__)
def get_concept_fund_flow(self, period="10日排行"):
"""获取概念/行业资金流向数据"""
try:
self.logger.info(f"Getting concept fund flow for period: {period}")
# 检查缓存
cache_key = f"concept_fund_flow_{period}"
if cache_key in self.data_cache:
cache_time, cached_data = self.data_cache[cache_key]
# 如果在最近一小时内有缓存数据,则返回缓存数据
if (datetime.now() - cache_time).total_seconds() < 3600:
return cached_data
# 从akshare获取数据
try:
concept_data = ak.stock_fund_flow_concept(symbol=period)
# 记录实际的列名,以便调试
self.logger.info(f"Actual columns: {list(concept_data.columns)}")
self.logger.info(f"Column count: {len(concept_data.columns)}")
# 处理数据
result = []
for _, row in concept_data.iterrows():
try:
# 使用更灵活的方式获取列数据,适应不同的列名结构
item = {}
# 尝试多种可能的列名
if "序号" in concept_data.columns:
item["rank"] = int(row.get("序号", 0))
elif "排名" in concept_data.columns:
item["rank"] = int(row.get("排名", 0))
else:
item["rank"] = 0
# 行业/概念名称
if "行业" in concept_data.columns:
item["sector"] = row.get("行业", "")
elif "概念名称" in concept_data.columns:
item["sector"] = row.get("概念名称", "")
elif "名称" in concept_data.columns:
item["sector"] = row.get("名称", "")
else:
# 尝试找到包含"名称"的列
name_cols = [col for col in concept_data.columns if "名称" in col or "行业" in col]
if name_cols:
item["sector"] = row.get(name_cols[0], "")
else:
item["sector"] = ""
# 公司家数
if "公司家数" in concept_data.columns:
item["company_count"] = int(row.get("公司家数", 0))
else:
item["company_count"] = 0
# 行业指数
if "行业指数" in concept_data.columns:
item["sector_index"] = float(row.get("行业指数", 0))
elif "概念指数" in concept_data.columns:
item["sector_index"] = float(row.get("概念指数", 0))
else:
item["sector_index"] = 0
# 涨跌幅
if "阶段涨跌幅" in concept_data.columns:
item["change_percent"] = self._parse_percent(row.get("阶段涨跌幅", "0%"))
elif "涨跌幅" in concept_data.columns:
item["change_percent"] = self._parse_percent(row.get("涨跌幅", "0%"))
else:
# 尝试找到包含"涨跌"的列
change_cols = [col for col in concept_data.columns if "涨跌" in col]
if change_cols:
item["change_percent"] = self._parse_percent(row.get(change_cols[0], "0%"))
else:
item["change_percent"] = 0
# 资金流向数据
# 尝试多种可能的列名组合
if "流入资金" in concept_data.columns and "流出资金" in concept_data.columns:
item["inflow"] = float(row.get("流入资金", 0))
item["outflow"] = float(row.get("流出资金", 0))
item["net_flow"] = float(row.get("净额", 0))
elif "主力净流入-净额" in concept_data.columns:
item["inflow"] = 0 # 这种情况下可能没有单独的流入流出数据
item["outflow"] = 0
item["net_flow"] = float(row.get("主力净流入-净额", 0))
else:
# 尝试找到包含特定关键词的列
inflow_cols = [col for col in concept_data.columns if "流入" in col and "净" not in col]
outflow_cols = [col for col in concept_data.columns if "流出" in col and "净" not in col]
net_cols = [col for col in concept_data.columns if "净" in col or "净额" in col]
if inflow_cols:
item["inflow"] = float(row.get(inflow_cols[0], 0))
else:
item["inflow"] = 0
if outflow_cols:
item["outflow"] = float(row.get(outflow_cols[0], 0))
else:
item["outflow"] = 0
if net_cols:
item["net_flow"] = float(row.get(net_cols[0], 0))
else:
# 如果没有净额列,尝试计算
item["net_flow"] = item["inflow"] - item["outflow"]
result.append(item)
except Exception as e:
self.logger.warning(f"Error processing row in concept fund flow: {str(e)}")
continue
except ValueError as e:
self.logger.error(f"ValueError in stock_fund_flow_concept: {str(e)}")
# 如果出现ValueError(可能是列名不匹配),尝试不重命名列直接处理数据
try:
concept_data = ak.stock_fund_flow_concept(symbol=period)
self.logger.info(f"Retrying with original columns: {list(concept_data.columns)}")
# 简化处理逻辑,直接使用原始列名
result = []
for _, row in concept_data.iterrows():
try:
item = {}
# 尝试将每一列的数据都添加到结果中
for col in concept_data.columns:
try:
# 尝试转换为数值类型
if isinstance(row[col], str) and '%' in row[col]:
item[col] = self._parse_percent(row[col])
else:
try:
item[col] = float(row[col])
except (ValueError, TypeError):
item[col] = row[col]
except Exception:
item[col] = row[col]
# 添加一些标准化的键,以便与现有代码兼容
if "名称" in item or "概念名称" in item or "行业" in item:
item["sector"] = item.get("名称", item.get("概念名称", item.get("行业", "")))
if "涨跌幅" in item:
item["change_percent"] = item["涨跌幅"]
result.append(item)
except Exception as e:
self.logger.warning(f"Error processing row with original columns: {str(e)}")
continue
except Exception as e2:
self.logger.error(f"Failed to process with original columns: {str(e2)}")
# 如果再次失败,返回模拟数据
result = self._generate_mock_concept_fund_flow(period)
# 缓存结果
self.data_cache[cache_key] = (datetime.now(), result)
return result
except Exception as e:
self.logger.error(f"Error getting concept fund flow: {str(e)}")
self.logger.error(traceback.format_exc())
# 如果API调用失败则返回模拟数据
return self._generate_mock_concept_fund_flow(period)
def get_individual_fund_flow_rank(self, period="10日"):
"""获取个股资金流向排名"""
try:
self.logger.info(f"Getting individual fund flow ranking for period: {period}")
# 检查缓存
cache_key = f"individual_fund_flow_rank_{period}"
if cache_key in self.data_cache:
cache_time, cached_data = self.data_cache[cache_key]
# 如果在最近一小时内有缓存数据,则返回缓存数据
if (datetime.now() - cache_time).total_seconds() < 3600:
return cached_data
# 从akshare获取数据
stock_data = ak.stock_individual_fund_flow_rank(indicator=period)
# 处理数据
result = []
for _, row in stock_data.iterrows():
try:
# 根据不同时间段设置列名前缀
period_prefix = "" if period == "今日" else f"{period}"
item = {
"rank": int(row.get("序号", 0)),
"code": row.get("代码", ""),
"name": row.get("名称", ""),
"price": float(row.get("最新价", 0)),
"change_percent": float(row.get(f"{period_prefix}涨跌幅", 0)),
"main_net_inflow": float(row.get(f"{period_prefix}主力净流入-净额", 0)),
"main_net_inflow_percent": float(row.get(f"{period_prefix}主力净流入-净占比", 0)),
"super_large_net_inflow": float(row.get(f"{period_prefix}超大单净流入-净额", 0)),
"super_large_net_inflow_percent": float(row.get(f"{period_prefix}超大单净流入-净占比", 0)),
"large_net_inflow": float(row.get(f"{period_prefix}大单净流入-净额", 0)),
"large_net_inflow_percent": float(row.get(f"{period_prefix}大单净流入-净占比", 0)),
"medium_net_inflow": float(row.get(f"{period_prefix}中单净流入-净额", 0)),
"medium_net_inflow_percent": float(row.get(f"{period_prefix}中单净流入-净占比", 0)),
"small_net_inflow": float(row.get(f"{period_prefix}小单净流入-净额", 0)),
"small_net_inflow_percent": float(row.get(f"{period_prefix}小单净流入-净占比", 0))
}
result.append(item)
except Exception as e:
self.logger.warning(f"Error processing row in individual fund flow rank: {str(e)}")
continue
# 缓存结果
self.data_cache[cache_key] = (datetime.now(), result)
return result
except Exception as e:
self.logger.error(f"Error getting individual fund flow ranking: {str(e)}")
self.logger.error(traceback.format_exc())
# 如果API调用失败则返回模拟数据
return self._generate_mock_individual_fund_flow_rank(period)
def get_individual_fund_flow(self, stock_code, market_type="", re_date="10日"):
"""获取个股资金流向数据"""
try:
self.logger.info(f"Getting fund flow for stock: {stock_code}, market: {market_type}")
# 检查缓存
cache_key = f"individual_fund_flow_{stock_code}_{market_type}"
if cache_key in self.data_cache:
cache_time, cached_data = self.data_cache[cache_key]
# 如果在一小时内有缓存数据,则返回缓存数据
if (datetime.now() - cache_time).total_seconds() < 3600:
return cached_data
# 如果未提供市场类型,则根据股票代码判断
if not market_type:
if stock_code.startswith('6'):
market_type = "sh"
elif stock_code.startswith('0') or stock_code.startswith('3'):
market_type = "sz"
else:
market_type = "sh" # Default to Shanghai
# 从akshare获取数据
flow_data = ak.stock_individual_fund_flow(stock=stock_code, market=market_type)
# 处理数据
result = {
"stock_code": stock_code,
"data": []
}
for _, row in flow_data.iterrows():
try:
item = {
"date": row.get("日期", ""),
"price": float(row.get("收盘价", 0)),
"change_percent": float(row.get("涨跌幅", 0)),
"main_net_inflow": float(row.get("主力净流入-净额", 0)),
"main_net_inflow_percent": float(row.get("主力净流入-净占比", 0)),
"super_large_net_inflow": float(row.get("超大单净流入-净额", 0)),
"super_large_net_inflow_percent": float(row.get("超大单净流入-净占比", 0)),
"large_net_inflow": float(row.get("大单净流入-净额", 0)),
"large_net_inflow_percent": float(row.get("大单净流入-净占比", 0)),
"medium_net_inflow": float(row.get("中单净流入-净额", 0)),
"medium_net_inflow_percent": float(row.get("中单净流入-净占比", 0)),
"small_net_inflow": float(row.get("小单净流入-净额", 0)),
"small_net_inflow_percent": float(row.get("小单净流入-净占比", 0))
}
result["data"].append(item)
except Exception as e:
self.logger.warning(f"Error processing row in individual fund flow: {str(e)}")
continue
# 计算汇总统计数据
if result["data"]:
# 最近数据 (最近10天)
recent_data = result["data"][:min(10, len(result["data"]))]
result["summary"] = {
"recent_days": len(recent_data),
"total_main_net_inflow": sum(item["main_net_inflow"] for item in recent_data),
"avg_main_net_inflow_percent": np.mean(
[item["main_net_inflow_percent"] for item in recent_data]),
"positive_days": sum(1 for item in recent_data if item["main_net_inflow"] > 0),
"negative_days": sum(1 for item in recent_data if item["main_net_inflow"] <= 0)
}
# Cache the result
self.data_cache[cache_key] = (datetime.now(), result)
return result
except Exception as e:
self.logger.error(f"Error getting individual fund flow: {str(e)}")
self.logger.error(traceback.format_exc())
# 如果API调用失败则返回模拟数据
return self._generate_mock_individual_fund_flow(stock_code, market_type)
def get_sector_stocks(self, sector):
"""获取特定行业的股票"""
try:
self.logger.info(f"Getting stocks for sector: {sector}")
# 检查缓存
cache_key = f"sector_stocks_{sector}"
if cache_key in self.data_cache:
cache_time, cached_data = self.data_cache[cache_key]
# 如果在一小时内有缓存数据,则返回缓存数据
if (datetime.now() - cache_time).total_seconds() < 3600:
return cached_data
# 尝试从akshare获取数据
try:
# For industry sectors (using 东方财富 interface)
stocks = ak.stock_board_industry_cons_em(symbol=sector)
# 提取股票列表
if not stocks.empty and '代码' in stocks.columns:
result = []
for _, row in stocks.iterrows():
try:
item = {
"code": row.get("代码", ""),
"name": row.get("名称", ""),
"price": float(row.get("最新价", 0)),
"change_percent": float(row.get("涨跌幅", 0)) if "涨跌幅" in row else 0,
"main_net_inflow": 0, # We'll get this data separately if needed
"main_net_inflow_percent": 0 # We'll get this data separately if needed
}
result.append(item)
except Exception as e:
# self.logger.warning(f"Error processing row in sector stocks: {str(e)}")
continue
# 缓存结果
self.data_cache[cache_key] = (datetime.now(), result)
return result
except Exception as e:
self.logger.warning(f"Failed to get sector stocks from API: {str(e)}")
# 降级到模拟数据
# 如果到达这里,说明无法从API获取数据,返回模拟数据
result = self._generate_mock_sector_stocks(sector)
self.data_cache[cache_key] = (datetime.now(), result)
return result
except Exception as e:
self.logger.error(f"Error getting sector stocks: {str(e)}")
self.logger.error(traceback.format_exc())
# 如果API调用失败则返回模拟数据
return self._generate_mock_sector_stocks(sector)
def calculate_capital_flow_score(self, stock_code, market_type=""):
"""计算股票资金流向评分"""
try:
self.logger.info(f"Calculating capital flow score for stock: {stock_code}")
# 获取个股资金流向数据
fund_flow = self.get_individual_fund_flow(stock_code, market_type)
if not fund_flow or not fund_flow.get("data") or not fund_flow.get("summary"):
return {
"total": 0,
"main_force": 0,
"large_order": 0,
"small_order": 0,
"details": {}
}
# Extract summary statistics
summary = fund_flow["summary"]
recent_days = summary["recent_days"]
total_main_net_inflow = summary["total_main_net_inflow"]
avg_main_net_inflow_percent = summary["avg_main_net_inflow_percent"]
positive_days = summary["positive_days"]
# Calculate main force score (0-40)
main_force_score = 0
# 基于净流入百分比的评分
if avg_main_net_inflow_percent > 3:
main_force_score += 20
elif avg_main_net_inflow_percent > 1:
main_force_score += 15
elif avg_main_net_inflow_percent > 0:
main_force_score += 10
# 基于上涨天数的评分
positive_ratio = positive_days / recent_days if recent_days > 0 else 0
if positive_ratio > 0.7:
main_force_score += 20
elif positive_ratio > 0.5:
main_force_score += 15
elif positive_ratio > 0.3:
main_force_score += 10
# 计算大单评分(0-30分)
large_order_score = 0
# 分析超大单和大单交易
recent_super_large = [item["super_large_net_inflow"] for item in
fund_flow["data"][:recent_days]]
recent_large = [item["large_net_inflow"] for item in fund_flow["data"][:recent_days]]
super_large_positive = sum(1 for x in recent_super_large if x > 0)
large_positive = sum(1 for x in recent_large if x > 0)
# 基于超大单的评分
super_large_ratio = super_large_positive / recent_days if recent_days > 0 else 0
if super_large_ratio > 0.7:
large_order_score += 15
elif super_large_ratio > 0.5:
large_order_score += 10
elif super_large_ratio > 0.3:
large_order_score += 5
# 基于大单的评分
large_ratio = large_positive / recent_days if recent_days > 0 else 0
if large_ratio > 0.7:
large_order_score += 15
elif large_ratio > 0.5:
large_order_score += 10
elif large_ratio > 0.3:
large_order_score += 5
# 计算小单评分(0-30分)
small_order_score = 0
# 分析中单和小单交易
recent_medium = [item["medium_net_inflow"] for item in fund_flow["data"][:recent_days]]
recent_small = [item["small_net_inflow"] for item in fund_flow["data"][:recent_days]]
medium_positive = sum(1 for x in recent_medium if x > 0)
small_positive = sum(1 for x in recent_small if x > 0)
# 基于中单的评分
medium_ratio = medium_positive / recent_days if recent_days > 0 else 0
if medium_ratio > 0.7:
small_order_score += 15
elif medium_ratio > 0.5:
small_order_score += 10
elif medium_ratio > 0.3:
small_order_score += 5
# 基于小单的评分
small_ratio = small_positive / recent_days if recent_days > 0 else 0
if small_ratio > 0.7:
small_order_score += 15
elif small_ratio > 0.5:
small_order_score += 10
elif small_ratio > 0.3:
small_order_score += 5
# 计算总评分
total_score = main_force_score + large_order_score + small_order_score
return {
"total": total_score,
"main_force": main_force_score,
"large_order": large_order_score,
"small_order": small_order_score,
"details": fund_flow
}
except Exception as e:
self.logger.error(f"Error calculating capital flow score: {str(e)}")
self.logger.error(traceback.format_exc())
return {
"total": 0,
"main_force": 0,
"large_order": 0,
"small_order": 0,
"details": {},
"error": str(e)
}
def _parse_percent(self, percent_str):
"""将百分比字符串转换为浮点数"""
try:
if isinstance(percent_str, str) and '%' in percent_str:
return float(percent_str.replace('%', ''))
return float(percent_str)
except (ValueError, TypeError):
return 0.0
def _generate_mock_concept_fund_flow(self, period):
"""生成模拟概念资金流向数据"""
# self.logger.warning(f"Generating mock concept fund flow data for period: {period}")
sectors = [
"新能源", "医药", "半导体", "芯片", "人工智能", "大数据", "云计算", "5G",
"汽车", "消费", "金融", "互联网", "游戏", "农业", "化工", "建筑", "军工",
"钢铁", "有色金属", "煤炭", "石油"
]
result = []
for i, sector in enumerate(sectors):
# 随机数据 - 前半部分为正,后半部分为负
is_positive = i < len(sectors) // 2
inflow = round(np.random.uniform(10, 50), 2) if is_positive else round(
np.random.uniform(5, 20), 2)
outflow = round(np.random.uniform(5, 20), 2) if is_positive else round(
np.random.uniform(10, 50), 2)
net_flow = round(inflow - outflow, 2)
change_percent = round(np.random.uniform(0, 5), 2) if is_positive else round(
np.random.uniform(-5, 0), 2)
item = {
"rank": i + 1,
"sector": sector,
"company_count": np.random.randint(10, 100),
"sector_index": round(np.random.uniform(1000, 5000), 2),
"change_percent": change_percent,
"inflow": inflow,
"outflow": outflow,
"net_flow": net_flow
}
result.append(item)
# 按净流入降序排序
return sorted(result, key=lambda x: x["net_flow"], reverse=True)
def _generate_mock_individual_fund_flow_rank(self, period):
"""生成模拟个股资金流向排名数据"""
# self.logger.warning(f"Generating mock individual fund flow ranking data for period: {period}")
# Sample stock data
stocks = [
{"code": "600000", "name": "浦发银行"}, {"code": "600036", "name": "招商银行"},
{"code": "601318", "name": "中国平安"}, {"code": "600519", "name": "贵州茅台"},
{"code": "000858", "name": "五粮液"}, {"code": "000333", "name": "美的集团"},
{"code": "600276", "name": "恒瑞医药"}, {"code": "601888", "name": "中国中免"},
{"code": "600030", "name": "中信证券"}, {"code": "601166", "name": "兴业银行"},
{"code": "600887", "name": "伊利股份"}, {"code": "601398", "name": "工商银行"},
{"code": "600028", "name": "中国石化"}, {"code": "601988", "name": "中国银行"},
{"code": "601857", "name": "中国石油"}, {"code": "600019", "name": "宝钢股份"},
{"code": "600050", "name": "中国联通"}, {"code": "601328", "name": "交通银行"},
{"code": "601668", "name": "中国建筑"}, {"code": "601288", "name": "农业银行"}
]
result = []
for i, stock in enumerate(stocks):
# 随机数据 - 前半部分为正,后半部分为负
is_positive = i < len(stocks) // 2
main_net_inflow = round(np.random.uniform(1e6, 5e7), 2) if is_positive else round(
np.random.uniform(-5e7, -1e6), 2)
main_net_inflow_percent = round(np.random.uniform(1, 10), 2) if is_positive else round(
np.random.uniform(-10, -1), 2)
super_large_net_inflow = round(main_net_inflow * np.random.uniform(0.3, 0.5), 2)
super_large_net_inflow_percent = round(main_net_inflow_percent * np.random.uniform(0.3, 0.5), 2)
large_net_inflow = round(main_net_inflow * np.random.uniform(0.3, 0.5), 2)
large_net_inflow_percent = round(main_net_inflow_percent * np.random.uniform(0.3, 0.5), 2)
medium_net_inflow = round(np.random.uniform(-1e6, 1e6), 2)
medium_net_inflow_percent = round(np.random.uniform(-2, 2), 2)
small_net_inflow = round(np.random.uniform(-1e6, 1e6), 2)
small_net_inflow_percent = round(np.random.uniform(-2, 2), 2)
change_percent = round(np.random.uniform(0, 5), 2) if is_positive else round(np.random.uniform(-5, 0), 2)
item = {
"rank": i + 1,
"code": stock["code"],
"name": stock["name"],
"price": round(np.random.uniform(10, 100), 2),
"change_percent": change_percent,
"main_net_inflow": main_net_inflow,
"main_net_inflow_percent": main_net_inflow_percent,
"super_large_net_inflow": super_large_net_inflow,
"super_large_net_inflow_percent": super_large_net_inflow_percent,
"large_net_inflow": large_net_inflow,
"large_net_inflow_percent": large_net_inflow_percent,
"medium_net_inflow": medium_net_inflow,
"medium_net_inflow_percent": medium_net_inflow_percent,
"small_net_inflow": small_net_inflow,
"small_net_inflow_percent": small_net_inflow_percent
}
result.append(item)
# 按主力净流入降序排序
return sorted(result, key=lambda x: x["main_net_inflow"], reverse=True)
def _generate_mock_individual_fund_flow(self, stock_code, market_type):
"""生成模拟个股资金流向数据"""
# self.logger.warning(f"Generating mock individual fund flow data for stock: {stock_code}")
# 生成30天的模拟数据
end_date = datetime.now()
result = {
"stock_code": stock_code,
"data": []
}
# 创建模拟价格趋势(使用合理的随机游走)
base_price = np.random.uniform(10, 100)
current_price = base_price
for i in range(30):
date = (end_date - timedelta(days=i)).strftime('%Y-%m-%d')
# 随机价格变化(-2%到+2%)
change_percent = np.random.uniform(-2, 2)
price = round(current_price * (1 + change_percent / 100), 2)
current_price = price
# 随机资金流向数据,与价格变化有一定相关性
is_positive = change_percent > 0
main_net_inflow = round(np.random.uniform(1e5, 5e6), 2) if is_positive else round(
np.random.uniform(-5e6, -1e5), 2)
main_net_inflow_percent = round(np.random.uniform(1, 5), 2) if is_positive else round(
np.random.uniform(-5, -1), 2)
super_large_net_inflow = round(main_net_inflow * np.random.uniform(0.3, 0.5), 2)
super_large_net_inflow_percent = round(main_net_inflow_percent * np.random.uniform(0.3, 0.5), 2)
large_net_inflow = round(main_net_inflow * np.random.uniform(0.3, 0.5), 2)
large_net_inflow_percent = round(main_net_inflow_percent * np.random.uniform(0.3, 0.5), 2)
medium_net_inflow = round(np.random.uniform(-1e5, 1e5), 2)
medium_net_inflow_percent = round(np.random.uniform(-2, 2), 2)
small_net_inflow = round(np.random.uniform(-1e5, 1e5), 2)
small_net_inflow_percent = round(np.random.uniform(-2, 2), 2)
item = {
"date": date,
"price": price,
"change_percent": round(change_percent, 2),
"main_net_inflow": main_net_inflow,
"main_net_inflow_percent": main_net_inflow_percent,
"super_large_net_inflow": super_large_net_inflow,
"super_large_net_inflow_percent": super_large_net_inflow_percent,
"large_net_inflow": large_net_inflow,
"large_net_inflow_percent": large_net_inflow_percent,
"medium_net_inflow": medium_net_inflow,
"medium_net_inflow_percent": medium_net_inflow_percent,
"small_net_inflow": small_net_inflow,
"small_net_inflow_percent": small_net_inflow_percent
}
result["data"].append(item)
# 按日期降序排序(最新的在前)
result["data"].sort(key=lambda x: x["date"], reverse=True)
# 计算汇总统计数据
recent_data = result["data"][:10]
result["summary"] = {
"recent_days": len(recent_data),
"total_main_net_inflow": sum(item["main_net_inflow"] for item in recent_data),
"avg_main_net_inflow_percent": np.mean([item["main_net_inflow_percent"] for item in recent_data]),
"positive_days": sum(1 for item in recent_data if item["main_net_inflow"] > 0),
"negative_days": sum(1 for item in recent_data if item["main_net_inflow"] <= 0)
}
return result
def _generate_mock_sector_stocks(self, sector):
"""生成模拟行业股票数据"""
# self.logger.warning(f"Generating mock sector stocks for: {sector}")
# 要生成的股票数量
num_stocks = np.random.randint(20, 50)
result = []
for i in range(num_stocks):
prefix = "6" if np.random.random() > 0.5 else "0"
stock_code = prefix + str(100000 + i).zfill(5)[-5:]
change_percent = round(np.random.uniform(-5, 5), 2)
item = {
"code": stock_code,
"name": f"{sector}股票{i + 1}",
"price": round(np.random.uniform(10, 100), 2),
"change_percent": change_percent,
"main_net_inflow": round(np.random.uniform(-1e6, 1e6), 2),
"main_net_inflow_percent": round(np.random.uniform(-5, 5), 2)
}
result.append(item)
# 按主力净流入降序排序
return sorted(result, key=lambda x: x["main_net_inflow"], reverse=True)