import os from collections import defaultdict from docx import Document from docx.text.hyperlink import Hyperlink from docx.text.run import Run import nltk nltk.download('punkt') nltk.download('punkt_tab') from nltk.tokenize import sent_tokenize, word_tokenize from nltk.tokenize.treebank import TreebankWordDetokenizer from subprocess import Popen, PIPE from itertools import groupby import fileinput from datetime import datetime from transformers import AutoTokenizer, AutoModelForCausalLM import torch from iso639 import languages import tqdm class Translator(): def __init__(self, model_path, source_lang, target_lang): self.tokenizer = AutoTokenizer.from_pretrained(model_path) self.model = AutoModelForCausalLM.from_pretrained( model_path, device_map="auto", torch_dtype=torch.bfloat16 ) self.prompt_f = lambda x: (f"Translate the following text from {source_lang} into " f"{target_lang}.\n{source_lang}: {x} \n{target_lang}:") def translate(self, text): message = [{"role": "user", "content": self.prompt_f(text)}] date_string = datetime.today().strftime('%Y-%m-%d') prompt = self.tokenizer.apply_chat_template( message, tokenize=False, add_generation_prompt=True, date_string=date_string ) inputs = self.tokenizer.encode(prompt, add_special_tokens=False, return_tensors="pt") input_length = inputs.shape[1] outputs = self.model.generate(input_ids=inputs.to(self.model.device), max_new_tokens=400, early_stopping=True, num_beams=5) return self.tokenizer.decode(outputs[0, input_length:], skip_special_tokens=True) # Class to align original and translated sentences # based on https://github.com/mtuoc/MTUOC-server/blob/main/GetWordAlignments_fast_align.py class Aligner(): def __init__(self, config_folder, source_lang, target_lang, temp_folder): forward_params_path = os.path.join(config_folder, f"{source_lang}-{target_lang}.params") reverse_params_path = os.path.join(config_folder, f"{target_lang}-{source_lang}.params") fwd_T, fwd_m = self.__read_err(os.path.join(config_folder, f"{source_lang}-{target_lang}.err")) rev_T, rev_m = self.__read_err(os.path.join(config_folder, f"{target_lang}-{source_lang}.err")) self.forward_alignment_file_path = os.path.join(temp_folder, "forward.align") self.reverse_alignment_file_path = os.path.join(temp_folder, "reverse.align") self.forward_command = lambda \ x: f'./fast_align -i {x} -d -T {fwd_T} -m {fwd_m} -f {forward_params_path} > {self.forward_alignment_file_path}' self.reverse_command = lambda \ x: f'./fast_align -i {x} -d -T {rev_T} -m {rev_m} -f {reverse_params_path} -r > {self.reverse_alignment_file_path}' self.symmetric_command = f'./atools -i {self.forward_alignment_file_path} -j {self.reverse_alignment_file_path} -c grow-diag-final-and' def __simplify_alignment_file(self, file): with fileinput.FileInput(file, inplace=True, backup='.bak') as f: for line in f: print(line.split('|||')[2].strip()) def __read_err(self, err): (T, m) = ('', '') for line in open(err): # expected target length = source length * N if 'expected target length' in line: m = line.split()[-1] # final tension: N elif 'final tension' in line: T = line.split()[-1] return T, m def align(self, file): # generate forward alignment process = Popen(self.forward_command(file), shell=True) process.wait() # generate reverse alignment process = Popen(self.reverse_command(file), shell=True) process.wait() # for some reason the output file contains more information than needed, remove it self.__simplify_alignment_file(self.forward_alignment_file_path) self.__simplify_alignment_file(self.reverse_alignment_file_path) # generate symmetrical alignment process = Popen(self.symmetric_command, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE) process.wait() # get final alignments and format them alignments_str = process.communicate()[0].decode('utf-8') alignments = [] for line in alignments_str.splitlines(): alignments.append([(int(i), int(j)) for i, j in [pair.split("-") for pair in line.strip("\n").split(" ")]]) return alignments # Function to extract paragraphs with their runs def extract_paragraphs_with_runs(doc): paragraphs_with_runs = [] for idx, paragraph in enumerate(doc.paragraphs): runs = [] for item in paragraph.iter_inner_content(): if isinstance(item, Run): runs.append({ 'text': item.text, 'bold': item.bold, 'italic': item.italic, 'underline': item.underline, 'font_name': item.font.name, 'font_size': item.font.size, 'font_color': item.font.color.rgb, 'paragraph_index': idx }) elif isinstance(item, Hyperlink): runs.append({ 'text': item.runs[0].text, 'bold': item.runs[0].bold, 'italic': item.runs[0].italic, 'underline': item.runs[0].underline, 'font_name': item.runs[0].font.name, 'font_size': item.runs[0].font.size, 'font_color': item.runs[0].font.color.rgb, 'paragraph_index': idx }) paragraphs_with_runs.append(runs) return paragraphs_with_runs def tokenize_paragraph_with_runs2(runs_in_paragraph): text_paragraph = " ".join(run["text"] for run in runs_in_paragraph) sentences = sent_tokenize(text_paragraph) tokenized_sentences = [word_tokenize(sentence) for sentence in sentences] tokenized_sentences_with_style = [] for tokenized_sentence in tokenized_sentences: tokenized_sentence_with_style = [] token_idx = 0 for run in runs_in_paragraph: text_in_run = run["text"].strip() if text_in_run == tokenized_sentence[token_idx]: new_run = run.copy() new_run["text"] = text_in_run tokenized_sentence_with_style.append(new_run) token_idx += 1 if token_idx >= len(tokenized_sentence): break elif len(text_in_run) > len(tokenized_sentence[token_idx]): if text_in_run.startswith(tokenized_sentence[token_idx]): for token in word_tokenize(text_in_run): if token == tokenized_sentence[token_idx]: new_run = run.copy() new_run["text"] = token tokenized_sentence_with_style.append(new_run) token_idx += 1 else: raise "oops" tokenized_sentences_with_style.append(tokenized_sentence_with_style) return tokenized_sentences_with_style def tokenize_with_runs(runs, detokenizer): text_paragraph = detokenizer.detokenize([run["text"] for run in runs]) sentences = sent_tokenize(text_paragraph) tokenized_sentences = [word_tokenize(sentence) for sentence in sentences] tokens_with_style = [] for run in runs: tokens = word_tokenize(run["text"]) for token in tokens: tokens_with_style.append(run.copy()) tokens_with_style[-1]["text"] = token token_index = 0 tokenized_sentences_with_style = [] for sentence in tokenized_sentences: sentence_with_style = [] for word in sentence: if word == tokens_with_style[token_index]["text"]: sentence_with_style.append(tokens_with_style[token_index]) token_index += 1 else: if word.startswith(tokens_with_style[token_index]["text"]): # this token might be split into several runs word_left = word while word_left: sentence_with_style.append(tokens_with_style[token_index]) word_left = word_left.removeprefix(tokens_with_style[token_index]["text"]) token_index += 1 else: raise "oops" tokenized_sentences_with_style.append(sentence_with_style) return tokenized_sentences_with_style def generate_alignments(original_paragraphs_with_runs, translated_paragraphs, aligner, temp_folder, detokenizer): # clean temp folder for f in os.listdir(temp_folder): os.remove(os.path.join(temp_folder, f)) temp_file_path = os.path.join(temp_folder, "tokenized_sentences.txt") # tokenize the original text by sentence and words while keeping the style original_tokenized_sentences_with_style = [tokenize_with_runs(runs, detokenizer) for runs in original_paragraphs_with_runs] # flatten all the runs so we can align with just one call instead of one per paragraph original_tokenized_sentences_with_style = [item for sublist in original_tokenized_sentences_with_style for item in sublist] # tokenize the translated text by sentence and word translated_tokenized_sentences = [word_tokenize(sentence) for translated_paragraph in translated_paragraphs for sentence in sent_tokenize(translated_paragraph)] # write the file that fastalign will use with open(temp_file_path, "w") as out_file: for original, translated in zip(original_tokenized_sentences_with_style, translated_tokenized_sentences): out_file.write(f"{" ".join(item["text"] for item in original)} ||| {" ".join(translated)}\n") alignments = aligner.align(temp_file_path) # using the alignments generated by fastalign, we need to copy the style of the original token to the translated one translated_sentences_with_style = [] for sentence_idx, sentence_alignments in enumerate(alignments): # reverse the order of the alignments and build a dict with it sentence_alignments = {target: source for source, target in sentence_alignments} translated_sentence_with_style = [] for token_idx, translated_token in enumerate(translated_tokenized_sentences[sentence_idx]): # fastalign has found a token aligned with the translated one if token_idx in sentence_alignments.keys(): # get the aligned token original_idx = sentence_alignments[token_idx] new_entry = original_tokenized_sentences_with_style[sentence_idx][original_idx].copy() new_entry["text"] = translated_token translated_sentence_with_style.append(new_entry) else: # WARNING this is a test # since fastalign doesn't know from which word to reference this token, copy the style of the previous word new_entry = translated_sentence_with_style[-1].copy() new_entry["text"] = translated_token translated_sentence_with_style.append(new_entry) translated_sentences_with_style.append(translated_sentence_with_style) return translated_sentences_with_style # group contiguous elements with the same boolean values def group_by_style(values, detokenizer): groups = [] for key, group in groupby(values, key=lambda x: ( x['bold'], x['italic'], x['underline'], x['font_name'], x['font_size'], x['font_color'], x['paragraph_index'])): text = detokenizer.detokenize([item['text'] for item in group]) if groups and not text.startswith((",", ";", ":", ".", ")")): text = " " + text groups.append({"text": text, "bold": key[0], "italic": key[1], "underline": key[2], "font_name": key[3], "font_size": key[4], "font_color": key[5], 'paragraph_index': key[6]}) return groups def preprocess_runs(runs_in_paragraph): new_runs = [] for run in runs_in_paragraph: # sometimes the parameters are False and sometimes they are None, set them all to False for key, value in run.items(): if value is None and not key.startswith("font"): run[key] = False if not new_runs: new_runs.append(run) else: # if the previous run has the same format as the current run, we merge the two runs together if (new_runs[-1]["bold"] == run["bold"] and new_runs[-1]["font_color"] == run["font_color"] and new_runs[-1]["font_color"] == run["font_color"] and new_runs[-1]["font_name"] == run["font_name"] and new_runs[-1]["font_size"] == run["font_size"] and new_runs[-1]["italic"] == run["italic"] and new_runs[-1]["underline"] == run["underline"] and new_runs[-1]["paragraph_index"] == run["paragraph_index"]): new_runs[-1]["text"] += run["text"] else: new_runs.append(run) # we want to split runs that contain more than one sentence to avoid problems later when aligning styles sentences = sent_tokenize(new_runs[-1]["text"]) if len(sentences) > 1: new_runs[-1]["text"] = sentences[0] for sentence in sentences[1:]: new_run = new_runs[-1].copy() new_run["text"] = sentence new_runs.append(new_run) return new_runs if __name__ == "__main__": input_file = 'data/test3.docx' output_file = 'data/translated_output.docx' source_lang = 'ca' target_lang = 'en' config_folder = "fast_align_config" temp_folder = "tmp" aligner = Aligner(config_folder, source_lang, target_lang, temp_folder) os.makedirs(temp_folder, exist_ok=True) # load original file, extract the paragraphs with their runs (which include style and formatting) doc = Document(input_file) paragraphs_with_runs = extract_paragraphs_with_runs(doc) detokenizer = TreebankWordDetokenizer() translator = Translator("BSC-LT/salamandraTA-7b-instruct", languages.get(alpha2=source_lang).name, languages.get(alpha2=target_lang).name) # translate each paragraph translated_paragraphs = [] for paragraph in tqdm.tqdm(paragraphs_with_runs, desc="Translating paragraphs..."): paragraph_text = detokenizer.detokenize([run["text"] for run in paragraph]) translated_paragraphs.append(translator.translate(paragraph_text)) print(translated_paragraphs) out_doc = Document() processed_original_paragraphs_with_runs = [preprocess_runs(runs) for runs in paragraphs_with_runs] translated_sentences_with_style = generate_alignments(processed_original_paragraphs_with_runs, translated_paragraphs, aligner, temp_folder, detokenizer) # flatten the sentences into a list of tokens translated_tokens_with_style = [item for sublist in translated_sentences_with_style for item in sublist] # group the tokens by style/run translated_runs_with_style = group_by_style(translated_tokens_with_style, detokenizer) # group the runs by original paragraph translated_paragraphs_with_style = defaultdict(list) for item in translated_runs_with_style: translated_paragraphs_with_style[item['paragraph_index']].append(item) for paragraph_index, original_paragraph in enumerate(doc.paragraphs): # in case there are empty paragraphs if not original_paragraph.text: out_doc.add_paragraph(style=original_paragraph.style) continue para = out_doc.add_paragraph(style=original_paragraph.style) for item in translated_paragraphs_with_style[paragraph_index]: run = para.add_run(item["text"]) # Preserve original run formatting run.bold = item['bold'] run.italic = item['italic'] run.underline = item['underline'] run.font.name = item['font_name'] run.font.size = item['font_size'] run.font.color.rgb = item['font_color'] out_doc.save(output_file)