|
import re
|
|
import json
|
|
import argparse
|
|
from typing import List, Dict
|
|
import bisect
|
|
|
|
class ClaudeTokenizer:
|
|
def __init__(self, config_file: str, algorithm: str = "trie"):
|
|
with open(config_file, "r") as f:
|
|
config = json.load(f)
|
|
|
|
self.vocab = sorted(config["vocab"])
|
|
self.vocab_size = config["n_vocab_size"]
|
|
self.pat_str = config["pat_str"]
|
|
self.special_tokens = config["special_tokens"]
|
|
|
|
self.token_to_id = {token: i for i, token in enumerate(self.vocab)}
|
|
self.id_to_token = {i: token for token, i in self.token_to_id.items()}
|
|
|
|
for token, id in self.special_tokens.items():
|
|
self.token_to_id[token] = id
|
|
self.id_to_token[id] = token
|
|
|
|
self.pat = re.compile(self.pat_str)
|
|
self.vocab_trie = self._build_trie(self.vocab)
|
|
|
|
self.algorithm = algorithm
|
|
if algorithm not in ["trie", "linear"]:
|
|
raise ValueError("Invalid algorithm. Choose 'trie' or 'linear'.")
|
|
|
|
def _build_trie(self, vocab: List[str]) -> Dict:
|
|
trie = {}
|
|
for token in vocab:
|
|
current = trie
|
|
for char in token:
|
|
if isinstance(current, str):
|
|
break
|
|
if char not in current:
|
|
current[char] = {}
|
|
current = current[char]
|
|
if isinstance(current, dict):
|
|
current["*"] = token
|
|
return trie
|
|
|
|
def tokenize(self, text: str) -> List[str]:
|
|
if self.algorithm == "trie":
|
|
tokens = []
|
|
for part in self.pat.findall(text):
|
|
tokens.extend(self._tokenize_part_trie(part))
|
|
return tokens
|
|
else:
|
|
return self._tokenize_part_linear(text)
|
|
|
|
def encode(self, text: str) -> List[int]:
|
|
tokens = self.tokenize(text)
|
|
return [
|
|
self.token_to_id.get(token, self.special_tokens["<META>"])
|
|
for token in tokens
|
|
]
|
|
|
|
def decode(self, ids: List[int]) -> str:
|
|
return "".join(self.id_to_token.get(id, "") for id in ids)
|
|
|
|
def _tokenize_part_trie(self, text: str) -> List[str]:
|
|
tokens = []
|
|
while text:
|
|
current = self.vocab_trie
|
|
longest_match = ""
|
|
for i, char in enumerate(text):
|
|
if char not in current:
|
|
break
|
|
current = current[char]
|
|
if "*" in current:
|
|
longest_match = current["*"]
|
|
if longest_match:
|
|
tokens.append(longest_match)
|
|
text = text[len(longest_match):]
|
|
else:
|
|
tokens.append(text[0])
|
|
text = text[1:]
|
|
return tokens
|
|
|
|
def _tokenize_part_linear(self, text: str) -> List[str]:
|
|
tokens = []
|
|
while text:
|
|
longest_match = self._binary_search_prefix(text)
|
|
if longest_match:
|
|
tokens.append(longest_match)
|
|
text = text[len(longest_match):]
|
|
else:
|
|
tokens.append(text[0])
|
|
text = text[1:]
|
|
return tokens
|
|
|
|
def _binary_search_prefix(self, text: str) -> str:
|
|
left, right = 0, len(self.vocab) - 1
|
|
longest_match = ""
|
|
|
|
while left <= right:
|
|
mid = (left + right) // 2
|
|
if text.startswith(self.vocab[mid]):
|
|
longest_match = self.vocab[mid]
|
|
left = mid + 1
|
|
elif self.vocab[mid] < text:
|
|
left = mid + 1
|
|
else:
|
|
right = mid - 1
|
|
|
|
return longest_match
|
|
|
|
def process_file(file_path: str, tokenizer: ClaudeTokenizer) -> List[Dict]:
|
|
encodings = ['utf-8', 'utf-16', 'latin-1', 'iso-8859-1']
|
|
|
|
for encoding in encodings:
|
|
try:
|
|
with open(file_path, 'r', encoding=encoding) as f:
|
|
text = f.read()
|
|
break
|
|
except UnicodeDecodeError:
|
|
continue
|
|
else:
|
|
raise ValueError(f"Unable to decode the file {file_path} with any of the attempted encodings.")
|
|
|
|
tokens = tokenizer.tokenize(text)
|
|
encoded = tokenizer.encode(text)
|
|
|
|
result = [{"token": token, "id": id} for token, id in zip(tokens, encoded)]
|
|
result.append({"total": len(tokens)})
|
|
|
|
return result
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Tokenize text using Claude Tokenizer")
|
|
parser.add_argument("--text", type=str, help="Text to tokenize")
|
|
parser.add_argument("--file", type=str, help="File to tokenize")
|
|
parser.add_argument("--algo", type=str, choices=["linear", "trie"], required=True, help="Tokenization algorithm")
|
|
args = parser.parse_args()
|
|
|
|
if not args.text and not args.file:
|
|
parser.error("Either --text or --file must be specified")
|
|
|
|
try:
|
|
tokenizer = ClaudeTokenizer("tokenizer_config.json", algorithm=args.algo)
|
|
|
|
if args.file:
|
|
result = process_file(args.file, tokenizer)
|
|
output_file = args.file + ".tokens"
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(result, f, indent=2, ensure_ascii=False)
|
|
print(f"Tokenization results saved to {output_file}")
|
|
else:
|
|
tokens = tokenizer.tokenize(args.text)
|
|
encoded = tokenizer.encode(args.text)
|
|
result = [{"token": token, "id": id} for token, id in zip(tokens, encoded)]
|
|
result.append({"total": len(tokens)})
|
|
print(json.dumps(result, indent=2, ensure_ascii=False))
|
|
except Exception as e:
|
|
print(f"An error occurred: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|