|
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request |
|
from fastapi.responses import HTMLResponse, JSONResponse |
|
from fastapi.staticfiles import StaticFiles |
|
from fastapi.templating import Jinja2Templates |
|
from typing import List, Optional, Dict |
|
from pydantic import BaseModel |
|
import os |
|
import requests |
|
import json |
|
import traceback |
|
import io |
|
import concurrent.futures |
|
import subprocess |
|
import sys |
|
import time |
|
|
|
|
|
class TranslationRequest(BaseModel): |
|
text: str |
|
source_lang: str |
|
target_lang: str |
|
|
|
|
|
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline |
|
import torch |
|
|
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
TEMPLATE_DIR = os.path.join(os.path.dirname(BASE_DIR), "templates") |
|
STATIC_DIR = os.path.join(os.path.dirname(BASE_DIR), "static") |
|
UPLOADS_DIR = os.path.join(os.path.dirname(BASE_DIR), "uploads") |
|
|
|
|
|
os.makedirs(UPLOADS_DIR, exist_ok=True) |
|
|
|
|
|
app = FastAPI(title="Tarjama Translation API") |
|
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") |
|
templates = Jinja2Templates(directory=TEMPLATE_DIR) |
|
|
|
|
|
LANGUAGE_MAP = { |
|
"ar": "Arabic", |
|
"en": "English", |
|
"fr": "French", |
|
"es": "Spanish", |
|
"de": "German", |
|
"zh": "Chinese", |
|
"ru": "Russian", |
|
"ja": "Japanese", |
|
"hi": "Hindi", |
|
"pt": "Portuguese", |
|
"tr": "Turkish", |
|
"ko": "Korean", |
|
"it": "Italian", |
|
"nl": "Dutch", |
|
"sv": "Swedish", |
|
"fi": "Finnish", |
|
"pl": "Polish", |
|
"he": "Hebrew", |
|
"id": "Indonesian", |
|
"uk": "Ukrainian", |
|
"cs": "Czech", |
|
"auto": "Detect Language" |
|
} |
|
|
|
|
|
|
|
|
|
os.environ['TRANSFORMERS_CACHE'] = '/tmp/transformers_cache' |
|
os.environ['HF_HOME'] = '/tmp/hf_home' |
|
os.environ['XDG_CACHE_HOME'] = '/tmp/cache' |
|
|
|
|
|
|
|
translation_models: Dict[str, Dict] = { |
|
"en-ar": { |
|
"model": None, |
|
"tokenizer": None, |
|
"translator": None, |
|
"model_name": "Helsinki-NLP/opus-mt-en-ar", |
|
}, |
|
"ar-en": { |
|
"model": None, |
|
"tokenizer": None, |
|
"translator": None, |
|
"model_name": "Helsinki-NLP/opus-mt-ar-en", |
|
}, |
|
|
|
"en-fr": { |
|
"model": None, |
|
"tokenizer": None, |
|
"translator": None, |
|
"model_name": "Helsinki-NLP/opus-mt-en-fr", |
|
}, |
|
"fr-en": { |
|
"model": None, |
|
"tokenizer": None, |
|
"translator": None, |
|
"model_name": "Helsinki-NLP/opus-mt-fr-en", |
|
}, |
|
"en-es": { |
|
"model": None, |
|
"tokenizer": None, |
|
"translator": None, |
|
"model_name": "Helsinki-NLP/opus-mt-en-es", |
|
}, |
|
"es-en": { |
|
"model": None, |
|
"tokenizer": None, |
|
"translator": None, |
|
"model_name": "Helsinki-NLP/opus-mt-es-en", |
|
}, |
|
"en-de": { |
|
"model": None, |
|
"tokenizer": None, |
|
"translator": None, |
|
"model_name": "Helsinki-NLP/opus-mt-en-de", |
|
}, |
|
"de-en": { |
|
"model": None, |
|
"tokenizer": None, |
|
"translator": None, |
|
"model_name": "Helsinki-NLP/opus-mt-de-en", |
|
}, |
|
"ar-fr": { |
|
"model": None, |
|
"tokenizer": None, |
|
"translator": None, |
|
"model_name": "Helsinki-NLP/opus-mt-ar-fr", |
|
}, |
|
"fr-ar": { |
|
"model": None, |
|
"tokenizer": None, |
|
"translator": None, |
|
"model_name": "Helsinki-NLP/opus-mt-fr-ar", |
|
}, |
|
|
|
|
|
} |
|
|
|
model_initialization_attempts = 0 |
|
max_model_initialization_attempts = 3 |
|
last_initialization_attempt = 0 |
|
initialization_cooldown = 300 |
|
|
|
|
|
def initialize_model(language_pair: str): |
|
"""Initialize a specific translation model and tokenizer for a language pair.""" |
|
global translation_models, model_initialization_attempts, last_initialization_attempt |
|
|
|
|
|
if language_pair not in translation_models: |
|
print(f"Unsupported language pair: {language_pair}") |
|
return False |
|
|
|
|
|
current_time = time.time() |
|
if (model_initialization_attempts >= max_model_initialization_attempts and |
|
current_time - last_initialization_attempt < initialization_cooldown): |
|
print(f"Maximum initialization attempts reached. Waiting for cooldown period.") |
|
return False |
|
|
|
|
|
model_initialization_attempts += 1 |
|
last_initialization_attempt = current_time |
|
|
|
try: |
|
model_info = translation_models[language_pair] |
|
model_name = model_info["model_name"] |
|
|
|
print(f"Initializing model and tokenizer for {language_pair} using {model_name} (attempt {model_initialization_attempts})...") |
|
|
|
|
|
device = "cpu" |
|
if torch.cuda.is_available(): |
|
device = "cuda" |
|
print(f"CUDA is available: {torch.cuda.get_device_name(0)}") |
|
print(f"Device set to use: {device}") |
|
|
|
|
|
try: |
|
tokenizer = AutoTokenizer.from_pretrained( |
|
model_name, |
|
cache_dir="/tmp/transformers_cache", |
|
use_fast=True, |
|
local_files_only=False |
|
) |
|
if tokenizer is None: |
|
print(f"Failed to load tokenizer for {language_pair}") |
|
return False |
|
print(f"Tokenizer for {language_pair} loaded successfully") |
|
translation_models[language_pair]["tokenizer"] = tokenizer |
|
except Exception as e: |
|
print(f"Error loading tokenizer for {language_pair}: {e}") |
|
return False |
|
|
|
|
|
try: |
|
model = AutoModelForSeq2SeqLM.from_pretrained( |
|
model_name, |
|
cache_dir="/tmp/transformers_cache", |
|
low_cpu_mem_usage=True, |
|
torch_dtype=torch.float32 |
|
) |
|
|
|
model = model.to(device) |
|
print(f"Model for {language_pair} loaded with PyTorch and moved to {device}") |
|
translation_models[language_pair]["model"] = model |
|
except Exception as e: |
|
print(f"Error loading model for {language_pair}: {e}") |
|
print(f"Model initialization for {language_pair} failed") |
|
return False |
|
|
|
|
|
try: |
|
|
|
translator = pipeline( |
|
"translation", |
|
model=model, |
|
tokenizer=tokenizer, |
|
device=0 if device == "cuda" else -1, |
|
framework="pt" |
|
) |
|
|
|
if translator is None: |
|
print(f"Failed to create translator pipeline for {language_pair}") |
|
return False |
|
|
|
|
|
source_lang, target_lang = language_pair.split('-') |
|
test_text = "hello world" if source_lang == "en" else "مرحبا بالعالم" |
|
test_result = translator(test_text, max_length=128) |
|
print(f"Model test result for {language_pair}: {test_result}") |
|
if not test_result or not isinstance(test_result, list) or len(test_result) == 0: |
|
print(f"Model test for {language_pair} failed: Invalid output format") |
|
return False |
|
|
|
translation_models[language_pair]["translator"] = translator |
|
|
|
|
|
model_initialization_attempts = 0 |
|
print(f"Model {model_name} for {language_pair} successfully initialized and tested") |
|
return True |
|
except Exception as inner_e: |
|
print(f"Error creating translation pipeline for {language_pair}: {inner_e}") |
|
traceback.print_exc() |
|
return False |
|
except Exception as e: |
|
print(f"Critical error initializing model for {language_pair}: {e}") |
|
traceback.print_exc() |
|
return False |
|
|
|
|
|
def get_language_pair(source_lang: str, target_lang: str): |
|
"""Determine the appropriate language pair and direction for translation.""" |
|
|
|
if source_lang == "auto": |
|
return None |
|
|
|
|
|
pair_key = f"{source_lang}-{target_lang}" |
|
if pair_key in translation_models: |
|
return pair_key |
|
|
|
|
|
return None |
|
|
|
|
|
def detect_language(text: str) -> str: |
|
"""Detect the language of the input text and return the language code.""" |
|
try: |
|
|
|
from langdetect import detect |
|
|
|
try: |
|
detected_lang = detect(text) |
|
print(f"Language detected using langdetect: {detected_lang}") |
|
|
|
|
|
lang_map = { |
|
"ar": "ar", "en": "en", "fr": "fr", "es": "es", "de": "de", |
|
"zh-cn": "zh", "zh-tw": "zh", "ru": "ru", "ja": "ja", |
|
"hi": "hi", "pt": "pt", "tr": "tr", "ko": "ko", |
|
"it": "it", "nl": "nl", "sv": "sv", "fi": "fi", |
|
"pl": "pl", "he": "he", "id": "id", "uk": "uk", "cs": "cs" |
|
} |
|
|
|
|
|
return lang_map.get(detected_lang, "en") |
|
except Exception as e: |
|
print(f"Error with langdetect: {e}") |
|
|
|
except ImportError: |
|
print("langdetect library not available, using basic detection") |
|
|
|
|
|
if len(text) < 10: |
|
return "en" |
|
|
|
|
|
arabic_count = sum(1 for c in text if '\u0600' <= c <= '\u06FF') |
|
chinese_count = sum(1 for c in text if '\u4e00' <= c <= '\u9fff') |
|
japanese_count = sum(1 for c in text if '\u3040' <= c <= '\u30ff') |
|
cyrillic_count = sum(1 for c in text if '\u0400' <= c <= '\u04FF') |
|
hebrew_count = sum(1 for c in text if '\u0590' <= c <= '\u05FF') |
|
|
|
|
|
text_len = len(text) |
|
arabic_ratio = arabic_count / text_len |
|
chinese_ratio = chinese_count / text_len |
|
japanese_ratio = japanese_count / text_len |
|
cyrillic_ratio = cyrillic_count / text_len |
|
hebrew_ratio = hebrew_count / text_len |
|
|
|
|
|
if arabic_ratio > 0.3: |
|
return "ar" |
|
elif chinese_ratio > 0.3: |
|
return "zh" |
|
elif japanese_ratio > 0.3: |
|
return "ja" |
|
elif cyrillic_ratio > 0.3: |
|
return "ru" |
|
elif hebrew_ratio > 0.3: |
|
return "he" |
|
|
|
|
|
return "en" |
|
|
|
|
|
def translate_text(text, source_lang, target_lang): |
|
"""Translate text using local model or fallback to online services.""" |
|
if not text: |
|
return "" |
|
|
|
print(f"Translation Request - Source Lang: {source_lang}, Target Lang: {target_lang}") |
|
|
|
|
|
language_pair = get_language_pair(source_lang, target_lang) |
|
|
|
|
|
if language_pair and language_pair in translation_models: |
|
model_info = translation_models[language_pair] |
|
translator = model_info["translator"] |
|
|
|
|
|
if not translator: |
|
success = initialize_model(language_pair) |
|
if not success: |
|
print(f"Local model initialization for {language_pair} failed, using fallback translation") |
|
return use_fallback_translation(text, source_lang, target_lang) |
|
|
|
translator = translation_models[language_pair]["translator"] |
|
|
|
try: |
|
|
|
text_to_translate = text |
|
print(f"Translating text with local model (first 50 chars): {text_to_translate[:50]}...") |
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
future = executor.submit( |
|
lambda: translator( |
|
text_to_translate, |
|
max_length=768 |
|
)[0]["translation_text"] |
|
) |
|
|
|
try: |
|
|
|
result = future.result(timeout=15) |
|
|
|
|
|
if target_lang == "ar": |
|
result = culturally_adapt_arabic(result) |
|
|
|
print(f"Translation successful (first 50 chars): {result[:50]}...") |
|
return result |
|
except concurrent.futures.TimeoutError: |
|
print(f"Model inference timed out after 15 seconds, falling back to online translation") |
|
return use_fallback_translation(text, source_lang, target_lang) |
|
except Exception as e: |
|
print(f"Error during model inference: {e}") |
|
|
|
|
|
initialize_model(language_pair) |
|
return use_fallback_translation(text, source_lang, target_lang) |
|
except Exception as e: |
|
print(f"Error using local model for {language_pair}: {e}") |
|
traceback.print_exc() |
|
return use_fallback_translation(text, source_lang, target_lang) |
|
else: |
|
|
|
print(f"No local model for {source_lang} to {target_lang}, using fallback translation") |
|
return use_fallback_translation(text, source_lang, target_lang) |
|
|
|
def culturally_adapt_arabic(text: str) -> str: |
|
"""Apply post-processing rules to enhance Arabic translation with cultural sensitivity.""" |
|
|
|
text = text.replace('?', '؟').replace(';', '؛').replace(',', '،') |
|
|
|
|
|
common_prefixes = [ |
|
"الترجمة:", "ترجمة:", "النص المترجم:", |
|
"Translation:", "Arabic translation:" |
|
] |
|
for prefix in common_prefixes: |
|
if text.startswith(prefix): |
|
text = text[len(prefix):].strip() |
|
|
|
|
|
|
|
return text |
|
|
|
|
|
def check_and_reinitialize_model(language_pair: str): |
|
"""Check if model needs to be reinitialized and do so if necessary""" |
|
global translation_models |
|
|
|
if language_pair not in translation_models: |
|
print(f"Unsupported language pair: {language_pair}") |
|
return False |
|
|
|
model_info = translation_models[language_pair] |
|
translator = model_info["translator"] |
|
|
|
try: |
|
|
|
if not translator: |
|
print(f"Model for {language_pair} not initialized. Attempting initialization...") |
|
return initialize_model(language_pair) |
|
|
|
|
|
source_lang, target_lang = language_pair.split('-') |
|
test_text = "hello" if source_lang == "en" else "مرحبا" |
|
result = translator(test_text, max_length=128) |
|
|
|
|
|
if result and isinstance(result, list) and len(result) > 0: |
|
print(f"Model check for {language_pair}: Model is functioning correctly.") |
|
return True |
|
else: |
|
print(f"Model check for {language_pair}: Model returned invalid result. Reinitializing...") |
|
return initialize_model(language_pair) |
|
except Exception as e: |
|
print(f"Error checking model status for {language_pair}: {e}") |
|
print("Model may be in a bad state. Attempting reinitialization...") |
|
return initialize_model(language_pair) |
|
|
|
def use_fallback_translation(text, source_lang, target_lang): |
|
"""Use various fallback online translation services.""" |
|
print("Using fallback translation...") |
|
|
|
|
|
try: |
|
print("Attempting fallback with Google Translate (no API key)") |
|
from googletrans import Translator |
|
google_translator = Translator(service_urls=['translate.google.com', 'translate.google.co.kr']) |
|
result = google_translator.translate(text, src=source_lang, dest=target_lang) |
|
if result and result.text: |
|
print("Google Translate successful!") |
|
return result.text |
|
except Exception as e: |
|
print(f"Error with Google Translate fallback: {str(e)}") |
|
|
|
|
|
libre_servers = [ |
|
"https://translate.terraprint.co/translate", |
|
"https://libretranslate.de/translate", |
|
"https://translate.argosopentech.com/translate", |
|
"https://translate.fedilab.app/translate", |
|
"https://trans.zillyhuhn.com/translate" |
|
] |
|
|
|
|
|
for server in libre_servers: |
|
try: |
|
print(f"Attempting fallback translation using LibreTranslate: {server}") |
|
headers = { |
|
"Content-Type": "application/json" |
|
} |
|
payload = { |
|
"q": text, |
|
"source": source_lang, |
|
"target": target_lang |
|
} |
|
|
|
|
|
response = requests.post(server, json=payload, headers=headers, timeout=10) |
|
|
|
if response.status_code == 200: |
|
result = response.json() |
|
if "translatedText" in result: |
|
print(f"LibreTranslate successful using {server}") |
|
return result["translatedText"] |
|
except Exception as e: |
|
print(f"Error with LibreTranslate {server}: {str(e)}") |
|
continue |
|
|
|
|
|
try: |
|
print("Attempting fallback with MyMemory Translation API") |
|
url = "https://api.mymemory.translated.net/get" |
|
params = { |
|
"q": text, |
|
"langpair": f"{source_lang}|{target_lang}", |
|
} |
|
response = requests.get(url, params=params, timeout=10) |
|
if response.status_code == 200: |
|
data = response.json() |
|
if data and data.get("responseData") and data["responseData"].get("translatedText"): |
|
print("MyMemory translation successful!") |
|
return data["responseData"]["translatedText"] |
|
except Exception as e: |
|
print(f"Error with MyMemory fallback: {str(e)}") |
|
|
|
|
|
print("All translation services failed. Returning error message.") |
|
return f"[Translation services unavailable] {text}" |
|
|
|
|
|
async def extract_text_from_file(file: UploadFile) -> str: |
|
"""Extracts text content from uploaded files without writing to disk.""" |
|
content = await file.read() |
|
file_extension = os.path.splitext(file.filename)[1].lower() |
|
extracted_text = "" |
|
|
|
try: |
|
if file_extension == '.txt': |
|
|
|
try: |
|
extracted_text = content.decode('utf-8') |
|
except UnicodeDecodeError: |
|
|
|
for encoding in ['latin-1', 'cp1252', 'utf-16']: |
|
try: |
|
extracted_text = content.decode(encoding) |
|
break |
|
except UnicodeDecodeError: |
|
continue |
|
elif file_extension == '.docx': |
|
try: |
|
import docx |
|
from io import BytesIO |
|
|
|
|
|
doc_stream = BytesIO(content) |
|
doc = docx.Document(doc_stream) |
|
extracted_text = '\n'.join([para.text for para in doc.paragraphs]) |
|
except ImportError: |
|
raise HTTPException(status_code=501, detail="DOCX processing requires 'python-docx' library") |
|
elif file_extension == '.pdf': |
|
try: |
|
import fitz |
|
from io import BytesIO |
|
|
|
|
|
pdf_stream = BytesIO(content) |
|
doc = fitz.open(stream=pdf_stream, filetype="pdf") |
|
|
|
page_texts = [] |
|
for page in doc: |
|
page_texts.append(page.get_text()) |
|
extracted_text = "\n".join(page_texts) |
|
doc.close() |
|
except ImportError: |
|
raise HTTPException(status_code=501, detail="PDF processing requires 'PyMuPDF' library") |
|
else: |
|
raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_extension}") |
|
|
|
print(f"Extracted text length: {len(extracted_text)}") |
|
return extracted_text |
|
|
|
except Exception as e: |
|
print(f"Error processing file {file.filename}: {e}") |
|
traceback.print_exc() |
|
raise HTTPException(status_code=500, detail=f"Error processing document: {str(e)}") |
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
async def read_root(request: Request): |
|
"""Serves the main HTML page.""" |
|
return templates.TemplateResponse("index.html", {"request": request}) |
|
|
|
@app.get("/api/languages") |
|
async def get_languages(): |
|
"""Return the list of supported languages.""" |
|
return {"languages": LANGUAGE_MAP} |
|
|
|
@app.post("/translate/text") |
|
async def translate_text_endpoint(request: TranslationRequest): |
|
print("[DEBUG] /translate/text endpoint called") |
|
try: |
|
|
|
source_lang = request.source_lang |
|
target_lang = request.target_lang |
|
text = request.text |
|
|
|
print(f"[DEBUG] Received request: source_lang={source_lang}, target_lang={target_lang}, text={text[:50]}") |
|
|
|
|
|
detected_source_lang = None |
|
if source_lang == "auto": |
|
detected_source_lang = detect_language(text) |
|
print(f"[DEBUG] Detected language: {detected_source_lang}") |
|
source_lang = detected_source_lang |
|
|
|
|
|
translation_result = translate_text(text, source_lang, target_lang) |
|
|
|
|
|
if not translation_result or translation_result.strip() == "": |
|
print("[DEBUG] Empty translation result received") |
|
return JSONResponse( |
|
status_code=500, |
|
content={"success": False, "error": "Translation returned empty result"} |
|
) |
|
|
|
print(f"[DEBUG] Translation successful: {translation_result[:100]}...") |
|
|
|
|
|
response_data = { |
|
"success": True, |
|
"translated_text": translation_result |
|
} |
|
|
|
if detected_source_lang: |
|
response_data["detected_source_lang"] = detected_source_lang |
|
|
|
return response_data |
|
|
|
except Exception as e: |
|
print(f"Critical error in translate_text_endpoint: {str(e)}") |
|
traceback.print_exc() |
|
return JSONResponse( |
|
status_code=500, |
|
content={"success": False, "error": f"Translation failed: {str(e)}"} |
|
) |
|
|
|
@app.post("/translate/document") |
|
async def translate_document_endpoint( |
|
file: UploadFile = File(...), |
|
source_lang: str = Form(...), |
|
target_lang: str = Form("ar") |
|
): |
|
"""Translates text extracted from an uploaded document.""" |
|
print("[DEBUG] /translate/document endpoint called") |
|
try: |
|
|
|
print(f"[DEBUG] Processing file: {file.filename}, Source: {source_lang}, Target: {target_lang}") |
|
|
|
|
|
extracted_text = await extract_text_from_file(file) |
|
if not extracted_text or extracted_text.strip() == "": |
|
return JSONResponse( |
|
status_code=400, |
|
content={"success": False, "error": "Could not extract text from document"} |
|
) |
|
|
|
|
|
detected_source_lang = None |
|
if source_lang == "auto": |
|
detected_source_lang = detect_language(extracted_text) |
|
print(f"[DEBUG] Detected document language: {detected_source_lang}") |
|
source_lang = detected_source_lang |
|
|
|
|
|
translated_text = translate_text(extracted_text, source_lang, target_lang) |
|
|
|
|
|
response = { |
|
"success": True, |
|
"original_filename": file.filename, |
|
"original_text": extracted_text[:2000] + ("..." if len(extracted_text) > 2000 else ""), |
|
"translated_text": translated_text |
|
} |
|
|
|
|
|
if detected_source_lang: |
|
response["detected_source_lang"] = detected_source_lang |
|
|
|
return response |
|
|
|
except HTTPException as e: |
|
|
|
raise e |
|
except Exception as e: |
|
print(f"Error in document translation: {str(e)}") |
|
traceback.print_exc() |
|
return JSONResponse( |
|
status_code=500, |
|
content={"success": False, "error": f"Document translation failed: {str(e)}"} |
|
) |
|
|
|
@app.post("/download/translated-document") |
|
async def download_translated_document(request: Request): |
|
"""Creates and returns a downloadable version of the translated document.""" |
|
|
|
from fastapi.responses import Response |
|
|
|
try: |
|
|
|
data = await request.json() |
|
content = data.get("content") |
|
filename = data.get("filename") |
|
original_type = data.get("original_type") |
|
|
|
if not content or not filename: |
|
return JSONResponse( |
|
status_code=400, |
|
content={"success": False, "error": "Missing required parameters"} |
|
) |
|
|
|
|
|
if filename.endswith('.txt'): |
|
|
|
return Response( |
|
content=content.encode('utf-8'), |
|
media_type="text/plain; charset=utf-8", |
|
headers={ |
|
"Content-Disposition": f"attachment; filename={filename}", |
|
"Content-Type": "text/plain; charset=utf-8" |
|
} |
|
) |
|
|
|
elif filename.endswith('.pdf'): |
|
try: |
|
|
|
try: |
|
|
|
from reportlab.pdfgen import canvas |
|
from reportlab.lib.pagesizes import letter |
|
from io import BytesIO |
|
|
|
print("Using ReportLab for PDF generation") |
|
|
|
|
|
buffer = BytesIO() |
|
c = canvas.Canvas(buffer, pagesize=letter) |
|
|
|
|
|
font_name = 'Helvetica' |
|
c.setFont(font_name, 12) |
|
|
|
|
|
has_arabic = any('\u0600' <= ch <= '\u06FF' for ch in content) |
|
|
|
|
|
lines = content.split('\n') |
|
y_position = 750 |
|
|
|
|
|
for line in lines: |
|
if line.strip(): |
|
|
|
if has_arabic: |
|
|
|
text_width = c.stringWidth(line, font_name, 12) |
|
|
|
c.drawString(letter[0] - 72 - text_width, y_position, line) |
|
else: |
|
|
|
c.drawString(72, y_position, line) |
|
|
|
|
|
y_position -= 14 |
|
|
|
|
|
if y_position < 72: |
|
c.showPage() |
|
c.setFont(font_name, 12) |
|
y_position = 750 |
|
|
|
|
|
c.save() |
|
|
|
|
|
pdf_content = buffer.getvalue() |
|
buffer.close() |
|
|
|
|
|
return Response( |
|
content=pdf_content, |
|
media_type="application/pdf", |
|
headers={"Content-Disposition": f"attachment; filename={filename}"} |
|
) |
|
|
|
except ImportError: |
|
|
|
print("ReportLab not available, using PyMuPDF with improved Arabic handling") |
|
import fitz |
|
from io import BytesIO |
|
import uuid |
|
import os |
|
import tempfile |
|
|
|
|
|
|
|
|
|
|
|
|
|
has_arabic = any('\u0600' <= ch <= '\u06FF' for ch in content) |
|
|
|
if has_arabic: |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.html', mode='w', encoding='utf-8') as temp_file: |
|
html_content = f"""<!DOCTYPE html> |
|
<html dir="rtl" lang="ar"> |
|
<head> |
|
<meta charset="UTF-8"> |
|
<title>Translated Document</title> |
|
<style> |
|
body {{ |
|
font-family: Arial, sans-serif; |
|
direction: rtl; |
|
text-align: right; |
|
margin: 1.5cm; |
|
font-size: 12pt; |
|
line-height: 1.5; |
|
}} |
|
</style> |
|
</head> |
|
<body> |
|
{content.replace('\n', '<br>')} |
|
</body> |
|
</html>""" |
|
temp_file.write(html_content) |
|
temp_html_path = temp_file.name |
|
|
|
try: |
|
|
|
doc = fitz.open() |
|
|
|
|
|
html_doc = fitz.open(temp_html_path) |
|
doc.insert_pdf(html_doc) |
|
html_doc.close() |
|
|
|
|
|
pdf_bytes = BytesIO() |
|
doc.save(pdf_bytes) |
|
doc.close() |
|
|
|
|
|
try: |
|
os.unlink(temp_html_path) |
|
except: |
|
pass |
|
|
|
|
|
return Response( |
|
content=pdf_bytes.getvalue(), |
|
media_type="application/pdf", |
|
headers={"Content-Disposition": f"attachment; filename={filename}"} |
|
) |
|
except Exception as html_err: |
|
print(f"HTML conversion failed: {html_err}") |
|
|
|
try: |
|
os.unlink(temp_html_path) |
|
except: |
|
pass |
|
|
|
|
|
return Response( |
|
content=content.encode('utf-8'), |
|
media_type="text/plain; charset=utf-8", |
|
headers={ |
|
"Content-Disposition": f"attachment; filename={filename.replace('.pdf', '.txt')}", |
|
"Content-Type": "text/plain; charset=utf-8" |
|
} |
|
) |
|
else: |
|
|
|
doc = fitz.open() |
|
page = doc.new_page() |
|
|
|
|
|
rect = fitz.Rect(72, 72, page.rect.width-72, page.rect.height-72) |
|
page.insert_text((72, 72), content, fontsize=11) |
|
|
|
|
|
pdf_bytes = BytesIO() |
|
doc.save(pdf_bytes) |
|
pdf_bytes.seek(0) |
|
doc.close() |
|
|
|
|
|
return Response( |
|
content=pdf_bytes.getvalue(), |
|
media_type="application/pdf", |
|
headers={"Content-Disposition": f"attachment; filename={filename}"} |
|
) |
|
|
|
except Exception as e: |
|
print(f"PDF creation error with advanced methods: {e}") |
|
traceback.print_exc() |
|
|
|
|
|
return Response( |
|
content=content.encode('utf-8'), |
|
media_type="text/plain; charset=utf-8", |
|
headers={ |
|
"Content-Disposition": f"attachment; filename={filename.replace('.pdf', '.txt')}", |
|
"Content-Type": "text/plain; charset=utf-8" |
|
} |
|
) |
|
|
|
except Exception as e: |
|
print(f"Overall PDF creation error: {e}") |
|
traceback.print_exc() |
|
|
|
return Response( |
|
content=content.encode('utf-8'), |
|
media_type="text/plain; charset=utf-8", |
|
headers={ |
|
"Content-Disposition": f"attachment; filename={filename.replace('.pdf', '.txt')}", |
|
"Content-Type": "text/plain; charset=utf-8" |
|
} |
|
) |
|
|
|
elif filename.endswith('.docx'): |
|
|
|
try: |
|
import docx |
|
from io import BytesIO |
|
|
|
|
|
doc = docx.Document() |
|
|
|
|
|
p = doc.add_paragraph() |
|
|
|
is_arabic = any('\u0600' <= c <= '\u06FF' for c in content) |
|
if is_arabic: |
|
try: |
|
p._element.get_or_add_pPr().set('bidi', True) |
|
except: |
|
pass |
|
p.add_run(content) |
|
|
|
|
|
docx_bytes = BytesIO() |
|
doc.save(docx_bytes) |
|
docx_bytes.seek(0) |
|
|
|
|
|
return Response( |
|
content=docx_bytes.getvalue(), |
|
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", |
|
headers={"Content-Disposition": f"attachment; filename={filename}"} |
|
) |
|
except ImportError: |
|
return JSONResponse( |
|
status_code=501, |
|
content={"success": False, "error": "DOCX creation requires python-docx library"} |
|
) |
|
except Exception as e: |
|
print(f"DOCX creation error: {str(e)}") |
|
traceback.print_exc() |
|
return JSONResponse( |
|
status_code=500, |
|
content={"success": False, "error": f"DOCX creation error: {str(e)}"} |
|
) |
|
|
|
else: |
|
|
|
return Response( |
|
content=content.encode('utf-8'), |
|
media_type="text/plain; charset=utf-8", |
|
headers={ |
|
"Content-Disposition": f"attachment; filename={filename}.txt", |
|
"Content-Type": "text/plain; charset=utf-8" |
|
} |
|
) |
|
|
|
except Exception as e: |
|
print(f"Error creating downloadable document: {str(e)}") |
|
traceback.print_exc() |
|
return JSONResponse( |
|
status_code=500, |
|
content={"success": False, "error": f"Failed to create document: {str(e)}"} |
|
) |
|
|
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
"""Initialize models during application startup.""" |
|
|
|
|
|
try: |
|
|
|
initialize_model("en-ar") |
|
except Exception as e: |
|
print(f"Error initializing en-ar model at startup: {e}") |
|
|
|
try: |
|
|
|
initialize_model("ar-en") |
|
except Exception as e: |
|
print(f"Error initializing ar-en model at startup: {e}") |
|
|
|
|
|
|
|
common_pairs = ["en-fr", "fr-en", "en-es", "es-en"] |
|
for pair in common_pairs: |
|
try: |
|
initialize_model(pair) |
|
except Exception as e: |
|
print(f"Error initializing {pair} model at startup: {e}") |
|
|
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) |
|
|