Spaces:
Running
Running
File size: 9,695 Bytes
507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 dbd33b2 507c938 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
import logging
from minsearch import Index
from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import re
from elasticsearch import Elasticsearch
import os
import json
from transcript_extractor import get_transcript
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def clean_text(text):
if not isinstance(text, str):
logger.warning(f"Non-string input to clean_text: {type(text)}")
return ""
cleaned = re.sub(r'[^\w\s.,!?]', ' ', text)
cleaned = re.sub(r'\s+', ' ', cleaned).strip()
logger.info(f"Cleaned text: '{cleaned[:100]}...'")
return cleaned
class DataProcessor:
def __init__(self, text_fields=["content", "title", "description"],
keyword_fields=["video_id", "author", "upload_date"],
embedding_model="all-MiniLM-L6-v2"):
self.text_index = Index(text_fields=text_fields, keyword_fields=keyword_fields)
self.embedding_model = SentenceTransformer(embedding_model)
self.documents = []
self.embeddings = []
self.index_built = False
self.current_index_name = None
elasticsearch_host = os.getenv('ELASTICSEARCH_HOST', 'localhost')
elasticsearch_port = int(os.getenv('ELASTICSEARCH_PORT', 9200))
self.es = Elasticsearch([f'http://{elasticsearch_host}:{elasticsearch_port}'])
logger.info(f"DataProcessor initialized with Elasticsearch at {elasticsearch_host}:{elasticsearch_port}")
def process_transcript(self, video_id, transcript_data):
if not transcript_data or 'metadata' not in transcript_data or 'transcript' not in transcript_data:
logger.error(f"Invalid transcript data for video {video_id}")
return None
metadata = transcript_data['metadata']
transcript = transcript_data['transcript']
logger.info(f"Processing transcript for video {video_id}")
logger.info(f"Number of transcript segments: {len(transcript)}")
full_transcript = " ".join([segment.get('text', '') for segment in transcript])
cleaned_transcript = clean_text(full_transcript)
if not cleaned_transcript:
logger.warning(f"Empty cleaned transcript for video {video_id}")
return None
doc = {
"video_id": video_id,
"content": cleaned_transcript,
"segment_id": f"{video_id}_full",
"title": clean_text(metadata.get('title', '')),
"author": metadata.get('author', ''),
"upload_date": metadata.get('upload_date', ''),
"view_count": metadata.get('view_count', 0),
"like_count": metadata.get('like_count', 0),
"comment_count": metadata.get('comment_count', 0),
"video_duration": metadata.get('duration', '')
}
self.documents.append(doc)
self.embeddings.append(self.embedding_model.encode(cleaned_transcript + " " + metadata.get('title', '')))
logger.info(f"Processed transcript for video {video_id}")
return f"video_{video_id}_{self.embedding_model.get_sentence_embedding_dimension()}"
def build_index(self, index_name):
if not self.documents:
logger.error("No documents to index")
return None
logger.info(f"Building index with {len(self.documents)} documents")
try:
self.text_index.fit(self.documents)
self.index_built = True
logger.info("Text index built successfully")
except Exception as e:
logger.error(f"Error building text index: {str(e)}")
raise
self.embeddings = np.array(self.embeddings)
try:
if not self.es.indices.exists(index=index_name):
self.es.indices.create(index=index_name, body={
"mappings": {
"properties": {
"embedding": {"type": "dense_vector", "dims": self.embeddings.shape[1]},
"content": {"type": "text"},
"video_id": {"type": "keyword"},
"segment_id": {"type": "keyword"},
"title": {"type": "text"},
"author": {"type": "keyword"},
"upload_date": {"type": "date"},
"view_count": {"type": "integer"},
"like_count": {"type": "integer"},
"comment_count": {"type": "integer"},
"video_duration": {"type": "text"}
}
}
})
logger.info(f"Created Elasticsearch index: {index_name}")
for doc, embedding in zip(self.documents, self.embeddings):
doc_with_embedding = doc.copy()
doc_with_embedding['embedding'] = embedding.tolist()
self.es.index(index=index_name, body=doc_with_embedding, id=doc['segment_id'])
logger.info(f"Successfully indexed {len(self.documents)} documents in Elasticsearch")
self.current_index_name = index_name
return index_name
except Exception as e:
logger.error(f"Error building Elasticsearch index: {str(e)}")
raise
def ensure_index_built(self, video_id, embedding_model):
index_name = f"video_{video_id}_{embedding_model.replace('-', '_')}".lower()
if not self.es.indices.exists(index=index_name):
logger.info(f"Index {index_name} does not exist. Building now...")
transcript_data = get_transcript(video_id)
if transcript_data:
self.process_transcript(video_id, transcript_data)
return self.build_index(index_name)
else:
logger.error(f"Failed to retrieve transcript for video {video_id}")
return None
return index_name
def search(self, query, filter_dict={}, boost_dict={}, num_results=10, method='hybrid', index_name=None):
if not index_name:
logger.error("No index name provided for search.")
raise ValueError("No index name provided for search.")
if not self.es.indices.exists(index=index_name):
logger.error(f"Index {index_name} does not exist.")
raise ValueError(f"Index {index_name} does not exist.")
logger.info(f"Performing {method} search for query: {query} in index: {index_name}")
if method == 'text':
return self.text_search(query, filter_dict, boost_dict, num_results, index_name)
elif method == 'embedding':
return self.embedding_search(query, num_results, index_name)
else: # hybrid search
text_results = self.text_search(query, filter_dict, boost_dict, num_results, index_name)
embedding_results = self.embedding_search(query, num_results, index_name)
return self.combine_results(text_results, embedding_results, num_results)
def text_search(self, query, filter_dict={}, boost_dict={}, num_results=10, index_name=None):
if not index_name:
logger.error("No index name provided for text search.")
raise ValueError("No index name provided for text search.")
# Perform text search using Elasticsearch
search_body = {
"query": {
"multi_match": {
"query": query,
"fields": ["content", "title"]
}
},
"size": num_results
}
response = self.es.search(index=index_name, body=search_body)
return [hit['_source'] for hit in response['hits']['hits']]
def embedding_search(self, query, num_results=10, index_name=None):
if not index_name:
logger.error("No index name provided for embedding search.")
raise ValueError("No index name provided for embedding search.")
query_vector = self.embedding_model.encode(query).tolist()
script_query = {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
"params": {"query_vector": query_vector}
}
}
}
response = self.es.search(
index=index_name,
body={
"size": num_results,
"query": script_query,
"_source": {"excludes": ["embedding"]}
}
)
return [hit['_source'] for hit in response['hits']['hits']]
def combine_results(self, text_results, embedding_results, num_results):
combined = []
for i in range(max(len(text_results), len(embedding_results))):
if i < len(text_results):
combined.append(text_results[i])
if i < len(embedding_results):
combined.append(embedding_results[i])
seen = set()
deduped = []
for doc in combined:
if doc['segment_id'] not in seen:
seen.add(doc['segment_id'])
deduped.append(doc)
return deduped[:num_results]
def process_query(self, query):
return clean_text(query)
def set_embedding_model(self, model_name):
self.embedding_model = SentenceTransformer(model_name)
logger.info(f"Embedding model set to: {model_name}") |