Spaces:
Sleeping
Sleeping
File size: 6,059 Bytes
71c32d5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
from config import NUM_CLUSTERS, OUTPUT_DIR
import os
class ClusterAnalyzer:
def __init__(self, n_clusters=NUM_CLUSTERS):
self.n_clusters = n_clusters
self.scaler = StandardScaler()
self.kmeans = None
self.pca = None
def fit_predict(self, features):
"""Fit KMeans and return cluster labels"""
# Standardize features
features_scaled = self.scaler.fit_transform(features)
# Adaptive PCA - use min(n_samples, n_features, 50) components
n_components = min(features_scaled.shape[0] - 1, features_scaled.shape[1], 50)
if n_components < 1:
n_components = 1
print(f"Using {n_components} PCA components (adapted to data size)")
self.pca = PCA(n_components=n_components)
features_reduced = self.pca.fit_transform(features_scaled)
# Adjust number of clusters if needed
n_clusters = min(self.n_clusters, len(features_reduced))
if n_clusters < 1:
n_clusters = 1
print(f"Using {n_clusters} clusters")
self.kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
labels = self.kmeans.fit_predict(features_reduced)
return labels, features_reduced
def get_cluster_centers(self):
"""Return cluster centers"""
if self.kmeans is not None:
return self.kmeans.cluster_centers_
return None
def visualize_clusters(self, features, labels, image_paths, save_path=None):
"""Visualize clusters using PCA"""
# Further reduce to 2D for visualization (if possible)
if features.shape[0] > 2 and features.shape[1] > 2:
pca_2d = PCA(n_components=min(2, features.shape[0] - 1, features.shape[1]))
features_2d = pca_2d.fit_transform(features)
else:
# If we can't do PCA, use first 2 features
features_2d = features[:, :2] if features.shape[1] >= 2 else np.hstack([features, np.zeros((features.shape[0], 2 - features.shape[1]))])
# Create plot
plt.figure(figsize=(12, 8))
# Handle case where we have only one cluster
unique_labels = np.unique(labels)
if len(unique_labels) > 1:
scatter = plt.scatter(features_2d[:, 0], features_2d[:, 1], c=labels, cmap='tab10', alpha=0.7, s=100)
plt.colorbar(scatter)
else:
plt.scatter(features_2d[:, 0], features_2d[:, 1], c='blue', alpha=0.7, s=100)
plt.title(f'All samples in single cluster (Cluster {labels[0]})')
plt.title('Drill Core Sample Clusters (PCA Visualization)', fontsize=16)
plt.xlabel('Feature Dimension 1')
plt.ylabel('Feature Dimension 2')
# Annotate some points
for i in range(min(15, len(features_2d))):
if i < len(image_paths):
filename = os.path.basename(image_paths[i])[:15] + "..."
plt.annotate(filename, (features_2d[i, 0], features_2d[i, 1]),
xytext=(5, 5), textcoords='offset points', fontsize=8, alpha=0.7)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"Cluster visualization saved to {save_path}")
plt.show()
def create_cluster_map(self, image_paths, labels):
"""Create mapping from cluster ID to image paths"""
cluster_map = {}
for path, label in zip(image_paths, labels):
if label not in cluster_map:
cluster_map[label] = []
cluster_map[label].append(path)
return cluster_map
def analyze_cluster_characteristics(self, features, labels, image_paths):
"""Analyze characteristics of each cluster"""
cluster_stats = {}
# Get features for each cluster
for cluster_id in np.unique(labels):
mask = labels == cluster_id
cluster_features = features[mask]
# Calculate statistics
mean_features = np.mean(cluster_features, axis=0)
std_features = np.std(cluster_features, axis=0)
# Get image paths for this cluster
cluster_images = [path for i, path in enumerate(image_paths) if labels[i] == cluster_id]
cluster_stats[cluster_id] = {
'count': len(cluster_images),
'mean_features': mean_features,
'std_features': std_features,
'sample_images': cluster_images[:5] # First 5 samples
}
return cluster_stats
def analyze_clusters(self, features, image_paths):
"""Complete clustering analysis"""
print(f"Performing clustering analysis on {len(image_paths)} samples...")
print(f"Feature dimensions: {features.shape}")
# Perform clustering
labels, features_reduced = self.fit_predict(features)
# Create cluster map
cluster_map = self.create_cluster_map(image_paths, labels)
# Analyze cluster characteristics
cluster_stats = self.analyze_cluster_characteristics(features, labels, image_paths)
# Visualize if we have enough samples
if len(image_paths) > 2:
viz_path = os.path.join(OUTPUT_DIR, "clusters.png")
self.visualize_clusters(features, labels, image_paths, viz_path)
# Print cluster information
print("\n" + "="*60)
print("CLUSTER ANALYSIS RESULTS")
print("="*60)
for cluster_id, stats in cluster_stats.items():
print(f"\nCluster {cluster_id}:")
print(f" Samples: {stats['count']} images")
print(f" Sample files:")
for path in stats['sample_images']:
print(f" - {os.path.basename(path)}")
return labels, cluster_map, cluster_stats
if __name__ == "__main__":
pass
|