File size: 41,793 Bytes
d6d82c2 ed71793 47cd112 91a3ee9 a95a188 7b65e1e f259de7 aded6a5 4e86ac5 d6d82c2 47cd112 c38e2fa d6d82c2 ed71793 a95a188 7dfe957 ed71793 d6d82c2 7b65e1e ed71793 7b65e1e ed71793 7b65e1e c38e2fa ed71793 8315bcd ed71793 4e86ac5 c38e2fa ed71793 4e86ac5 c38e2fa ed71793 c38e2fa ed71793 c38e2fa f259de7 aded6a5 f259de7 068c749 443ca56 ed71793 443ca56 ed71793 443ca56 ed71793 aded6a5 c38e2fa aded6a5 2a3ff67 f259de7 aded6a5 2a3ff67 aded6a5 ed71793 2a3ff67 ed71793 aded6a5 6f3aee6 aded6a5 ec41997 aded6a5 ed71793 aded6a5 ed71793 aded6a5 ed71793 aded6a5 4e86ac5 ed71793 4e86ac5 ed71793 aded6a5 ed71793 aded6a5 c38e2fa ed71793 c38e2fa be03516 ed71793 a95a188 f259de7 ed71793 c38e2fa a95a188 be03516 ed71793 c38e2fa ed71793 b17d8bf ed71793 b17d8bf ed71793 3a2e47e ed71793 f259de7 c38e2fa 3a2e47e 4e86ac5 ed71793 4e86ac5 ed71793 4e86ac5 ed71793 4e86ac5 ed71793 443ca56 4e86ac5 ed71793 4e86ac5 ed71793 4e86ac5 ed71793 4e86ac5 ed71793 4e86ac5 f259de7 443ca56 f259de7 c38e2fa f259de7 443ca56 ed71793 c38e2fa 7dfe957 443ca56 f259de7 7dfe957 f259de7 7dfe957 f259de7 7dfe957 ed71793 7dfe957 f259de7 443ca56 f259de7 a95a188 f259de7 a95a188 443ca56 f259de7 443ca56 f259de7 443ca56 a95a188 f259de7 443ca56 91a3ee9 a95a188 91a3ee9 7b65e1e 7dfe957 7b65e1e d6d82c2 91a3ee9 7b65e1e 91a3ee9 7b65e1e 91a3ee9 7dfe957 91a3ee9 7b65e1e 91a3ee9 7b65e1e 91a3ee9 7dfe957 91a3ee9 7b65e1e 91a3ee9 7dfe957 d6d82c2 ed71793 d6d82c2 47cd112 050f2a9 d6d82c2 47cd112 aded6a5 050f2a9 aded6a5 ed71793 c49ad44 aded6a5 c49ad44 aded6a5 c49ad44 ed71793 aded6a5 d6d82c2 47cd112 d6d82c2 7b65e1e d6d82c2 7dfe957 ed71793 d6d82c2 7b65e1e a583ad7 7b65e1e ed71793 d6d82c2 ed71793 f259de7 ed71793 d6d82c2 ed71793 d6d82c2 ed71793 d6d82c2 ed71793 7dfe957 ed71793 abc6641 af0b13b abc6641 dbcd9b2 abc6641 dbcd9b2 abc6641 52c54ab af0b13b 52c54ab 1913c15 52c54ab 1913c15 52c54ab 1913c15 00634bc 1913c15 00634bc 52c54ab 00634bc 52c54ab 00634bc 52c54ab 1913c15 52c54ab 00634bc 52c54ab 1913c15 52c54ab 1913c15 52c54ab 1913c15 52c54ab 1913c15 52c54ab 1913c15 52c54ab 1913c15 52c54ab 1913c15 52c54ab 1913c15 52c54ab 1913c15 52c54ab 1913c15 52c54ab af0b13b 52c54ab 00634bc 52c54ab 00634bc 1913c15 00634bc 52c54ab 140ca27 52c54ab 140ca27 00634bc af0b13b 140ca27 abc6641 dbcd9b2 abc6641 dbcd9b2 140ca27 dbcd9b2 abc6641 dbcd9b2 abc6641 dbcd9b2 abc6641 dbcd9b2 140ca27 dbcd9b2 140ca27 dbcd9b2 abc6641 dbcd9b2 abc6641 ed71793 d6d82c2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 |
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from typing import List, Optional, Dict
from pydantic import BaseModel
import os
import requests
import json
import traceback
import io
import concurrent.futures
import subprocess
import sys
import time
# Define the TranslationRequest model
class TranslationRequest(BaseModel):
text: str
source_lang: str
target_lang: str
# Import transformers for local model inference
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline
import torch
# --- Configuration ---
# Determine the base directory of the main.py script
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# Adjust paths to go one level up from backend to find templates/static
TEMPLATE_DIR = os.path.join(os.path.dirname(BASE_DIR), "templates")
STATIC_DIR = os.path.join(os.path.dirname(BASE_DIR), "static")
UPLOADS_DIR = os.path.join(os.path.dirname(BASE_DIR), "uploads")
# Ensure uploads directory exists
os.makedirs(UPLOADS_DIR, exist_ok=True)
# --- Initialize FastAPI ---
app = FastAPI(title="Tarjama Translation API")
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
templates = Jinja2Templates(directory=TEMPLATE_DIR)
# --- Language mapping ---
LANGUAGE_MAP = {
"ar": "Arabic",
"en": "English",
"fr": "French",
"es": "Spanish",
"de": "German",
"zh": "Chinese",
"ru": "Russian",
"ja": "Japanese",
"hi": "Hindi",
"pt": "Portuguese",
"tr": "Turkish",
"ko": "Korean",
"it": "Italian",
"nl": "Dutch",
"sv": "Swedish",
"fi": "Finnish",
"pl": "Polish",
"he": "Hebrew",
"id": "Indonesian",
"uk": "Ukrainian",
"cs": "Czech",
"auto": "Detect Language"
}
# --- Set cache directory to a writeable location ---
# This is crucial for Hugging Face Spaces where /app/.cache is not writable
# Using /tmp which is typically writable in most environments
os.environ['TRANSFORMERS_CACHE'] = '/tmp/transformers_cache'
os.environ['HF_HOME'] = '/tmp/hf_home'
os.environ['XDG_CACHE_HOME'] = '/tmp/cache'
# --- Global model variables ---
# Store multiple translation models to support various language pairs
translation_models: Dict[str, Dict] = {
"en-ar": {
"model": None,
"tokenizer": None,
"translator": None,
"model_name": "Helsinki-NLP/opus-mt-en-ar",
},
"ar-en": {
"model": None,
"tokenizer": None,
"translator": None,
"model_name": "Helsinki-NLP/opus-mt-ar-en",
},
# Add more language pair models
"en-fr": {
"model": None,
"tokenizer": None,
"translator": None,
"model_name": "Helsinki-NLP/opus-mt-en-fr",
},
"fr-en": {
"model": None,
"tokenizer": None,
"translator": None,
"model_name": "Helsinki-NLP/opus-mt-fr-en",
},
"en-es": {
"model": None,
"tokenizer": None,
"translator": None,
"model_name": "Helsinki-NLP/opus-mt-en-es",
},
"es-en": {
"model": None,
"tokenizer": None,
"translator": None,
"model_name": "Helsinki-NLP/opus-mt-es-en",
},
"en-de": {
"model": None,
"tokenizer": None,
"translator": None,
"model_name": "Helsinki-NLP/opus-mt-en-de",
},
"de-en": {
"model": None,
"tokenizer": None,
"translator": None,
"model_name": "Helsinki-NLP/opus-mt-de-en",
},
"ar-fr": {
"model": None,
"tokenizer": None,
"translator": None,
"model_name": "Helsinki-NLP/opus-mt-ar-fr",
},
"fr-ar": {
"model": None,
"tokenizer": None,
"translator": None,
"model_name": "Helsinki-NLP/opus-mt-fr-ar",
},
# Can add more language pairs here as needed
}
model_initialization_attempts = 0
max_model_initialization_attempts = 3
last_initialization_attempt = 0
initialization_cooldown = 300 # 5 minutes cooldown between retry attempts
# --- Model initialization function ---
def initialize_model(language_pair: str):
"""Initialize a specific translation model and tokenizer for a language pair."""
global translation_models, model_initialization_attempts, last_initialization_attempt
# If language pair doesn't exist, return False
if language_pair not in translation_models:
print(f"Unsupported language pair: {language_pair}")
return False
# Check if we've exceeded maximum attempts and if enough time has passed since last attempt
current_time = time.time()
if (model_initialization_attempts >= max_model_initialization_attempts and
current_time - last_initialization_attempt < initialization_cooldown):
print(f"Maximum initialization attempts reached. Waiting for cooldown period.")
return False
# Update attempt counter and timestamp
model_initialization_attempts += 1
last_initialization_attempt = current_time
try:
model_info = translation_models[language_pair]
model_name = model_info["model_name"]
print(f"Initializing model and tokenizer for {language_pair} using {model_name} (attempt {model_initialization_attempts})...")
# Check for available device - properly detect CPU/GPU
device = "cpu" # Default to CPU which is more reliable
if torch.cuda.is_available():
device = "cuda"
print(f"CUDA is available: {torch.cuda.get_device_name(0)}")
print(f"Device set to use: {device}")
# Load the tokenizer with explicit cache directory
try:
tokenizer = AutoTokenizer.from_pretrained(
model_name,
cache_dir="/tmp/transformers_cache",
use_fast=True,
local_files_only=False
)
if tokenizer is None:
print(f"Failed to load tokenizer for {language_pair}")
return False
print(f"Tokenizer for {language_pair} loaded successfully")
translation_models[language_pair]["tokenizer"] = tokenizer
except Exception as e:
print(f"Error loading tokenizer for {language_pair}: {e}")
return False
# Load the model with explicit device placement
try:
model = AutoModelForSeq2SeqLM.from_pretrained(
model_name,
cache_dir="/tmp/transformers_cache",
low_cpu_mem_usage=True, # Better memory usage
torch_dtype=torch.float32 # Explicit dtype for better compatibility
)
# Move model to device after loading
model = model.to(device)
print(f"Model for {language_pair} loaded with PyTorch and moved to {device}")
translation_models[language_pair]["model"] = model
except Exception as e:
print(f"Error loading model for {language_pair}: {e}")
print(f"Model initialization for {language_pair} failed")
return False
# Create a pipeline with the loaded model and tokenizer
try:
# Create the pipeline with explicit model and tokenizer
translator = pipeline(
"translation",
model=model,
tokenizer=tokenizer,
device=0 if device == "cuda" else -1, # Proper device mapping
framework="pt" # Explicitly use PyTorch
)
if translator is None:
print(f"Failed to create translator pipeline for {language_pair}")
return False
# Test the model with a simple translation to verify it works
source_lang, target_lang = language_pair.split('-')
test_text = "hello world" if source_lang == "en" else "مرحبا بالعالم"
test_result = translator(test_text, max_length=128)
print(f"Model test result for {language_pair}: {test_result}")
if not test_result or not isinstance(test_result, list) or len(test_result) == 0:
print(f"Model test for {language_pair} failed: Invalid output format")
return False
translation_models[language_pair]["translator"] = translator
# Success - reset the attempt counter
model_initialization_attempts = 0
print(f"Model {model_name} for {language_pair} successfully initialized and tested")
return True
except Exception as inner_e:
print(f"Error creating translation pipeline for {language_pair}: {inner_e}")
traceback.print_exc()
return False
except Exception as e:
print(f"Critical error initializing model for {language_pair}: {e}")
traceback.print_exc()
return False
# --- Get appropriate language pair for translation ---
def get_language_pair(source_lang: str, target_lang: str):
"""Determine the appropriate language pair and direction for translation."""
# Handle auto-detection case (fallback to online services)
if source_lang == "auto":
return None
# Check if we have a direct model for this language pair
pair_key = f"{source_lang}-{target_lang}"
if pair_key in translation_models:
return pair_key
# No direct model available
return None
# --- Language detection function ---
def detect_language(text: str) -> str:
"""Detect the language of the input text and return the language code."""
try:
# Try to use langdetect library if available
from langdetect import detect
try:
detected_lang = detect(text)
print(f"Language detected using langdetect: {detected_lang}")
# Map langdetect specific codes to our standard codes
lang_map = {
"ar": "ar", "en": "en", "fr": "fr", "es": "es", "de": "de",
"zh-cn": "zh", "zh-tw": "zh", "ru": "ru", "ja": "ja",
"hi": "hi", "pt": "pt", "tr": "tr", "ko": "ko",
"it": "it", "nl": "nl", "sv": "sv", "fi": "fi",
"pl": "pl", "he": "he", "id": "id", "uk": "uk", "cs": "cs"
}
# Return the mapped language or default to English if not in our supported languages
return lang_map.get(detected_lang, "en")
except Exception as e:
print(f"Error with langdetect: {e}")
# Fall back to basic detection
except ImportError:
print("langdetect library not available, using basic detection")
# Basic fallback detection based on character ranges
if len(text) < 10: # Need reasonable amount of text
return "en" # Default to English for very short texts
# Count characters in different Unicode ranges
arabic_count = sum(1 for c in text if '\u0600' <= c <= '\u06FF')
chinese_count = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
japanese_count = sum(1 for c in text if '\u3040' <= c <= '\u30ff')
cyrillic_count = sum(1 for c in text if '\u0400' <= c <= '\u04FF')
hebrew_count = sum(1 for c in text if '\u0590' <= c <= '\u05FF')
# Determine ratios
text_len = len(text)
arabic_ratio = arabic_count / text_len
chinese_ratio = chinese_count / text_len
japanese_ratio = japanese_count / text_len
cyrillic_ratio = cyrillic_count / text_len
hebrew_ratio = hebrew_count / text_len
# Make decision based on highest ratio
if arabic_ratio > 0.3:
return "ar"
elif chinese_ratio > 0.3:
return "zh"
elif japanese_ratio > 0.3:
return "ja"
elif cyrillic_ratio > 0.3:
return "ru"
elif hebrew_ratio > 0.3:
return "he"
# Default to English for Latin scripts (could be any European language)
return "en"
# --- Translation Function ---
def translate_text(text, source_lang, target_lang):
"""Translate text using local model or fallback to online services."""
if not text:
return ""
print(f"Translation Request - Source Lang: {source_lang}, Target Lang: {target_lang}")
# Get the appropriate language pair for local translation
language_pair = get_language_pair(source_lang, target_lang)
# If we have a supported local model for this language pair
if language_pair and language_pair in translation_models:
model_info = translation_models[language_pair]
translator = model_info["translator"]
# Check if model is initialized, if not try to initialize it
if not translator:
success = initialize_model(language_pair)
if not success:
print(f"Local model initialization for {language_pair} failed, using fallback translation")
return use_fallback_translation(text, source_lang, target_lang)
# Get the translator after initialization
translator = translation_models[language_pair]["translator"]
try:
# Ensure only the raw text is sent to the model
text_to_translate = text
print(f"Translating text with local model (first 50 chars): {text_to_translate[:50]}...")
# Use a more reliable timeout approach with concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
lambda: translator(
text_to_translate,
max_length=768
)[0]["translation_text"]
)
try:
# Set a reasonable timeout
result = future.result(timeout=15)
# Post-process the result for cultural adaptation if needed
if target_lang == "ar":
result = culturally_adapt_arabic(result)
print(f"Translation successful (first 50 chars): {result[:50]}...")
return result
except concurrent.futures.TimeoutError:
print(f"Model inference timed out after 15 seconds, falling back to online translation")
return use_fallback_translation(text, source_lang, target_lang)
except Exception as e:
print(f"Error during model inference: {e}")
# If the model failed during inference, try to re-initialize it for next time
# but use fallback for this request
initialize_model(language_pair)
return use_fallback_translation(text, source_lang, target_lang)
except Exception as e:
print(f"Error using local model for {language_pair}: {e}")
traceback.print_exc()
return use_fallback_translation(text, source_lang, target_lang)
else:
# No local model for this language pair, use online services
print(f"No local model for {source_lang} to {target_lang}, using fallback translation")
return use_fallback_translation(text, source_lang, target_lang)
def culturally_adapt_arabic(text: str) -> str:
"""Apply post-processing rules to enhance Arabic translation with cultural sensitivity."""
# Replace Latin punctuation with Arabic ones
text = text.replace('?', '؟').replace(';', '؛').replace(',', '،')
# If the text starts with common translation artifacts like "Translation:" or the prompt instructions, remove them
common_prefixes = [
"الترجمة:", "ترجمة:", "النص المترجم:",
"Translation:", "Arabic translation:"
]
for prefix in common_prefixes:
if text.startswith(prefix):
text = text[len(prefix):].strip()
# Additional cultural adaptations can be added here
return text
# --- Function to check model status and trigger re-initialization if needed ---
def check_and_reinitialize_model(language_pair: str):
"""Check if model needs to be reinitialized and do so if necessary"""
global translation_models
if language_pair not in translation_models:
print(f"Unsupported language pair: {language_pair}")
return False
model_info = translation_models[language_pair]
translator = model_info["translator"]
try:
# If model isn't initialized yet, try to initialize it
if not translator:
print(f"Model for {language_pair} not initialized. Attempting initialization...")
return initialize_model(language_pair)
# Test the existing model with a simple translation
source_lang, target_lang = language_pair.split('-')
test_text = "hello" if source_lang == "en" else "مرحبا"
result = translator(test_text, max_length=128)
# If we got a valid result, model is working fine
if result and isinstance(result, list) and len(result) > 0:
print(f"Model check for {language_pair}: Model is functioning correctly.")
return True
else:
print(f"Model check for {language_pair}: Model returned invalid result. Reinitializing...")
return initialize_model(language_pair)
except Exception as e:
print(f"Error checking model status for {language_pair}: {e}")
print("Model may be in a bad state. Attempting reinitialization...")
return initialize_model(language_pair)
def use_fallback_translation(text, source_lang, target_lang):
"""Use various fallback online translation services."""
print("Using fallback translation...")
# Try Google Translate API with a wrapper first (most reliable)
try:
print("Attempting fallback with Google Translate (no API key)")
from googletrans import Translator
google_translator = Translator(service_urls=['translate.google.com', 'translate.google.co.kr'])
result = google_translator.translate(text, src=source_lang, dest=target_lang)
if result and result.text:
print("Google Translate successful!")
return result.text
except Exception as e:
print(f"Error with Google Translate fallback: {str(e)}")
# List of LibreTranslate servers to try with increased timeout
libre_servers = [
"https://translate.terraprint.co/translate",
"https://libretranslate.de/translate",
"https://translate.argosopentech.com/translate",
"https://translate.fedilab.app/translate",
"https://trans.zillyhuhn.com/translate"
]
# Try each LibreTranslate server with increased timeout
for server in libre_servers:
try:
print(f"Attempting fallback translation using LibreTranslate: {server}")
headers = {
"Content-Type": "application/json"
}
payload = {
"q": text,
"source": source_lang,
"target": target_lang
}
# Use a longer timeout for the request
response = requests.post(server, json=payload, headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
if "translatedText" in result:
print(f"LibreTranslate successful using {server}")
return result["translatedText"]
except Exception as e:
print(f"Error with LibreTranslate {server}: {str(e)}")
continue
# Try MyMemory as another fallback
try:
print("Attempting fallback with MyMemory Translation API")
url = "https://api.mymemory.translated.net/get"
params = {
"q": text,
"langpair": f"{source_lang}|{target_lang}",
}
response = requests.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
if data and data.get("responseData") and data["responseData"].get("translatedText"):
print("MyMemory translation successful!")
return data["responseData"]["translatedText"]
except Exception as e:
print(f"Error with MyMemory fallback: {str(e)}")
# Final fallback - return original text with error message
print("All translation services failed. Returning error message.")
return f"[Translation services unavailable] {text}"
# --- Helper Functions ---
async def extract_text_from_file(file: UploadFile) -> str:
"""Extracts text content from uploaded files without writing to disk."""
content = await file.read()
file_extension = os.path.splitext(file.filename)[1].lower()
extracted_text = ""
try:
if file_extension == '.txt':
# Process text file directly from bytes
try:
extracted_text = content.decode('utf-8')
except UnicodeDecodeError:
# Try other common encodings if UTF-8 fails
for encoding in ['latin-1', 'cp1252', 'utf-16']:
try:
extracted_text = content.decode(encoding)
break
except UnicodeDecodeError:
continue
elif file_extension == '.docx':
try:
import docx
from io import BytesIO
# Load DOCX from memory
doc_stream = BytesIO(content)
doc = docx.Document(doc_stream)
extracted_text = '\n'.join([para.text for para in doc.paragraphs])
except ImportError:
raise HTTPException(status_code=501, detail="DOCX processing requires 'python-docx' library")
elif file_extension == '.pdf':
try:
import fitz # PyMuPDF
from io import BytesIO
# Load PDF from memory
pdf_stream = BytesIO(content)
doc = fitz.open(stream=pdf_stream, filetype="pdf")
page_texts = []
for page in doc:
page_texts.append(page.get_text())
extracted_text = "\n".join(page_texts)
doc.close()
except ImportError:
raise HTTPException(status_code=501, detail="PDF processing requires 'PyMuPDF' library")
else:
raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_extension}")
print(f"Extracted text length: {len(extracted_text)}")
return extracted_text
except Exception as e:
print(f"Error processing file {file.filename}: {e}")
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Error processing document: {str(e)}")
# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
"""Serves the main HTML page."""
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/languages")
async def get_languages():
"""Return the list of supported languages."""
return {"languages": LANGUAGE_MAP}
@app.post("/translate/text")
async def translate_text_endpoint(request: TranslationRequest):
print("[DEBUG] /translate/text endpoint called")
try:
# Explicitly extract fields from request to ensure they exist
source_lang = request.source_lang
target_lang = request.target_lang
text = request.text
print(f"[DEBUG] Received request: source_lang={source_lang}, target_lang={target_lang}, text={text[:50]}")
# Handle automatic language detection
detected_source_lang = None
if source_lang == "auto":
detected_source_lang = detect_language(text)
print(f"[DEBUG] Detected language: {detected_source_lang}")
source_lang = detected_source_lang
# Call our culturally-aware translate_text function
translation_result = translate_text(text, source_lang, target_lang)
# Check for empty result
if not translation_result or translation_result.strip() == "":
print("[DEBUG] Empty translation result received")
return JSONResponse(
status_code=500,
content={"success": False, "error": "Translation returned empty result"}
)
print(f"[DEBUG] Translation successful: {translation_result[:100]}...")
# Include detected language in response if auto-detection was used
response_data = {
"success": True,
"translated_text": translation_result
}
if detected_source_lang:
response_data["detected_source_lang"] = detected_source_lang
return response_data
except Exception as e:
print(f"Critical error in translate_text_endpoint: {str(e)}")
traceback.print_exc()
return JSONResponse(
status_code=500,
content={"success": False, "error": f"Translation failed: {str(e)}"}
)
@app.post("/translate/document")
async def translate_document_endpoint(
file: UploadFile = File(...),
source_lang: str = Form(...),
target_lang: str = Form("ar")
):
"""Translates text extracted from an uploaded document."""
print("[DEBUG] /translate/document endpoint called")
try:
# Extract text directly from the uploaded file
print(f"[DEBUG] Processing file: {file.filename}, Source: {source_lang}, Target: {target_lang}")
# Extract text from document
extracted_text = await extract_text_from_file(file)
if not extracted_text or extracted_text.strip() == "":
return JSONResponse(
status_code=400,
content={"success": False, "error": "Could not extract text from document"}
)
# Handle automatic language detection
detected_source_lang = None
if source_lang == "auto":
detected_source_lang = detect_language(extracted_text)
print(f"[DEBUG] Detected document language: {detected_source_lang}")
source_lang = detected_source_lang
# Translate the extracted text
translated_text = translate_text(extracted_text, source_lang, target_lang)
# Prepare response
response = {
"success": True,
"original_filename": file.filename,
"original_text": extracted_text[:2000] + ("..." if len(extracted_text) > 2000 else ""),
"translated_text": translated_text
}
# Include detected language in response if auto-detection was used
if detected_source_lang:
response["detected_source_lang"] = detected_source_lang
return response
except HTTPException as e:
# Re-raise HTTP exceptions
raise e
except Exception as e:
print(f"Error in document translation: {str(e)}")
traceback.print_exc()
return JSONResponse(
status_code=500,
content={"success": False, "error": f"Document translation failed: {str(e)}"}
)
@app.post("/download/translated-document")
async def download_translated_document(request: Request):
"""Creates and returns a downloadable version of the translated document."""
# Import Response at the function start to ensure it's in scope for all code paths
from fastapi.responses import Response
try:
# Parse request body
data = await request.json()
content = data.get("content")
filename = data.get("filename")
original_type = data.get("original_type")
if not content or not filename:
return JSONResponse(
status_code=400,
content={"success": False, "error": "Missing required parameters"}
)
# Handle different file types
if filename.endswith('.txt'):
# Simple text file with UTF-8 encoding
return Response(
content=content.encode('utf-8'),
media_type="text/plain; charset=utf-8",
headers={
"Content-Disposition": f"attachment; filename={filename}",
"Content-Type": "text/plain; charset=utf-8"
}
)
elif filename.endswith('.pdf'):
try:
# For PDF files, try multiple approaches
try:
# Try ReportLab first (which handles Arabic better)
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter
from io import BytesIO
print("Using ReportLab for PDF generation")
# Create a PDF in memory
buffer = BytesIO()
c = canvas.Canvas(buffer, pagesize=letter)
# Use a basic font that should work with most installations
font_name = 'Helvetica'
c.setFont(font_name, 12)
# Check if text contains Arabic
has_arabic = any('\u0600' <= ch <= '\u06FF' for ch in content)
# Split text into lines
lines = content.split('\n')
y_position = 750 # Start from top
# Draw text line by line
for line in lines:
if line.strip():
# For Arabic, right-align the text
if has_arabic:
# Get width to calculate right alignment
text_width = c.stringWidth(line, font_name, 12)
# Position from right margin
c.drawString(letter[0] - 72 - text_width, y_position, line)
else:
# Left-align for non-Arabic text
c.drawString(72, y_position, line)
# Move down for next line
y_position -= 14
# Add a new page if needed
if y_position < 72:
c.showPage()
c.setFont(font_name, 12)
y_position = 750
# Save the PDF to the buffer
c.save()
# Get PDF content
pdf_content = buffer.getvalue()
buffer.close()
# Return the PDF
return Response(
content=pdf_content,
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
except ImportError:
# Fall back to PyMuPDF with improved approach for Arabic
print("ReportLab not available, using PyMuPDF with improved Arabic handling")
import fitz
from io import BytesIO
import uuid
import os
import tempfile
# For PyMuPDF, we'll take a different approach for Arabic text:
# 1. Create a temporary HTML file with the Arabic text and proper RTL styling
# 2. Convert it to PDF using PyMuPDF's HTML parser
# Determine if we have Arabic text
has_arabic = any('\u0600' <= ch <= '\u06FF' for ch in content)
if has_arabic:
# Create a temporary HTML file with RTL direction for Arabic
with tempfile.NamedTemporaryFile(delete=False, suffix='.html', mode='w', encoding='utf-8') as temp_file:
html_content = f"""<!DOCTYPE html>
<html dir="rtl" lang="ar">
<head>
<meta charset="UTF-8">
<title>Translated Document</title>
<style>
body {{
font-family: Arial, sans-serif;
direction: rtl;
text-align: right;
margin: 1.5cm;
font-size: 12pt;
line-height: 1.5;
}}
</style>
</head>
<body>
{content.replace('\n', '<br>')}
</body>
</html>"""
temp_file.write(html_content)
temp_html_path = temp_file.name
try:
# Convert HTML to PDF
doc = fitz.open()
# Load the HTML file as a separate document and insert it
html_doc = fitz.open(temp_html_path)
doc.insert_pdf(html_doc)
html_doc.close()
# Save to memory
pdf_bytes = BytesIO()
doc.save(pdf_bytes)
doc.close()
# Clean up temporary file
try:
os.unlink(temp_html_path)
except:
pass
# Return the PDF
return Response(
content=pdf_bytes.getvalue(),
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
except Exception as html_err:
print(f"HTML conversion failed: {html_err}")
# Clean up temp file if it exists
try:
os.unlink(temp_html_path)
except:
pass
# Fall back to text file since all PDF attempts failed
return Response(
content=content.encode('utf-8'),
media_type="text/plain; charset=utf-8",
headers={
"Content-Disposition": f"attachment; filename={filename.replace('.pdf', '.txt')}",
"Content-Type": "text/plain; charset=utf-8"
}
)
else:
# For non-Arabic text, use the simpler PDF creation method
doc = fitz.open()
page = doc.new_page()
# Add text content
rect = fitz.Rect(72, 72, page.rect.width-72, page.rect.height-72)
page.insert_text((72, 72), content, fontsize=11)
# Save to memory
pdf_bytes = BytesIO()
doc.save(pdf_bytes)
pdf_bytes.seek(0)
doc.close()
# Return the PDF
return Response(
content=pdf_bytes.getvalue(),
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
except Exception as e:
print(f"PDF creation error with advanced methods: {e}")
traceback.print_exc()
# Fall back to text file if all PDF attempts fail
return Response(
content=content.encode('utf-8'),
media_type="text/plain; charset=utf-8",
headers={
"Content-Disposition": f"attachment; filename={filename.replace('.pdf', '.txt')}",
"Content-Type": "text/plain; charset=utf-8"
}
)
except Exception as e:
print(f"Overall PDF creation error: {e}")
traceback.print_exc()
# Return a text file as fallback
return Response(
content=content.encode('utf-8'),
media_type="text/plain; charset=utf-8",
headers={
"Content-Disposition": f"attachment; filename={filename.replace('.pdf', '.txt')}",
"Content-Type": "text/plain; charset=utf-8"
}
)
elif filename.endswith('.docx'):
# Create DOCX file with proper encoding for Arabic
try:
import docx
from io import BytesIO
# Create a new document with the translated content
doc = docx.Document()
# Add a paragraph with the translated content
p = doc.add_paragraph()
# Set paragraph direction to right-to-left for Arabic if needed
is_arabic = any('\u0600' <= c <= '\u06FF' for c in content)
if is_arabic:
try:
p._element.get_or_add_pPr().set('bidi', True) # Set RTL direction
except:
pass # If this fails, continue with default direction
p.add_run(content)
# Save to bytes
docx_bytes = BytesIO()
doc.save(docx_bytes)
docx_bytes.seek(0)
# Return as attachment with proper encoding
return Response(
content=docx_bytes.getvalue(),
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
except ImportError:
return JSONResponse(
status_code=501,
content={"success": False, "error": "DOCX creation requires python-docx library"}
)
except Exception as e:
print(f"DOCX creation error: {str(e)}")
traceback.print_exc()
return JSONResponse(
status_code=500,
content={"success": False, "error": f"DOCX creation error: {str(e)}"}
)
else:
# Fallback to text file
return Response(
content=content.encode('utf-8'),
media_type="text/plain; charset=utf-8",
headers={
"Content-Disposition": f"attachment; filename={filename}.txt",
"Content-Type": "text/plain; charset=utf-8"
}
)
except Exception as e:
print(f"Error creating downloadable document: {str(e)}")
traceback.print_exc()
return JSONResponse(
status_code=500,
content={"success": False, "error": f"Failed to create document: {str(e)}"}
)
# Initialize models during startup
@app.on_event("startup")
async def startup_event():
"""Initialize models during application startup."""
# Initial model loading for the most common language pairs
# We load them asynchronously to not block the startup
try:
# Try to initialize English-to-Arabic model
initialize_model("en-ar")
except Exception as e:
print(f"Error initializing en-ar model at startup: {e}")
try:
# Try to initialize Arabic-to-English model
initialize_model("ar-en")
except Exception as e:
print(f"Error initializing ar-en model at startup: {e}")
# Initialize additional models for common language pairs
# These will be initialized in the background without blocking startup
common_pairs = ["en-fr", "fr-en", "en-es", "es-en"]
for pair in common_pairs:
try:
initialize_model(pair)
except Exception as e:
print(f"Error initializing {pair} model at startup: {e}")
# --- Run the server (for local development) ---
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
|