Update chunker.py
Browse files- chunker.py +54 -14
chunker.py
CHANGED
@@ -1,3 +1,4 @@
|
|
|
|
1 |
from transformers import AutoTokenizer
|
2 |
from typing import List
|
3 |
import logging
|
@@ -13,28 +14,67 @@ class DocumentChunker:
|
|
13 |
"""Initialize tokenizer if not already done"""
|
14 |
if self.tokenizer is None:
|
15 |
print(f"π€ Loading tokenizer: {model_name}")
|
16 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
17 |
|
18 |
def chunk_by_tokens(self, text: str, max_tokens: int = 1600, stride: int = 50) -> List[str]:
|
19 |
"""Fast token-window chunking without NLTK dependency"""
|
20 |
self.initialize_tokenizer()
|
21 |
|
22 |
-
|
23 |
-
|
24 |
-
|
25 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
26 |
|
27 |
-
|
|
|
|
|
28 |
|
29 |
-
while i <
|
30 |
-
j = min(i +
|
31 |
-
|
32 |
-
|
33 |
-
|
|
|
34 |
|
35 |
-
if j ==
|
36 |
break
|
37 |
-
i = max(j -
|
38 |
|
39 |
-
logger.info(f"βοΈ Created {len(chunks)} chunks")
|
40 |
return chunks
|
|
|
1 |
+
# chunker.py - Document chunking with token-based splitting
|
2 |
from transformers import AutoTokenizer
|
3 |
from typing import List
|
4 |
import logging
|
|
|
14 |
"""Initialize tokenizer if not already done"""
|
15 |
if self.tokenizer is None:
|
16 |
print(f"π€ Loading tokenizer: {model_name}")
|
17 |
+
try:
|
18 |
+
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
|
19 |
+
print(f"β
Tokenizer loaded successfully")
|
20 |
+
except Exception as e:
|
21 |
+
print(f"β οΈ Failed to load tokenizer {model_name}, using backup...")
|
22 |
+
# Fallback to a more common tokenizer
|
23 |
+
self.tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
|
24 |
|
25 |
def chunk_by_tokens(self, text: str, max_tokens: int = 1600, stride: int = 50) -> List[str]:
|
26 |
"""Fast token-window chunking without NLTK dependency"""
|
27 |
self.initialize_tokenizer()
|
28 |
|
29 |
+
try:
|
30 |
+
# Encode text to tokens
|
31 |
+
ids = self.tokenizer.encode(text, add_special_tokens=False)
|
32 |
+
chunks = []
|
33 |
+
i, n = 0, len(ids)
|
34 |
+
|
35 |
+
logger.info(f"π Total tokens: {n}, creating chunks with max_tokens: {max_tokens}")
|
36 |
+
|
37 |
+
while i < n:
|
38 |
+
j = min(i + max_tokens, n)
|
39 |
+
chunk_ids = ids[i:j]
|
40 |
+
chunk_text = self.tokenizer.decode(chunk_ids, skip_special_tokens=True)
|
41 |
+
|
42 |
+
# Only add non-empty chunks
|
43 |
+
if chunk_text.strip():
|
44 |
+
chunks.append(chunk_text.strip())
|
45 |
+
|
46 |
+
if j == n:
|
47 |
+
break
|
48 |
+
i = max(j - stride, 0) # Overlap to avoid cutting mid-sentence
|
49 |
+
|
50 |
+
logger.info(f"βοΈ Created {len(chunks)} chunks")
|
51 |
+
return chunks
|
52 |
+
|
53 |
+
except Exception as e:
|
54 |
+
logger.error(f"β Chunking failed: {e}")
|
55 |
+
# Fallback to simple text splitting if tokenization fails
|
56 |
+
return self._fallback_chunk(text, max_tokens, stride)
|
57 |
+
|
58 |
+
def _fallback_chunk(self, text: str, max_tokens: int, stride: int) -> List[str]:
|
59 |
+
"""Fallback chunking method using character-based estimation"""
|
60 |
+
# Rough estimation: 1 token β 4 characters for English text
|
61 |
+
max_chars = max_tokens * 4
|
62 |
+
stride_chars = stride * 4
|
63 |
|
64 |
+
chunks = []
|
65 |
+
i = 0
|
66 |
+
text_length = len(text)
|
67 |
|
68 |
+
while i < text_length:
|
69 |
+
j = min(i + max_chars, text_length)
|
70 |
+
chunk = text[i:j].strip()
|
71 |
+
|
72 |
+
if chunk:
|
73 |
+
chunks.append(chunk)
|
74 |
|
75 |
+
if j == text_length:
|
76 |
break
|
77 |
+
i = max(j - stride_chars, 0)
|
78 |
|
79 |
+
logger.info(f"βοΈ Created {len(chunks)} chunks using fallback method")
|
80 |
return chunks
|