|
|
|
import hashlib
|
|
import os
|
|
import uuid
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from flask import current_app
|
|
from werkzeug.utils import secure_filename
|
|
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from flask import current_app
|
|
|
|
import os
|
|
import hashlib
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from flask import current_app
|
|
|
|
|
|
class FileManager:
|
|
@staticmethod
|
|
def get_upload_dir():
|
|
"""
|
|
获取上传文件存储目录[^1]
|
|
:return: 上传文件存储目录的绝对路径
|
|
"""
|
|
base_dir = Path(current_app.config['UPLOAD_BASE_DIR'])
|
|
date_str = datetime.now().strftime('%Y-%m-%d')
|
|
upload_dir = base_dir / 'uploads' / date_str
|
|
upload_dir.mkdir(parents=True, exist_ok=True)
|
|
return str(upload_dir)
|
|
|
|
@staticmethod
|
|
def generate_filename(filename):
|
|
"""
|
|
生成唯一的文件名[^3]
|
|
:param filename: 原始文件名
|
|
:return: 唯一的文件名
|
|
"""
|
|
name, ext = os.path.splitext(filename)
|
|
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
|
|
return f"{name}_{timestamp}{ext}"
|
|
|
|
@staticmethod
|
|
def get_relative_path(full_path):
|
|
"""
|
|
获取相对于存储根目录的相对路径[^4]
|
|
:param full_path: 文件的绝对路径
|
|
:return: 相对路径
|
|
"""
|
|
base_dir = Path(current_app.config['UPLOAD_BASE_DIR'])
|
|
return str(Path(full_path).relative_to(base_dir)).replace('\\', '/')
|
|
|
|
@staticmethod
|
|
def exists(file_path):
|
|
"""
|
|
检查文件是否存在[^5]
|
|
:param file_path: 文件的相对路径或绝对路径
|
|
:return: 文件是否存在 (True/False)
|
|
"""
|
|
if not file_path:
|
|
return False
|
|
full_path = os.path.join(current_app.config['UPLOAD_BASE_DIR'], file_path.lstrip('/'))
|
|
return os.path.exists(full_path)
|
|
|
|
@staticmethod
|
|
def calculate_md5(file_path):
|
|
"""
|
|
计算文件的 MD5 值[^6]
|
|
:param file_path: 文件的绝对路径
|
|
:return: 文件的 MD5 值
|
|
"""
|
|
hash_md5 = hashlib.md5()
|
|
with open(file_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_md5.update(chunk)
|
|
return hash_md5.hexdigest()
|
|
|
|
@staticmethod
|
|
def allowed_file(filename):
|
|
"""
|
|
验证文件类型是否允许[^7]
|
|
:param filename: 文件名
|
|
:return: 文件类型是否允许 (True/False)
|
|
"""
|
|
ALLOWED_EXTENSIONS = {'docx', 'xlsx', 'pptx', 'pdf', 'txt', 'md', 'csv', 'xls', 'doc'}
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
@staticmethod
|
|
def validate_file_size(file_stream):
|
|
"""
|
|
验证文件大小是否超过限制[^8]
|
|
:param file_stream: 文件流
|
|
:return: 文件大小是否合法 (True/False)
|
|
"""
|
|
MAX_FILE_SIZE = 10 * 1024 * 1024
|
|
file_stream.seek(0, os.SEEK_END)
|
|
file_size = file_stream.tell()
|
|
file_stream.seek(0)
|
|
return file_size <= MAX_FILE_SIZE
|
|
|
|
@staticmethod
|
|
def get_translate_absolute_path(filename):
|
|
"""
|
|
获取翻译结果的绝对路径(保持原文件名)[^2]
|
|
:param filename: 原始文件名
|
|
:return: 翻译结果的绝对路径
|
|
"""
|
|
base_dir = Path(current_app.config['UPLOAD_BASE_DIR'])
|
|
date_str = datetime.now().strftime('%Y-%m-%d')
|
|
translate_dir = base_dir / 'translate' / date_str
|
|
translate_dir.mkdir(parents=True, exist_ok=True)
|
|
return str(translate_dir / filename)
|
|
|
|
|
|
|
|
|
|
class FileManager11:
|
|
@staticmethod
|
|
def allowed_file(filename):
|
|
"""验证文件类型是否允许[^1]"""
|
|
ALLOWED_EXTENSIONS = {'docx', 'xlsx', 'pptx', 'pdf', 'txt', 'md', 'csv', 'xls', 'doc'}
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
@staticmethod
|
|
def validate_file_size(file_stream):
|
|
"""验证文件大小是否超过限制[^2]"""
|
|
MAX_FILE_SIZE = 10 * 1024 * 1024
|
|
file_stream.seek(0, os.SEEK_END)
|
|
file_size = file_stream.tell()
|
|
file_stream.seek(0)
|
|
return file_size <= MAX_FILE_SIZE
|
|
|
|
@staticmethod
|
|
def get_upload_dir():
|
|
"""获取基于配置的上传目录"""
|
|
upload_dir = os.path.join(
|
|
current_app.config['UPLOAD_FOLDER'],
|
|
datetime.now().strftime('%Y-%m-%d')
|
|
)
|
|
|
|
if not os.path.exists(upload_dir):
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
return upload_dir
|
|
|
|
def get_upload_dir1111(self):
|
|
"""获取按日期分类的上传目录"""
|
|
|
|
base_dir = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
|
|
print(base_dir)
|
|
upload_dir = os.path.join(base_dir, 'uploads', datetime.now().strftime('%Y-%m-%d'))
|
|
|
|
|
|
if not os.path.exists(upload_dir):
|
|
os.makedirs(upload_dir)
|
|
return upload_dir
|
|
|
|
@staticmethod
|
|
def generate_filename(filename):
|
|
"""生成安全文件名(带随机后缀防冲突)"""
|
|
safe_name = secure_filename(filename)
|
|
name_part, ext_part = os.path.splitext(safe_name)
|
|
random_str = uuid.uuid4().hex[:6]
|
|
return f"{name_part}_{random_str}{ext_part}"
|
|
|
|
@staticmethod
|
|
def generate_filename111(filename):
|
|
"""生成安全的文件名,如果文件已存在则附加随机字符串[^4]"""
|
|
safe_filename = secure_filename(filename)
|
|
name, ext = os.path.splitext(safe_filename)
|
|
return f"{name}_{str(uuid.uuid4())[:5]}{ext}"
|
|
|
|
@staticmethod
|
|
def safe_remove(filepath):
|
|
"""安全删除文件"""
|
|
if os.path.exists(filepath):
|
|
try:
|
|
os.remove(filepath)
|
|
print(f"File {filepath} has been deleted.")
|
|
except Exception as e:
|
|
print(f"Error occurred while deleting file {filepath}: {e}")
|
|
else:
|
|
print(f"File {filepath} does not exist.")
|
|
|
|
@staticmethod
|
|
def exists(file_path: str) -> bool:
|
|
"""验证文件是否存在并检查路径安全性[^1]
|
|
Args:
|
|
file_path: 文件路径,支持相对路径和绝对路径
|
|
Returns:
|
|
bool: 文件是否存在且路径合法
|
|
"""
|
|
try:
|
|
|
|
normalized_path = Path(file_path).resolve(strict=False)
|
|
|
|
|
|
upload_dir = Path(current_app.config['UPLOAD_FOLDER']).resolve()
|
|
if not normalized_path.is_relative_to(upload_dir):
|
|
return False
|
|
|
|
return normalized_path.exists() and normalized_path.is_file()
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"文件路径验证失败: {str(e)}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def get_storage_dir():
|
|
"""获取按日期分类的存储目录[^2]"""
|
|
base_dir = Path(current_app.config['STORAGE_FOLDER'])
|
|
storage_dir = base_dir / datetime.now().strftime('%Y-%m-%d')
|
|
|
|
if not storage_dir.exists():
|
|
storage_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
return str(storage_dir)
|
|
|
|
@staticmethod
|
|
def is_secure_path(file_path: str, base_dir: str) -> bool:
|
|
"""验证文件路径是否安全[^3]
|
|
Args:
|
|
file_path: 文件路径
|
|
base_dir: 基准目录
|
|
Returns:
|
|
bool: 路径是否安全
|
|
"""
|
|
try:
|
|
normalized_path = Path(file_path).resolve(strict=False)
|
|
base_dir_path = Path(base_dir).resolve()
|
|
return normalized_path.is_relative_to(base_dir_path)
|
|
except Exception as e:
|
|
current_app.logger.error(f"路径安全验证失败: {str(e)}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def exists111xin(file_path: str, base_dir: str) -> bool:
|
|
"""验证文件是否存在并检查路径安全性[^4]
|
|
Args:
|
|
file_path: 文件路径
|
|
base_dir: 基准目录
|
|
Returns:
|
|
bool: 文件是否存在且路径合法
|
|
"""
|
|
if not FileManager.is_secure_path(file_path, base_dir):
|
|
return False
|
|
|
|
normalized_path = Path(file_path).resolve(strict=False)
|
|
return normalized_path.exists() and normalized_path.is_file()
|
|
|
|
@staticmethod
|
|
def calculate_md5(file_path):
|
|
"""计算文件的MD5值"""
|
|
hash_md5 = hashlib.md5()
|
|
with open(file_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_md5.update(chunk)
|
|
return hash_md5.hexdigest()
|
|
|
|
|
|
def get_upload_dir():
|
|
"""获取按日期分类的上传目录"""
|
|
|
|
base_dir = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
|
|
print(base_dir)
|
|
upload_dir = os.path.join(base_dir, 'uploads', datetime.now().strftime('%Y-%m-%d'))
|
|
|
|
|
|
if not os.path.exists(upload_dir):
|
|
os.makedirs(upload_dir)
|
|
return upload_dir
|
|
|