document-translator / src /translate_any_doc.py
mjuvilla's picture
changed nltk tokenizer to multilingual tokenizers
41ceab1
raw
history blame
18.6 kB
import shutil
import time
import os
from itertools import groupby
from subprocess import Popen, PIPE
import re
from src.aligner import Aligner
import glob
from sacremoses import MosesTokenizer, MosesDetokenizer
import spacy
import tqdm
# Load multilingual model to use as sentence tokenizer
spacy_nlp = spacy.load("xx_ent_wiki_sm")
# Add the rule-based sentencizer
if "sentencizer" not in spacy_nlp.pipe_names:
spacy_nlp.add_pipe("sentencizer")
def doc_to_plain_text(input_file: str, source_lang: str, target_lang: str, tikal_folder: str,
original_xliff_file_path: str) -> str:
"""
Given a document, this function generates an xliff file and then a plain text file with the text contents
while keeping style and formatting using tags like <g id=1> </g>
Parameters:
input_file: Path to document to process
source_lang: Source language of the document
target_lang: Target language of the document
tikal_folder: Folder where tikal.sh is located
original_xliff_file_path: Path to xliff file to generate, which will be use later
Returns:
string: Path to plain text file
"""
tikal_xliff_command = [os.path.join(tikal_folder, "tikal.sh"), "-x", input_file, "-nocopy", "-sl", source_lang,
"-tl", target_lang]
Popen(tikal_xliff_command).wait()
tikal_moses_command = [os.path.join(tikal_folder, "tikal.sh"), "-xm", original_xliff_file_path, "-sl", source_lang,
"-tl", target_lang]
Popen(tikal_moses_command).wait()
return os.path.join(original_xliff_file_path + f".{source_lang}")
def get_runs_from_paragraph(paragraph: str, paragraph_index: int) -> list[dict[str, str | tuple[str, ...]]]:
"""
Given some text that may or may not contain some chunks tagged with something like <g id=1> </g>, extract each
of the runs of text and convert them into dictionaries to keep this information
Parameters:
text: Text to process
paragraph_index: Index of the paragraph in the file
Returns:
list[dict]: Where each element is a run with text, tag id (if any, if not None) and paragraph_index
"""
tag_stack = []
runs = []
pos = 0
# Match any tag: <tag id="123"/>, </tag>, or <tag id="123">
tag_pattern = re.compile(r'<(/?)(\w+)(?:\s+id="(\d+)")?\s*(/?)>')
for match in tag_pattern.finditer(paragraph):
start, end = match.span()
is_closing = match.group(1) == "/"
tag_name = match.group(2)
tag_id = match.group(3)
is_self_closing = match.group(4) == "/"
# Text before this tag
if start > pos:
text = paragraph[pos:start]
if text:
runs.append({
"text": text,
"id": tag_stack.copy(),
"paragraph_index": paragraph_index
})
if is_closing:
# Closing tag </tag>
expected_prefix = f"{tag_name}_"
if tag_stack and tag_stack[-1].startswith(expected_prefix):
tag_stack.pop()
else:
raise ValueError(f"Mismatched closing tag </{tag_name}>")
elif is_self_closing:
# Self-closing tag like <x id="1"/>
if tag_id is None:
raise ValueError(f"Self-closing tag <{tag_name}/> missing id")
runs.append({
"text": "",
"id": [f"{tag_name}_{tag_id}"],
"paragraph_index": paragraph_index
})
else:
# Opening tag <tag id="...">
if tag_id is None:
raise ValueError(f"Opening tag <{tag_name}> missing id")
tag_stack.append(f"{tag_name}_{tag_id}")
pos = end
# Final trailing text
if pos < len(paragraph):
text = paragraph[pos:]
if text:
runs.append({
"text": text,
"id": tag_stack.copy(),
"paragraph_index": paragraph_index
})
return runs
def tokenize_text(text, tokenizer):
# To avoid the tokenizer destroying the url
def preserve_urls(text):
url_pattern = r'https?://[^\s\)\]\}\>]+|www\.[^\s\)\]\}\>]+'
# Find URLs using regex and replace them with a placeholder
urls = re.findall(url_pattern, text)
for idx, url in enumerate(urls):
placeholder = f"URL{idx}"
text = text.replace(url, placeholder)
return text, urls
# Replace URLs with placeholders
text, urls = preserve_urls(text)
# Tokenize using Sacremoses
tokens = tokenizer.tokenize(text)
# Revert placeholders back to original URLs
for idx, url in enumerate(urls):
placeholder = f"URL{idx}"
tokens = [token.replace(placeholder, url) for token in tokens]
return tokens
def tokenize_with_runs(runs: list[dict[str, str]], tokenizer, detokenizer) -> list[list[dict[str, str]]]:
"""
Given a list of runs, we need to tokenize them by sentence and token while keeping the style of each token according
to its original run
Parameters:
runs: List of runs, where each item is a chunk of text (possibly various tokens) and some style/formatting information
source_lang: Language of the document
Returns:
list[list[dict]]: A list of tokenized sentences where each token contains the style of its original run
"""
# it's a bit of a mess but first we get the tokenized sentences
text_paragraph = detokenizer.detokenize([run["text"] for run in runs])
sentences = spacy_nlp(text_paragraph).sents
tokenized_sentences = [tokenize_text(sentence.text, tokenizer) for sentence in sentences]
# then we assign a run/style to each token
tokens_with_style = []
for run in runs:
tokens = tokenize_text(run["text"], tokenizer)
if tokens:
for token in tokens:
tokens_with_style.append(run.copy())
tokens_with_style[-1]["text"] = token
else:
tokens_with_style.append(run.copy())
# and finally we combine both things, where each token of each sentence is assigned a run/style
token_index = 0
tokenized_sentences_with_style = []
for sentence in tokenized_sentences:
sentence_with_style = []
for word in sentence:
if word == tokens_with_style[token_index]["text"]:
sentence_with_style.append(tokens_with_style[token_index])
token_index += 1
else:
if word.startswith(tokens_with_style[token_index]["text"]):
# this token might be split into several runs
word_left = word
while word_left:
sentence_with_style.append(tokens_with_style[token_index])
word_left = word_left.removeprefix(tokens_with_style[token_index]["text"])
token_index += 1
else:
raise "Something unexpected happened I'm afraid"
tokenized_sentences_with_style.append(sentence_with_style)
return tokenized_sentences_with_style
def generate_alignments(original_paragraphs_with_runs: list[list[dict[str, str]]],
translated_paragraphs: list[str], aligner, temp_folder: str,
source_tokenizer, source_detokenizer, target_tokenizer) -> list[list[dict[str, str]]]:
"""
Given some original paragraphs with style and formatting and its translation without formatting, try to match
the translated text formatting with the original. Since we only want to run fastalign once we have to temporarily
forget about paragraphs and work only in sentences, so the output is a list of sentences but with information about
from which paragraph that sentence came from
Parameters:
original_paragraphs_with_runs: Original text split into paragraphs and runs
translated_paragraphs: Translated text, split into paragraphs
aligner: Object of the aligner class, uses fastalign
temp_folder: Path to folder where to put all the intermediate files
source_lang: original language of the document
target_lang: target language of the translation
Returns:
list[list[dict]]: A list of tokenized sentences where each translated token contains the style of the associated
original token
"""
# clean temp folder
for f in glob.glob(os.path.join(temp_folder, "*align*")):
os.remove(f)
# tokenize the original text by sentence and words while keeping the style
original_tokenized_sentences_with_style = [tokenize_with_runs(runs, source_tokenizer, source_detokenizer) for runs in
original_paragraphs_with_runs]
# flatten all the runs so we can align with just one call instead of one per paragraph
original_tokenized_sentences_with_style = [item for sublist in original_tokenized_sentences_with_style for item in
sublist]
# tokenize the translated text by sentence and word
translated_tokenized_sentences = [tokenize_text(sentence.text, target_tokenizer) for
translated_paragraph in translated_paragraphs for sentence in
spacy_nlp(translated_paragraph).sents]
assert len(translated_tokenized_sentences) == len(
original_tokenized_sentences_with_style), "The original and translated texts contain a different number of sentence, likely due to a translation error"
original_sentences = []
translated_sentences = []
for original, translated in zip(original_tokenized_sentences_with_style, translated_tokenized_sentences):
original_sentences.append(' '.join(item['text'] for item in original))
translated_sentences.append(' '.join(translated))
alignments = aligner.align(original_sentences, translated_sentences)
# using the alignments generated by fastalign, we need to copy the style of the original token to the translated one
translated_sentences_with_style = []
for sentence_idx, sentence_alignments in enumerate(alignments):
# reverse the order of the alignments and build a dict with it
sentence_alignments = {target: source for source, target in sentence_alignments}
translated_sentence_with_style: list[dict[str, str]] = []
for token_idx, translated_token in enumerate(translated_tokenized_sentences[sentence_idx]):
# fastalign has found a token aligned with the translated one
if token_idx in sentence_alignments.keys():
# get the aligned token
original_idx = sentence_alignments[token_idx]
new_entry = original_tokenized_sentences_with_style[sentence_idx][original_idx].copy()
new_entry["text"] = translated_token
translated_sentence_with_style.append(new_entry)
else:
# WARNING this is a test
# since fastalign doesn't know from which word to reference this token, copy the style of the previous word
new_entry = translated_sentence_with_style[-1].copy()
new_entry["text"] = translated_token
translated_sentence_with_style.append(new_entry)
translated_sentences_with_style.append(translated_sentence_with_style)
return translated_sentences_with_style
def group_by_style(tokens: list[dict[str, str]], detokenizer) -> list[dict[str, str]]:
"""
To avoid having issues in the future, we group the contiguous tokens that have the same style. Basically, we
reconstruct the runs.
Parameters:
tokens: Tokens with style information
detokenizer: Detokenizer object to merge tokens back together
Returns:
list[dict]: A list of translated runs with format and style
"""
groups = []
for key, group in groupby(tokens, key=lambda x: (x["id"], x["paragraph_index"])):
text = detokenizer.detokenize([item['text'] for item in group])
if groups and not text.startswith((",", ";", ":", ".", ")", "!", "?")):
text = " " + text
groups.append({"text": text,
"id": key[0],
"paragraph_index": key[1]})
return groups
def runs_to_plain_text(paragraphs_with_style: dict[str, list[dict[str, str, str]]], out_file_path: str):
"""
Generate a plain text file restoring the original tag structure like <g id=1> </g>
Parameters:
paragraphs_with_style: Dictionary where each key is the paragraph_index and its contents are a list of runs
out_file_path: Path to the file where the plain text will be saved
"""
with open(out_file_path, "w") as out_file:
current_stack = []
def close_tags(to_close):
return ''.join(f'</g>' for _ in to_close)
def open_tags(to_open):
tag = ""
for gid in to_open:
tag_type, tag_id = gid.split("_")
tag += f'<{tag_type} id="{tag_id}">'
return tag
for key, paragraph in paragraphs_with_style.items():
output = []
for run in paragraph:
ids = list(run["id"]) if run["id"] else []
# Find the point where current and new IDs diverge
common_prefix_len = 0
for a, b in zip(current_stack, ids):
if a == b:
common_prefix_len += 1
else:
break
# Close tags not in the new stack
to_close = current_stack[common_prefix_len:]
if to_close:
output.append(close_tags(to_close))
# Open new tags
to_open = ids[common_prefix_len:]
if to_open:
output.append(open_tags(to_open))
# Add text
output.append(run["text"])
# Update the stack
current_stack = ids
# Close any remaining open tags
if current_stack:
output.append(close_tags(current_stack))
out_file.write("".join(output) + "\n")
def translate_document(input_file: str, source_lang: str, target_lang: str,
translator,
aligner: Aligner,
temp_folder: str = "tmp",
tikal_folder: str = "okapi-apps_gtk2-linux-x86_64_1.47.0") -> str:
input_filename = input_file.split("/")[-1]
# copy the original file to the temporal folder to avoid common issues with tikal
temp_input_file = os.path.join(temp_folder, input_filename)
shutil.copy(input_file, temp_input_file)
original_xliff_file = os.path.join(temp_folder, input_filename + ".xlf")
plain_text_file = doc_to_plain_text(temp_input_file, source_lang, target_lang, tikal_folder, original_xliff_file)
source_tokenizer = MosesTokenizer(lang=source_lang)
source_detokenizer = MosesDetokenizer(lang=source_lang)
target_tokenizer = MosesTokenizer(lang=target_lang)
target_detokenizer = MosesDetokenizer(lang=target_lang)
# get paragraphs with runs
paragraphs_with_runs = [get_runs_from_paragraph(line.strip(), idx) for idx, line in
enumerate(open(plain_text_file).readlines())]
# translate using plaintext file
translated_paragraphs = []
for paragraph in tqdm.tqdm(paragraphs_with_runs, desc="Translating paragraphs..."):
paragraph_text = source_detokenizer.detokenize([run["text"] for run in paragraph])
translated_paragraphs.append(translator.translate(paragraph_text, source_lang, target_lang))
# time to align the translation with the original
print("Generating alignments...")
start_time = time.time()
translated_sentences_with_style = generate_alignments(paragraphs_with_runs, translated_paragraphs, aligner,
temp_folder, source_tokenizer, source_detokenizer, target_tokenizer)
print(f"Finished alignments in {time.time() - start_time} seconds")
# flatten the sentences into a list of tokens
translated_tokens_with_style = [item for sublist in translated_sentences_with_style for item in sublist]
# group the tokens by style/run
translated_runs_with_style = group_by_style(translated_tokens_with_style, target_detokenizer)
# group the runs by original paragraph
translated_paragraphs_with_style = {key: [{'id': None, 'paragraph_index': key, 'text': ""}] for key in
range(len(paragraphs_with_runs))}
for item in translated_runs_with_style:
# first item in the paragraph, remove starting blank space we introduced in group_by_style(), where we
# didn't know where paragraphs started and ended
if not translated_paragraphs_with_style[item['paragraph_index']][0]["text"]:
first_item_in_paragraph = item.copy()
first_item_in_paragraph["text"] = first_item_in_paragraph["text"].lstrip(" ")
translated_paragraphs_with_style[item['paragraph_index']] = []
translated_paragraphs_with_style[item['paragraph_index']].append(first_item_in_paragraph)
else:
translated_paragraphs_with_style[item['paragraph_index']].append(item)
# save to new plain text file
translated_moses_file = os.path.join(original_xliff_file + f".{target_lang}")
runs_to_plain_text(translated_paragraphs_with_style, translated_moses_file)
# put the translations into the xlf
tikal_moses_to_xliff_command = [os.path.join(tikal_folder, "tikal.sh"), "-lm", original_xliff_file, "-sl",
source_lang, "-tl", target_lang, "-from", translated_moses_file, "-totrg",
"-noalttrans", "-to", original_xliff_file]
Popen(tikal_moses_to_xliff_command).wait()
# merge into a docx again
tikal_merge_doc_command = [os.path.join(tikal_folder, "tikal.sh"), "-m", original_xliff_file]
final_process = Popen(tikal_merge_doc_command, stdout=PIPE, stderr=PIPE)
stdout, stderr = final_process.communicate()
final_process.wait()
# get the path to the output file
output = stdout.decode('utf-8')
translated_file_path = re.search(r'(?<=Output:\s)(.*)', output)[0]
print(f"Saved file in {translated_file_path}")
return translated_file_path