|
import numpy as np |
|
from typing import List, Dict, Any, Tuple, Optional |
|
import torch |
|
from sentence_transformers import SentenceTransformer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import re |
|
from collections import defaultdict |
|
|
|
from utils.logging import setup_logger |
|
from utils.error_handling import handle_exceptions, AIModelError |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
MODEL_CACHE = {} |
|
|
|
def get_embedding_model(): |
|
"""Load and cache the sentence embedding model""" |
|
model_name = "all-MiniLM-L6-v2" |
|
|
|
if model_name not in MODEL_CACHE: |
|
logger.info(f"Loading embedding model: {model_name}") |
|
try: |
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
model = SentenceTransformer(model_name, device=device) |
|
MODEL_CACHE[model_name] = model |
|
logger.info(f"Embedding model loaded successfully on {device}") |
|
except Exception as e: |
|
logger.error(f"Error loading embedding model: {str(e)}") |
|
raise AIModelError(f"Error loading embedding model", {"original_error": str(e)}) from e |
|
|
|
return MODEL_CACHE[model_name] |
|
|
|
def extract_text_from_item(item: Dict[str, Any]) -> str: |
|
"""Extract searchable text from an item""" |
|
text_parts = [] |
|
|
|
|
|
if "title" in item and item["title"]: |
|
text_parts.append(item["title"]) |
|
|
|
if "content" in item and item["content"]: |
|
text_parts.append(item["content"]) |
|
|
|
|
|
if "description" in item and item["description"]: |
|
text_parts.append(item["description"]) |
|
|
|
|
|
if "tags" in item and item["tags"]: |
|
if isinstance(item["tags"], list): |
|
text_parts.append(" ".join(item["tags"])) |
|
elif isinstance(item["tags"], str): |
|
text_parts.append(item["tags"]) |
|
|
|
|
|
return " ".join(text_parts) |
|
|
|
def get_item_embeddings(items: List[Dict[str, Any]]) -> Tuple[np.ndarray, List[Dict[str, Any]]]: |
|
"""Get embeddings for a list of items""" |
|
model = get_embedding_model() |
|
texts = [] |
|
valid_items = [] |
|
|
|
for item in items: |
|
text = extract_text_from_item(item) |
|
if text.strip(): |
|
texts.append(text) |
|
valid_items.append(item) |
|
|
|
if not texts: |
|
return np.array([]), [] |
|
|
|
try: |
|
embeddings = model.encode(texts, convert_to_numpy=True) |
|
return embeddings, valid_items |
|
except Exception as e: |
|
logger.error(f"Error generating embeddings: {str(e)}") |
|
return np.array([]), [] |
|
|
|
def search_content(query: str, items: List[Dict[str, Any]], top_k: int = 10) -> List[Dict[str, Any]]: |
|
"""Search content using semantic search with fallback to keyword search |
|
|
|
Args: |
|
query: Search query |
|
items: List of items to search |
|
top_k: Number of top results to return |
|
|
|
Returns: |
|
List of items sorted by relevance |
|
""" |
|
if not query or not items: |
|
return [] |
|
|
|
logger.info(f"Performing semantic search for query: {query}") |
|
|
|
try: |
|
|
|
item_embeddings, valid_items = get_item_embeddings(items) |
|
|
|
if len(valid_items) == 0: |
|
logger.warning("No valid items with text content found") |
|
return [] |
|
|
|
|
|
model = get_embedding_model() |
|
query_embedding = model.encode([query], convert_to_numpy=True) |
|
|
|
|
|
similarity_scores = cosine_similarity(query_embedding, item_embeddings)[0] |
|
|
|
|
|
results = [] |
|
for i, (item, score) in enumerate(zip(valid_items, similarity_scores)): |
|
item_copy = item.copy() |
|
item_copy["relevance_score"] = float(score) |
|
results.append(item_copy) |
|
|
|
|
|
results.sort(key=lambda x: x["relevance_score"], reverse=True) |
|
|
|
|
|
return results[:top_k] |
|
|
|
except Exception as e: |
|
logger.error(f"Error in semantic search: {str(e)}. Falling back to keyword search.") |
|
|
|
|
|
return keyword_search(query, items, top_k) |
|
|
|
def keyword_search(query: str, items: List[Dict[str, Any]], top_k: int = 10) -> List[Dict[str, Any]]: |
|
"""Fallback keyword search when semantic search fails |
|
|
|
Args: |
|
query: Search query |
|
items: List of items to search |
|
top_k: Number of top results to return |
|
|
|
Returns: |
|
List of items sorted by relevance |
|
""" |
|
logger.info(f"Performing keyword search for query: {query}") |
|
|
|
|
|
query_terms = re.findall(r'\w+', query.lower()) |
|
if not query_terms: |
|
return [] |
|
|
|
results = [] |
|
for item in items: |
|
text = extract_text_from_item(item).lower() |
|
|
|
|
|
score = 0 |
|
for term in query_terms: |
|
term_count = text.count(term) |
|
if term_count > 0: |
|
|
|
title = item.get("title", "").lower() |
|
title_count = title.count(term) |
|
score += (term_count + title_count * 2) |
|
|
|
if score > 0: |
|
item_copy = item.copy() |
|
item_copy["relevance_score"] = score |
|
results.append(item_copy) |
|
|
|
|
|
results.sort(key=lambda x: x["relevance_score"], reverse=True) |
|
|
|
|
|
return results[:top_k] |
|
|
|
def find_similar_items(item: Dict[str, Any], items: List[Dict[str, Any]], top_k: int = 3) -> List[Dict[str, Any]]: |
|
"""Find items similar to a given item |
|
|
|
Args: |
|
item: Reference item |
|
items: List of items to search |
|
top_k: Number of top results to return |
|
|
|
Returns: |
|
List of similar items |
|
""" |
|
if not item or not items: |
|
return [] |
|
|
|
|
|
reference_text = extract_text_from_item(item) |
|
if not reference_text.strip(): |
|
return [] |
|
|
|
try: |
|
|
|
model = get_embedding_model() |
|
reference_embedding = model.encode([reference_text], convert_to_numpy=True) |
|
|
|
|
|
item_embeddings, valid_items = get_item_embeddings(items) |
|
|
|
if len(valid_items) == 0: |
|
return [] |
|
|
|
|
|
similarity_scores = cosine_similarity(reference_embedding, item_embeddings)[0] |
|
|
|
|
|
results = [] |
|
for i, (similar_item, score) in enumerate(zip(valid_items, similarity_scores)): |
|
|
|
if similar_item.get("id") == item.get("id"): |
|
continue |
|
|
|
similar_item_copy = similar_item.copy() |
|
similar_item_copy["similarity_score"] = float(score) |
|
results.append(similar_item_copy) |
|
|
|
|
|
results.sort(key=lambda x: x["similarity_score"], reverse=True) |
|
|
|
|
|
return results[:top_k] |
|
|
|
except Exception as e: |
|
logger.error(f"Error finding similar items: {str(e)}. Falling back to keyword similarity.") |
|
return keyword_similarity(item, items, top_k) |
|
|
|
def keyword_similarity(item: Dict[str, Any], items: List[Dict[str, Any]], top_k: int = 3) -> List[Dict[str, Any]]: |
|
"""Fallback keyword-based similarity when semantic similarity fails |
|
|
|
Args: |
|
item: Reference item |
|
items: List of items to search |
|
top_k: Number of top results to return |
|
|
|
Returns: |
|
List of similar items |
|
""" |
|
|
|
reference_text = extract_text_from_item(item).lower() |
|
if not reference_text.strip(): |
|
return [] |
|
|
|
|
|
reference_words = set(re.findall(r'\w+', reference_text)) |
|
|
|
results = [] |
|
for other_item in items: |
|
|
|
if other_item.get("id") == item.get("id"): |
|
continue |
|
|
|
other_text = extract_text_from_item(other_item).lower() |
|
other_words = set(re.findall(r'\w+', other_text)) |
|
|
|
|
|
if not other_words or not reference_words: |
|
continue |
|
|
|
intersection = len(reference_words.intersection(other_words)) |
|
union = len(reference_words.union(other_words)) |
|
similarity = intersection / union if union > 0 else 0 |
|
|
|
if similarity > 0: |
|
other_item_copy = other_item.copy() |
|
other_item_copy["similarity_score"] = similarity |
|
results.append(other_item_copy) |
|
|
|
|
|
results.sort(key=lambda x: x["similarity_score"], reverse=True) |
|
|
|
|
|
return results[:top_k] |
|
|
|
def build_knowledge_graph(items: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
"""Build a simple knowledge graph from items |
|
|
|
Args: |
|
items: List of items to include in the graph |
|
|
|
Returns: |
|
Knowledge graph as a dictionary |
|
""" |
|
graph = { |
|
"nodes": [], |
|
"edges": [] |
|
} |
|
|
|
|
|
node_ids = set() |
|
|
|
|
|
for item in items: |
|
item_id = item.get("id") |
|
if not item_id or item_id in node_ids: |
|
continue |
|
|
|
node_type = item.get("type", "unknown") |
|
node = { |
|
"id": item_id, |
|
"label": item.get("title", "Untitled"), |
|
"type": node_type |
|
} |
|
|
|
graph["nodes"].append(node) |
|
node_ids.add(item_id) |
|
|
|
|
|
for i, item1 in enumerate(items): |
|
item1_id = item1.get("id") |
|
if not item1_id or item1_id not in node_ids: |
|
continue |
|
|
|
|
|
similar_items = find_similar_items(item1, items, top_k=5) |
|
|
|
for similar_item in similar_items: |
|
similar_id = similar_item.get("id") |
|
if not similar_id or similar_id not in node_ids or similar_id == item1_id: |
|
continue |
|
|
|
|
|
edge = { |
|
"source": item1_id, |
|
"target": similar_id, |
|
"weight": similar_item.get("similarity_score", 0.5), |
|
"type": "similar" |
|
} |
|
|
|
graph["edges"].append(edge) |
|
|
|
return graph |
|
|
|
def detect_duplicates(items: List[Dict[str, Any]], threshold: float = 0.85) -> List[List[Dict[str, Any]]]: |
|
"""Detect potential duplicate items |
|
|
|
Args: |
|
items: List of items to check |
|
threshold: Similarity threshold for considering items as duplicates |
|
|
|
Returns: |
|
List of groups of duplicate items |
|
""" |
|
if not items or len(items) < 2: |
|
return [] |
|
|
|
try: |
|
|
|
item_embeddings, valid_items = get_item_embeddings(items) |
|
|
|
if len(valid_items) < 2: |
|
return [] |
|
|
|
|
|
similarity_matrix = cosine_similarity(item_embeddings) |
|
|
|
|
|
duplicate_groups = [] |
|
processed = set() |
|
|
|
for i in range(len(valid_items)): |
|
if i in processed: |
|
continue |
|
|
|
group = [valid_items[i]] |
|
processed.add(i) |
|
|
|
for j in range(i+1, len(valid_items)): |
|
if j in processed: |
|
continue |
|
|
|
if similarity_matrix[i, j] >= threshold: |
|
group.append(valid_items[j]) |
|
processed.add(j) |
|
|
|
if len(group) > 1: |
|
duplicate_groups.append(group) |
|
|
|
return duplicate_groups |
|
|
|
except Exception as e: |
|
logger.error(f"Error detecting duplicates: {str(e)}") |
|
return [] |
|
|
|
def cluster_content(items: List[Dict[str, Any]], num_clusters: int = 5) -> Dict[str, List[Dict[str, Any]]]: |
|
"""Cluster content into groups |
|
|
|
Args: |
|
items: List of items to cluster |
|
num_clusters: Number of clusters to create |
|
|
|
Returns: |
|
Dictionary mapping cluster labels to lists of items |
|
""" |
|
if not items or len(items) < num_clusters: |
|
return {} |
|
|
|
try: |
|
|
|
item_embeddings, valid_items = get_item_embeddings(items) |
|
|
|
if len(valid_items) < num_clusters: |
|
return {} |
|
|
|
|
|
from sklearn.cluster import KMeans |
|
kmeans = KMeans(n_clusters=min(num_clusters, len(valid_items)), random_state=42) |
|
cluster_labels = kmeans.fit_predict(item_embeddings) |
|
|
|
|
|
clusters = defaultdict(list) |
|
for i, label in enumerate(cluster_labels): |
|
clusters[str(label)].append(valid_items[i]) |
|
|
|
|
|
named_clusters = {} |
|
for label, cluster_items in clusters.items(): |
|
|
|
cluster_text = " ".join([extract_text_from_item(item) for item in cluster_items]) |
|
|
|
|
|
words = re.findall(r'\b[a-zA-Z]{3,}\b', cluster_text.lower()) |
|
word_counts = defaultdict(int) |
|
|
|
|
|
stopwords = {"the", "and", "for", "with", "this", "that", "from", "have", "not"} |
|
|
|
for word in words: |
|
if word not in stopwords: |
|
word_counts[word] += 1 |
|
|
|
|
|
top_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:3] |
|
|
|
if top_words: |
|
cluster_name = ", ".join([word for word, _ in top_words]) |
|
named_clusters[cluster_name] = cluster_items |
|
else: |
|
named_clusters[f"Cluster {label}"] = cluster_items |
|
|
|
return named_clusters |
|
|
|
except Exception as e: |
|
logger.error(f"Error clustering content: {str(e)}") |
|
return {} |
|
|
|
def identify_trends(items: List[Dict[str, Any]], time_field: str = "created_at") -> Dict[str, Any]: |
|
"""Identify trends in content over time |
|
|
|
Args: |
|
items: List of items to analyze |
|
time_field: Field containing timestamp |
|
|
|
Returns: |
|
Dictionary with trend information |
|
""" |
|
if not items: |
|
return {} |
|
|
|
try: |
|
import datetime |
|
from collections import Counter |
|
|
|
|
|
daily_counts = defaultdict(int) |
|
weekly_counts = defaultdict(int) |
|
monthly_counts = defaultdict(int) |
|
|
|
|
|
topics_by_month = defaultdict(Counter) |
|
|
|
for item in items: |
|
timestamp = item.get(time_field) |
|
if not timestamp: |
|
continue |
|
|
|
|
|
if isinstance(timestamp, (int, float)): |
|
dt = datetime.datetime.fromtimestamp(timestamp) |
|
elif isinstance(timestamp, str): |
|
try: |
|
dt = datetime.datetime.fromisoformat(timestamp.replace('Z', '+00:00')) |
|
except ValueError: |
|
continue |
|
else: |
|
continue |
|
|
|
|
|
date_str = dt.strftime("%Y-%m-%d") |
|
week_str = dt.strftime("%Y-%W") |
|
month_str = dt.strftime("%Y-%m") |
|
|
|
daily_counts[date_str] += 1 |
|
weekly_counts[week_str] += 1 |
|
monthly_counts[month_str] += 1 |
|
|
|
|
|
topics = [] |
|
if "tags" in item and item["tags"]: |
|
if isinstance(item["tags"], list): |
|
topics.extend(item["tags"]) |
|
elif isinstance(item["tags"], str): |
|
topics.extend(item["tags"].split(",")) |
|
|
|
|
|
if not topics and "title" in item: |
|
title_words = re.findall(r'\b[a-zA-Z]{3,}\b', item["title"].lower()) |
|
stopwords = {"the", "and", "for", "with", "this", "that", "from", "have", "not"} |
|
topics = [word for word in title_words if word not in stopwords][:3] |
|
|
|
|
|
for topic in topics: |
|
topics_by_month[month_str][topic] += 1 |
|
|
|
|
|
trending_topics = {} |
|
for month, counter in topics_by_month.items(): |
|
trending_topics[month] = counter.most_common(5) |
|
|
|
|
|
growth_rates = {} |
|
if len(monthly_counts) >= 2: |
|
months = sorted(monthly_counts.keys()) |
|
for i in range(1, len(months)): |
|
current_month = months[i] |
|
prev_month = months[i-1] |
|
current_count = monthly_counts[current_month] |
|
prev_count = monthly_counts[prev_month] |
|
|
|
if prev_count > 0: |
|
growth_rate = (current_count - prev_count) / prev_count * 100 |
|
growth_rates[current_month] = growth_rate |
|
|
|
return { |
|
"daily_counts": dict(daily_counts), |
|
"weekly_counts": dict(weekly_counts), |
|
"monthly_counts": dict(monthly_counts), |
|
"trending_topics": trending_topics, |
|
"growth_rates": growth_rates |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error identifying trends: {str(e)}") |
|
return {} |
|
|
|
def identify_information_gaps(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
"""Identify potential information gaps in the content |
|
|
|
Args: |
|
items: List of items to analyze |
|
|
|
Returns: |
|
List of identified information gaps |
|
""" |
|
if not items: |
|
return [] |
|
|
|
try: |
|
|
|
clusters = cluster_content(items) |
|
|
|
|
|
gaps = [] |
|
|
|
|
|
for cluster_name, cluster_items in clusters.items(): |
|
if len(cluster_items) <= 2: |
|
gaps.append({ |
|
"type": "underdeveloped_topic", |
|
"topic": cluster_name, |
|
"description": f"Limited content on topic: {cluster_name}", |
|
"item_count": len(cluster_items), |
|
"sample_items": [item.get("title", "Untitled") for item in cluster_items] |
|
}) |
|
|
|
|
|
if len(clusters) >= 2: |
|
cluster_names = list(clusters.keys()) |
|
for i in range(len(cluster_names)): |
|
for j in range(i+1, len(cluster_names)): |
|
name1 = cluster_names[i] |
|
name2 = cluster_names[j] |
|
|
|
|
|
has_connection = False |
|
for item1 in clusters[name1]: |
|
similar_items = find_similar_items(item1, clusters[name2], top_k=1) |
|
if similar_items and similar_items[0].get("similarity_score", 0) > 0.5: |
|
has_connection = True |
|
break |
|
|
|
if not has_connection: |
|
gaps.append({ |
|
"type": "missing_connection", |
|
"topics": [name1, name2], |
|
"description": f"Potential missing connection between {name1} and {name2}" |
|
}) |
|
|
|
return gaps |
|
|
|
except Exception as e: |
|
logger.error(f"Error identifying information gaps: {str(e)}") |
|
return [] |