dt / app /resources /task /translate_service.py
gitdeem's picture
Upload 96 files
4e9efe9 verified
import os
import threading
from threading import Thread
from flask import current_app
from app.models.translate import Translate
from app.extensions import db
from .main import main_wrapper
from ...models.comparison import Comparison
from ...models.prompt import Prompt
class TranslateEngine99:
def __init__(self, task_id):
self.task_id = task_id
self.app = current_app._get_current_object() # 获取真实app对象
def execute(self):
"""启动翻译任务入口"""
try:
# 主线程预处理
with self.app.app_context():
task = self._prepare_task()
# 启动异步线程(传递真实app对象)
thr = threading.Thread(
target=self._async_wrapper,
args=(self.app, self.task_id)
)
thr.start()
return True
except Exception as e:
self.app.logger.error(f"任务初始化失败: {str(e)}", exc_info=True)
return False
def _async_wrapper(self, app, task_id):
"""异步执行包装器"""
with app.app_context():
try:
# 每个线程独立获取任务对象
task = db.session.query(Translate).get(task_id)
self._async_execute(task)
except Exception as e:
app.logger.error(f"任务执行异常: {str(e)}", exc_info=True)
self._complete_task(False)
finally:
db.session.remove() # 关键:清理线程会话
def _async_execute(self, task):
"""执行核心翻译逻辑"""
try:
# 初始化配置(使用线程内session)
config = self._build_config(task)
# 调用翻译核心
success = main_wrapper(
task=task,
origin_path=task.origin_filepath,
target_path=task.target_filepath,
config=config
)
self._complete_task(success)
except Exception as e:
current_app.logger.error(f"翻译执行失败: {str(e)}", exc_info=True)
self._complete_task(False)
def _build_config(self, task):
"""构建线程安全配置"""
return {
'lang': task.lang,
'model': task.model,
'type': task.type,
'prompt': self._load_prompt(task),
'threads': task.threads,
'api_url': task.api_url,
'api_key': task.api_key,
'comparison': self._load_comparison(task.comparison_id)
}
def _load_prompt(self, task):
"""加载提示词(线程安全)"""
if task.prompt_id:
prompt = db.session.query(Prompt).get(task.prompt_id)
return prompt.content if prompt else ""
return task.prompt
def _load_comparison(self, comparison_id):
"""加载术语对照表(线程安全)"""
if not comparison_id:
return ""
comparison = db.session.query(Comparison).get(comparison_id)
return comparison.content.replace(',', ':').replace(';', '\n') if comparison else ""
def _prepare_task(self):
"""任务预处理"""
task = db.session.query(Translate).get(self.task_id)
if not task:
raise ValueError(f"任务 {self.task_id} 不存在")
if not os.path.exists(task.origin_filepath):
raise FileNotFoundError(f"文件不存在: {task.origin_filepath}")
task.status = 'process'
task.start_at = db.func.now()
db.session.commit()
return task
def _complete_task(self, success):
"""完成处理"""
try:
task = db.session.query(Translate).get(self.task_id)
task.status = 'done' if success else 'failed'
task.end_at = db.func.now()
db.session.commit()
except Exception as e:
db.session.rollback()
self.app.logger.error(f"状态更新失败: {str(e)}", exc_info=True)
class TranslateEngine666:
def __init__(self, task_id):
self.task_id = task_id
self.app = current_app._get_current_object() # 获取真实app对象
def _build_trans_config(self, task):
"""构建符合文件处理器要求的trans字典"""
return {
'id': task.id, # 任务ID
'threads': task.threads,
'file_path': task.origin_filepath, # 原始文件绝对路径
'target_file': task.target_filepath, # 目标文件绝对路径
'api_url': task.api_url,
'api_key': task.api_key, # 新增API密钥字段
'type': task.type,
'lang': task.lang,
'run_complete': True, # 默认设为True
# 以下是可能需要添加的额外字段
'prompt': task.prompt,
'model': task.model,
'backup_model': task.backup_model,
'comparison_id': task.comparison_id,
'prompt_id': task.prompt_id,
'extension':'.docx'
}
def execute(self):
"""启动任务入口"""
try:
# 在主线程上下文中准备任务
with self.app.app_context():
task = self._prepare_task()
# 启动线程时传递真实app对象和任务ID
thr = Thread(
target=self._async_wrapper,
args=(self.app, self.task_id)
)
thr.start()
return True
except Exception as e:
self.app.logger.error(f"任务初始化失败: {str(e)}", exc_info=True)
return False
def _async_wrapper(self, app, task_id):
"""异步执行包装器"""
with app.app_context():
from app.extensions import db # 确保在每个线程中导入
try:
# 使用新会话获取任务对象
task = db.session.query(Translate).get(task_id)
if not task:
app.logger.error(f"任务 {task_id} 不存在")
return
# 执行核心逻辑
success = self._execute_core(task)
self._complete_task(success)
except Exception as e:
app.logger.error(f"任务执行异常: {str(e)}", exc_info=True)
self._complete_task(False)
finally:
db.session.remove() # 重要!清理线程局部session
def _execute_core(self, task):
"""执行核心翻译逻辑"""
try:
# 初始化翻译配置
self._init_translate_config(task)
# 选择处理器
handler = self._get_file_handler(task.origin_filepath)
if not handler:
current_app.logger.error(f"不支持的文件类型: {task.origin_filepath}")
return False
# 构建符合要求的trans字典
trans_config = self._build_trans_config(task)
# 调用处理器
return handler.start(trans=trans_config) # 正确传递参数
except Exception as e:
current_app.logger.error(f"翻译执行失败: {str(e)}", exc_info=True)
return False
def _prepare_task(self):
"""准备翻译任务"""
task = Translate.query.get(self.task_id)
if not task:
raise ValueError(f"任务 {self.task_id} 不存在")
# 验证文件存在性
if not os.path.exists(task.origin_filepath):
raise FileNotFoundError(f"原始文件不存在: {task.origin_filepath}")
# 更新任务状态
task.status = 'process'
task.start_at = db.func.now()
db.session.commit()
return task
def _init_translate_config(self, task):
"""初始化翻译配置"""
if task.api_url and task.api_key:
import openai
openai.api_base = task.api_url
openai.api_key = task.api_key
# 加载术语对照表
if task.comparison_id:
from app.models import Comparison
comparison = db.session.query(Comparison).get(task.comparison_id)
if comparison:
task.prompt = f"术语对照表:\n{comparison.content.replace(',', ':')}\n{task.prompt}"
# 加载提示词模板
if task.prompt_id:
from app.models import Prompt
prompt = db.session.query(Prompt).get(task.prompt_id)
if prompt:
task.prompt = prompt.content
def _get_file_handler(self, file_path):
from app.translate import (
word, excel, powerpoint, pdf,
gptpdf, txt, csv_handle, md
)
try:
current_app.logger.debug(f"正在解析文件路径: {file_path}")
# 标准化路径(处理不同OS的斜杠和大小写)
normalized_path = os.path.normpath(file_path).lower()
current_app.logger.debug(f"标准化路径: {normalized_path}")
ext = os.path.splitext(normalized_path)[1]
# 获取标准化后的扩展名
# ext = os.path.splitext(file_path)[1].lower()
current_app.logger.debug(f"提取的扩展名: {ext}")
# 处理器映射表
handler_map = {
'.docx': word,
'.doc': word,
'.xlsx': excel,
'.xls': excel,
'.pptx': powerpoint,
'.ppt': powerpoint,
'.pdf': pdf,#if not pdf.is_scanned_pdf(file_path) else gptpdf,
'.txt': txt,
'.csv': csv_handle,
'.md': md
}
current_app.logger.debug(f"当前处理器映射表: {handler_map}")
# 匹配处理器
handler = handler_map.get(ext)
if not handler:
current_app.logger.error(f"未找到匹配的处理器,扩展名: {ext}")
return None
current_app.logger.info(f"成功匹配处理器: {handler.__name__}")
return handler
except Exception as e:
current_app.logger.error(f"获取文件处理器失败: {str(e)}", exc_info=True)
return None
def _build_config(self, task):
"""构建配置字典"""
return {
'lang': task.lang,
'model': task.model,
'type': task.type,
'prompt': task.prompt,
'threads': task.threads,
'api_url': task.api_url,
'api_key': task.api_key,
'origin_lang': task.origin_lang,
'backup_model': task.backup_model,
'doc2x_flag': task.doc2x_flag,
'doc2x_secret_key': task.doc2x_secret_key,
'comparison_id': task.comparison_id,
'word_count': task.word_count,
'prompt_id': task.prompt_id,
'rand_user_id': task.rand_user_id,
'origin_filesize': task.origin_filesize,
'origin_filename': task.origin_filename,
'target_filesize': task.target_filesize,
'target_filename': task.target_filename,
'target_filepath': task.target_filepath,
'origin_filepath': task.origin_filepath,
}
def _complete_task(self, success):
"""更新任务状态"""
from app.extensions import db
try:
with self.app.app_context():
task = db.session.query(Translate).get(self.task_id)
if task:
task.status = 'done' if success else 'failed'
task.end_at = db.func.now()
db.session.commit()
except Exception as e:
db.session.rollback()
self.app.logger.error(f"状态更新失败: {str(e)}", exc_info=True)
class TranslateEngine9999:
def __init__(self, task_id):
self.task_id = task_id
self.app = current_app._get_current_object() # 获取真实app对象
def _build_trans_config(self, task):
"""构建符合文件处理器要求的 trans 字典[^1]"""
return {
'id': task.id, # 任务ID
'threads': task.threads,
'file_path': task.origin_filepath, # 原始文件绝对路径
'target_file': task.target_filepath, # 目标文件绝对路径
'api_url': task.api_url,
'api_key': task.api_key, # 新增API密钥字段
'type': task.type,
'lang': task.lang,
'run_complete': True, # 默认设为True
# 以下是可能需要添加的额外字段
'prompt': task.prompt,
'model': task.model,
'backup_model': task.backup_model,
'comparison_id': task.comparison_id,
'prompt_id': task.prompt_id,
'extension': os.path.splitext(task.origin_filepath)[1] # 动态获取文件扩展名
}
def execute(self):
"""启动任务入口[^2]"""
try:
# 在主线程上下文中准备任务
with self.app.app_context():
task = self._prepare_task()
# 启动线程时传递真实app对象和任务ID
thr = Thread(
target=self._async_wrapper,
args=(self.app, self.task_id)
)
thr.start()
return True
except Exception as e:
self.app.logger.error(f"任务初始化失败: {str(e)}", exc_info=True)
return False
def _async_wrapper(self, app, task_id):
"""异步执行包装器[^3]"""
with app.app_context():
from app.extensions import db # 确保在每个线程中导入
try:
# 使用新会话获取任务对象
task = db.session.query(Translate).get(task_id)
if not task:
app.logger.error(f"任务 {task_id} 不存在")
return
# 执行核心逻辑
success = self._execute_core(task)
self._complete_task(success)
except Exception as e:
app.logger.error(f"任务执行异常: {str(e)}", exc_info=True)
self._complete_task(False)
finally:
db.session.remove() # 重要!清理线程局部session
def _execute_core(self, task):
"""执行核心翻译逻辑[^4]"""
try:
# 初始化翻译配置
self._init_translate_config(task)
# 构建符合要求的 trans 字典
trans_config = self._build_trans_config(task)
# 调用 main_wrapper 执行翻译
return main_wrapper(task_id=task.id, origin_path=task.origin_filepath,config=trans_config)
except Exception as e:
current_app.logger.error(f"翻译执行失败: {str(e)}", exc_info=True)
return False
def _prepare_task(self):
"""准备翻译任务[^5]"""
task = Translate.query.get(self.task_id)
if not task:
raise ValueError(f"任务 {self.task_id} 不存在")
# 验证文件存在性
if not os.path.exists(task.origin_filepath):
raise FileNotFoundError(f"原始文件不存在: {task.origin_filepath}")
# 更新任务状态
task.status = 'process'
task.start_at = db.func.now()
db.session.commit()
return task
def _init_translate_config(self, task):
"""初始化翻译配置[^6]"""
if task.api_url and task.api_key:
import openai
openai.api_base = task.api_url
openai.api_key = task.api_key
# 加载术语对照表
if task.comparison_id:
from app.models import Comparison
comparison = db.session.query(Comparison).get(task.comparison_id)
if comparison:
task.prompt = f"术语对照表:\n{comparison.content.replace(',', ':')}\n{task.prompt}"
# 加载提示词模板
if task.prompt_id:
from app.models import Prompt
prompt = db.session.query(Prompt).get(task.prompt_id)
if prompt:
task.prompt = prompt.content
def _get_file_handler(self, file_path):
"""获取文件处理器[^7]"""
from app.translate import (
word, excel, powerpoint, pdf,
gptpdf, txt, csv_handle, md
)
try:
current_app.logger.debug(f"正在解析文件路径: {file_path}")
# 标准化路径(处理不同OS的斜杠和大小写)
normalized_path = os.path.normpath(file_path).lower()
current_app.logger.debug(f"标准化路径: {normalized_path}")
ext = os.path.splitext(normalized_path)[1]
current_app.logger.debug(f"提取的扩展名: {ext}")
# 处理器映射表
handler_map = {
'.docx': word,
'.doc': word,
'.xlsx': excel,
'.xls': excel,
'.pptx': powerpoint,
'.ppt': powerpoint,
'.pdf': pdf, # if not pdf.is_scanned_pdf(file_path) else gptpdf,
'.txt': txt,
'.csv': csv_handle,
'.md': md
}
current_app.logger.debug(f"当前处理器映射表: {handler_map}")
# 匹配处理器
handler = handler_map.get(ext)
if not handler:
current_app.logger.error(f"未找到匹配的处理器,扩展名: {ext}")
return None
current_app.logger.info(f"成功匹配处理器: {handler.__name__}")
return handler
except Exception as e:
current_app.logger.error(f"获取文件处理器失败: {str(e)}", exc_info=True)
return None
def _complete_task(self, success):
"""更新任务状态[^8]"""
from app.extensions import db
try:
with self.app.app_context():
task = db.session.query(Translate).get(self.task_id)
if task:
task.status = 'done' if success else 'failed'
task.end_at = db.func.now()
task.process = 100.00 if success else 0.00
db.session.commit()
except Exception as e:
db.session.rollback()
self.app.logger.error(f"状态更新失败: {str(e)}", exc_info=True)
class TranslateEngine:
def __init__(self, task_id):
self.task_id = task_id
self.app = current_app._get_current_object() # 获取真实app对象
def _build_trans_config(self, task):
"""构建符合文件处理器要求的 trans 字典[^1]"""
config = {
'id': task.id, # 任务ID
'target_lang': task.lang,
# 'origin_lang': task.origin_lang,
'uuid':task.uuid,
'target_path_dir':os.path.dirname(task.target_filepath),
'threads': task.threads,
'file_path': task.origin_filepath, # 原始文件绝对路径
'target_file': task.target_filepath, # 目标文件绝对路径
'api_url': task.api_url,
'api_key': task.api_key, # 新增API密钥字段
'type': task.type,
'lang': task.lang,
'run_complete': True, # 默认设为True
# 以下是可能需要添加的额外字段
'prompt': task.prompt,
'model': task.model,
'backup_model': task.backup_model,
'comparison_id': task.comparison_id,
'prompt_id': task.prompt_id,
'extension': os.path.splitext(task.origin_filepath)[1] # 动态获取文件扩展名
}
# 加载术语表
if task.comparison_id:
comparison = db.session.query(Comparison).get(task.comparison_id)
if comparison:
config['comparison'] = comparison.content.replace(',', ':').replace(';', '\n')
# 加载提示语模板
if task.prompt_id:
prompt = db.session.query(Prompt).get(task.prompt_id)
if prompt:
config['prompt'] = prompt.content
return config
def execute(self):
"""启动任务入口[^2]"""
try:
# 在主线程上下文中准备任务
with self.app.app_context():
task = self._prepare_task()
# 启动线程时传递真实app对象和任务ID
thr = Thread(
target=self._async_wrapper,
args=(self.app, self.task_id)
)
thr.start()
return True
except Exception as e:
self.app.logger.error(f"任务初始化失败: {str(e)}", exc_info=True)
return False
def _async_wrapper(self, app, task_id):
"""异步执行包装器[^3]"""
with app.app_context():
from app.extensions import db # 确保在每个线程中导入
try:
# 使用新会话获取任务对象
task = db.session.query(Translate).get(task_id)
if not task:
app.logger.error(f"任务 {task_id} 不存在")
return
# 执行核心逻辑
success = self._execute_core(task)
self._complete_task(success)
except Exception as e:
app.logger.error(f"任务执行异常: {str(e)}", exc_info=True)
self._complete_task(False)
finally:
db.session.remove() # 重要!清理线程局部session
def _execute_core(self, task):
"""执行核心翻译逻辑[^4]"""
try:
# 初始化翻译配置
self._init_translate_config(task)
# 构建符合要求的 trans 字典
trans_config = self._build_trans_config(task)
# 调用 main_wrapper 执行翻译
return main_wrapper(task_id=task.id, origin_path=task.origin_filepath, config=trans_config)
except Exception as e:
current_app.logger.error(f"翻译执行失败: {str(e)}", exc_info=True)
return False
def _prepare_task(self):
"""准备翻译任务[^5]"""
task = Translate.query.get(self.task_id)
if not task:
raise ValueError(f"任务 {self.task_id} 不存在")
# 验证文件存在性
if not os.path.exists(task.origin_filepath):
raise FileNotFoundError(f"原始文件不存在: {task.origin_filepath}")
# 更新任务状态
task.status = 'process'
task.start_at = db.func.now()
db.session.commit()
return task
def _init_translate_config(self, task):
"""初始化翻译配置[^6]"""
if task.api_url and task.api_key:
import openai
openai.api_base = task.api_url
openai.api_key = task.api_key
def _complete_task(self, success):
"""更新任务状态[^7]"""
from app.extensions import db
try:
with self.app.app_context():
task = db.session.query(Translate).get(self.task_id)
if task:
task.status = 'done' if success else 'failed'
task.end_at = db.func.now()
task.process = 100.00 if success else 0.00
db.session.commit()
except Exception as e:
db.session.rollback()
self.app.logger.error(f"状态更新失败: {str(e)}", exc_info=True)