|
""" |
|
Quantitative Alpha Mining Platform with LLM Discovery |
|
Author: Spencer Purdy |
|
Description: A sophisticated platform that leverages LLMs to discover and evaluate alpha factors, |
|
combining classical quantitative approaches with modern ML techniques for comprehensive |
|
market analysis and portfolio construction. |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
import numpy as np |
|
import pandas as pd |
|
import torch |
|
import torch.nn as nn |
|
from datetime import datetime, timedelta |
|
import gradio as gr |
|
import plotly.graph_objects as go |
|
import plotly.express as px |
|
from plotly.subplots import make_subplots |
|
import json |
|
import random |
|
from typing import Dict, List, Tuple, Optional, Any, Union |
|
from dataclasses import dataclass, field |
|
from collections import defaultdict |
|
import warnings |
|
import os |
|
import openai |
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
from scipy import stats |
|
from scipy.optimize import minimize |
|
from sklearn.preprocessing import StandardScaler |
|
from sklearn.decomposition import PCA |
|
from sklearn.cluster import KMeans |
|
from statsmodels.tsa.stattools import adfuller |
|
import statsmodels.api as sm |
|
|
|
|
|
import ta |
|
|
|
|
|
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification |
|
|
|
|
|
np.random.seed(42) |
|
torch.manual_seed(42) |
|
random.seed(42) |
|
|
|
|
|
RISK_FREE_RATE = 0.02 |
|
TRANSACTION_COST = 0.001 |
|
REBALANCE_FREQUENCY = 20 |
|
MIN_FACTOR_IC = 0.02 |
|
MAX_FACTOR_CORRELATION = 0.7 |
|
|
|
@dataclass |
|
class AlphaFactor: |
|
"""Data class representing an alpha factor""" |
|
name: str |
|
formula: str |
|
category: str |
|
lookback_period: int |
|
ic_score: float = 0.0 |
|
sharpe_ratio: float = 0.0 |
|
turnover: float = 0.0 |
|
decay_rate: float = 0.0 |
|
regime_performance: Dict[str, float] = field(default_factory=dict) |
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
@dataclass |
|
class MarketRegime: |
|
"""Data class for market regime identification""" |
|
regime_type: str |
|
confidence: float |
|
characteristics: Dict[str, float] |
|
start_date: datetime |
|
end_date: Optional[datetime] = None |
|
|
|
class ClassicalAlphaFactors: |
|
"""Implementation of classical alpha factors inspired by WorldQuant's 101 Alphas""" |
|
|
|
@staticmethod |
|
def safe_rank(series: pd.Series) -> pd.Series: |
|
"""Safely rank a series handling NaN values""" |
|
return series.rank(pct=True, na_option='keep') |
|
|
|
@staticmethod |
|
def safe_rolling(series: pd.Series, window: int, func: str = 'mean') -> pd.Series: |
|
"""Safely apply rolling window operations""" |
|
if len(series) < window: |
|
return pd.Series(np.nan, index=series.index) |
|
|
|
if func == 'mean': |
|
return series.rolling(window, min_periods=1).mean() |
|
elif func == 'std': |
|
return series.rolling(window, min_periods=1).std() |
|
elif func == 'max': |
|
return series.rolling(window, min_periods=1).max() |
|
elif func == 'min': |
|
return series.rolling(window, min_periods=1).min() |
|
elif func == 'sum': |
|
return series.rolling(window, min_periods=1).sum() |
|
else: |
|
return series.rolling(window, min_periods=1).mean() |
|
|
|
@staticmethod |
|
def alpha_001(data: pd.DataFrame) -> pd.Series: |
|
"""Alpha#001: Momentum-based factor with volatility adjustment""" |
|
try: |
|
returns = data['close'].pct_change().fillna(0) |
|
condition = returns < 0 |
|
stddev = ClassicalAlphaFactors.safe_rolling(returns, 20, 'std').fillna(0.01) |
|
|
|
signed_power = pd.Series( |
|
np.where(condition, stddev ** 2, data['close'] ** 2), |
|
index=data.index |
|
) |
|
|
|
ts_argmax = signed_power.rolling(5, min_periods=1).apply( |
|
lambda x: x.argmax() if len(x) > 0 else 0 |
|
) |
|
|
|
result = ClassicalAlphaFactors.safe_rank(ts_argmax) - 0.5 |
|
return result.fillna(0) |
|
except Exception as e: |
|
return pd.Series(0, index=data.index) |
|
|
|
@staticmethod |
|
def alpha_002(data: pd.DataFrame) -> pd.Series: |
|
"""Alpha#002: Volume-price correlation factor""" |
|
try: |
|
|
|
data_safe = data.copy() |
|
data_safe['volume'] = data_safe['volume'].replace(0, 1) |
|
data_safe['open'] = data_safe['open'].replace(0, data_safe['close']) |
|
|
|
log_volume_delta = np.log(data_safe['volume']).diff(2).fillna(0) |
|
price_change_ratio = ((data_safe['close'] - data_safe['open']) / data_safe['open']).fillna(0) |
|
|
|
rank1 = ClassicalAlphaFactors.safe_rank(log_volume_delta) |
|
rank2 = ClassicalAlphaFactors.safe_rank(price_change_ratio) |
|
|
|
correlation = rank1.rolling(6, min_periods=1).corr(rank2) |
|
return (-1 * correlation).fillna(0) |
|
except Exception as e: |
|
return pd.Series(0, index=data.index) |
|
|
|
@staticmethod |
|
def alpha_003(data: pd.DataFrame) -> pd.Series: |
|
"""Alpha#003: Open-volume correlation""" |
|
try: |
|
rank_open = ClassicalAlphaFactors.safe_rank(data['open']) |
|
rank_volume = ClassicalAlphaFactors.safe_rank(data['volume']) |
|
|
|
correlation = rank_open.rolling(10, min_periods=1).corr(rank_volume) |
|
return (-1 * correlation).fillna(0) |
|
except Exception as e: |
|
return pd.Series(0, index=data.index) |
|
|
|
@staticmethod |
|
def alpha_004(data: pd.DataFrame) -> pd.Series: |
|
"""Alpha#004: Low price time series rank""" |
|
try: |
|
rank_low = ClassicalAlphaFactors.safe_rank(data['low']) |
|
ts_rank = rank_low.rolling(9, min_periods=1).apply( |
|
lambda x: ClassicalAlphaFactors.safe_rank(pd.Series(x)).iloc[-1] if len(x) > 0 else 0.5 |
|
) |
|
return (-1 * ts_rank).fillna(0) |
|
except Exception as e: |
|
return pd.Series(0, index=data.index) |
|
|
|
@staticmethod |
|
def alpha_005(data: pd.DataFrame) -> pd.Series: |
|
"""Alpha#005: VWAP-based factor""" |
|
try: |
|
|
|
data_safe = data.copy() |
|
data_safe['volume'] = data_safe['volume'].replace(0, 1) |
|
|
|
vwap = (data_safe['close'] * data_safe['volume']).cumsum() / data_safe['volume'].cumsum() |
|
vwap_ma = ClassicalAlphaFactors.safe_rolling(vwap, 10, 'mean') |
|
|
|
rank1 = ClassicalAlphaFactors.safe_rank(data_safe['open'] - vwap_ma) |
|
rank2 = np.abs(ClassicalAlphaFactors.safe_rank(data_safe['close'] - vwap)) |
|
|
|
result = rank1 * (-1 * rank2) |
|
return result.fillna(0) |
|
except Exception as e: |
|
return pd.Series(0, index=data.index) |
|
|
|
@staticmethod |
|
def alpha_006(data: pd.DataFrame) -> pd.Series: |
|
"""Alpha#006: Open-volume correlation""" |
|
try: |
|
correlation = data['open'].rolling(10, min_periods=1).corr(data['volume']) |
|
return (-1 * correlation).fillna(0) |
|
except Exception as e: |
|
return pd.Series(0, index=data.index) |
|
|
|
@staticmethod |
|
def alpha_007(data: pd.DataFrame) -> pd.Series: |
|
"""Alpha#007: Volume-based momentum""" |
|
try: |
|
adv20 = ClassicalAlphaFactors.safe_rolling(data['volume'], 20, 'mean') |
|
condition = adv20 < data['volume'] |
|
|
|
close_delta = data['close'].diff(7).fillna(0) |
|
abs_delta = np.abs(close_delta) |
|
|
|
ts_rank = abs_delta.rolling(60, min_periods=1).apply( |
|
lambda x: ClassicalAlphaFactors.safe_rank(pd.Series(x)).iloc[-1] if len(x) > 0 else 0.5 |
|
) |
|
|
|
result = pd.Series( |
|
np.where(condition, -1 * ts_rank * np.sign(close_delta), -1), |
|
index=data.index |
|
) |
|
return result.fillna(0) |
|
except Exception as e: |
|
return pd.Series(0, index=data.index) |
|
|
|
@staticmethod |
|
def alpha_008(data: pd.DataFrame) -> pd.Series: |
|
"""Alpha#008: Open-return product factor""" |
|
try: |
|
returns = data['close'].pct_change().fillna(0) |
|
sum_open = ClassicalAlphaFactors.safe_rolling(data['open'], 5, 'sum') |
|
sum_returns = ClassicalAlphaFactors.safe_rolling(returns, 5, 'sum') |
|
|
|
product = sum_open * sum_returns |
|
delayed_product = product.shift(10).fillna(method='bfill') |
|
|
|
result = -1 * ClassicalAlphaFactors.safe_rank(product - delayed_product) |
|
return result.fillna(0) |
|
except Exception as e: |
|
return pd.Series(0, index=data.index) |
|
|
|
@staticmethod |
|
def alpha_009(data: pd.DataFrame) -> pd.Series: |
|
"""Alpha#009: Close delta conditional factor""" |
|
try: |
|
close_delta = data['close'].diff(1).fillna(0) |
|
ts_min = ClassicalAlphaFactors.safe_rolling(close_delta, 5, 'min') |
|
ts_max = ClassicalAlphaFactors.safe_rolling(close_delta, 5, 'max') |
|
|
|
condition1 = ts_min > 0 |
|
condition2 = ts_max < 0 |
|
|
|
result = pd.Series( |
|
np.where(condition1, close_delta, |
|
np.where(condition2, close_delta, -1 * close_delta)), |
|
index=data.index |
|
) |
|
return result.fillna(0) |
|
except Exception as e: |
|
return pd.Series(0, index=data.index) |
|
|
|
@staticmethod |
|
def alpha_010(data: pd.DataFrame) -> pd.Series: |
|
"""Alpha#010: Ranked version of alpha_009""" |
|
try: |
|
close_delta = data['close'].diff(1).fillna(0) |
|
ts_min = ClassicalAlphaFactors.safe_rolling(close_delta, 4, 'min') |
|
ts_max = ClassicalAlphaFactors.safe_rolling(close_delta, 4, 'max') |
|
|
|
condition1 = ts_min > 0 |
|
condition2 = ts_max < 0 |
|
|
|
raw_result = pd.Series( |
|
np.where(condition1, close_delta, |
|
np.where(condition2, close_delta, -1 * close_delta)), |
|
index=data.index |
|
) |
|
|
|
result = ClassicalAlphaFactors.safe_rank(raw_result) |
|
return result.fillna(0) |
|
except Exception as e: |
|
return pd.Series(0, index=data.index) |
|
|
|
@staticmethod |
|
def get_all_classical_factors() -> List[callable]: |
|
"""Return list of all classical alpha factor functions""" |
|
return [ |
|
ClassicalAlphaFactors.alpha_001, |
|
ClassicalAlphaFactors.alpha_002, |
|
ClassicalAlphaFactors.alpha_003, |
|
ClassicalAlphaFactors.alpha_004, |
|
ClassicalAlphaFactors.alpha_005, |
|
ClassicalAlphaFactors.alpha_006, |
|
ClassicalAlphaFactors.alpha_007, |
|
ClassicalAlphaFactors.alpha_008, |
|
ClassicalAlphaFactors.alpha_009, |
|
ClassicalAlphaFactors.alpha_010 |
|
] |
|
|
|
class LLMAlphaGenerator: |
|
"""Generate novel alpha factors using OpenAI's GPT models""" |
|
|
|
def __init__(self, api_key: str = None): |
|
self.api_key = api_key |
|
if self.api_key: |
|
openai.api_key = self.api_key |
|
|
|
self.operators = ['rank', 'ts_rank', 'ts_sum', 'ts_mean', 'ts_std', 'ts_max', 'ts_min', |
|
'correlation', 'covariance', 'delta', 'delay', 'log', 'sign', 'abs'] |
|
self.variables = ['open', 'high', 'low', 'close', 'volume', 'returns', 'vwap'] |
|
self.generated_factors = [] |
|
|
|
def generate_llm_factor(self, market_context: Dict[str, Any], category: str) -> Tuple[str, str]: |
|
"""Generate a novel alpha factor formula using OpenAI's GPT model""" |
|
|
|
|
|
if not self.api_key: |
|
return self._generate_fallback_factor(category) |
|
|
|
|
|
prompt = f"""You are a quantitative researcher creating novel alpha factors for trading. |
|
|
|
Market Context: |
|
- Current Regime: {market_context.get('regime', 'unknown')} |
|
- Average Volatility: {market_context.get('volatility', 0.02):.1%} |
|
- Trend Strength: {market_context.get('trend_strength', 0.5):.1%} |
|
|
|
Task: Generate a novel alpha factor formula for the '{category}' category. |
|
|
|
Available operators: {', '.join(self.operators)} |
|
Available variables: {', '.join(self.variables)} |
|
|
|
Requirements: |
|
1. The formula must be executable Python code using pandas operations |
|
2. Use time-series operators (ts_*) with appropriate lookback periods |
|
3. The factor should capture {category} characteristics |
|
4. Include rank transformations to make the factor cross-sectionally comparable |
|
5. The formula should be between 50-150 characters |
|
|
|
Examples of good alpha factors: |
|
- rank(ts_sum(returns, 20)) * rank(volume / ts_mean(volume, 20)) |
|
- -1 * correlation(rank(close), rank(volume), 10) |
|
- sign(returns) * ts_std(returns, 20) / ts_mean(abs(returns), 20) |
|
|
|
Generate ONE formula that captures {category} patterns. Return ONLY the formula, no explanation.""" |
|
|
|
try: |
|
|
|
response = openai.ChatCompletion.create( |
|
model="gpt-3.5-turbo", |
|
messages=[ |
|
{"role": "system", "content": "You are a quantitative finance expert specializing in alpha factor research."}, |
|
{"role": "user", "content": prompt} |
|
], |
|
temperature=0.7, |
|
max_tokens=150 |
|
) |
|
|
|
formula = response.choices[0].message.content.strip() |
|
|
|
|
|
if self.validate_formula(formula): |
|
name = f"LLM_{category}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
|
self.generated_factors.append({'name': name, 'formula': formula, 'category': category}) |
|
return name, formula |
|
else: |
|
return self._generate_fallback_factor(category) |
|
|
|
except Exception as e: |
|
print(f"LLM generation error: {e}") |
|
return self._generate_fallback_factor(category) |
|
|
|
def _generate_fallback_factor(self, category: str) -> Tuple[str, str]: |
|
"""Generate a fallback factor if LLM generation fails""" |
|
templates = { |
|
'momentum': "rank(ts_sum(returns, 20)) * rank(volume / ts_mean(volume, 20))", |
|
'mean_reversion': "-1 * (close - ts_mean(close, 20)) / ts_std(close, 20)", |
|
'volatility': "ts_std(returns, 20) / ts_mean(abs(returns), 20)", |
|
'microstructure': "(high - low) / (high + low) * rank(volume)", |
|
'price': "rank(close / ts_max(high, 20))", |
|
'volume': "rank(volume / ts_mean(volume, 50))", |
|
'fundamental': "rank(close * volume / ts_sum(volume, 10))", |
|
'alternative': "rank(ts_std(volume, 10) / ts_mean(volume, 30))" |
|
} |
|
|
|
formula = templates.get(category, templates['momentum']) |
|
name = f"Fallback_{category}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
|
return name, formula |
|
|
|
def validate_formula(self, formula: str) -> bool: |
|
"""Validate that a formula is syntactically correct and safe""" |
|
try: |
|
|
|
if formula.count('(') != formula.count(')'): |
|
return False |
|
|
|
|
|
dangerous_ops = ['eval', 'exec', 'import', '__', 'lambda', 'os', 'sys'] |
|
for op in dangerous_ops: |
|
if op in formula: |
|
return False |
|
|
|
|
|
has_operator = any(op in formula for op in self.operators) |
|
has_variable = any(var in formula for var in self.variables) |
|
|
|
return has_operator and has_variable |
|
|
|
except: |
|
return False |
|
|
|
def evaluate_formula(self, formula: str, data: pd.DataFrame) -> pd.Series: |
|
"""Safely evaluate a formula on market data""" |
|
try: |
|
|
|
safe_data = data.copy() |
|
safe_data['volume'] = safe_data['volume'].replace(0, 1) |
|
|
|
|
|
returns = safe_data['close'].pct_change().fillna(0) |
|
vwap = (safe_data['close'] * safe_data['volume']).cumsum() / safe_data['volume'].cumsum() |
|
vwap = vwap.fillna(safe_data['close']) |
|
adv20 = safe_data['volume'].rolling(20, min_periods=1).mean() |
|
|
|
|
|
context = { |
|
'open': safe_data['open'], |
|
'high': safe_data['high'], |
|
'low': safe_data['low'], |
|
'close': safe_data['close'], |
|
'volume': safe_data['volume'], |
|
'returns': returns, |
|
'vwap': vwap, |
|
'adv20': adv20 |
|
} |
|
|
|
|
|
def safe_rank(x): |
|
return x.rank(pct=True, na_option='keep').fillna(0.5) |
|
|
|
def safe_ts_rank(x, n): |
|
return x.rolling(n, min_periods=1).apply( |
|
lambda y: y.rank(pct=True).iloc[-1] if len(y) > 0 else 0.5 |
|
).fillna(0.5) |
|
|
|
def safe_ts_sum(x, n): |
|
return x.rolling(n, min_periods=1).sum().fillna(0) |
|
|
|
def safe_ts_mean(x, n): |
|
return x.rolling(n, min_periods=1).mean().fillna(x.fillna(0)) |
|
|
|
def safe_ts_std(x, n): |
|
result = x.rolling(n, min_periods=1).std() |
|
return result.fillna(0.001) |
|
|
|
def safe_ts_max(x, n): |
|
return x.rolling(n, min_periods=1).max().fillna(x.fillna(0)) |
|
|
|
def safe_ts_min(x, n): |
|
return x.rolling(n, min_periods=1).min().fillna(x.fillna(0)) |
|
|
|
def safe_correlation(x, y, n): |
|
return x.rolling(n, min_periods=1).corr(y).fillna(0) |
|
|
|
def safe_covariance(x, y, n): |
|
return x.rolling(n, min_periods=1).cov(y).fillna(0) |
|
|
|
def safe_delta(x, n): |
|
return x.diff(n).fillna(0) |
|
|
|
def safe_delay(x, n): |
|
return x.shift(n).fillna(method='bfill').fillna(0) |
|
|
|
def safe_log(x): |
|
return np.log(x.clip(lower=0.001)) |
|
|
|
def safe_sign(x): |
|
return np.sign(x).fillna(0) |
|
|
|
def safe_abs(x): |
|
return np.abs(x).fillna(0) |
|
|
|
|
|
safe_functions = { |
|
'rank': safe_rank, |
|
'ts_rank': safe_ts_rank, |
|
'ts_sum': safe_ts_sum, |
|
'ts_mean': safe_ts_mean, |
|
'ts_std': safe_ts_std, |
|
'ts_max': safe_ts_max, |
|
'ts_min': safe_ts_min, |
|
'correlation': safe_correlation, |
|
'covariance': safe_covariance, |
|
'delta': safe_delta, |
|
'delay': safe_delay, |
|
'log': safe_log, |
|
'sign': safe_sign, |
|
'abs': safe_abs, |
|
'np': np, |
|
'pd': pd |
|
} |
|
|
|
|
|
eval_namespace = {**context, **safe_functions} |
|
|
|
|
|
result = eval(formula, {"__builtins__": {}}, eval_namespace) |
|
|
|
|
|
if not isinstance(result, pd.Series): |
|
result = pd.Series(result, index=data.index) |
|
|
|
|
|
result = result.replace([np.inf, -np.inf], 0) |
|
result = result.fillna(0) |
|
|
|
return result |
|
|
|
except Exception as e: |
|
print(f"Error evaluating formula '{formula}': {e}") |
|
|
|
return pd.Series(0, index=data.index) |
|
|
|
class AlternativeDataPipeline: |
|
"""Extract sentiment scores from alternative data sources""" |
|
|
|
def __init__(self): |
|
|
|
try: |
|
self.sentiment_analyzer = pipeline( |
|
"sentiment-analysis", |
|
model="ProsusAI/finbert", |
|
device=-1 |
|
) |
|
except: |
|
|
|
self.sentiment_analyzer = None |
|
|
|
|
|
self.data_sources = { |
|
'earnings_calls': self._generate_earnings_call_snippets, |
|
'sec_filings': self._generate_sec_filing_snippets, |
|
'news': self._generate_news_snippets, |
|
'social_media': self._generate_social_media_snippets |
|
} |
|
|
|
def _generate_earnings_call_snippets(self) -> List[str]: |
|
"""Generate simulated earnings call transcripts""" |
|
positive_phrases = [ |
|
"We exceeded our revenue guidance for the quarter with strong performance across all segments", |
|
"Our strategic initiatives are yielding positive results with improved margins", |
|
"Customer acquisition costs have decreased while lifetime value continues to grow", |
|
"We're seeing strong demand for our products in emerging markets", |
|
"Our R&D investments are beginning to show promising returns" |
|
] |
|
|
|
negative_phrases = [ |
|
"We faced headwinds in our core markets due to increased competition", |
|
"Supply chain disruptions continue to impact our margins", |
|
"We're revising our guidance downward for the upcoming quarter", |
|
"Customer churn rates have increased beyond our expectations", |
|
"Regulatory challenges in key markets are affecting our expansion plans" |
|
] |
|
|
|
neutral_phrases = [ |
|
"We maintained our market position despite challenging conditions", |
|
"Our performance was in line with analyst expectations", |
|
"We continue to execute on our long-term strategic plan", |
|
"Market conditions remain mixed with both opportunities and challenges", |
|
"We're monitoring the situation closely and will adjust as needed" |
|
] |
|
|
|
|
|
market_sentiment = random.choice(['positive', 'negative', 'neutral']) |
|
|
|
if market_sentiment == 'positive': |
|
return random.sample(positive_phrases, min(3, len(positive_phrases))) + \ |
|
random.sample(neutral_phrases, min(1, len(neutral_phrases))) |
|
elif market_sentiment == 'negative': |
|
return random.sample(negative_phrases, min(3, len(negative_phrases))) + \ |
|
random.sample(neutral_phrases, min(1, len(neutral_phrases))) |
|
else: |
|
return random.sample(neutral_phrases, min(2, len(neutral_phrases))) + \ |
|
random.sample(positive_phrases, min(1, len(positive_phrases))) + \ |
|
random.sample(negative_phrases, min(1, len(negative_phrases))) |
|
|
|
def _generate_sec_filing_snippets(self) -> List[str]: |
|
"""Generate simulated SEC filing excerpts""" |
|
risk_factors = [ |
|
"The company faces increased cybersecurity risks that could materially affect operations", |
|
"Changes in interest rates may adversely impact our financial condition", |
|
"We depend on key personnel whose loss could harm our business", |
|
"Intense competition in our industry may result in reduced market share", |
|
"Economic uncertainty could reduce demand for our products and services" |
|
] |
|
|
|
positive_disclosures = [ |
|
"We have secured long-term contracts with several major customers", |
|
"Our patent portfolio provides strong competitive advantages", |
|
"Recent acquisitions are expected to be accretive to earnings", |
|
"We maintain a strong balance sheet with minimal debt", |
|
"Our diversified revenue streams provide resilience against market volatility" |
|
] |
|
|
|
return random.sample(risk_factors, min(2, len(risk_factors))) + \ |
|
random.sample(positive_disclosures, min(2, len(positive_disclosures))) |
|
|
|
def _generate_news_snippets(self) -> List[str]: |
|
"""Generate simulated financial news headlines""" |
|
headlines = [ |
|
"Company announces breakthrough technology in core product line", |
|
"Analysts upgrade stock following strong quarterly results", |
|
"New CEO brings fresh perspective and growth strategy", |
|
"Competitor's product recall may benefit company's market share", |
|
"Industry report shows growing demand for company's services", |
|
"Regulatory approval received for expansion into new markets", |
|
"Company faces lawsuit over alleged patent infringement", |
|
"Major customer switches to competitor's platform", |
|
"Economic indicators suggest challenging environment ahead" |
|
] |
|
|
|
return random.sample(headlines, min(5, len(headlines))) |
|
|
|
def _generate_social_media_snippets(self) -> List[str]: |
|
"""Generate simulated social media sentiment""" |
|
posts = [ |
|
"Love the new features in the latest product update! #innovation", |
|
"Customer service has really improved lately, impressed!", |
|
"Stock looking oversold here, might be a buying opportunity", |
|
"Disappointed with the recent earnings miss, concerning trend", |
|
"Management seems to be making all the right moves", |
|
"Product quality has declined, considering alternatives", |
|
"Excited about the company's expansion plans", |
|
"Valuation seems stretched at current levels" |
|
] |
|
|
|
return random.sample(posts, min(4, len(posts))) |
|
|
|
def analyze_sentiment(self, text: str) -> Dict[str, float]: |
|
"""Analyze sentiment of a single text""" |
|
if self.sentiment_analyzer is None: |
|
|
|
positive_words = ['strong', 'exceed', 'growth', 'positive', 'improve', 'breakthrough'] |
|
negative_words = ['decline', 'loss', 'risk', 'challenge', 'lawsuit', 'disappoint'] |
|
|
|
text_lower = text.lower() |
|
pos_count = sum(1 for word in positive_words if word in text_lower) |
|
neg_count = sum(1 for word in negative_words if word in text_lower) |
|
|
|
if pos_count > neg_count: |
|
return {'label': 'positive', 'score': 0.7} |
|
elif neg_count > pos_count: |
|
return {'label': 'negative', 'score': 0.7} |
|
else: |
|
return {'label': 'neutral', 'score': 0.5} |
|
|
|
try: |
|
result = self.sentiment_analyzer(text[:512])[0] |
|
return result |
|
except: |
|
return {'label': 'neutral', 'score': 0.5} |
|
|
|
def extract_sentiment_scores(self, source: str = 'all') -> Dict[str, Dict[str, float]]: |
|
"""Extract sentiment scores from specified data source""" |
|
sentiment_scores = {} |
|
|
|
if source == 'all': |
|
sources_to_analyze = self.data_sources.keys() |
|
else: |
|
sources_to_analyze = [source] if source in self.data_sources else [] |
|
|
|
for src in sources_to_analyze: |
|
snippets = self.data_sources[src]() |
|
|
|
|
|
positive_count = 0 |
|
negative_count = 0 |
|
total_score = 0 |
|
|
|
for snippet in snippets: |
|
try: |
|
result = self.analyze_sentiment(snippet) |
|
|
|
if result['label'] == 'positive': |
|
positive_count += 1 |
|
total_score += result['score'] |
|
elif result['label'] == 'negative': |
|
negative_count += 1 |
|
total_score -= result['score'] |
|
|
|
except: |
|
continue |
|
|
|
|
|
if len(snippets) > 0: |
|
sentiment_scores[src] = { |
|
'positive_ratio': positive_count / len(snippets), |
|
'negative_ratio': negative_count / len(snippets), |
|
'net_sentiment': total_score / len(snippets), |
|
'snippets_analyzed': len(snippets) |
|
} |
|
else: |
|
sentiment_scores[src] = { |
|
'positive_ratio': 0, |
|
'negative_ratio': 0, |
|
'net_sentiment': 0, |
|
'snippets_analyzed': 0 |
|
} |
|
|
|
return sentiment_scores |
|
|
|
def create_sentiment_alpha_factors(self, sentiment_scores: Dict[str, Dict[str, float]]) -> List[AlphaFactor]: |
|
"""Create alpha factors based on sentiment scores""" |
|
factors = [] |
|
|
|
|
|
if 'earnings_calls' in sentiment_scores: |
|
factor = AlphaFactor( |
|
name="sentiment_earnings_momentum", |
|
formula="earnings_sentiment * volume_ratio", |
|
category="alternative", |
|
lookback_period=20, |
|
metadata={'sentiment_data': sentiment_scores['earnings_calls']} |
|
) |
|
factors.append(factor) |
|
|
|
|
|
if 'news' in sentiment_scores: |
|
factor = AlphaFactor( |
|
name="sentiment_news_reversal", |
|
formula="-1 * news_sentiment * (close - ma20) / std20", |
|
category="alternative", |
|
lookback_period=20, |
|
metadata={'sentiment_data': sentiment_scores['news']} |
|
) |
|
factors.append(factor) |
|
|
|
|
|
if len(sentiment_scores) > 1: |
|
avg_sentiment = np.mean([s['net_sentiment'] for s in sentiment_scores.values()]) |
|
|
|
factor = AlphaFactor( |
|
name="sentiment_composite", |
|
formula="composite_sentiment * rank(volume)", |
|
category="alternative", |
|
lookback_period=10, |
|
metadata={ |
|
'avg_sentiment': avg_sentiment, |
|
'sources': list(sentiment_scores.keys()) |
|
} |
|
) |
|
factors.append(factor) |
|
|
|
return factors |
|
|
|
class MarketRegimeDetector: |
|
"""Detect market regimes using statistical methods""" |
|
|
|
def __init__(self): |
|
self.regime_history = [] |
|
self.current_regime = None |
|
|
|
def detect_regime(self, data: pd.DataFrame, lookback: int = 60) -> MarketRegime: |
|
"""Detect current market regime""" |
|
|
|
|
|
if len(data) < 20: |
|
return MarketRegime( |
|
regime_type='volatile', |
|
confidence=0.5, |
|
characteristics={ |
|
'trend_strength': 0, |
|
'volatility': 0.02, |
|
'hurst_exponent': 0.5, |
|
'volume_trend': 0, |
|
'avg_return': 0 |
|
}, |
|
start_date=data.index[0] if len(data) > 0 else datetime.now() |
|
) |
|
|
|
if len(data) < lookback: |
|
lookback = len(data) |
|
|
|
|
|
returns = data['close'].pct_change().fillna(0) |
|
recent_returns = returns.iloc[-lookback:] |
|
|
|
|
|
trend_strength = self._calculate_trend_strength(data['close'].iloc[-lookback:]) |
|
|
|
|
|
volatility = recent_returns.std() * np.sqrt(252) |
|
|
|
|
|
hurst_exponent = self._calculate_hurst_exponent(data['close'].iloc[-lookback:]) |
|
|
|
|
|
volume_data = data['volume'].iloc[-lookback:].fillna(0) |
|
if len(volume_data) > 1: |
|
try: |
|
volume_trend = np.polyfit(range(len(volume_data)), volume_data, 1)[0] |
|
except: |
|
volume_trend = 0 |
|
else: |
|
volume_trend = 0 |
|
|
|
|
|
avg_return = recent_returns.mean() |
|
|
|
if trend_strength > 0.6 and avg_return > 0.001: |
|
regime_type = 'trending_up' |
|
elif trend_strength > 0.6 and avg_return < -0.001: |
|
regime_type = 'trending_down' |
|
elif hurst_exponent < 0.45: |
|
regime_type = 'mean_reverting' |
|
else: |
|
regime_type = 'volatile' |
|
|
|
|
|
confidence = self._calculate_regime_confidence( |
|
trend_strength, volatility, hurst_exponent |
|
) |
|
|
|
regime = MarketRegime( |
|
regime_type=regime_type, |
|
confidence=confidence, |
|
characteristics={ |
|
'trend_strength': trend_strength, |
|
'volatility': volatility, |
|
'hurst_exponent': hurst_exponent, |
|
'volume_trend': volume_trend, |
|
'avg_return': avg_return |
|
}, |
|
start_date=data.index[-lookback] if lookback <= len(data) else data.index[0] |
|
) |
|
|
|
self.current_regime = regime |
|
return regime |
|
|
|
def _calculate_trend_strength(self, prices: pd.Series) -> float: |
|
"""Calculate trend strength using R-squared of linear regression""" |
|
try: |
|
if len(prices) < 2: |
|
return 0 |
|
|
|
x = np.arange(len(prices)) |
|
y = prices.values |
|
|
|
|
|
mask = ~np.isnan(y) |
|
if mask.sum() < 2: |
|
return 0 |
|
|
|
x = x[mask] |
|
y = y[mask] |
|
|
|
|
|
x_std = x.std() |
|
y_std = y.std() |
|
|
|
if x_std == 0 or y_std == 0: |
|
return 0 |
|
|
|
x = (x - x.mean()) / x_std |
|
y = (y - y.mean()) / y_std |
|
|
|
|
|
slope, intercept = np.polyfit(x, y, 1) |
|
y_pred = slope * x + intercept |
|
|
|
|
|
ss_res = np.sum((y - y_pred) ** 2) |
|
ss_tot = np.sum((y - y.mean()) ** 2) |
|
|
|
if ss_tot == 0: |
|
return 0 |
|
|
|
r_squared = 1 - (ss_res / ss_tot) |
|
return abs(r_squared) |
|
|
|
except: |
|
return 0 |
|
|
|
def _calculate_hurst_exponent(self, prices: pd.Series) -> float: |
|
"""Calculate Hurst exponent for mean reversion detection""" |
|
try: |
|
if len(prices) < 20: |
|
return 0.5 |
|
|
|
|
|
max_lag = min(20, len(prices) // 2) |
|
lags = range(2, max_lag) |
|
|
|
|
|
rs_values = [] |
|
|
|
for lag in lags: |
|
|
|
returns = prices.pct_change(lag).dropna() |
|
|
|
if len(returns) < 2: |
|
continue |
|
|
|
|
|
mean_returns = returns.mean() |
|
adjusted = returns - mean_returns |
|
|
|
|
|
cumsum = adjusted.cumsum() |
|
|
|
|
|
R = cumsum.max() - cumsum.min() |
|
|
|
|
|
S = returns.std() |
|
|
|
if S > 0 and R > 0: |
|
rs_values.append(R / S) |
|
|
|
if len(rs_values) >= 2: |
|
|
|
valid_lags = list(lags[:len(rs_values)]) |
|
log_lags = np.log(valid_lags) |
|
log_rs = np.log(rs_values) |
|
|
|
|
|
mask = np.isfinite(log_lags) & np.isfinite(log_rs) |
|
if mask.sum() >= 2: |
|
hurst, _ = np.polyfit(log_lags[mask], log_rs[mask], 1) |
|
return max(0, min(1, hurst)) |
|
|
|
return 0.5 |
|
|
|
except: |
|
return 0.5 |
|
|
|
def _calculate_regime_confidence(self, trend_strength: float, |
|
volatility: float, hurst: float) -> float: |
|
"""Calculate confidence in regime classification""" |
|
|
|
confidence = 0.5 |
|
|
|
|
|
if trend_strength > 0.7: |
|
confidence += 0.2 |
|
elif trend_strength > 0.5: |
|
confidence += 0.1 |
|
|
|
|
|
if abs(hurst - 0.5) > 0.2: |
|
confidence += 0.15 |
|
elif abs(hurst - 0.5) > 0.1: |
|
confidence += 0.075 |
|
|
|
|
|
if 0.1 < volatility < 0.4: |
|
confidence += 0.15 |
|
elif 0.05 < volatility < 0.5: |
|
confidence += 0.075 |
|
|
|
return min(confidence, 1.0) |
|
|
|
class FactorEvaluator: |
|
"""Evaluate alpha factors using various metrics""" |
|
|
|
def __init__(self): |
|
self.evaluation_history = defaultdict(list) |
|
|
|
def calculate_information_coefficient(self, factor_values: pd.Series, |
|
forward_returns: pd.Series) -> float: |
|
"""Calculate Information Coefficient (IC)""" |
|
try: |
|
|
|
mask = factor_values.notna() & forward_returns.notna() |
|
clean_factor = factor_values[mask] |
|
clean_returns = forward_returns[mask] |
|
|
|
if len(clean_factor) < 20: |
|
return 0.0 |
|
|
|
|
|
if clean_factor.std() == 0 or clean_returns.std() == 0: |
|
return 0.0 |
|
|
|
|
|
ic = stats.spearmanr(clean_factor, clean_returns)[0] |
|
|
|
return ic if not np.isnan(ic) else 0.0 |
|
|
|
except: |
|
return 0.0 |
|
|
|
def calculate_factor_turnover(self, factor_values: pd.Series, |
|
rebalance_freq: int = 20) -> float: |
|
"""Calculate factor turnover""" |
|
try: |
|
if len(factor_values) < rebalance_freq * 2: |
|
return 0.0 |
|
|
|
|
|
ranks = factor_values.rank(pct=True, na_option='keep').fillna(0.5) |
|
|
|
|
|
long_positions = ranks > 0.8 |
|
short_positions = ranks < 0.2 |
|
|
|
|
|
turnover_rates = [] |
|
|
|
for i in range(rebalance_freq, len(ranks), rebalance_freq): |
|
prev_long = long_positions.iloc[i-rebalance_freq] |
|
curr_long = long_positions.iloc[i] |
|
|
|
prev_short = short_positions.iloc[i-rebalance_freq] |
|
curr_short = short_positions.iloc[i] |
|
|
|
|
|
long_turnover = (prev_long != curr_long).mean() |
|
short_turnover = (prev_short != curr_short).mean() |
|
|
|
turnover_rates.append((long_turnover + short_turnover) / 2) |
|
|
|
return np.mean(turnover_rates) if turnover_rates else 0.0 |
|
|
|
except: |
|
return 0.0 |
|
|
|
def calculate_factor_decay(self, factor: AlphaFactor, |
|
market_data: pd.DataFrame, |
|
max_lag: int = 20) -> Dict[int, float]: |
|
"""Calculate IC decay over different prediction horizons""" |
|
ic_by_lag = {} |
|
|
|
try: |
|
|
|
factor_values = self._get_factor_values(factor, market_data) |
|
|
|
|
|
for lag in range(1, min(max_lag + 1, len(market_data) - 1)): |
|
forward_returns = market_data['close'].pct_change(lag).shift(-lag) |
|
ic = self.calculate_information_coefficient(factor_values, forward_returns) |
|
ic_by_lag[lag] = ic |
|
|
|
except: |
|
|
|
for lag in range(1, max_lag + 1): |
|
ic_by_lag[lag] = 0.0 |
|
|
|
return ic_by_lag |
|
|
|
def _get_factor_values(self, factor: AlphaFactor, market_data: pd.DataFrame) -> pd.Series: |
|
"""Get factor values from formula or function""" |
|
try: |
|
if isinstance(factor.formula, str): |
|
if 'sentiment' in factor.name: |
|
|
|
if 'sentiment_data' in factor.metadata: |
|
sentiment = factor.metadata['sentiment_data'].get('net_sentiment', 0) |
|
|
|
base_values = market_data['volume'] / market_data['volume'].rolling(20, min_periods=1).mean() |
|
factor_values = base_values * (1 + sentiment) |
|
else: |
|
|
|
factor_values = pd.Series( |
|
np.random.normal(0, 0.1, len(market_data)), |
|
index=market_data.index |
|
).cumsum() * 0.01 |
|
else: |
|
|
|
llm_gen = LLMAlphaGenerator() |
|
factor_values = llm_gen.evaluate_formula(factor.formula, market_data) |
|
else: |
|
|
|
factor_values = factor.formula(market_data) |
|
|
|
|
|
factor_values = factor_values.replace([np.inf, -np.inf], np.nan) |
|
factor_values = factor_values.fillna(0) |
|
|
|
return factor_values |
|
|
|
except: |
|
|
|
return pd.Series(0, index=market_data.index) |
|
|
|
def evaluate_factor_performance(self, factor: AlphaFactor, |
|
market_data: pd.DataFrame, |
|
regime: Optional[MarketRegime] = None) -> Dict[str, float]: |
|
"""Comprehensive factor performance evaluation""" |
|
try: |
|
|
|
factor_values = self._get_factor_values(factor, market_data) |
|
|
|
|
|
forward_returns = market_data['close'].pct_change().shift(-1) |
|
|
|
|
|
ic = self.calculate_information_coefficient(factor_values, forward_returns) |
|
turnover = self.calculate_factor_turnover(factor_values) |
|
|
|
|
|
factor_portfolio_returns = self._calculate_factor_portfolio_returns( |
|
factor_values, forward_returns |
|
) |
|
sharpe = self._calculate_sharpe_ratio(factor_portfolio_returns) |
|
|
|
|
|
max_dd = self._calculate_max_drawdown(factor_portfolio_returns) |
|
|
|
|
|
hit_rate = (factor_portfolio_returns > 0).mean() if len(factor_portfolio_returns) > 0 else 0.5 |
|
|
|
metrics = { |
|
'ic': ic, |
|
'turnover': turnover, |
|
'sharpe_ratio': sharpe, |
|
'max_drawdown': max_dd, |
|
'hit_rate': hit_rate |
|
} |
|
|
|
|
|
self.evaluation_history[factor.name].append({ |
|
'timestamp': datetime.now(), |
|
'metrics': metrics, |
|
'regime': regime.regime_type if regime else 'unknown' |
|
}) |
|
|
|
return metrics |
|
|
|
except: |
|
|
|
return { |
|
'ic': 0.0, |
|
'turnover': 0.5, |
|
'sharpe_ratio': 0.0, |
|
'max_drawdown': 0.1, |
|
'hit_rate': 0.5 |
|
} |
|
|
|
def _calculate_factor_portfolio_returns(self, factor_values: pd.Series, |
|
forward_returns: pd.Series) -> pd.Series: |
|
"""Calculate returns of long-short portfolio based on factor""" |
|
try: |
|
|
|
ranks = factor_values.rank(pct=True, na_option='keep').fillna(0.5) |
|
|
|
|
|
long_weight = (ranks > 0.8).astype(float) |
|
short_weight = (ranks < 0.2).astype(float) |
|
|
|
|
|
long_sum = long_weight.sum() |
|
short_sum = short_weight.sum() |
|
|
|
if long_sum > 0: |
|
long_weight = long_weight / long_sum |
|
if short_sum > 0: |
|
short_weight = short_weight / short_sum |
|
|
|
|
|
portfolio_returns = (long_weight - short_weight) * forward_returns |
|
portfolio_returns = portfolio_returns.fillna(0) |
|
|
|
return portfolio_returns |
|
|
|
except: |
|
return pd.Series(0, index=forward_returns.index) |
|
|
|
def _calculate_sharpe_ratio(self, returns: pd.Series) -> float: |
|
"""Calculate Sharpe ratio""" |
|
try: |
|
if len(returns) < 2: |
|
return 0.0 |
|
|
|
clean_returns = returns.dropna() |
|
if len(clean_returns) < 2: |
|
return 0.0 |
|
|
|
excess_returns = clean_returns - RISK_FREE_RATE / 252 |
|
|
|
if clean_returns.std() > 0: |
|
return np.sqrt(252) * excess_returns.mean() / clean_returns.std() |
|
else: |
|
return 0.0 |
|
|
|
except: |
|
return 0.0 |
|
|
|
def _calculate_max_drawdown(self, returns: pd.Series) -> float: |
|
"""Calculate maximum drawdown""" |
|
try: |
|
if len(returns) < 2: |
|
return 0.0 |
|
|
|
|
|
cum_returns = (1 + returns.fillna(0)).cumprod() |
|
|
|
|
|
running_max = cum_returns.expanding().max() |
|
|
|
|
|
drawdown = (cum_returns - running_max) / running_max |
|
|
|
|
|
return abs(drawdown.min()) if len(drawdown) > 0 else 0.0 |
|
|
|
except: |
|
return 0.0 |
|
|
|
class HierarchicalRiskParity: |
|
"""Hierarchical Risk Parity portfolio construction""" |
|
|
|
def __init__(self): |
|
self.linkage_method = 'single' |
|
self.distance_metric = 'euclidean' |
|
|
|
def calculate_weights(self, returns: pd.DataFrame, |
|
factor_scores: pd.DataFrame) -> pd.Series: |
|
"""Calculate HRP weights for factors""" |
|
|
|
|
|
if returns.empty or len(returns.columns) == 0: |
|
return pd.Series() |
|
|
|
if len(returns.columns) == 1: |
|
return pd.Series(1.0, index=returns.columns) |
|
|
|
try: |
|
|
|
corr_matrix = returns.corr() |
|
|
|
|
|
corr_matrix = corr_matrix.fillna(0) |
|
|
|
|
|
np.fill_diagonal(corr_matrix.values, 1) |
|
|
|
|
|
dist_matrix = np.sqrt(2 * (1 - corr_matrix)) |
|
|
|
|
|
condensed_dist = dist_matrix[np.triu_indices(len(dist_matrix), k=1)] |
|
linkage_matrix = self._tree_clustering(condensed_dist) |
|
|
|
|
|
quasi_diag = self._get_quasi_diag(linkage_matrix) |
|
|
|
|
|
weights = self._get_recursive_bisection( |
|
returns.cov().fillna(0), |
|
quasi_diag |
|
) |
|
|
|
return pd.Series(weights, index=returns.columns) |
|
|
|
except: |
|
|
|
return pd.Series(1.0 / len(returns.columns), index=returns.columns) |
|
|
|
def _tree_clustering(self, dist_matrix: np.ndarray) -> np.ndarray: |
|
"""Perform hierarchical clustering""" |
|
try: |
|
from scipy.cluster.hierarchy import linkage |
|
return linkage(dist_matrix, method=self.linkage_method) |
|
except: |
|
|
|
n = int((1 + np.sqrt(1 + 8 * len(dist_matrix))) / 2) |
|
return np.zeros((n-1, 4)) |
|
|
|
def _get_quasi_diag(self, linkage_matrix: np.ndarray) -> List[int]: |
|
"""Get quasi-diagonal matrix ordering""" |
|
try: |
|
from scipy.cluster.hierarchy import dendrogram |
|
|
|
|
|
dendro = dendrogram(linkage_matrix, no_plot=True) |
|
|
|
|
|
return dendro['leaves'] |
|
except: |
|
|
|
n = linkage_matrix.shape[0] + 1 |
|
return list(range(n)) |
|
|
|
def _get_recursive_bisection(self, cov: pd.DataFrame, |
|
sort_idx: List[int]) -> np.ndarray: |
|
"""Recursive bisection for weight calculation""" |
|
try: |
|
|
|
weights = pd.Series(1, index=cov.index) |
|
|
|
|
|
items = [sort_idx] |
|
|
|
while len(items) > 0: |
|
|
|
item = items.pop() |
|
|
|
if len(item) > 1: |
|
|
|
n = len(item) // 2 |
|
left = item[:n] |
|
right = item[n:] |
|
|
|
|
|
var_left = self._get_cluster_var(cov, left) |
|
var_right = self._get_cluster_var(cov, right) |
|
|
|
|
|
total_var = var_left + var_right |
|
if total_var > 0: |
|
alpha = var_right / total_var |
|
else: |
|
alpha = 0.5 |
|
|
|
|
|
weights.iloc[left] *= alpha |
|
weights.iloc[right] *= (1 - alpha) |
|
|
|
|
|
items.extend([left, right]) |
|
|
|
|
|
return weights.values / (weights.sum() + 1e-8) |
|
|
|
except: |
|
|
|
return np.ones(len(cov)) / len(cov) |
|
|
|
def _get_cluster_var(self, cov: pd.DataFrame, items: List[int]) -> float: |
|
"""Calculate cluster variance""" |
|
try: |
|
if len(items) == 0: |
|
return 0 |
|
elif len(items) == 1: |
|
return cov.iloc[items[0], items[0]] |
|
else: |
|
|
|
cluster_cov = cov.iloc[items, items] |
|
weights = pd.Series(1 / len(items), index=cluster_cov.index) |
|
|
|
return weights @ cluster_cov @ weights |
|
except: |
|
return 1.0 |
|
|
|
class RegimeAwarePortfolioOptimizer: |
|
"""Portfolio optimizer that adapts to market regimes""" |
|
|
|
def __init__(self): |
|
self.hrp = HierarchicalRiskParity() |
|
self.regime_weights = { |
|
'trending_up': {'momentum': 0.6, 'mean_reversion': 0.1, |
|
'volatility': 0.1, 'alternative': 0.2}, |
|
'trending_down': {'momentum': 0.2, 'mean_reversion': 0.3, |
|
'volatility': 0.3, 'alternative': 0.2}, |
|
'mean_reverting': {'momentum': 0.1, 'mean_reversion': 0.6, |
|
'volatility': 0.1, 'alternative': 0.2}, |
|
'volatile': {'momentum': 0.2, 'mean_reversion': 0.2, |
|
'volatility': 0.4, 'alternative': 0.2} |
|
} |
|
|
|
def optimize_portfolio(self, factors: List[AlphaFactor], |
|
factor_returns: pd.DataFrame, |
|
regime: MarketRegime) -> Dict[str, float]: |
|
"""Optimize portfolio weights based on regime""" |
|
|
|
|
|
if not factors or factor_returns.empty: |
|
return {} |
|
|
|
|
|
category_weights = self.regime_weights.get( |
|
regime.regime_type, |
|
self.regime_weights['volatile'] |
|
) |
|
|
|
|
|
factors_by_category = defaultdict(list) |
|
for factor in factors: |
|
category = factor.category if factor.category in category_weights else 'alternative' |
|
factors_by_category[category].append(factor) |
|
|
|
|
|
final_weights = {} |
|
|
|
for category, cat_factors in factors_by_category.items(): |
|
if not cat_factors: |
|
continue |
|
|
|
|
|
cat_factor_names = [f.name for f in cat_factors] |
|
available_factors = [name for name in cat_factor_names if name in factor_returns.columns] |
|
|
|
if not available_factors: |
|
continue |
|
|
|
cat_returns = factor_returns[available_factors] |
|
|
|
if len(cat_returns.columns) == 1: |
|
|
|
within_cat_weights = pd.Series(1.0, index=cat_returns.columns) |
|
else: |
|
|
|
within_cat_weights = self.hrp.calculate_weights( |
|
cat_returns, |
|
pd.DataFrame() |
|
) |
|
|
|
|
|
cat_weight = category_weights.get(category, 0.1) |
|
|
|
for factor_name, weight in within_cat_weights.items(): |
|
final_weights[factor_name] = weight * cat_weight |
|
|
|
|
|
total_weight = sum(final_weights.values()) |
|
if total_weight > 0: |
|
final_weights = {k: v/total_weight for k, v in final_weights.items()} |
|
|
|
return final_weights |
|
|
|
class AlphaMiningPlatform: |
|
"""Main platform for alpha factor discovery and evaluation""" |
|
|
|
def __init__(self, openai_api_key: str = None): |
|
|
|
self.llm_generator = LLMAlphaGenerator(api_key=openai_api_key) |
|
self.alt_data_pipeline = AlternativeDataPipeline() |
|
self.regime_detector = MarketRegimeDetector() |
|
self.factor_evaluator = FactorEvaluator() |
|
self.portfolio_optimizer = RegimeAwarePortfolioOptimizer() |
|
|
|
|
|
self.discovered_factors = [] |
|
self.active_factors = [] |
|
self.factor_performance_history = defaultdict(list) |
|
|
|
|
|
self.current_weights = {} |
|
self.portfolio_value = 100000 |
|
self.portfolio_history = [] |
|
|
|
|
|
self.factor_values_cache = {} |
|
|
|
def discover_factors(self, market_data: pd.DataFrame, |
|
n_factors: int = 20) -> List[AlphaFactor]: |
|
"""Discover new alpha factors using multiple methods""" |
|
|
|
discovered = [] |
|
|
|
|
|
current_regime = self.regime_detector.detect_regime(market_data) |
|
market_context = { |
|
'regime': current_regime.regime_type, |
|
'volatility': current_regime.characteristics['volatility'], |
|
'trend_strength': current_regime.characteristics['trend_strength'] |
|
} |
|
|
|
|
|
classical_funcs = ClassicalAlphaFactors.get_all_classical_factors() |
|
for i, func in enumerate(classical_funcs[:n_factors//2]): |
|
factor = AlphaFactor( |
|
name=f"classical_{func.__name__}", |
|
formula=func, |
|
category="price", |
|
lookback_period=20 |
|
) |
|
discovered.append(factor) |
|
|
|
|
|
categories = ['momentum', 'mean_reversion', 'volatility', 'microstructure'] |
|
for i in range(n_factors//3): |
|
category = categories[i % len(categories)] |
|
name, formula = self.llm_generator.generate_llm_factor( |
|
market_context=market_context, |
|
category=category |
|
) |
|
|
|
factor = AlphaFactor( |
|
name=name, |
|
formula=formula, |
|
category=category, |
|
lookback_period=random.choice([10, 20, 30, 60]) |
|
) |
|
discovered.append(factor) |
|
|
|
|
|
sentiment_scores = self.alt_data_pipeline.extract_sentiment_scores() |
|
sentiment_factors = self.alt_data_pipeline.create_sentiment_alpha_factors( |
|
sentiment_scores |
|
) |
|
discovered.extend(sentiment_factors[:n_factors//6]) |
|
|
|
return discovered |
|
|
|
def evaluate_factors(self, factors: List[AlphaFactor], |
|
market_data: pd.DataFrame) -> pd.DataFrame: |
|
"""Evaluate all factors and return performance metrics""" |
|
|
|
|
|
regime = self.regime_detector.detect_regime(market_data) |
|
|
|
evaluation_results = [] |
|
|
|
|
|
self.factor_values_cache = {} |
|
|
|
for factor in factors: |
|
|
|
metrics = self.factor_evaluator.evaluate_factor_performance( |
|
factor, market_data, regime |
|
) |
|
|
|
|
|
factor.ic_score = metrics['ic'] |
|
factor.sharpe_ratio = metrics['sharpe_ratio'] |
|
factor.turnover = metrics['turnover'] |
|
|
|
|
|
decay_profile = self.factor_evaluator.calculate_factor_decay( |
|
factor, market_data |
|
) |
|
|
|
|
|
if len(decay_profile) > 1: |
|
decay_values = list(decay_profile.values()) |
|
factor.decay_rate = (decay_values[0] - decay_values[-1]) / len(decay_values) |
|
|
|
|
|
factor.regime_performance[regime.regime_type] = metrics['ic'] |
|
|
|
|
|
self.factor_values_cache[factor.name] = self.factor_evaluator._get_factor_values(factor, market_data) |
|
|
|
evaluation_results.append({ |
|
'name': factor.name, |
|
'category': factor.category, |
|
'ic': metrics['ic'], |
|
'sharpe': metrics['sharpe_ratio'], |
|
'turnover': metrics['turnover'], |
|
'max_dd': metrics['max_drawdown'], |
|
'regime': regime.regime_type, |
|
'decay_rate': factor.decay_rate |
|
}) |
|
|
|
return pd.DataFrame(evaluation_results) |
|
|
|
def select_active_factors(self, factors: List[AlphaFactor], |
|
min_ic: float = MIN_FACTOR_IC, |
|
max_correlation: float = MAX_FACTOR_CORRELATION) -> List[AlphaFactor]: |
|
"""Select factors for active trading""" |
|
|
|
|
|
qualified_factors = [f for f in factors if abs(f.ic_score) > min_ic] |
|
|
|
if not qualified_factors: |
|
return [] |
|
|
|
|
|
qualified_factors.sort(key=lambda x: abs(x.ic_score), reverse=True) |
|
|
|
|
|
selected = [qualified_factors[0]] |
|
|
|
for factor in qualified_factors[1:]: |
|
|
|
correlated = False |
|
|
|
|
|
if factor.name in self.factor_values_cache: |
|
for selected_factor in selected: |
|
if selected_factor.name in self.factor_values_cache: |
|
corr = self.factor_values_cache[factor.name].corr( |
|
self.factor_values_cache[selected_factor.name] |
|
) |
|
if abs(corr) > max_correlation: |
|
correlated = True |
|
break |
|
else: |
|
|
|
for selected_factor in selected: |
|
if factor.category == selected_factor.category: |
|
correlated = True |
|
break |
|
|
|
if not correlated: |
|
selected.append(factor) |
|
|
|
if len(selected) >= 10: |
|
break |
|
|
|
return selected |
|
|
|
def construct_portfolio(self, market_data: pd.DataFrame) -> Dict[str, Any]: |
|
"""Construct portfolio based on active factors""" |
|
|
|
|
|
regime = self.regime_detector.detect_regime(market_data) |
|
|
|
|
|
factor_returns = pd.DataFrame() |
|
|
|
for factor in self.active_factors: |
|
|
|
if factor.name in self.factor_values_cache: |
|
factor_values = self.factor_values_cache[factor.name] |
|
|
|
ranks = factor_values.rank(pct=True, na_option='keep').fillna(0.5) |
|
long_weight = (ranks > 0.8).astype(float) |
|
short_weight = (ranks < 0.2).astype(float) |
|
|
|
|
|
long_sum = long_weight.sum() |
|
short_sum = short_weight.sum() |
|
|
|
if long_sum > 0: |
|
long_weight = long_weight / long_sum |
|
if short_sum > 0: |
|
short_weight = short_weight / short_sum |
|
|
|
|
|
market_returns = market_data['close'].pct_change() |
|
|
|
|
|
factor_return = (long_weight - short_weight) * market_returns |
|
factor_returns[factor.name] = factor_return |
|
|
|
|
|
if not factor_returns.empty and len(factor_returns) > 252: |
|
weights = self.portfolio_optimizer.optimize_portfolio( |
|
self.active_factors, |
|
factor_returns.iloc[-252:], |
|
regime |
|
) |
|
else: |
|
|
|
weights = {f.name: 1.0/len(self.active_factors) for f in self.active_factors} |
|
|
|
self.current_weights = weights |
|
|
|
|
|
category_counts = defaultdict(int) |
|
for f in self.active_factors: |
|
category_counts[f.category] += 1 |
|
|
|
return { |
|
'weights': weights, |
|
'regime': regime.regime_type, |
|
'n_factors': len(weights), |
|
'categories': dict(category_counts) |
|
} |
|
|
|
def backtest_portfolio(self, market_data: pd.DataFrame, |
|
initial_capital: float, |
|
rebalance_freq: int) -> Tuple[pd.DataFrame, Dict[str, float]]: |
|
"""Run realistic portfolio backtest""" |
|
|
|
portfolio_value = initial_capital |
|
portfolio_history = [] |
|
positions = {} |
|
|
|
|
|
start_idx = min(100, len(market_data) // 3) |
|
|
|
for i in range(start_idx, len(market_data), 1): |
|
current_date = market_data.index[i] |
|
|
|
|
|
if i % rebalance_freq == 0 or i == start_idx: |
|
|
|
current_data = market_data.iloc[:i] |
|
|
|
|
|
portfolio_info = self.construct_portfolio(current_data) |
|
|
|
|
|
new_positions = {} |
|
for factor_name, weight in portfolio_info['weights'].items(): |
|
new_positions[factor_name] = portfolio_value * weight |
|
|
|
|
|
transaction_cost = 0 |
|
for factor_name in set(list(positions.keys()) + list(new_positions.keys())): |
|
old_value = positions.get(factor_name, 0) |
|
new_value = new_positions.get(factor_name, 0) |
|
transaction_cost += abs(new_value - old_value) * TRANSACTION_COST |
|
|
|
portfolio_value -= transaction_cost |
|
positions = new_positions |
|
|
|
|
|
daily_pnl = 0 |
|
|
|
for factor_name, position_value in positions.items(): |
|
|
|
factor = next((f for f in self.active_factors if f.name == factor_name), None) |
|
|
|
if factor and factor_name in self.factor_values_cache: |
|
|
|
factor_values = self.factor_values_cache[factor_name] |
|
|
|
if i < len(factor_values): |
|
|
|
ranks = factor_values.iloc[:i].rank(pct=True, na_option='keep').fillna(0.5) |
|
if len(ranks) > 0: |
|
current_rank = ranks.iloc[-1] |
|
|
|
|
|
if current_rank > 0.8: |
|
position_direction = 1 |
|
elif current_rank < 0.2: |
|
position_direction = -1 |
|
else: |
|
position_direction = 0 |
|
|
|
|
|
if i > 0: |
|
market_return = (market_data['close'].iloc[i] - market_data['close'].iloc[i-1]) / market_data['close'].iloc[i-1] |
|
else: |
|
market_return = 0 |
|
|
|
|
|
factor_pnl = position_value * position_direction * market_return |
|
daily_pnl += factor_pnl |
|
|
|
|
|
portfolio_value += daily_pnl |
|
|
|
|
|
portfolio_history.append({ |
|
'date': current_date, |
|
'value': portfolio_value, |
|
'pnl': daily_pnl, |
|
'positions': positions.copy() |
|
}) |
|
|
|
|
|
history_df = pd.DataFrame(portfolio_history) |
|
|
|
if history_df.empty: |
|
return history_df, { |
|
'total_return': 0.0, |
|
'annual_return': 0.0, |
|
'sharpe_ratio': 0.0, |
|
'max_drawdown': 0.0, |
|
'win_rate': 0.5 |
|
} |
|
|
|
|
|
returns = history_df['pnl'] / history_df['value'].shift(1) |
|
returns = returns.fillna(0) |
|
|
|
total_return = (portfolio_value - initial_capital) / initial_capital |
|
annual_return = (portfolio_value / initial_capital) ** (252 / len(history_df)) - 1 if len(history_df) > 0 else 0 |
|
|
|
if returns.std() > 0: |
|
sharpe = np.sqrt(252) * returns.mean() / returns.std() |
|
else: |
|
sharpe = 0 |
|
|
|
|
|
cum_returns = (1 + returns).cumprod() |
|
running_max = cum_returns.expanding().max() |
|
drawdown = (running_max - cum_returns) / running_max |
|
max_drawdown = drawdown.max() |
|
|
|
metrics = { |
|
'total_return': total_return, |
|
'annual_return': annual_return, |
|
'sharpe_ratio': sharpe, |
|
'max_drawdown': max_drawdown, |
|
'win_rate': (returns > 0).mean() |
|
} |
|
|
|
return history_df, metrics |
|
|
|
def calculate_information_coefficient_decay(self, |
|
factor: AlphaFactor, |
|
market_data: pd.DataFrame) -> pd.DataFrame: |
|
"""Calculate and visualize IC decay""" |
|
|
|
decay_profile = self.factor_evaluator.calculate_factor_decay( |
|
factor, market_data, max_lag=30 |
|
) |
|
|
|
decay_df = pd.DataFrame([ |
|
{'lag': lag, 'ic': ic} |
|
for lag, ic in decay_profile.items() |
|
]) |
|
|
|
return decay_df |
|
|
|
|
|
class MarketDataGenerator: |
|
"""Generate realistic market data for demonstration""" |
|
|
|
@staticmethod |
|
def generate_market_data(n_days: int = 1000) -> pd.DataFrame: |
|
"""Generate OHLCV market data""" |
|
|
|
dates = pd.date_range(end=datetime.now(), periods=n_days, freq='D') |
|
|
|
|
|
returns = np.random.normal(0.0005, 0.02, n_days) |
|
|
|
|
|
regime_changes = [0, n_days//4, n_days//2, 3*n_days//4, n_days] |
|
|
|
for i in range(len(regime_changes)-1): |
|
start, end = regime_changes[i], regime_changes[i+1] |
|
|
|
if i % 4 == 0: |
|
returns[start:end] += np.random.normal(0.001, 0.001, end-start) |
|
elif i % 4 == 1: |
|
returns[start:end] = np.random.normal(0, 0.015, end-start) |
|
elif i % 4 == 2: |
|
returns[start:end] += np.random.normal(-0.001, 0.001, end-start) |
|
else: |
|
returns[start:end] = np.random.normal(0, 0.03, end-start) |
|
|
|
|
|
prices = 100 * np.exp(np.cumsum(returns)) |
|
|
|
|
|
data = pd.DataFrame({ |
|
'open': prices * (1 + np.random.normal(0, 0.001, n_days)), |
|
'high': prices * (1 + np.abs(np.random.normal(0, 0.005, n_days))), |
|
'low': prices * (1 - np.abs(np.random.normal(0, 0.005, n_days))), |
|
'close': prices, |
|
'volume': np.random.lognormal(15, 0.5, n_days) |
|
}, index=dates) |
|
|
|
|
|
data['high'] = data[['open', 'high', 'close']].max(axis=1) |
|
data['low'] = data[['open', 'low', 'close']].min(axis=1) |
|
|
|
return data |
|
|
|
|
|
def create_gradio_interface(): |
|
"""Create the main Gradio interface for the Alpha Mining Platform""" |
|
|
|
|
|
platform = None |
|
market_data_cache = {} |
|
|
|
def generate_and_evaluate_factors(n_days, n_factors, min_ic_threshold, openai_api_key): |
|
"""Main function to generate and evaluate alpha factors""" |
|
|
|
try: |
|
|
|
nonlocal platform |
|
platform = AlphaMiningPlatform(openai_api_key=openai_api_key if openai_api_key else None) |
|
|
|
|
|
market_data = MarketDataGenerator.generate_market_data(int(n_days)) |
|
market_data_cache['data'] = market_data |
|
|
|
|
|
discovered_factors = platform.discover_factors(market_data, int(n_factors)) |
|
platform.discovered_factors = discovered_factors |
|
|
|
|
|
evaluation_df = platform.evaluate_factors(discovered_factors, market_data) |
|
|
|
|
|
platform.active_factors = platform.select_active_factors( |
|
discovered_factors, |
|
min_ic=float(min_ic_threshold) |
|
) |
|
|
|
|
|
portfolio_info = platform.construct_portfolio(market_data) |
|
|
|
|
|
|
|
fig_heatmap = create_factor_heatmap(evaluation_df) |
|
|
|
|
|
fig_ic_dist = create_ic_distribution(evaluation_df) |
|
|
|
|
|
fig_weights = create_portfolio_weights_chart(portfolio_info['weights']) |
|
|
|
|
|
fig_regime = create_regime_timeline(market_data, platform.regime_detector) |
|
|
|
|
|
active_factor_names = [f.name for f in platform.active_factors] |
|
active_factors_df = evaluation_df[evaluation_df['name'].isin(active_factor_names)] |
|
avg_ic = active_factors_df['ic'].mean() if len(active_factors_df) > 0 else 0 |
|
|
|
summary_stats = f""" |
|
### Factor Discovery Summary |
|
- **Total Factors Discovered**: {len(discovered_factors)} |
|
- **Active Factors Selected**: {len(platform.active_factors)} |
|
- **Current Market Regime**: {portfolio_info['regime']} |
|
- **Average IC of Active Factors**: {avg_ic:.4f} |
|
- **Average Sharpe Ratio**: {evaluation_df['sharpe'].mean():.2f} |
|
|
|
### Portfolio Construction |
|
- **Number of Factors in Portfolio**: {portfolio_info['n_factors']} |
|
- **Category Distribution**: {portfolio_info['categories']} |
|
|
|
### LLM-Generated Factors |
|
- **Total LLM Factors**: {len([f for f in discovered_factors if 'LLM' in f.name or 'Fallback' in f.name])} |
|
- **LLM Factors Selected**: {len([f for f in platform.active_factors if 'LLM' in f.name or 'Fallback' in f.name])} |
|
""" |
|
|
|
|
|
top_factors_df = evaluation_df.nlargest(10, 'ic')[ |
|
['name', 'category', 'ic', 'sharpe', 'turnover', 'regime'] |
|
].round(3) |
|
|
|
return fig_heatmap, fig_ic_dist, fig_weights, fig_regime, summary_stats, top_factors_df |
|
|
|
except Exception as e: |
|
print(f"Error in generate_and_evaluate_factors: {e}") |
|
|
|
empty_fig = go.Figure() |
|
empty_fig.add_annotation(text="Error generating data", x=0.5, y=0.5, showarrow=False) |
|
return empty_fig, empty_fig, empty_fig, empty_fig, f"Error: {str(e)}", pd.DataFrame() |
|
|
|
def analyze_factor_decay(factor_name): |
|
"""Analyze IC decay for a specific factor""" |
|
|
|
try: |
|
if 'data' not in market_data_cache or platform is None: |
|
empty_fig = go.Figure() |
|
empty_fig.add_annotation(text="Please generate factors first", x=0.5, y=0.5, showarrow=False) |
|
return empty_fig, "Please generate factors first" |
|
|
|
market_data = market_data_cache['data'] |
|
|
|
|
|
factor = None |
|
for f in platform.discovered_factors: |
|
if f.name == factor_name: |
|
factor = f |
|
break |
|
|
|
if not factor: |
|
empty_fig = go.Figure() |
|
empty_fig.add_annotation(text=f"Factor '{factor_name}' not found", x=0.5, y=0.5, showarrow=False) |
|
return empty_fig, f"Factor '{factor_name}' not found" |
|
|
|
|
|
decay_df = platform.calculate_information_coefficient_decay(factor, market_data) |
|
|
|
|
|
fig = go.Figure() |
|
fig.add_trace(go.Scatter( |
|
x=decay_df['lag'], |
|
y=decay_df['ic'], |
|
mode='lines+markers', |
|
name='IC Decay', |
|
line=dict(color='blue', width=2), |
|
marker=dict(size=8) |
|
)) |
|
|
|
|
|
if len(decay_df) > 3: |
|
from scipy.optimize import curve_fit |
|
|
|
def exp_decay(x, a, b): |
|
return a * np.exp(-b * x) |
|
|
|
try: |
|
popt, _ = curve_fit(exp_decay, decay_df['lag'], np.abs(decay_df['ic'])) |
|
fit_y = exp_decay(decay_df['lag'], *popt) |
|
|
|
fig.add_trace(go.Scatter( |
|
x=decay_df['lag'], |
|
y=fit_y, |
|
mode='lines', |
|
name='Exponential Fit', |
|
line=dict(color='red', width=2, dash='dash') |
|
)) |
|
|
|
half_life = np.log(2) / popt[1] if popt[1] > 0 else np.inf |
|
decay_stats = f"Half-life: {half_life:.1f} days" |
|
except: |
|
decay_stats = "Could not fit exponential decay" |
|
else: |
|
decay_stats = "Insufficient data for decay analysis" |
|
|
|
fig.update_layout( |
|
title=f"Information Coefficient Decay: {factor_name}", |
|
xaxis_title="Prediction Horizon (days)", |
|
yaxis_title="Information Coefficient", |
|
height=400 |
|
) |
|
|
|
return fig, decay_stats |
|
|
|
except Exception as e: |
|
print(f"Error in analyze_factor_decay: {e}") |
|
empty_fig = go.Figure() |
|
empty_fig.add_annotation(text=f"Error: {str(e)}", x=0.5, y=0.5, showarrow=False) |
|
return empty_fig, f"Error: {str(e)}" |
|
|
|
def backtest_portfolio(initial_capital, rebalance_freq): |
|
"""Run portfolio backtest with actual factor returns""" |
|
|
|
try: |
|
if 'data' not in market_data_cache or platform is None or not platform.active_factors: |
|
empty_fig = go.Figure() |
|
empty_fig.add_annotation(text="Please generate and select factors first", x=0.5, y=0.5, showarrow=False) |
|
return empty_fig, "Please generate and select factors first", "" |
|
|
|
market_data = market_data_cache['data'] |
|
initial_capital = float(initial_capital) |
|
rebalance_freq = int(rebalance_freq) |
|
|
|
|
|
history_df, metrics = platform.backtest_portfolio( |
|
market_data, initial_capital, rebalance_freq |
|
) |
|
|
|
if history_df.empty: |
|
empty_fig = go.Figure() |
|
empty_fig.add_annotation(text="No backtest data generated", x=0.5, y=0.5, showarrow=False) |
|
return empty_fig, "No backtest data generated", "" |
|
|
|
|
|
fig = make_subplots( |
|
rows=2, cols=1, |
|
subplot_titles=('Portfolio Value', 'Rolling Sharpe Ratio'), |
|
row_heights=[0.7, 0.3], |
|
vertical_spacing=0.1 |
|
) |
|
|
|
|
|
fig.add_trace( |
|
go.Scatter( |
|
x=history_df['date'], |
|
y=history_df['value'], |
|
mode='lines', |
|
name='Portfolio Value', |
|
line=dict(color='blue', width=2) |
|
), |
|
row=1, col=1 |
|
) |
|
|
|
|
|
market_returns = market_data['close'].pct_change().fillna(0) |
|
benchmark_value = initial_capital * (1 + market_returns).cumprod() |
|
benchmark_dates = market_data.index[market_data.index.isin(history_df['date'])] |
|
benchmark_value = benchmark_value[benchmark_dates] |
|
|
|
fig.add_trace( |
|
go.Scatter( |
|
x=benchmark_dates, |
|
y=benchmark_value, |
|
mode='lines', |
|
name='Buy & Hold Benchmark', |
|
line=dict(color='gray', width=1, dash='dash') |
|
), |
|
row=1, col=1 |
|
) |
|
|
|
|
|
returns = history_df['pnl'] / history_df['value'].shift(1) |
|
returns = returns.fillna(0) |
|
|
|
if len(returns) > 60: |
|
rolling_returns = returns.rolling(window=60) |
|
rolling_sharpe = np.sqrt(252) * rolling_returns.mean() / (rolling_returns.std() + 1e-8) |
|
|
|
fig.add_trace( |
|
go.Scatter( |
|
x=history_df['date'], |
|
y=rolling_sharpe, |
|
mode='lines', |
|
name='60-Day Sharpe', |
|
line=dict(color='green', width=2) |
|
), |
|
row=2, col=1 |
|
) |
|
|
|
fig.update_layout(height=700, showlegend=True) |
|
fig.update_xaxes(title_text="Date", row=2, col=1) |
|
fig.update_yaxes(title_text="Portfolio Value", row=1, col=1) |
|
fig.update_yaxes(title_text="Sharpe Ratio", row=2, col=1) |
|
|
|
|
|
metrics_text = f""" |
|
### Backtest Performance Metrics |
|
- **Total Return**: {metrics['total_return']*100:.2f}% |
|
- **Annualized Return**: {metrics['annual_return']*100:.2f}% |
|
- **Sharpe Ratio**: {metrics['sharpe_ratio']:.2f} |
|
- **Maximum Drawdown**: {metrics['max_drawdown']*100:.2f}% |
|
- **Win Rate**: {metrics['win_rate']*100:.1f}% |
|
- **Number of Rebalances**: {len(history_df) // rebalance_freq} |
|
|
|
### Active Factors Used |
|
{', '.join([f.name for f in platform.active_factors])} |
|
""" |
|
|
|
return fig, metrics_text, "" |
|
|
|
except Exception as e: |
|
print(f"Error in backtest_portfolio: {e}") |
|
empty_fig = go.Figure() |
|
empty_fig.add_annotation(text=f"Error: {str(e)}", x=0.5, y=0.5, showarrow=False) |
|
return empty_fig, f"Error: {str(e)}", "" |
|
|
|
|
|
def create_factor_heatmap(eval_df): |
|
"""Create heatmap of factor performance by category""" |
|
try: |
|
if eval_df.empty: |
|
fig = go.Figure() |
|
fig.add_annotation(text="No data to display", x=0.5, y=0.5, showarrow=False) |
|
return fig |
|
|
|
|
|
pivot_df = pd.pivot_table( |
|
eval_df, |
|
values='ic', |
|
index='category', |
|
columns='regime', |
|
aggfunc='mean', |
|
fill_value=0 |
|
) |
|
|
|
if pivot_df.empty: |
|
fig = go.Figure() |
|
fig.add_annotation(text="No data to display", x=0.5, y=0.5, showarrow=False) |
|
return fig |
|
|
|
fig = go.Figure(data=go.Heatmap( |
|
z=pivot_df.values, |
|
x=pivot_df.columns, |
|
y=pivot_df.index, |
|
colorscale='RdBu', |
|
zmid=0, |
|
text=np.round(pivot_df.values, 3), |
|
texttemplate='%{text}', |
|
textfont={"size": 10} |
|
)) |
|
|
|
fig.update_layout( |
|
title="Average IC by Factor Category and Market Regime", |
|
xaxis_title="Market Regime", |
|
yaxis_title="Factor Category", |
|
height=400 |
|
) |
|
|
|
return fig |
|
except Exception as e: |
|
print(f"Error in create_factor_heatmap: {e}") |
|
fig = go.Figure() |
|
fig.add_annotation(text=f"Error: {str(e)}", x=0.5, y=0.5, showarrow=False) |
|
return fig |
|
|
|
def create_ic_distribution(eval_df): |
|
"""Create IC distribution plot""" |
|
try: |
|
if eval_df.empty: |
|
fig = go.Figure() |
|
fig.add_annotation(text="No data to display", x=0.5, y=0.5, showarrow=False) |
|
return fig |
|
|
|
fig = go.Figure() |
|
|
|
for category in eval_df['category'].unique(): |
|
cat_data = eval_df[eval_df['category'] == category] |
|
|
|
fig.add_trace(go.Box( |
|
y=cat_data['ic'], |
|
name=category, |
|
boxpoints='all', |
|
jitter=0.3, |
|
pointpos=-1.8 |
|
)) |
|
|
|
fig.update_layout( |
|
title="Information Coefficient Distribution by Category", |
|
yaxis_title="Information Coefficient", |
|
showlegend=False, |
|
height=400 |
|
) |
|
|
|
|
|
fig.add_hline(y=0, line_dash="dash", line_color="gray") |
|
|
|
return fig |
|
except Exception as e: |
|
print(f"Error in create_ic_distribution: {e}") |
|
fig = go.Figure() |
|
fig.add_annotation(text=f"Error: {str(e)}", x=0.5, y=0.5, showarrow=False) |
|
return fig |
|
|
|
def create_portfolio_weights_chart(weights): |
|
"""Create portfolio weights pie chart""" |
|
try: |
|
if not weights: |
|
fig = go.Figure() |
|
fig.add_annotation( |
|
text="No active factors selected", |
|
xref="paper", yref="paper", |
|
x=0.5, y=0.5, |
|
showarrow=False |
|
) |
|
fig.update_layout(height=400) |
|
return fig |
|
|
|
fig = go.Figure(data=[go.Pie( |
|
labels=list(weights.keys()), |
|
values=list(weights.values()), |
|
textposition='inside', |
|
textinfo='percent+label', |
|
hole=0.3 |
|
)]) |
|
|
|
fig.update_layout( |
|
title="Portfolio Factor Weights", |
|
height=400 |
|
) |
|
|
|
return fig |
|
except Exception as e: |
|
print(f"Error in create_portfolio_weights_chart: {e}") |
|
fig = go.Figure() |
|
fig.add_annotation(text=f"Error: {str(e)}", x=0.5, y=0.5, showarrow=False) |
|
return fig |
|
|
|
def create_regime_timeline(market_data, regime_detector): |
|
"""Create market regime timeline""" |
|
try: |
|
|
|
regime_history = [] |
|
|
|
step = max(20, len(market_data) // 50) |
|
for i in range(60, len(market_data), step): |
|
regime = regime_detector.detect_regime(market_data.iloc[:i]) |
|
regime_history.append({ |
|
'date': market_data.index[i-1], |
|
'regime': regime.regime_type, |
|
'confidence': regime.confidence |
|
}) |
|
|
|
regime_df = pd.DataFrame(regime_history) |
|
|
|
if regime_df.empty: |
|
fig = go.Figure() |
|
fig.add_annotation(text="No regime data", x=0.5, y=0.5, showarrow=False) |
|
return fig |
|
|
|
|
|
color_map = { |
|
'trending_up': 'green', |
|
'trending_down': 'red', |
|
'mean_reverting': 'blue', |
|
'volatile': 'orange' |
|
} |
|
|
|
fig = go.Figure() |
|
|
|
|
|
for regime in color_map.keys(): |
|
regime_data = regime_df[regime_df['regime'] == regime] |
|
|
|
if len(regime_data) > 0: |
|
fig.add_trace(go.Scatter( |
|
x=regime_data['date'], |
|
y=regime_data['confidence'], |
|
mode='markers', |
|
name=regime, |
|
marker=dict( |
|
color=color_map[regime], |
|
size=10, |
|
symbol='square' |
|
) |
|
)) |
|
|
|
fig.update_layout( |
|
title="Market Regime Detection Timeline", |
|
xaxis_title="Date", |
|
yaxis_title="Confidence", |
|
height=300, |
|
yaxis_range=[0, 1] |
|
) |
|
|
|
return fig |
|
except Exception as e: |
|
print(f"Error in create_regime_timeline: {e}") |
|
fig = go.Figure() |
|
fig.add_annotation(text=f"Error: {str(e)}", x=0.5, y=0.5, showarrow=False) |
|
return fig |
|
|
|
|
|
with gr.Blocks(title="Quantitative Alpha Mining Platform") as interface: |
|
gr.Markdown(""" |
|
# Quantitative Alpha Mining Platform with LLM Discovery |
|
|
|
This platform leverages LLMs and machine learning to discover novel alpha factors from multiple data sources: |
|
- **Classical Factors**: Implementation of quantitative factors inspired by WorldQuant's research |
|
- **LLM-Generated Factors**: Novel factor formulas created using OpenAI's GPT models |
|
- **Alternative Data**: Sentiment analysis from earnings calls, SEC filings, news, and social media |
|
- **Regime-Aware Portfolio**: Hierarchical Risk Parity with dynamic regime adaptation |
|
|
|
Author: Spencer Purdy |
|
""") |
|
|
|
with gr.Tab("Factor Discovery"): |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
gr.Markdown("### Configuration") |
|
n_days = gr.Slider( |
|
minimum=500, maximum=2000, value=1000, step=100, |
|
label="Market Data Days" |
|
) |
|
n_factors = gr.Slider( |
|
minimum=10, maximum=50, value=20, step=5, |
|
label="Number of Factors to Generate" |
|
) |
|
min_ic = gr.Slider( |
|
minimum=0.01, maximum=0.1, value=0.02, step=0.01, |
|
label="Minimum IC Threshold" |
|
) |
|
|
|
gr.Markdown("### API Configuration") |
|
openai_api_key = gr.Textbox( |
|
label="OpenAI API Key", |
|
placeholder="sk-...", |
|
type="password", |
|
info="Optional: For LLM-generated factors (leave empty for fallback)" |
|
) |
|
|
|
generate_btn = gr.Button("Generate & Evaluate Factors", variant="primary") |
|
|
|
with gr.Row(): |
|
factor_heatmap = gr.Plot(label="Factor Performance Heatmap") |
|
ic_distribution = gr.Plot(label="IC Distribution") |
|
|
|
with gr.Row(): |
|
portfolio_weights = gr.Plot(label="Portfolio Weights") |
|
regime_timeline = gr.Plot(label="Market Regime Timeline") |
|
|
|
with gr.Row(): |
|
summary_stats = gr.Markdown(label="Summary Statistics") |
|
top_factors_table = gr.DataFrame(label="Top Factors by IC") |
|
|
|
with gr.Tab("Factor Analysis"): |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
factor_selector = gr.Dropdown( |
|
choices=[], |
|
label="Select Factor to Analyze" |
|
) |
|
analyze_btn = gr.Button("Analyze Factor Decay") |
|
|
|
with gr.Column(scale=2): |
|
decay_plot = gr.Plot(label="IC Decay Analysis") |
|
decay_stats = gr.Markdown(label="Decay Statistics") |
|
|
|
with gr.Tab("Portfolio Backtest"): |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
initial_capital_input = gr.Number( |
|
value=100000, label="Initial Capital", minimum=10000 |
|
) |
|
rebalance_freq_input = gr.Slider( |
|
minimum=5, maximum=60, value=20, step=5, |
|
label="Rebalance Frequency (days)" |
|
) |
|
|
|
backtest_btn = gr.Button("Run Backtest", variant="primary") |
|
|
|
with gr.Column(scale=2): |
|
backtest_plot = gr.Plot(label="Backtest Performance") |
|
backtest_metrics = gr.Markdown(label="Performance Metrics") |
|
backtest_error = gr.Markdown(visible=False) |
|
|
|
|
|
def update_factor_selector(fig1, fig2, fig3, fig4, stats, table): |
|
"""Update factor selector with discovered factors""" |
|
if platform and platform.discovered_factors: |
|
choices = [f.name for f in platform.discovered_factors] |
|
return gr.Dropdown(choices=choices, value=choices[0] if choices else None) |
|
return gr.Dropdown(choices=[]) |
|
|
|
generate_btn.click( |
|
fn=generate_and_evaluate_factors, |
|
inputs=[n_days, n_factors, min_ic, openai_api_key], |
|
outputs=[factor_heatmap, ic_distribution, portfolio_weights, |
|
regime_timeline, summary_stats, top_factors_table] |
|
).then( |
|
fn=update_factor_selector, |
|
inputs=[factor_heatmap, ic_distribution, portfolio_weights, |
|
regime_timeline, summary_stats, top_factors_table], |
|
outputs=[factor_selector] |
|
) |
|
|
|
analyze_btn.click( |
|
fn=analyze_factor_decay, |
|
inputs=[factor_selector], |
|
outputs=[decay_plot, decay_stats] |
|
) |
|
|
|
backtest_btn.click( |
|
fn=backtest_portfolio, |
|
inputs=[initial_capital_input, rebalance_freq_input], |
|
outputs=[backtest_plot, backtest_metrics, backtest_error] |
|
) |
|
|
|
|
|
gr.Examples( |
|
examples=[ |
|
[1000, 20, 0.02], |
|
[1500, 30, 0.03], |
|
[2000, 40, 0.025] |
|
], |
|
inputs=[n_days, n_factors, min_ic] |
|
) |
|
|
|
gr.Markdown(""" |
|
--- |
|
**Note**: This system uses sophisticated machine learning models including optional LLM integration for factor discovery. |
|
For best results, provide an OpenAI API key for genuine LLM-generated factors. Without an API key, the system will use |
|
fallback factor generation methods. The simulation and analysis features work with or without the API key. |
|
All trading strategies are for demonstration purposes only. |
|
|
|
**API Key Information**: |
|
- OpenAI API Key: Get yours at https://platform.openai.com/api-keys |
|
""") |
|
|
|
return interface |
|
|
|
|
|
if __name__ == "__main__": |
|
interface = create_gradio_interface() |
|
interface.launch() |