File size: 9,695 Bytes
507c938
dbd33b2
 
 
 
 
 
 
507c938
 
 
 
 
dbd33b2
 
507c938
 
 
 
 
 
 
dbd33b2
 
 
507c938
dbd33b2
 
 
 
 
507c938
 
dbd33b2
 
 
 
 
507c938
dbd33b2
 
507c938
 
 
 
dbd33b2
 
 
507c938
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbd33b2
 
507c938
 
 
 
 
 
 
 
 
 
 
 
 
dbd33b2
 
507c938
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbd33b2
507c938
 
 
 
 
 
 
 
 
 
 
 
 
 
dbd33b2
507c938
 
 
 
 
 
 
 
 
 
 
 
dbd33b2
 
507c938
 
 
 
 
 
 
 
 
 
dbd33b2
507c938
dbd33b2
 
 
507c938
dbd33b2
 
 
507c938
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbd33b2
 
507c938
 
 
 
 
 
 
 
 
 
 
dbd33b2
 
507c938
 
 
 
 
 
 
 
 
 
dbd33b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
507c938
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import logging
from minsearch import Index
from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import re
from elasticsearch import Elasticsearch
import os
import json
from transcript_extractor import get_transcript

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def clean_text(text):
    if not isinstance(text, str):
        logger.warning(f"Non-string input to clean_text: {type(text)}")
        return ""
    cleaned = re.sub(r'[^\w\s.,!?]', ' ', text)
    cleaned = re.sub(r'\s+', ' ', cleaned).strip()
    logger.info(f"Cleaned text: '{cleaned[:100]}...'")
    return cleaned

