Spaces:
Sleeping
Sleeping
import time | |
import json | |
import requests | |
import tqdm | |
import os | |
import string | |
from collections import defaultdict | |
from docx import Document | |
from docx.text.hyperlink import Hyperlink | |
from docx.text.run import Run | |
import nltk | |
import platform | |
nltk.download('punkt') | |
nltk.download('punkt_tab') | |
from nltk.tokenize import sent_tokenize, word_tokenize | |
from nltk.tokenize.treebank import TreebankWordDetokenizer | |
from subprocess import Popen, PIPE | |
from itertools import groupby | |
import fileinput | |
ip = "192.168.20.216" | |
port = "8000" | |
def translate(text, ip, port): | |
myobj = { | |
'id': '1', | |
'src': text, | |
} | |
port = str(int(port)) | |
url = 'http://' + ip + ':' + port + '/translate' | |
x = requests.post(url, json=myobj) | |
json_response = json.loads(x.text) | |
return json_response['tgt'] | |
# Class to align original and translated sentences | |
# based on https://github.com/mtuoc/MTUOC-server/blob/main/GetWordAlignments_fast_align.py | |
class Aligner(): | |
def __init__(self, config_folder, source_lang, target_lang, temp_folder): | |
forward_params_path = os.path.join(config_folder, f"{source_lang}-{target_lang}.params") | |
reverse_params_path = os.path.join(config_folder, f"{target_lang}-{source_lang}.params") | |
fwd_T, fwd_m = self.__read_err(os.path.join(config_folder, f"{source_lang}-{target_lang}.err")) | |
rev_T, rev_m = self.__read_err(os.path.join(config_folder, f"{target_lang}-{source_lang}.err")) | |
self.forward_alignment_file_path = os.path.join(temp_folder, "forward.align") | |
self.reverse_alignment_file_path = os.path.join(temp_folder, "reverse.align") | |
if platform.system().lower() == "windows": | |
fastalign_bin = "fast_align.exe" | |
atools_bin = "atools.exe" | |
else: | |
fastalign_bin = "./fast_align" | |
atools_bin = "./atools" | |
self.temp_file_path = os.path.join(temp_folder, "tokenized_sentences.txt") | |
self.forward_command = [fastalign_bin, "-i", self.temp_file_path, "-d", "-T", fwd_T, "-m", fwd_m, "-f", | |
forward_params_path] | |
self.reverse_command = [fastalign_bin, "-i", self.temp_file_path, "-d", "-T", rev_T, "-m", rev_m, "-f", | |
reverse_params_path, "r"] | |
self.symmetric_command = [atools_bin, "-i", self.forward_alignment_file_path, "-j", | |
self.reverse_alignment_file_path, "-c", "grow-diag-final-and"] | |
def __simplify_alignment_file(self, file): | |
with fileinput.FileInput(file, inplace=True, backup='.bak') as f: | |
for line in f: | |
print(line.split('|||')[2].strip()) | |
def __read_err(self, err): | |
(T, m) = ('', '') | |
for line in open(err): | |
# expected target length = source length * N | |
if 'expected target length' in line: | |
m = line.split()[-1] | |
# final tension: N | |
elif 'final tension' in line: | |
T = line.split()[-1] | |
return T, m | |
def align(self, original_sentences, translated_sentences): | |
# create temporary file which fastalign will use | |
with open(self.temp_file_path, "w") as temp_file: | |
for original, translated in zip(original_sentences, translated_sentences): | |
temp_file.write(f"{original} ||| {translated}\n") | |
# generate forward alignment | |
with open(self.forward_alignment_file_path, 'w') as f_out, open(self.reverse_alignment_file_path, 'w') as r_out: | |
fw_process = Popen(self.forward_command, stdout=f_out) | |
# generate reverse alignment | |
r_process = Popen(self.reverse_command, stdout=r_out) | |
# wait for both to finish | |
fw_process.wait() | |
r_process.wait() | |
# for some reason the output file contains more information than needed, remove it | |
self.__simplify_alignment_file(self.forward_alignment_file_path) | |
self.__simplify_alignment_file(self.reverse_alignment_file_path) | |
# generate symmetrical alignment | |
process = Popen(self.symmetric_command, stdin=PIPE, stdout=PIPE, stderr=PIPE) | |
process.wait() | |
# get final alignments and format them | |
alignments_str = process.communicate()[0].decode('utf-8') | |
alignments = [] | |
for line in alignments_str.splitlines(): | |
alignments.append([(int(i), int(j)) for i, j in [pair.split("-") for pair in line.strip("\n").split(" ")]]) | |
return alignments | |
# Function to extract paragraphs with their runs | |
def extract_paragraphs_with_runs(doc): | |
paragraphs_with_runs = [] | |
for idx, paragraph in enumerate(doc.paragraphs): | |
runs = [] | |
for item in paragraph.iter_inner_content(): | |
if isinstance(item, Run): | |
runs.append({ | |
'text': item.text, | |
'bold': item.bold, | |
'italic': item.italic, | |
'underline': item.underline, | |
'font_name': item.font.name, | |
'font_size': item.font.size, | |
'font_color': item.font.color.rgb, | |
'paragraph_index': idx | |
}) | |
elif isinstance(item, Hyperlink): | |
runs.append({ | |
'text': item.runs[0].text, | |
'bold': item.runs[0].bold, | |
'italic': item.runs[0].italic, | |
'underline': item.runs[0].underline, | |
'font_name': item.runs[0].font.name, | |
'font_size': item.runs[0].font.size, | |
'font_color': item.runs[0].font.color.rgb, | |
'paragraph_index': idx | |
}) | |
paragraphs_with_runs.append(runs) | |
return paragraphs_with_runs | |
def tokenize_with_runs(runs, detokenizer): | |
text_paragraph = detokenizer.detokenize([run["text"] for run in runs]) | |
sentences = sent_tokenize(text_paragraph) | |
tokenized_sentences = [word_tokenize(sentence) for sentence in sentences] | |
tokens_with_style = [] | |
for run in runs: | |
tokens = word_tokenize(run["text"]) | |
for token in tokens: | |
tokens_with_style.append(run.copy()) | |
tokens_with_style[-1]["text"] = token | |
token_index = 0 | |
tokenized_sentences_with_style = [] | |
for sentence in tokenized_sentences: | |
sentence_with_style = [] | |
for word in sentence: | |
if word == tokens_with_style[token_index]["text"]: | |
sentence_with_style.append(tokens_with_style[token_index]) | |
token_index += 1 | |
else: | |
if word.startswith(tokens_with_style[token_index]["text"]): | |
# this token might be split into several runs | |
word_left = word | |
while word_left: | |
sentence_with_style.append(tokens_with_style[token_index]) | |
word_left = word_left.removeprefix(tokens_with_style[token_index]["text"]) | |
token_index += 1 | |
else: | |
raise "Something unexpected happened I'm afraid" | |
tokenized_sentences_with_style.append(sentence_with_style) | |
return tokenized_sentences_with_style | |
def generate_alignments(original_paragraphs_with_runs, translated_paragraphs, aligner, temp_folder, detokenizer): | |
# clean temp folder | |
for f in os.listdir(temp_folder): | |
os.remove(os.path.join(temp_folder, f)) | |
# tokenize the original text by sentence and words while keeping the style | |
original_tokenized_sentences_with_style = [tokenize_with_runs(runs, detokenizer) for runs in | |
original_paragraphs_with_runs] | |
# flatten all the runs so we can align with just one call instead of one per paragraph | |
original_tokenized_sentences_with_style = [item for sublist in original_tokenized_sentences_with_style for item in | |
sublist] | |
# tokenize the translated text by sentence and word | |
translated_tokenized_sentences = [word_tokenize(sentence) for | |
translated_paragraph in translated_paragraphs for sentence in | |
sent_tokenize(translated_paragraph)] | |
original_sentences = [] | |
translated_sentences = [] | |
for original, translated in zip(original_tokenized_sentences_with_style, translated_tokenized_sentences): | |
original_sentences.append(' '.join(item['text'] for item in original)) | |
translated_sentences.append(' '.join(translated)) | |
alignments = aligner.align(original_sentences, translated_sentences) | |
# using the alignments generated by fastalign, we need to copy the style of the original token to the translated one | |
translated_sentences_with_style = [] | |
for sentence_idx, sentence_alignments in enumerate(alignments): | |
# reverse the order of the alignments and build a dict with it | |
sentence_alignments = {target: source for source, target in sentence_alignments} | |
translated_sentence_with_style = [] | |
for token_idx, translated_token in enumerate(translated_tokenized_sentences[sentence_idx]): | |
# fastalign has found a token aligned with the translated one | |
if token_idx in sentence_alignments.keys(): | |
# get the aligned token | |
original_idx = sentence_alignments[token_idx] | |
new_entry = original_tokenized_sentences_with_style[sentence_idx][original_idx].copy() | |
new_entry["text"] = translated_token | |
translated_sentence_with_style.append(new_entry) | |
else: | |
# WARNING this is a test | |
# since fastalign doesn't know from which word to reference this token, copy the style of the previous word | |
new_entry = translated_sentence_with_style[-1].copy() | |
new_entry["text"] = translated_token | |
translated_sentence_with_style.append(new_entry) | |
translated_sentences_with_style.append(translated_sentence_with_style) | |
return translated_sentences_with_style | |
# group contiguous elements with the same boolean values | |
def group_by_style(values, detokenizer): | |
groups = [] | |
for key, group in groupby(values, key=lambda x: ( | |
x['bold'], x['italic'], x['underline'], x['font_name'], x['font_size'], x['font_color'], | |
x['paragraph_index'])): | |
text = detokenizer.detokenize([item['text'] for item in group]) | |
if groups and not text.startswith((",", ";", ":", ".", ")")): | |
text = " " + text | |
groups.append({"text": text, | |
"bold": key[0], | |
"italic": key[1], | |
"underline": key[2], | |
"font_name": key[3], | |
"font_size": key[4], | |
"font_color": key[5], | |
'paragraph_index': key[6]}) | |
return groups | |
def preprocess_runs(runs_in_paragraph): | |
new_runs = [] | |
for run in runs_in_paragraph: | |
# sometimes the parameters are False and sometimes they are None, set them all to False | |
for key, value in run.items(): | |
if value is None and not key.startswith("font"): | |
run[key] = False | |
if not new_runs: | |
new_runs.append(run) | |
else: | |
# if the previous run has the same format as the current run, we merge the two runs together | |
if (new_runs[-1]["bold"] == run["bold"] and new_runs[-1]["font_color"] == run["font_color"] and | |
new_runs[-1]["font_color"] == run["font_color"] and new_runs[-1]["font_name"] == run["font_name"] | |
and new_runs[-1]["font_size"] == run["font_size"] and new_runs[-1]["italic"] == run["italic"] | |
and new_runs[-1]["underline"] == run["underline"] | |
and new_runs[-1]["paragraph_index"] == run["paragraph_index"]): | |
new_runs[-1]["text"] += run["text"] | |
else: | |
new_runs.append(run) | |
# we want to split runs that contain more than one sentence to avoid problems later when aligning styles | |
sentences = sent_tokenize(new_runs[-1]["text"]) | |
if len(sentences) > 1: | |
new_runs[-1]["text"] = sentences[0] | |
for sentence in sentences[1:]: | |
new_run = new_runs[-1].copy() | |
new_run["text"] = sentence | |
new_runs.append(new_run) | |
return new_runs | |
def translate_document(input_file, | |
aligner, | |
detokenizer, | |
ip="192.168.20.216", | |
temp_folder="tmp", | |
port="8000"): | |
os.makedirs(temp_folder, exist_ok=True) | |
# load original file, extract the paragraphs with their runs (which include style and formatting) | |
doc = Document(input_file) | |
paragraphs_with_runs = extract_paragraphs_with_runs(doc) | |
# translate each paragraph | |
translated_paragraphs = [] | |
for paragraph in tqdm.tqdm(paragraphs_with_runs, desc="Translating paragraphs..."): | |
paragraph_text = detokenizer.detokenize([run["text"] for run in paragraph]) | |
translated_paragraphs.append(translate(paragraph_text, ip, port)) | |
out_doc = Document() | |
processed_original_paragraphs_with_runs = [preprocess_runs(runs) for runs in paragraphs_with_runs] | |
print("Generating alignments...") | |
translated_sentences_with_style = generate_alignments(processed_original_paragraphs_with_runs, | |
translated_paragraphs, aligner, | |
temp_folder, detokenizer) | |
print("Finished alignments") | |
# flatten the sentences into a list of tokens | |
translated_tokens_with_style = [item for sublist in translated_sentences_with_style for item in sublist] | |
# group the tokens by style/run | |
translated_runs_with_style = group_by_style(translated_tokens_with_style, detokenizer) | |
print("Grouped by style") | |
# group the runs by original paragraph | |
translated_paragraphs_with_style = defaultdict(list) | |
for item in translated_runs_with_style: | |
translated_paragraphs_with_style[item['paragraph_index']].append(item) | |
for paragraph_index, original_paragraph in enumerate(doc.paragraphs): | |
# in case there are empty paragraphs | |
if not original_paragraph.text: | |
out_doc.add_paragraph(style=original_paragraph.style) | |
continue | |
para = out_doc.add_paragraph(style=original_paragraph.style) | |
for item in translated_paragraphs_with_style[paragraph_index]: | |
run = para.add_run(item["text"]) | |
# Preserve original run formatting | |
run.bold = item['bold'] | |
run.italic = item['italic'] | |
run.underline = item['underline'] | |
run.font.name = item['font_name'] | |
run.font.size = item['font_size'] | |
run.font.color.rgb = item['font_color'] | |
out_doc.save("translated.docx") | |
print("Saved file") | |
return "translated.docx" | |