|
""" |
|
Economic Segmentation Module |
|
Advanced clustering analysis for economic time series and time periods |
|
""" |
|
|
|
import logging |
|
from typing import Dict, List, Optional, Tuple, Union |
|
|
|
import numpy as np |
|
import pandas as pd |
|
from sklearn.cluster import KMeans, AgglomerativeClustering |
|
from sklearn.decomposition import PCA |
|
from sklearn.manifold import TSNE |
|
from sklearn.metrics import silhouette_score, calinski_harabasz_score |
|
from sklearn.preprocessing import StandardScaler |
|
from scipy.cluster.hierarchy import dendrogram, linkage, fcluster |
|
from scipy.spatial.distance import pdist, squareform |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class EconomicSegmentation: |
|
""" |
|
Advanced economic segmentation using clustering techniques |
|
for both time periods and economic series |
|
""" |
|
|
|
def __init__(self, data: pd.DataFrame): |
|
""" |
|
Initialize segmentation with economic data |
|
|
|
Args: |
|
data: DataFrame with economic indicators |
|
""" |
|
self.data = data.copy() |
|
self.scaler = StandardScaler() |
|
self.clusters = {} |
|
self.cluster_analysis = {} |
|
|
|
def prepare_time_period_data(self, indicators: List[str] = None, |
|
window_size: int = 4) -> pd.DataFrame: |
|
""" |
|
Prepare time period data for clustering |
|
|
|
Args: |
|
indicators: List of indicators to use. If None, use all numeric columns |
|
window_size: Rolling window size for feature extraction |
|
|
|
Returns: |
|
DataFrame with time period features |
|
""" |
|
if indicators is None: |
|
indicators = self.data.select_dtypes(include=[np.number]).columns.tolist() |
|
|
|
|
|
growth_data = self.data[indicators].pct_change().dropna() |
|
|
|
|
|
features = [] |
|
feature_names = [] |
|
|
|
for indicator in indicators: |
|
|
|
features.extend([ |
|
growth_data[indicator].rolling(window_size).mean(), |
|
growth_data[indicator].rolling(window_size).std(), |
|
growth_data[indicator].rolling(window_size).min(), |
|
growth_data[indicator].rolling(window_size).max(), |
|
growth_data[indicator].rolling(window_size).skew(), |
|
growth_data[indicator].rolling(window_size).kurt() |
|
]) |
|
feature_names.extend([ |
|
f"{indicator}_mean", f"{indicator}_std", f"{indicator}_min", |
|
f"{indicator}_max", f"{indicator}_skew", f"{indicator}_kurt" |
|
]) |
|
|
|
|
|
feature_df = pd.concat(features, axis=1) |
|
feature_df.columns = feature_names |
|
feature_df = feature_df.dropna() |
|
|
|
return feature_df |
|
|
|
def prepare_series_data(self, indicators: List[str] = None) -> pd.DataFrame: |
|
""" |
|
Prepare series data for clustering (clustering the indicators themselves) |
|
|
|
Args: |
|
indicators: List of indicators to use. If None, use all numeric columns |
|
|
|
Returns: |
|
DataFrame with series features |
|
""" |
|
if indicators is None: |
|
indicators = self.data.select_dtypes(include=[np.number]).columns.tolist() |
|
|
|
|
|
growth_data = self.data[indicators].pct_change().dropna() |
|
|
|
|
|
series_features = {} |
|
|
|
for indicator in indicators: |
|
series = growth_data[indicator].dropna() |
|
|
|
|
|
series_features[indicator] = { |
|
'mean': series.mean(), |
|
'std': series.std(), |
|
'min': series.min(), |
|
'max': series.max(), |
|
'skew': series.skew(), |
|
'kurt': series.kurtosis(), |
|
'autocorr_1': series.autocorr(lag=1), |
|
'autocorr_4': series.autocorr(lag=4), |
|
'volatility': series.rolling(12).std().mean(), |
|
'trend': np.polyfit(range(len(series)), series, 1)[0] |
|
} |
|
|
|
return pd.DataFrame(series_features).T |
|
|
|
def find_optimal_clusters(self, data: pd.DataFrame, max_clusters: int = 10, |
|
method: str = 'kmeans') -> Dict: |
|
""" |
|
Find optimal number of clusters using elbow method and silhouette analysis |
|
|
|
Args: |
|
data: Feature data for clustering |
|
max_clusters: Maximum number of clusters to test |
|
method: Clustering method ('kmeans' or 'hierarchical') |
|
|
|
Returns: |
|
Dictionary with optimal cluster analysis |
|
""" |
|
if len(data) < max_clusters: |
|
max_clusters = len(data) - 1 |
|
|
|
inertias = [] |
|
silhouette_scores = [] |
|
calinski_scores = [] |
|
|
|
for k in range(2, max_clusters + 1): |
|
try: |
|
if method == 'kmeans': |
|
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10) |
|
labels = kmeans.fit_predict(data) |
|
inertias.append(kmeans.inertia_) |
|
else: |
|
clustering = AgglomerativeClustering(n_clusters=k) |
|
labels = clustering.fit_predict(data) |
|
inertias.append(0) |
|
|
|
|
|
if len(np.unique(labels)) > 1: |
|
silhouette_scores.append(silhouette_score(data, labels)) |
|
calinski_scores.append(calinski_harabasz_score(data, labels)) |
|
else: |
|
silhouette_scores.append(0) |
|
calinski_scores.append(0) |
|
|
|
except Exception as e: |
|
logger.warning(f"Failed to cluster with k={k}: {e}") |
|
inertias.append(0) |
|
silhouette_scores.append(0) |
|
calinski_scores.append(0) |
|
|
|
|
|
optimal_k_silhouette = np.argmax(silhouette_scores) + 2 |
|
optimal_k_calinski = np.argmax(calinski_scores) + 2 |
|
|
|
|
|
if method == 'kmeans' and len(inertias) > 1: |
|
|
|
second_derivative = np.diff(np.diff(inertias)) |
|
optimal_k_elbow = np.argmin(second_derivative) + 3 |
|
else: |
|
optimal_k_elbow = optimal_k_silhouette |
|
|
|
return { |
|
'inertias': inertias, |
|
'silhouette_scores': silhouette_scores, |
|
'calinski_scores': calinski_scores, |
|
'optimal_k_silhouette': optimal_k_silhouette, |
|
'optimal_k_calinski': optimal_k_calinski, |
|
'optimal_k_elbow': optimal_k_elbow, |
|
'recommended_k': optimal_k_silhouette |
|
} |
|
|
|
def cluster_time_periods(self, indicators: List[str] = None, |
|
n_clusters: int = None, method: str = 'kmeans', |
|
window_size: int = 4) -> Dict: |
|
""" |
|
Cluster time periods based on economic activity patterns |
|
|
|
Args: |
|
indicators: List of indicators to use |
|
n_clusters: Number of clusters. If None, auto-detect |
|
method: Clustering method ('kmeans' or 'hierarchical') |
|
window_size: Rolling window size for feature extraction |
|
|
|
Returns: |
|
Dictionary with clustering results |
|
""" |
|
|
|
feature_df = self.prepare_time_period_data(indicators, window_size) |
|
|
|
|
|
scaled_data = self.scaler.fit_transform(feature_df) |
|
scaled_df = pd.DataFrame(scaled_data, index=feature_df.index, columns=feature_df.columns) |
|
|
|
|
|
if n_clusters is None: |
|
cluster_analysis = self.find_optimal_clusters(scaled_df, method=method) |
|
n_clusters = cluster_analysis['recommended_k'] |
|
logger.info(f"Auto-detected optimal clusters: {n_clusters}") |
|
|
|
|
|
if method == 'kmeans': |
|
clustering = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) |
|
else: |
|
clustering = AgglomerativeClustering(n_clusters=n_clusters) |
|
|
|
cluster_labels = clustering.fit_predict(scaled_df) |
|
|
|
|
|
result_df = feature_df.copy() |
|
result_df['cluster'] = cluster_labels |
|
|
|
|
|
cluster_analysis = self.analyze_clusters(result_df, 'cluster') |
|
|
|
|
|
pca = PCA(n_components=2) |
|
pca_data = pca.fit_transform(scaled_data) |
|
|
|
tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, len(scaled_data)-1)) |
|
tsne_data = tsne.fit_transform(scaled_data) |
|
|
|
return { |
|
'data': result_df, |
|
'cluster_labels': cluster_labels, |
|
'cluster_analysis': cluster_analysis, |
|
'pca_data': pca_data, |
|
'tsne_data': tsne_data, |
|
'feature_importance': dict(zip(feature_df.columns, np.abs(pca.components_[0]))), |
|
'n_clusters': n_clusters, |
|
'method': method |
|
} |
|
|
|
def cluster_economic_series(self, indicators: List[str] = None, |
|
n_clusters: int = None, method: str = 'kmeans') -> Dict: |
|
""" |
|
Cluster economic series based on their characteristics |
|
|
|
Args: |
|
indicators: List of indicators to use |
|
n_clusters: Number of clusters. If None, auto-detect |
|
method: Clustering method ('kmeans' or 'hierarchical') |
|
|
|
Returns: |
|
Dictionary with clustering results |
|
""" |
|
|
|
series_df = self.prepare_series_data(indicators) |
|
|
|
|
|
scaled_data = self.scaler.fit_transform(series_df) |
|
scaled_df = pd.DataFrame(scaled_data, index=series_df.index, columns=series_df.columns) |
|
|
|
|
|
if n_clusters is None: |
|
cluster_analysis = self.find_optimal_clusters(scaled_df, method=method) |
|
n_clusters = cluster_analysis['recommended_k'] |
|
logger.info(f"Auto-detected optimal clusters: {n_clusters}") |
|
|
|
|
|
if method == 'kmeans': |
|
clustering = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) |
|
else: |
|
clustering = AgglomerativeClustering(n_clusters=n_clusters) |
|
|
|
cluster_labels = clustering.fit_predict(scaled_df) |
|
|
|
|
|
result_df = series_df.copy() |
|
result_df['cluster'] = cluster_labels |
|
|
|
|
|
cluster_analysis = self.analyze_clusters(result_df, 'cluster') |
|
|
|
|
|
pca = PCA(n_components=2) |
|
pca_data = pca.fit_transform(scaled_data) |
|
|
|
tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, len(scaled_data)-1)) |
|
tsne_data = tsne.fit_transform(scaled_data) |
|
|
|
return { |
|
'data': result_df, |
|
'cluster_labels': cluster_labels, |
|
'cluster_analysis': cluster_analysis, |
|
'pca_data': pca_data, |
|
'tsne_data': tsne_data, |
|
'feature_importance': dict(zip(series_df.columns, np.abs(pca.components_[0]))), |
|
'n_clusters': n_clusters, |
|
'method': method |
|
} |
|
|
|
def analyze_clusters(self, data: pd.DataFrame, cluster_col: str) -> Dict: |
|
""" |
|
Analyze cluster characteristics |
|
|
|
Args: |
|
data: DataFrame with cluster labels |
|
cluster_col: Name of cluster column |
|
|
|
Returns: |
|
Dictionary with cluster analysis |
|
""" |
|
feature_cols = [col for col in data.columns if col != cluster_col] |
|
cluster_analysis = {} |
|
|
|
for cluster_id in data[cluster_col].unique(): |
|
cluster_data = data[data[cluster_col] == cluster_id] |
|
|
|
cluster_analysis[cluster_id] = { |
|
'size': len(cluster_data), |
|
'percentage': len(cluster_data) / len(data) * 100, |
|
'features': {} |
|
} |
|
|
|
|
|
for feature in feature_cols: |
|
feature_data = cluster_data[feature] |
|
cluster_analysis[cluster_id]['features'][feature] = { |
|
'mean': feature_data.mean(), |
|
'std': feature_data.std(), |
|
'min': feature_data.min(), |
|
'max': feature_data.max(), |
|
'median': feature_data.median() |
|
} |
|
|
|
return cluster_analysis |
|
|
|
def perform_hierarchical_clustering(self, data: pd.DataFrame, |
|
method: str = 'ward', |
|
distance_threshold: float = None) -> Dict: |
|
""" |
|
Perform hierarchical clustering with dendrogram analysis |
|
|
|
Args: |
|
data: Feature data for clustering |
|
method: Linkage method ('ward', 'complete', 'average', 'single') |
|
distance_threshold: Distance threshold for cutting dendrogram |
|
|
|
Returns: |
|
Dictionary with hierarchical clustering results |
|
""" |
|
|
|
scaled_data = self.scaler.fit_transform(data) |
|
|
|
|
|
if method == 'ward': |
|
linkage_matrix = linkage(scaled_data, method=method) |
|
else: |
|
|
|
distance_matrix = pdist(scaled_data) |
|
linkage_matrix = linkage(distance_matrix, method=method) |
|
|
|
|
|
if distance_threshold is None: |
|
|
|
distances = linkage_matrix[:, 2] |
|
second_derivative = np.diff(np.diff(distances)) |
|
optimal_threshold = distances[np.argmax(second_derivative) + 1] |
|
else: |
|
optimal_threshold = distance_threshold |
|
|
|
|
|
cluster_labels = fcluster(linkage_matrix, optimal_threshold, criterion='distance') |
|
|
|
|
|
result_df = data.copy() |
|
result_df['cluster'] = cluster_labels |
|
cluster_analysis = self.analyze_clusters(result_df, 'cluster') |
|
|
|
return { |
|
'linkage_matrix': linkage_matrix, |
|
'cluster_labels': cluster_labels, |
|
'distance_threshold': optimal_threshold, |
|
'cluster_analysis': cluster_analysis, |
|
'data': result_df, |
|
'method': method |
|
} |
|
|
|
def generate_segmentation_report(self, time_period_clusters: Dict = None, |
|
series_clusters: Dict = None) -> str: |
|
""" |
|
Generate comprehensive segmentation report |
|
|
|
Args: |
|
time_period_clusters: Results from time period clustering |
|
series_clusters: Results from series clustering |
|
|
|
Returns: |
|
Formatted report string |
|
""" |
|
report = "ECONOMIC SEGMENTATION REPORT\n" |
|
report += "=" * 50 + "\n\n" |
|
|
|
if time_period_clusters: |
|
report += "TIME PERIOD CLUSTERING\n" |
|
report += "-" * 30 + "\n" |
|
report += f"Method: {time_period_clusters['method']}\n" |
|
report += f"Number of Clusters: {time_period_clusters['n_clusters']}\n" |
|
report += f"Total Periods: {len(time_period_clusters['data'])}\n\n" |
|
|
|
|
|
cluster_analysis = time_period_clusters['cluster_analysis'] |
|
for cluster_id, analysis in cluster_analysis.items(): |
|
report += f"Cluster {cluster_id}:\n" |
|
report += f" Size: {analysis['size']} periods ({analysis['percentage']:.1f}%)\n" |
|
|
|
|
|
if 'feature_importance' in time_period_clusters: |
|
features = time_period_clusters['feature_importance'] |
|
top_features = sorted(features.items(), key=lambda x: x[1], reverse=True)[:5] |
|
report += f" Top Features: {', '.join([f[0] for f in top_features])}\n" |
|
|
|
report += "\n" |
|
|
|
if series_clusters: |
|
report += "ECONOMIC SERIES CLUSTERING\n" |
|
report += "-" * 30 + "\n" |
|
report += f"Method: {series_clusters['method']}\n" |
|
report += f"Number of Clusters: {series_clusters['n_clusters']}\n" |
|
report += f"Total Series: {len(series_clusters['data'])}\n\n" |
|
|
|
|
|
cluster_analysis = series_clusters['cluster_analysis'] |
|
for cluster_id, analysis in cluster_analysis.items(): |
|
report += f"Cluster {cluster_id}:\n" |
|
report += f" Size: {analysis['size']} series ({analysis['percentage']:.1f}%)\n" |
|
|
|
|
|
cluster_series = series_clusters['data'][series_clusters['data']['cluster'] == cluster_id] |
|
series_names = cluster_series.index.tolist() |
|
report += f" Series: {', '.join(series_names)}\n" |
|
|
|
|
|
if 'feature_importance' in series_clusters: |
|
features = series_clusters['feature_importance'] |
|
top_features = sorted(features.items(), key=lambda x: x[1], reverse=True)[:5] |
|
report += f" Top Features: {', '.join([f[0] for f in top_features])}\n" |
|
|
|
report += "\n" |
|
|
|
return report |