class DataProcessor:
    def __init__(self, text_fields=["content", "title", "description"], 
                 keyword_fields=["video_id", "author", "upload_date"], 
                 embedding_model="all-MiniLM-L6-v2"):
        self.text_index = Index(text_fields=text_fields, keyword_fields=keyword_fields)
        self.embedding_model = SentenceTransformer(embedding_model)
        self.documents = []
        self.embeddings = []
        self.index_built = False
        self.current_index_name = None
        
        elasticsearch_host = os.getenv('ELASTICSEARCH_HOST', 'localhost')
        elasticsearch_port = int(os.getenv('ELASTICSEARCH_PORT', 9200))
        
        self.es = Elasticsearch([f'http://{elasticsearch_host}:{elasticsearch_port}'])
        logger.info(f"DataProcessor initialized with Elasticsearch at {elasticsearch_host}:{elasticsearch_port}")

    def process_transcript(self, video_id, transcript_data):
        if not transcript_data or 'metadata' not in transcript_data or 'transcript' not in transcript_data:
            logger.error(f"Invalid transcript data for video {video_id}")
            return None

        metadata = transcript_data['metadata']
        transcript = transcript_data['transcript']

        logger.info(f"Processing transcript for video {video_id}")
        logger.info(f"Number of transcript segments: {len(transcript)}")

        full_transcript = " ".join([segment.get('text', '') for segment in transcript])
        cleaned_transcript = clean_text(full_transcript)

        if not cleaned_transcript:
            logger.warning(f"Empty cleaned transcript for video {video_id}")
            return None

        doc = {
            "video_id": video_id,
            "content": cleaned_transcript,
            "segment_id": f"{video_id}_full",
            "title": clean_text(metadata.get('title', '')),
            "author": metadata.get('author', ''),
            "upload_date": metadata.get('upload_date', ''),
            "view_count": metadata.get('view_count', 0),
            "like_count": metadata.get('like_count', 0),
            "comment_count": metadata.get('comment_count', 0),
            "video_duration": metadata.get('duration', '')
        }
        self.documents.append(doc)
        self.embeddings.append(self.embedding_model.encode(cleaned_transcript + " " + metadata.get('title', '')))

        logger.info(f"Processed transcript for video {video_id}")
        return f"video_{video_id}_{self.embedding_model.get_sentence_embedding_dimension()}"

    def build_index(self, index_name):
        if not self.documents:
            logger.error("No documents to index")
            return None

        logger.info(f"Building index with {len(self.documents)} documents")
        try:
            self.text_index.fit(self.documents)
            self.index_built = True
            logger.info("Text index built successfully")
        except Exception as e:
            logger.error(f"Error building text index: {str(e)}")
            raise

        self.embeddings = np.array(self.embeddings)
        
        try:
            if not self.es.indices.exists(index=index_name):
                self.es.indices.create(index=index_name, body={
                    "mappings": {
                        "properties": {
                            "embedding": {"type": "dense_vector", "dims": self.embeddings.shape[1]},
                            "content": {"type": "text"},
                            "video_id": {"type": "keyword"},
                            "segment_id": {"type": "keyword"},
                            "title": {"type": "text"},
                            "author": {"type": "keyword"},
                            "upload_date": {"type": "date"},
                            "view_count": {"type": "integer"},
                            "like_count": {"type": "integer"},
                            "comment_count": {"type": "integer"},
                            "video_duration": {"type": "text"}
                        }
                    }
                })
                logger.info(f"Created Elasticsearch index: {index_name}")

            for doc, embedding in zip(self.documents, self.embeddings):
                doc_with_embedding = doc.copy()
                doc_with_embedding['embedding'] = embedding.tolist()
                self.es.index(index=index_name, body=doc_with_embedding, id=doc['segment_id'])
            
            logger.info(f"Successfully indexed {len(self.documents)} documents in Elasticsearch")
            self.current_index_name = index_name
            return index_name
        except Exception as e:
            logger.error(f"Error building Elasticsearch index: {str(e)}")
            raise

    def ensure_index_built(self, video_id, embedding_model):
        index_name = f"video_{video_id}_{embedding_model.replace('-', '_')}".lower()
        if not self.es.indices.exists(index=index_name):
            logger.info(f"Index {index_name} does not exist. Building now...")
            transcript_data = get_transcript(video_id)
            if transcript_data:
                self.process_transcript(video_id, transcript_data)
                return self.build_index(index_name)
            else:
                logger.error(f"Failed to retrieve transcript for video {video_id}")
                return None
        return index_name

    def search(self, query, filter_dict={}, boost_dict={}, num_results=10, method='hybrid', index_name=None):
        if not index_name:
            logger.error("No index name provided for search.")
            raise ValueError("No index name provided for search.")
        
        if not self.es.indices.exists(index=index_name):
            logger.error(f"Index {index_name} does not exist.")
            raise ValueError(f"Index {index_name} does not exist.")
        
        logger.info(f"Performing {method} search for query: {query} in index: {index_name}")
        
        if method == 'text':
            return self.text_search(query, filter_dict, boost_dict, num_results, index_name)
        elif method == 'embedding':
            return self.embedding_search(query, num_results, index_name)
        else:  # hybrid search
            text_results = self.text_search(query, filter_dict, boost_dict, num_results, index_name)
            embedding_results = self.embedding_search(query, num_results, index_name)
            return self.combine_results(text_results, embedding_results, num_results)

    def text_search(self, query, filter_dict={}, boost_dict={}, num_results=10, index_name=None):
        if not index_name:
            logger.error("No index name provided for text search.")
            raise ValueError("No index name provided for text search.")
        
        # Perform text search using Elasticsearch
        search_body = {
            "query": {
                "multi_match": {
                    "query": query,
                    "fields": ["content", "title"]
                }
            },
            "size": num_results
        }
        response = self.es.search(index=index_name, body=search_body)
        return [hit['_source'] for hit in response['hits']['hits']]

    def embedding_search(self, query, num_results=10, index_name=None):
        if not index_name:
            logger.error("No index name provided for embedding search.")
            raise ValueError("No index name provided for embedding search.")
        
        query_vector = self.embedding_model.encode(query).tolist()
        script_query = {
            "script_score": {
                "query": {"match_all": {}},
                "script": {
                    "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
                    "params": {"query_vector": query_vector}
                }
            }
        }
        response = self.es.search(
            index=index_name,
            body={
                "size": num_results,
                "query": script_query,
                "_source": {"excludes": ["embedding"]}
            }
        )
        return [hit['_source'] for hit in response['hits']['hits']]

    def combine_results(self, text_results, embedding_results, num_results):
        combined = []
        for i in range(max(len(text_results), len(embedding_results))):
            if i < len(text_results):
                combined.append(text_results[i])
            if i < len(embedding_results):
                combined.append(embedding_results[i])
        
        seen = set()
        deduped = []
        for doc in combined:
            if doc['segment_id'] not in seen:
                seen.add(doc['segment_id'])
                deduped.append(doc)
        
        return deduped[:num_results]

    def process_query(self, query):
        return clean_text(query)

    def set_embedding_model(self, model_name):
        self.embedding_model = SentenceTransformer(model_name)
        logger.info(f"Embedding model set to: {model_name}")