ttm-webapp-hf / pipeline /metrics.py
daniel-wojahn's picture
cleanup and expansion of structural analysis
edd4b9d
import numpy as np
import pandas as pd
from typing import List, Dict, Union
from itertools import combinations
from sklearn.metrics.pairwise import cosine_similarity
from .fasttext_embedding import generate_embeddings as generate_fasttext_embeddings
from .hf_embedding import generate_embeddings as generate_hf_embeddings
import logging
# Attempt to import the Cython-compiled fast_lcs module
try:
from .fast_lcs import compute_lcs_fast
USE_CYTHON_LCS = True
except ImportError:
# print("Cython fast_lcs not found, using Python LCS. For better performance, compile the Cython module.")
USE_CYTHON_LCS = False
logger = logging.getLogger(__name__)
def compute_normalized_lcs(words1: List[str], words2: List[str]) -> float:
# Calculate m and n (lengths) here, so they are available for normalization
# regardless of which LCS implementation is used.
m, n = len(words1), len(words2)
if USE_CYTHON_LCS:
# Use the Cython-compiled version if available
lcs_length = compute_lcs_fast(words1, words2)
else:
# Fallback to pure Python implementation
# m, n = len(words1), len(words2) # Moved to the beginning of the function
# Using numpy array for dp table can be slightly faster than list of lists for large inputs
# but the primary bottleneck is the Python loop itself compared to Cython.
dp = np.zeros((m + 1, n + 1), dtype=np.int32)
for i in range(1, m + 1):
for j in range(1, n + 1):
if words1[i - 1] == words2[j - 1]:
dp[i, j] = dp[i - 1, j - 1] + 1
else:
dp[i, j] = max(dp[i - 1, j], dp[i, j - 1])
lcs_length = int(dp[m, n])
avg_length = (m + n) / 2
return lcs_length / avg_length if avg_length > 0 else 0.0
def compute_semantic_similarity(
text1_segment: str,
text2_segment: str,
tokens1: List[str],
tokens2: List[str],
model,
model_type: str = "fasttext",
use_stopwords: bool = True,
use_lite_stopwords: bool = False,
fasttext_tokenize_fn=None,
term_freq_corpus=None,
doc_freq_map=None,
total_docs_in_corpus=0,
batch_size: int = 32,
show_progress_bar: bool = False
) -> float:
"""Computes semantic similarity using either a FastText or Sentence Transformer model."""
if model_type not in ["fasttext", "sentence-transformer"]:
logger.error(f"compute_semantic_similarity called with unexpected model_type: {model_type}")
return np.nan
if model is None:
logger.warning(
"FastText model not available for semantic similarity. Skipping calculation."
)
return np.nan
if not text1_segment or not text2_segment:
logger.info(
"One or both texts are empty for semantic similarity. Returning 0.0."
)
return 0.0
def _get_aggregated_embedding(
raw_text_segment: str,
_botok_tokens: List[str], # Parameter name prefixed with _ to indicate it's not used
model_obj,
use_stopwords_param: bool,
use_lite_stopwords_param: bool,
tokenize_fn_param,
term_freq_corpus_param,
doc_freq_map_param,
total_docs_in_corpus_param,
batch_size_param: int,
show_progress_bar_param: bool
) -> Union[np.ndarray, None]:
"""Helper to get a single embedding for a text using FastText."""
if not raw_text_segment.strip():
logger.info(
f"Text segment is empty or only whitespace: {raw_text_segment[:100]}... Returning None for embedding."
)
return None
if model_type == "fasttext":
embedding = generate_fasttext_embeddings(
texts=[raw_text_segment],
model=model_obj,
use_stopwords=use_stopwords_param,
use_lite_stopwords=use_lite_stopwords_param
)
elif model_type == "sentence-transformer":
embedding = generate_hf_embeddings(
texts=[raw_text_segment],
model=model_obj,
batch_size=batch_size_param,
show_progress_bar=show_progress_bar_param
)
if embedding is None or embedding.size == 0:
logger.error(
f"Failed to generate embedding for text: {raw_text_segment[:100]}..."
)
return None
return embedding
try:
# Pass all relevant parameters to _get_aggregated_embedding
emb1 = _get_aggregated_embedding(text1_segment, tokens1, model, use_stopwords, use_lite_stopwords, fasttext_tokenize_fn, term_freq_corpus, doc_freq_map, total_docs_in_corpus, batch_size, show_progress_bar)
emb2 = _get_aggregated_embedding(text2_segment, tokens2, model, use_stopwords, use_lite_stopwords, fasttext_tokenize_fn, term_freq_corpus, doc_freq_map, total_docs_in_corpus, batch_size, show_progress_bar)
if emb1 is None or emb2 is None or emb1.size == 0 or emb2.size == 0:
logger.error(
"Failed to obtain one or both embeddings for semantic similarity."
)
return np.nan
# Ensure embeddings are numpy arrays (should be, but defensive)
if not isinstance(emb1, np.ndarray):
emb1 = np.array(emb1)
if not isinstance(emb2, np.ndarray):
emb2 = np.array(emb2)
# Handle cases where embeddings are all zeros
if np.all(emb1 == 0) and np.all(emb2 == 0):
logger.info("Both embeddings are zero. Semantic similarity is 0.0.")
return 0.0
if np.all(emb1 == 0) or np.all(emb2 == 0):
logger.info("One of the embeddings is zero. Semantic similarity is 0.0.")
return 0.0
# Handle NaN or Inf in embeddings
if np.isnan(emb1).any() or np.isinf(emb1).any() or \
np.isnan(emb2).any() or np.isinf(emb2).any():
logger.warning("NaN or Inf found in embeddings. Semantic similarity set to 0.0.")
return 0.0
# Ensure embeddings are 2D for cosine_similarity: [1, dim]
if emb1.ndim == 1:
emb1 = emb1.reshape(1, -1)
if emb2.ndim == 1:
emb2 = emb2.reshape(1, -1)
similarity_score = cosine_similarity(emb1, emb2)[0][0]
return max(0.0, float(similarity_score))
except Exception as e:
safe_text1 = str(text1_segment)[:100] if text1_segment is not None else "N/A"
safe_text2 = str(text2_segment)[:100] if text2_segment is not None else "N/A"
logger.error(
f"Error during semantic similarity calculation:\nText1: {safe_text1}...\nText2: {safe_text2}...\nError: {e}"
)
logger.exception("Traceback for semantic similarity calculation error:")
return np.nan
def compute_all_metrics(
texts: Dict[str, str],
token_lists: Dict[str, List[str]],
model=None,
enable_semantic: bool = True,
model_type: str = "fasttext",
use_stopwords: bool = True,
use_lite_stopwords: bool = False,
fasttext_tokenize_fn=None,
batch_size: int = 32,
show_progress_bar: bool = False
) -> pd.DataFrame:
"""
Computes all selected similarity metrics between pairs of texts.
Args:
texts (Dict[str, str]): A dictionary where keys are text identifiers (e.g., filenames or segment IDs)
and values are the text content strings.
model (SentenceTransformer, optional): The pre-loaded sentence transformer model.
Defaults to None.
device (str, optional): The device the model is on ('cuda' or 'cpu').
Defaults to None.
Returns:
pd.DataFrame: A DataFrame where each row contains the metrics for a pair of texts,
including 'Text Pair', 'Jaccard Similarity (%)', 'Normalized LCS',
and 'Semantic Similarity'.
"""
files = list(texts.keys())
results = []
corpus_for_sklearn_tfidf = [] # For storing space-joined tokens for scikit-learn's TF-IDF
# For FastText TF-IDF related statistics
term_freq_corpus_for_fasttext = {} # Renamed from global_corpus_token_freq_for_fasttext
document_frequency_map_for_fasttext = {}
total_num_documents_for_fasttext = len(texts)
stopwords_set_for_fasttext_stats_calc = set()
if use_stopwords: # This 'use_stopwords' is an arg to compute_all_metrics
if use_lite_stopwords:
from .stopwords_lite_bo import TIBETAN_STOPWORDS_LITE_SET
stopwords_set_for_fasttext_stats_calc = TIBETAN_STOPWORDS_LITE_SET
else:
from .stopwords_bo import TIBETAN_STOPWORDS_SET
stopwords_set_for_fasttext_stats_calc = TIBETAN_STOPWORDS_SET
for fname, content in texts.items():
# Use the pre-computed tokens from the token_lists dictionary
current_tokens_for_file = token_lists.get(fname, [])
corpus_for_sklearn_tfidf.append(" ".join(current_tokens_for_file) if current_tokens_for_file else "")
if model_type == "fasttext":
tokens_for_fasttext_stats = []
if fasttext_tokenize_fn is not None:
tokens_for_fasttext_stats = fasttext_tokenize_fn(content)
else:
tokens_for_fasttext_stats = current_tokens_for_file
filtered_tokens_for_stats = [
token for token in tokens_for_fasttext_stats if token not in stopwords_set_for_fasttext_stats_calc
] if use_stopwords else tokens_for_fasttext_stats
# Update corpus-wide term frequencies
for token in filtered_tokens_for_stats:
if token.strip():
term_freq_corpus_for_fasttext[token] = term_freq_corpus_for_fasttext.get(token, 0) + 1
# Update document frequencies
unique_filtered_tokens_in_doc = set(filtered_tokens_for_stats)
for token in unique_filtered_tokens_in_doc:
if token.strip():
document_frequency_map_for_fasttext[token] = document_frequency_map_for_fasttext.get(token, 0) + 1
if model_type == "fasttext":
logger.info(f"Built FastText corpus term frequency map with {len(term_freq_corpus_for_fasttext)} unique tokens.")
logger.info(f"Built FastText document frequency map with {len(document_frequency_map_for_fasttext)} unique tokens across {total_num_documents_for_fasttext} documents.")
# Handle case with no texts or all empty texts
_ = len(files) if files else 0 # n unused, replaced with _
for i, j in combinations(range(len(files)), 2):
f1, f2 = files[i], files[j]
words1_raw, words2_raw = token_lists[f1], token_lists[f2]
# Select appropriate stopwords set based on user preference
if use_stopwords:
# Choose between regular and lite stopwords sets
if use_lite_stopwords:
stopwords_set_to_use = TIBETAN_STOPWORDS_LITE_SET
else:
stopwords_set_to_use = TIBETAN_STOPWORDS_SET
else:
# If stopwords are disabled, use an empty set
stopwords_set_to_use = set()
# Filter stopwords for Jaccard calculation
words1_jaccard = [word for word in words1_raw if word not in stopwords_set_to_use]
words2_jaccard = [word for word in words2_raw if word not in stopwords_set_to_use]
jaccard = (
len(set(words1_jaccard) & set(words2_jaccard)) / len(set(words1_jaccard) | set(words2_jaccard))
if set(words1_jaccard) | set(words2_jaccard) # Ensure denominator is not zero
else 0.0
)
# LCS uses raw tokens (words1_raw, words2_raw) to provide a complementary metric.
# Semantic similarity also uses raw text and its botok tokens for chunking decisions.
jaccard_percent = jaccard * 100.0
norm_lcs = compute_normalized_lcs(words1_raw, words2_raw)
# Semantic Similarity Calculation
if enable_semantic:
# Pass raw texts and their pre-computed botok tokens
semantic_sim = compute_semantic_similarity(
texts[f1], texts[f2], words1_raw, words2_raw, model, model_type, use_stopwords, use_lite_stopwords,
fasttext_tokenize_fn=fasttext_tokenize_fn,
term_freq_corpus=term_freq_corpus_for_fasttext if model_type == "fasttext" else None,
doc_freq_map=document_frequency_map_for_fasttext if model_type == "fasttext" else None,
total_docs_in_corpus=total_num_documents_for_fasttext if model_type == "fasttext" else 0,
batch_size=batch_size,
show_progress_bar=show_progress_bar
)
else:
semantic_sim = np.nan
results.append(
{
"Text Pair": f"{f1} vs {f2}",
"Jaccard Similarity (%)": jaccard_percent,
"Normalized LCS": norm_lcs,
# Pass tokens1 and tokens2 to compute_semantic_similarity
"Semantic Similarity": semantic_sim
}
)
return pd.DataFrame(results)