Spaces:
Sleeping
Sleeping
import shutil | |
import time | |
import os | |
from itertools import groupby | |
from subprocess import Popen, PIPE | |
import re | |
from src.aligner import Aligner | |
import nltk | |
import glob | |
from nltk.tokenize import sent_tokenize, word_tokenize | |
import tqdm | |
nltk.download('punkt') | |
nltk.download('punkt_tab') | |
def doc_to_plain_text(input_file: str, source_lang: str, target_lang: str, tikal_folder: str, | |
original_xliff_file_path: str) -> str: | |
""" | |
Given a document, this function generates an xliff file and then a plain text file with the text contents | |
while keeping style and formatting using tags like <g id=1> </g> | |
Parameters: | |
input_file: Path to document to process | |
source_lang: Source language of the document | |
target_lang: Target language of the document | |
tikal_folder: Folder where tikal.sh is located | |
original_xliff_file_path: Path to xliff file to generate, which will be use later | |
Returns: | |
string: Path to plain text file | |
""" | |
tikal_xliff_command = [os.path.join(tikal_folder, "tikal.sh"), "-x", input_file, "-nocopy", "-sl", source_lang, | |
"-tl", target_lang] | |
Popen(tikal_xliff_command).wait() | |
tikal_moses_command = [os.path.join(tikal_folder, "tikal.sh"), "-xm", original_xliff_file_path, "-sl", source_lang, | |
"-tl", target_lang] | |
Popen(tikal_moses_command).wait() | |
return os.path.join(original_xliff_file_path + f".{source_lang}") | |
def get_runs_from_paragraph(text: str, paragraph_index: int) -> list[dict[str, str]]: | |
""" | |
Given some text that may or may not contain some chunks tagged with something like <g id=1> </g>, extract each | |
of the runs of text and convert them into dictionaries to keep this information | |
Parameters: | |
text: Text to process | |
paragraph_index: Index of the paragraph in the file | |
Returns: | |
list[dict]: Where each element is a run with text, tag id (if any, if not None) and paragraph_index | |
""" | |
tag_stack = [] | |
runs = [] | |
buffer = '' | |
pos = 0 | |
tag_pattern = re.compile(r'<(/?)g(?: id="(\d+)")?>') | |
while pos < len(text): | |
match = tag_pattern.search(text, pos) | |
if match: | |
start, end = match.span() | |
# Add any text before this tag as a run | |
if start > pos: | |
buffer = text[pos:start] | |
if buffer: | |
runs.append({"text": buffer, "id": tuple(tag_stack) if tag_stack else None, | |
"paragraph_index": paragraph_index}) | |
is_closing, tag_id = match.groups() | |
if is_closing: | |
# Pop the last matching tag ID | |
if tag_stack: | |
tag_stack.pop() | |
else: | |
# Opening tag | |
tag_stack.append(tag_id) | |
pos = end # Move position past this tag | |
else: | |
# No more tags, capture the rest | |
buffer = text[pos:] | |
if buffer: | |
runs.append( | |
{"text": buffer, "id": tuple(tag_stack) if tag_stack else None, "paragraph_index": paragraph_index}) | |
break | |
return runs | |
def tokenize_with_runs(runs: list[dict[str, str]], detokenizer) -> list[list[dict[str, str]]]: | |
""" | |
Given a list of runs, we need to tokenize them by sentence and token while keeping the style of each token according | |
to its original run | |
Parameters: | |
runs: List of runs, where each item is a chunk of text (possibly various tokens) and some style/formatting information | |
detokenizer: Detokenizer object to merge tokens back together | |
Returns: | |
list[list[dict]]: A list of tokenized sentences where each token contains the style of its original run | |
""" | |
text_paragraph = detokenizer.detokenize([run["text"] for run in runs]) | |
sentences = sent_tokenize(text_paragraph) | |
tokenized_sentences = [word_tokenize(sentence) for sentence in sentences] | |
tokens_with_style = [] | |
for run in runs: | |
tokens = word_tokenize(run["text"]) | |
for token in tokens: | |
tokens_with_style.append(run.copy()) | |
tokens_with_style[-1]["text"] = token | |
token_index = 0 | |
tokenized_sentences_with_style = [] | |
for sentence in tokenized_sentences: | |
sentence_with_style = [] | |
for word in sentence: | |
if word == tokens_with_style[token_index]["text"]: | |
sentence_with_style.append(tokens_with_style[token_index]) | |
token_index += 1 | |
else: | |
if word.startswith(tokens_with_style[token_index]["text"]): | |
# this token might be split into several runs | |
word_left = word | |
while word_left: | |
sentence_with_style.append(tokens_with_style[token_index]) | |
word_left = word_left.removeprefix(tokens_with_style[token_index]["text"]) | |
token_index += 1 | |
else: | |
raise "Something unexpected happened I'm afraid" | |
tokenized_sentences_with_style.append(sentence_with_style) | |
return tokenized_sentences_with_style | |
def generate_alignments(original_paragraphs_with_runs: list[list[dict[str, str]]], | |
translated_paragraphs: list[str], aligner, temp_folder: str, | |
detokenizer) -> list[list[dict[str, str]]]: | |
""" | |
Given some original paragraphs with style and formatting and its translation without formatting, try to match | |
the translated text formatting with the original. Since we only want to run fastalign once we have to temporarily | |
forget about paragraphs and work only in sentences, so the output is a list of sentences but with information about | |
from which paragraph that sentence came from | |
Parameters: | |
original_paragraphs_with_runs: Original text split into paragraphs and runs | |
translated_paragraphs: Translated text, split into paragraphs | |
aligner: Object of the aligner class, uses fastalign | |
temp_folder: Path to folder where to put all the intermediate files | |
detokenizer: Detokenizer object to merge tokens back together | |
Returns: | |
list[list[dict]]: A list of tokenized sentences where each translated token contains the style of the associated | |
original token | |
""" | |
# clean temp folder | |
for f in glob.glob(os.path.join(temp_folder, "*align*")): | |
os.remove(f) | |
# tokenize the original text by sentence and words while keeping the style | |
original_tokenized_sentences_with_style = [tokenize_with_runs(runs, detokenizer) for runs in | |
original_paragraphs_with_runs] | |
# flatten all the runs so we can align with just one call instead of one per paragraph | |
original_tokenized_sentences_with_style = [item for sublist in original_tokenized_sentences_with_style for item in | |
sublist] | |
# tokenize the translated text by sentence and word | |
translated_tokenized_sentences = [word_tokenize(sentence) for | |
translated_paragraph in translated_paragraphs for sentence in | |
sent_tokenize(translated_paragraph)] | |
assert len(translated_tokenized_sentences) == len( | |
original_tokenized_sentences_with_style), "The original and translated texts contain a different number of sentence, likely due to a translation error" | |
original_sentences = [] | |
translated_sentences = [] | |
for original, translated in zip(original_tokenized_sentences_with_style, translated_tokenized_sentences): | |
original_sentences.append(' '.join(item['text'] for item in original)) | |
translated_sentences.append(' '.join(translated)) | |
alignments = aligner.align(original_sentences, translated_sentences) | |
# using the alignments generated by fastalign, we need to copy the style of the original token to the translated one | |
translated_sentences_with_style = [] | |
for sentence_idx, sentence_alignments in enumerate(alignments): | |
# reverse the order of the alignments and build a dict with it | |
sentence_alignments = {target: source for source, target in sentence_alignments} | |
translated_sentence_with_style: list[dict[str, str]] = [] | |
for token_idx, translated_token in enumerate(translated_tokenized_sentences[sentence_idx]): | |
# fastalign has found a token aligned with the translated one | |
if token_idx in sentence_alignments.keys(): | |
# get the aligned token | |
original_idx = sentence_alignments[token_idx] | |
new_entry = original_tokenized_sentences_with_style[sentence_idx][original_idx].copy() | |
new_entry["text"] = translated_token | |
translated_sentence_with_style.append(new_entry) | |
else: | |
# WARNING this is a test | |
# since fastalign doesn't know from which word to reference this token, copy the style of the previous word | |
new_entry = translated_sentence_with_style[-1].copy() | |
new_entry["text"] = translated_token | |
translated_sentence_with_style.append(new_entry) | |
translated_sentences_with_style.append(translated_sentence_with_style) | |
return translated_sentences_with_style | |
def group_by_style(tokens: list[dict[str, str]], detokenizer) -> list[dict[str, str]]: | |
""" | |
To avoid having issues in the future, we group the contiguous tokens that have the same style. Basically, we | |
reconstruct the runs. | |
Parameters: | |
tokens: Tokens with style information | |
detokenizer: Detokenizer object to merge tokens back together | |
Returns: | |
list[dict]: A list of translated runs with format and style | |
""" | |
groups = [] | |
for key, group in groupby(tokens, key=lambda x: (x["id"], x["paragraph_index"])): | |
text = detokenizer.detokenize([item['text'] for item in group]) | |
if groups and not text.startswith((",", ";", ":", ".", ")", "!", "?")): | |
text = " " + text | |
groups.append({"text": text, | |
"id": key[0], | |
"paragraph_index": key[1]}) | |
return groups | |
def runs_to_plain_text(paragraphs_with_style: dict[str, list[dict[str, str, str]]], out_file_path: str): | |
""" | |
Generate a plain text file restoring the original tag structure like <g id=1> </g> | |
Parameters: | |
paragraphs_with_style: Dictionary where each key is the paragraph_index and its contents are a list of runs | |
out_file_path: Path to the file where the plain text will be saved | |
""" | |
with open(out_file_path, "w") as out_file: | |
current_stack = [] | |
def close_tags(to_close): | |
return ''.join(f'</g>' for _ in to_close) | |
def open_tags(to_open): | |
return ''.join(f'<g id="{gid}">' for gid in to_open) | |
for key, paragraph in paragraphs_with_style.items(): | |
output = [] | |
for run in paragraph: | |
ids = list(run["id"]) if run["id"] else [] | |
# Find the point where current and new IDs diverge | |
common_prefix_len = 0 | |
for a, b in zip(current_stack, ids): | |
if a == b: | |
common_prefix_len += 1 | |
else: | |
break | |
# Close tags not in the new stack | |
to_close = current_stack[common_prefix_len:] | |
if to_close: | |
output.append(close_tags(to_close)) | |
# Open new tags | |
to_open = ids[common_prefix_len:] | |
if to_open: | |
output.append(open_tags(to_open)) | |
# Add text | |
output.append(run["text"]) | |
# Update the stack | |
current_stack = ids | |
# Close any remaining open tags | |
if current_stack: | |
output.append(close_tags(current_stack)) | |
out_file.write("".join(output) + "\n") | |
def translate_document(input_file: str, source_lang: str, target_lang: str, | |
translator, | |
aligner: Aligner, | |
detokenizer, | |
temp_folder: str = "tmp", | |
tikal_folder: str = "okapi-apps_gtk2-linux-x86_64_1.47.0") -> str: | |
input_filename = input_file.split("/")[-1] | |
# copy the original file to the temporal folder to avoid common issues with tikal | |
temp_input_file = os.path.join(temp_folder, input_filename) | |
shutil.copy(input_file, temp_input_file) | |
original_xliff_file = os.path.join(temp_folder, input_filename + ".xlf") | |
plain_text_file = doc_to_plain_text(temp_input_file, source_lang, target_lang, tikal_folder, original_xliff_file) | |
# get paragraphs with runs | |
paragraphs_with_runs = [get_runs_from_paragraph(line.strip(), idx) for idx, line in | |
enumerate(open(plain_text_file).readlines())] | |
# translate using plaintext file | |
translated_paragraphs = [] | |
for paragraph in tqdm.tqdm(paragraphs_with_runs, desc="Translating paragraphs..."): | |
paragraph_text = detokenizer.detokenize([run["text"] for run in paragraph]) | |
translated_paragraphs.append(translator.translate(paragraph_text, source_lang, target_lang)) | |
# time to align the translation with the original | |
print("Generating alignments...") | |
start_time = time.time() | |
translated_sentences_with_style = generate_alignments(paragraphs_with_runs, translated_paragraphs, aligner, | |
temp_folder, detokenizer) | |
print(f"Finished alignments in {time.time() - start_time} seconds") | |
# flatten the sentences into a list of tokens | |
translated_tokens_with_style = [item for sublist in translated_sentences_with_style for item in sublist] | |
# group the tokens by style/run | |
translated_runs_with_style = group_by_style(translated_tokens_with_style, detokenizer) | |
# group the runs by original paragraph | |
translated_paragraphs_with_style = dict() | |
for item in translated_runs_with_style: | |
if item['paragraph_index'] in translated_paragraphs_with_style: | |
translated_paragraphs_with_style[item['paragraph_index']].append(item) | |
else: | |
# first item in the paragraph, remove starting blank space we introduced in group_by_style(), where we | |
# didn't know where paragraphs started and ended | |
first_item_in_paragraph = item.copy() | |
first_item_in_paragraph["text"] = first_item_in_paragraph["text"].lstrip(" ") | |
translated_paragraphs_with_style[item['paragraph_index']] = [] | |
translated_paragraphs_with_style[item['paragraph_index']].append(first_item_in_paragraph) | |
# save to new plain text file | |
translated_moses_file = os.path.join(original_xliff_file + f".{target_lang}") | |
runs_to_plain_text(translated_paragraphs_with_style, translated_moses_file) | |
# put the translations into the xlf | |
tikal_moses_to_xliff_command = [os.path.join(tikal_folder, "tikal.sh"), "-lm", original_xliff_file, "-sl", | |
source_lang, "-tl", target_lang, "-from", translated_moses_file, "-totrg", | |
"-noalttrans", "-to", original_xliff_file] | |
Popen(tikal_moses_to_xliff_command).wait() | |
# merge into a docx again | |
tikal_merge_doc_command = [os.path.join(tikal_folder, "tikal.sh"), "-m", original_xliff_file] | |
final_process = Popen(tikal_merge_doc_command, stdout=PIPE, stderr=PIPE) | |
stdout, stderr = final_process.communicate() | |
final_process.wait() | |
# get the path to the output file | |
output = stdout.decode('utf-8') | |
translated_file_path = re.search(r'(?<=Output:\s)(.*)', output)[0] | |
print("Saved file") | |
return translated_file_path | |