|
""" |
|
Competency questions analysis functions |
|
Partially inherited from [idea](https://github.com/polifonia-project/idea) |
|
""" |
|
|
|
import ast |
|
import io |
|
import re |
|
from collections import defaultdict |
|
|
|
import numpy as np |
|
|
|
from PIL import Image |
|
from matplotlib import pyplot as plt |
|
|
|
from sentence_transformers import SentenceTransformer |
|
from sklearn.cluster import AgglomerativeClustering, HDBSCAN |
|
from scipy.cluster.hierarchy import dendrogram |
|
|
|
from ontochat.chatbot import chat_completion |
|
|
|
|
|
def preprocess_competency_questions(cqs): |
|
|
|
cqs = cqs.split("\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
cleaned_cqs = [] |
|
for q in cqs: |
|
|
|
q = q.replace("\n", "; ") |
|
|
|
q = q.replace("\t", " ") |
|
|
|
q = re.sub(r"[ ]+", " ", q) |
|
|
|
q = re.sub(r";[ ]*;", ";", q) |
|
cleaned_cqs.append(q) |
|
|
|
return cleaned_cqs |
|
|
|
|
|
def compute_embeddings(cqs, model="all-MiniLM-L6-v2", device="cpu"): |
|
""" |
|
Compute sentence-level embeddings of competency questions |
|
|
|
:param cqs: |
|
:param model: |
|
:param device: |
|
:return: |
|
""" |
|
cleaned_cqs = preprocess_competency_questions(cqs) |
|
|
|
model = SentenceTransformer(model, device=device) |
|
embeddings = model.encode(cleaned_cqs) |
|
|
|
|
|
embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) |
|
|
|
return cleaned_cqs, embeddings |
|
|
|
|
|
def agglomerative_clustering(cqs, embeddings, n_clusters=None, metric="euclidean", distance_threshold=None): |
|
""" |
|
|
|
:param cqs: |
|
:param embeddings: |
|
:param n_clusters: |
|
:param metric: |
|
:param distance_threshold: |
|
:return: |
|
""" |
|
clustering_model = AgglomerativeClustering( |
|
n_clusters=n_clusters, |
|
metric=metric, |
|
distance_threshold=distance_threshold, |
|
compute_distances=True |
|
) |
|
clustering_model.fit(embeddings) |
|
cluster_assignment = clustering_model.labels_ |
|
|
|
clustered_cqs = defaultdict(list) |
|
for sentence_id, cluster_id in enumerate(cluster_assignment): |
|
clustered_cqs[str(cluster_id)].append(cqs[sentence_id]) |
|
|
|
pil_image = plot_dendrogram( |
|
clustering_model, |
|
orientation='right', |
|
labels=list(range(1, len(cqs) + 1)), |
|
|
|
truncate_mode=None, |
|
|
|
show_leaf_counts=False, |
|
) |
|
|
|
return clustered_cqs, pil_image |
|
|
|
|
|
def plot_dendrogram(model, **kwargs): |
|
""" Create linkage matrix and then plot the dendrogram |
|
source: https://scikit-learn.org/stable/auto_examples/cluster/plot_agglomerative_dendrogram.html |
|
|
|
:param model: |
|
:param kwargs: |
|
:return: |
|
""" |
|
|
|
counts = np.zeros(model.children_.shape[0]) |
|
n_samples = len(model.labels_) |
|
for i, merge in enumerate(model.children_): |
|
current_count = 0 |
|
for child_idx in merge: |
|
if child_idx < n_samples: |
|
current_count += 1 |
|
else: |
|
current_count += counts[child_idx - n_samples] |
|
counts[i] = current_count |
|
|
|
linkage_matrix = np.column_stack( |
|
[model.children_, model.distances_, counts] |
|
).astype(float) |
|
|
|
|
|
plt.tight_layout() |
|
|
|
dendrogram(linkage_matrix, **kwargs) |
|
|
|
|
|
|
|
|
|
fig = plt.gcf() |
|
buf = io.BytesIO() |
|
fig.savefig(buf) |
|
buf.seek(0) |
|
return Image.open(buf) |
|
|
|
|
|
def response_parser(response): |
|
try: |
|
response = ast.literal_eval(response) |
|
except (ValueError, TypeError, SyntaxError): |
|
response = "" |
|
return response |
|
|
|
|
|
def llm_cq_clustering(cqs, n_clusters, api_key, paraphrase_detection=False): |
|
""" |
|
|
|
:param cqs: |
|
:param n_clusters: |
|
:param api_key: |
|
:param paraphrase_detection: |
|
:return: |
|
""" |
|
conversation_history = [ |
|
{"role": "system", "content": "You are an ontology engineer."} |
|
] |
|
|
|
if paraphrase_detection: |
|
|
|
prompt_1 = "Perform paraphrase detection for the following competency questions: {}. " \ |
|
"Return a Python list of duplicate competency questions.".format(cqs) |
|
|
|
conversation_history.append({"role": "user", "content": prompt_1}) |
|
response = chat_completion(api_key, conversation_history) |
|
print("{} CQs remaining after paraphrase detection.".format(len(cqs) - len(response_parser(response)))) |
|
|
|
|
|
if n_clusters: |
|
prompt_2 = f"Clustering the competency questions into {n_clusters} clusters based on their topics. " \ |
|
"Keep the granularity of the topic in each cluster at a similar level. " \ |
|
"Return in JSON format, such as: {'cluster 1 topic': " \ |
|
"['competency question 1', 'competency question 2']}:" |
|
else: |
|
prompt_2 = f"Clustering the competency questions into clusters based on their topics. " \ |
|
"Keep the granularity of the topic in each cluster at a similar level. " \ |
|
"Return in JSON format, such as: {'cluster 1 topic': " \ |
|
"['competency question 1', 'competency question 2']}:" |
|
conversation_history.append({"role": "assistant", "content": response}) |
|
conversation_history.append({"role": "user", "content": prompt_2}) |
|
response = chat_completion(api_key, conversation_history) |
|
|
|
|
|
else: |
|
if n_clusters: |
|
prompt_2 = f"Given the competency questions: {cqs}, clustering them into {n_clusters} clusters based on " \ |
|
f"the topics." |
|
else: |
|
prompt_2 = f"Given the competency questions: {cqs}, clustering them into clusters based on the topics." |
|
prompt_2 += "Keep the granularity of the topic in each cluster at a similar level. " \ |
|
"Return in JSON format, such as: {'cluster 1 topic': " \ |
|
"['competency question 1', 'competency question 2']}:" |
|
conversation_history.append({"role": "user", "content": prompt_2}) |
|
response = chat_completion(api_key, conversation_history) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return response_parser(response), Image.new("RGB", (640, 480), (255, 255, 255)) |
|
|