OntoChat / ontochat /analysis.py
1hangzhao's picture
Update ontochat/analysis.py
0df1ee3 verified
"""
Competency questions analysis functions
Partially inherited from [idea](https://github.com/polifonia-project/idea)
"""
import ast
import io
import re
from collections import defaultdict
import numpy as np
from PIL import Image
from matplotlib import pyplot as plt
from sentence_transformers import SentenceTransformer
from sklearn.cluster import AgglomerativeClustering
from scipy.cluster.hierarchy import dendrogram
from ontochat.chatbot import chat_completion
def preprocess_competency_questions(cqs):
# preprocess competency questions: string -> list of strings
cqs = cqs.split("\n")
# # keep index
# cqs = [re.split(r'\.\s', cq, 1) for cq in cqs]
# cqs = [{cq[0]: cq[1]} for cq in cqs]
# cqs = [re.split(r'\.\s', cq, 1)[1] for cq in cqs]
# clean
cleaned_cqs = []
for q in cqs: # FIXME to move
# Collapse complex questions in a sentence
q = q.replace("\n", "; ")
# Remove tabular occurrences for metadata
q = q.replace("\t", " ")
# Collapse multiple empty spaces
q = re.sub(r"[ ]+", " ", q)
# Discard inconsistent punctuation
q = re.sub(r";[ ]*;", ";", q)
cleaned_cqs.append(q)
return cleaned_cqs
def compute_embeddings(cqs, model="all-MiniLM-L6-v2", device="cpu"):
"""
Compute sentence-level embeddings of competency questions
:param cqs:
:param model:
:param device:
:return:
"""
cleaned_cqs = preprocess_competency_questions(cqs)
model = SentenceTransformer(model, device=device)
embeddings = model.encode(cleaned_cqs)
# Normalisation of CQ embeddings to unit length
embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
return cleaned_cqs, embeddings
def agglomerative_clustering(cqs, embeddings, n_clusters=None, metric="euclidean", distance_threshold=None):
"""
:param cqs:
:param embeddings:
:param n_clusters:
:param metric:
:param distance_threshold:
:return:
"""
clustering_model = AgglomerativeClustering(
n_clusters=n_clusters,
metric=metric,
distance_threshold=distance_threshold,
compute_distances=True
)
clustering_model.fit(embeddings)
cluster_assignment = clustering_model.labels_
clustered_cqs = defaultdict(list)
for sentence_id, cluster_id in enumerate(cluster_assignment):
clustered_cqs[str(cluster_id)].append(cqs[sentence_id])
pil_image = plot_dendrogram(
clustering_model,
orientation='right',
labels=list(range(1, len(cqs) + 1)),
# labels=cqs,
truncate_mode=None,
# p=3,
show_leaf_counts=False,
)
return clustered_cqs, pil_image
def plot_dendrogram(model, **kwargs):
""" Create linkage matrix and then plot the dendrogram
source: https://scikit-learn.org/stable/auto_examples/cluster/plot_agglomerative_dendrogram.html
:param model:
:param kwargs:
:return:
"""
# create the counts of samples under each node
counts = np.zeros(model.children_.shape[0])
n_samples = len(model.labels_)
for i, merge in enumerate(model.children_):
current_count = 0
for child_idx in merge:
if child_idx < n_samples:
current_count += 1 # leaf node
else:
current_count += counts[child_idx - n_samples]
counts[i] = current_count
linkage_matrix = np.column_stack(
[model.children_, model.distances_, counts]
).astype(float)
# Plot the corresponding dendrogram
plt.tight_layout()
# plt.figure(figsize=(40, 20))
dendrogram(linkage_matrix, **kwargs)
# plt.subplots_adjust(left=0.25, right=1.025, top=0.9, bottom=0.075)
# plt.savefig(figsave_path)
# plt.show()
# convert the figure into a PIL image
fig = plt.gcf()
buf = io.BytesIO()
fig.savefig(buf)
buf.seek(0)
return Image.open(buf)
def response_parser(response):
try:
response = ast.literal_eval(response)
except (ValueError, TypeError, SyntaxError):
response = ""
return response
def llm_cq_clustering(cqs, n_clusters, api_key, paraphrase_detection=False):
"""
:param cqs:
:param n_clusters:
:param api_key:
:param paraphrase_detection:
:return:
"""
conversation_history = [
{"role": "system", "content": "You are an ontology engineer."}
]
# paraphrase detection before clustering
if paraphrase_detection:
# 1. paraphrase detection
prompt_1 = "Perform paraphrase detection for the following competency questions: {}. " \
"Return a Python list of duplicate competency questions.".format(cqs)
conversation_history.append({"role": "user", "content": prompt_1})
response = chat_completion(api_key, conversation_history)
print("{} CQs remaining after paraphrase detection.".format(len(cqs) - len(response_parser(response))))
# 2. clustering
if n_clusters:
prompt_2 = f"Clustering the competency questions into {n_clusters} clusters based on their topics. " \
"Keep the granularity of the topic in each cluster at a similar level. " \
"Return in JSON format, such as: {'cluster 1 topic': " \
"['competency question 1', 'competency question 2']}:"
else:
prompt_2 = f"Clustering the competency questions into clusters based on their topics. " \
"Keep the granularity of the topic in each cluster at a similar level. " \
"Return in JSON format, such as: {'cluster 1 topic': " \
"['competency question 1', 'competency question 2']}:"
conversation_history.append({"role": "assistant", "content": response}) # previous response
conversation_history.append({"role": "user", "content": prompt_2})
response = chat_completion(api_key, conversation_history)
# print("Output is: \"{}\"".format(response))
else: # clustering only
if n_clusters:
prompt_2 = f"Given the competency questions: {cqs}, clustering them into {n_clusters} clusters based on " \
f"the topics."
else:
prompt_2 = f"Given the competency questions: {cqs}, clustering them into clusters based on the topics."
prompt_2 += "Keep the granularity of the topic in each cluster at a similar level. " \
"Return in JSON format, such as: {'cluster 1 topic': " \
"['competency question 1', 'competency question 2']}:"
conversation_history.append({"role": "user", "content": prompt_2})
response = chat_completion(api_key, conversation_history)
# print("Output is: \"{}\"".format(response))
# # 3. assign labels
# prompt_2 = "Clustering the competency questions based on their topics. Return in JSON format, " \
# "such as: {'cluster 1 topic': ['competency question 1', 'competency question 2']}:"
# conversation_history.append({"role": "assistant", "content": response}) # previous response
# conversation_history.append({"role": "user", "content": prompt_2})
# response = chat_completion(conversation_history)
# response = response.choices[0].message.content
# print("Output is: \"{}\"".format(response))
return response_parser(response), Image.new("RGB", (640, 480), (255, 255, 255))