Spaces:
Sleeping
Sleeping
import gradio as gr | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import time | |
from sklearn.base import BaseEstimator, clone | |
from sklearn.cluster import AgglomerativeClustering | |
from sklearn.datasets import make_blobs | |
from sklearn.ensemble import RandomForestClassifier | |
from sklearn.inspection import DecisionBoundaryDisplay | |
from sklearn.utils.metaestimators import available_if | |
from sklearn.utils.validation import check_is_fitted | |
model_card = f""" | |
## Description | |
**Clustering** can be costly, especially when we have a lot of data. | |
Some clustering algorithms cannot be used with new data without redoing the clustering, which can be difficult. | |
Instead, we can use clustering to create a model with a classifier, it calls **Inductive Clustering** | |
This demo illustrates a generic implementation of a meta-estimator which extends clustering by inducing a classifier from the cluster labels, and compares the running time. | |
You can play around with different ``number of samples`` and ``number of new data`` to see the effect | |
## Dataset | |
Simulation dataset | |
""" | |
def _classifier_has(attr): | |
"""Check if we can delegate a method to the underlying classifier. | |
First, we check the first fitted classifier if available, otherwise we | |
check the unfitted classifier. | |
""" | |
return lambda estimator: ( | |
hasattr(estimator.classifier_, attr) | |
if hasattr(estimator, "classifier_") | |
else hasattr(estimator.classifier, attr) | |
) | |
class InductiveClusterer(BaseEstimator): | |
def __init__(self, clusterer, classifier): | |
self.clusterer = clusterer | |
self.classifier = classifier | |
def fit(self, X, y=None): | |
self.clusterer_ = clone(self.clusterer) | |
self.classifier_ = clone(self.classifier) | |
y = self.clusterer_.fit_predict(X) | |
self.classifier_.fit(X, y) | |
return self | |
def predict(self, X): | |
check_is_fitted(self) | |
return self.classifier_.predict(X) | |
def decision_function(self, X): | |
check_is_fitted(self) | |
return self.classifier_.decision_function(X) | |
def do_train(n_samples, n_new_data): | |
N_SAMPLES = n_samples | |
N_NEW_DATA = n_new_data | |
RANDOM_STATE = 42 | |
# Generate some training data from clustering | |
X, y = make_blobs( | |
n_samples=N_SAMPLES, | |
cluster_std=[1.0, 1.0, 0.5], | |
centers=[(-5, -5), (0, 0), (5, 5)], | |
random_state=RANDOM_STATE, | |
) | |
# Train a clustering algorithm on the training data and get the cluster labels | |
clusterer = AgglomerativeClustering(n_clusters=3) | |
cluster_labels = clusterer.fit_predict(X) | |
fig1, axes1 = plt.subplots() | |
axes1.scatter(X[:, 0], X[:, 1], c=cluster_labels, alpha=0.5, edgecolor="k") | |
axes1.set_title("Ward Linkage") | |
# Generate new samples and plot them along with the original dataset | |
X_new, y_new = make_blobs( | |
n_samples=N_NEW_DATA, centers=[(-7, -1), (-2, 4), (3, 6)], random_state=RANDOM_STATE | |
) | |
X_all = np.concatenate((X, X_new), axis=0) | |
fig2, axes2 = plt.subplots() | |
axes2.scatter(X[:, 0], X[:, 1], c=cluster_labels, alpha=0.5, edgecolor="k") | |
axes2.scatter(X_new[:, 0], X_new[:, 1], c="black", alpha=1, edgecolor="k") | |
axes2.set_title("Unknown instances") | |
# Declare the inductive learning model that it will be used to | |
# predict cluster membership for unknown instances | |
t1 = time.time() | |
classifier = RandomForestClassifier(random_state=RANDOM_STATE) | |
inductive_learner = InductiveClusterer(clusterer, classifier).fit(X) | |
probable_clusters = inductive_learner.predict(X_new) | |
fig3, axes3 = plt.subplots() | |
disp = DecisionBoundaryDisplay.from_estimator( | |
inductive_learner, X, response_method="predict", alpha=0.4, ax=axes3 | |
) | |
disp.ax_.set_title("Classify unknown instances with known clusters") | |
disp.ax_.scatter(X[:, 0], X[:, 1], c=cluster_labels, alpha=0.5, edgecolor="k") | |
disp.ax_.scatter(X_new[:, 0], X_new[:, 1], c=probable_clusters, alpha=0.5, edgecolor="k") | |
t1_running = time.time() - t1 | |
# recomputing clustering and classify boundary | |
t2 = time.time() | |
clusterer = AgglomerativeClustering(n_clusters=3) | |
y = clusterer.fit_predict(X_all) | |
classifier = RandomForestClassifier(random_state=RANDOM_STATE).fit(X_all, y) | |
fig4, axes4 = plt.subplots() | |
disp = DecisionBoundaryDisplay.from_estimator( | |
classifier, X_all, response_method="predict", alpha=0.4, ax=axes4 | |
) | |
disp.ax_.set_title("Classify unknown instance with recomputing clusters") | |
disp.ax_.scatter(X_all[:, 0], X_all[:, 1], c=y, alpha=0.5, edgecolor="k") | |
t2_running = time.time() - t2 | |
text = f"Inductive Clustering running time: {t1_running:.4f}s. Recomputing clusters running time: {t2_running:.4f}s" | |
return fig1, fig2, fig3, fig4, text | |
with gr.Blocks() as demo: | |
gr.Markdown(''' | |
<div> | |
<h1 style='text-align: center'>Inductive Clustering</h1> | |
</div> | |
''') | |
gr.Markdown(model_card) | |
gr.Markdown("Author: <a href=\"https://huggingface.co/vumichien\">Vu Minh Chien</a>. Based on the example from <a href=\"https://scikit-learn.org/stable/auto_examples/cluster/plot_inductive_clustering.html#sphx-glr-auto-examples-cluster-plot-inductive-clustering-py\">scikit-learn</a>") | |
n_samples = gr.Slider(minimum=1000, maximum=5000, step=500, value=1000, label="Number of samples") | |
n_new_data = gr.Slider(minimum=10, maximum=100, step=10, value=10, label="Number of new data") | |
with gr.Row(): | |
with gr.Column(): | |
plot1 = gr.Plot(label="Clustering") | |
with gr.Column(): | |
plot2 = gr.Plot(label="Clustering with new data") | |
with gr.Row(): | |
with gr.Column(): | |
plot3 = gr.Plot(label="Inductive clustering") | |
with gr.Column(): | |
plot4 = gr.Plot(label="Recomputing clustering") | |
with gr.Row(): | |
results = gr.Textbox(label="Results") | |
n_samples.change(fn=do_train, inputs=[n_samples, n_new_data], outputs=[plot1, plot2, plot3, plot4, results]) | |
n_new_data.change(fn=do_train, inputs=[n_samples, n_new_data], outputs=[plot1, plot2, plot3, plot4, results]) | |
demo.launch() |