Spaces:
Running
Running
File size: 7,660 Bytes
3b13b0e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
import os
import glob
import time
import platform
import shutil
from uuid import uuid4
from loguru import logger
from app.utils import utils
def open_task_folder(root_dir, task_id):
"""打开任务文件夹
Args:
root_dir: 项目根目录
task_id: 任务ID
"""
try:
sys = platform.system()
path = os.path.join(root_dir, "storage", "tasks", task_id)
if os.path.exists(path):
if sys == 'Windows':
os.system(f"start {path}")
if sys == 'Darwin':
os.system(f"open {path}")
if sys == 'Linux':
os.system(f"xdg-open {path}")
except Exception as e:
logger.error(f"打开任务文件夹失败: {e}")
def cleanup_temp_files(temp_dir, max_age=3600):
"""清理临时文件
Args:
temp_dir: 临时文件目录
max_age: 文件最大保存时间(秒)
"""
if os.path.exists(temp_dir):
for file in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, file)
try:
if os.path.getctime(file_path) < time.time() - max_age:
if os.path.isfile(file_path):
os.remove(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
logger.debug(f"已清理临时文件: {file_path}")
except Exception as e:
logger.error(f"清理临时文件失败: {file_path}, 错误: {e}")
def get_file_list(directory, file_types=None, sort_by='ctime', reverse=True):
"""获取指定目录下的文件列表
Args:
directory: 目录路径
file_types: 文件类型列表,如 ['.mp4', '.mov']
sort_by: 排序方式,支持 'ctime'(创建时间), 'mtime'(修改时间), 'size'(文件大小), 'name'(文件名)
reverse: 是否倒序排序
Returns:
list: 文件信息列表
"""
if not os.path.exists(directory):
return []
files = []
if file_types:
for file_type in file_types:
files.extend(glob.glob(os.path.join(directory, f"*{file_type}")))
else:
files = glob.glob(os.path.join(directory, "*"))
file_list = []
for file_path in files:
try:
file_stat = os.stat(file_path)
file_info = {
"name": os.path.basename(file_path),
"path": file_path,
"size": file_stat.st_size,
"ctime": file_stat.st_ctime,
"mtime": file_stat.st_mtime
}
file_list.append(file_info)
except Exception as e:
logger.error(f"获取文件信息失败: {file_path}, 错误: {e}")
# 排序
if sort_by in ['ctime', 'mtime', 'size', 'name']:
file_list.sort(key=lambda x: x.get(sort_by, ''), reverse=reverse)
return file_list
def save_uploaded_file(uploaded_file, save_dir, allowed_types=None):
"""保存上传的文件
Args:
uploaded_file: StreamlitUploadedFile对象
save_dir: 保存目录
allowed_types: 允许的文件类型列表,如 ['.mp4', '.mov']
Returns:
str: 保存后的文件路径,失败返回None
"""
try:
if not os.path.exists(save_dir):
os.makedirs(save_dir)
file_name, file_extension = os.path.splitext(uploaded_file.name)
# 检查文件类型
if allowed_types and file_extension.lower() not in allowed_types:
logger.error(f"不支持的文件类型: {file_extension}")
return None
# 如果文件已存在,添加时间戳
save_path = os.path.join(save_dir, uploaded_file.name)
if os.path.exists(save_path):
timestamp = time.strftime("%Y%m%d%H%M%S")
new_file_name = f"{file_name}_{timestamp}{file_extension}"
save_path = os.path.join(save_dir, new_file_name)
# 保存文件
with open(save_path, "wb") as f:
f.write(uploaded_file.read())
logger.info(f"文件保存成功: {save_path}")
return save_path
except Exception as e:
logger.error(f"保存上传文件失败: {e}")
return None
def create_temp_file(prefix='tmp', suffix='', directory=None):
"""创建临时文件
Args:
prefix: 文件名前缀
suffix: 文件扩展名
directory: 临时文件目录,默认使用系统临时目录
Returns:
str: 临时文件路径
"""
try:
if directory is None:
directory = utils.storage_dir("temp", create=True)
if not os.path.exists(directory):
os.makedirs(directory)
temp_file = os.path.join(directory, f"{prefix}-{str(uuid4())}{suffix}")
return temp_file
except Exception as e:
logger.error(f"创建临时文件失败: {e}")
return None
def get_file_size(file_path, format='MB'):
"""获取文件大小
Args:
file_path: 文件路径
format: 返回格式,支持 'B', 'KB', 'MB', 'GB'
Returns:
float: 文件大小
"""
try:
size_bytes = os.path.getsize(file_path)
if format.upper() == 'B':
return size_bytes
elif format.upper() == 'KB':
return size_bytes / 1024
elif format.upper() == 'MB':
return size_bytes / (1024 * 1024)
elif format.upper() == 'GB':
return size_bytes / (1024 * 1024 * 1024)
else:
return size_bytes
except Exception as e:
logger.error(f"获取文件大小失败: {file_path}, 错误: {e}")
return 0
def ensure_directory(directory):
"""确保目录存在,如果不存在则创建
Args:
directory: 目录路径
Returns:
bool: 是否成功
"""
try:
if not os.path.exists(directory):
os.makedirs(directory)
return True
except Exception as e:
logger.error(f"创建目录失败: {directory}, 错误: {e}")
return False
def create_zip(files: list, zip_path: str, base_dir: str = None, folder_name: str = "demo") -> bool:
"""
创建zip文件
Args:
files: 要打包的文件列表
zip_path: zip文件保存路径
base_dir: 基础目录,用于保持目录结构
folder_name: zip解压后的文件夹名称,默认为frames
Returns:
bool: 是否成功
"""
try:
import zipfile
# 确保目标目录存在
os.makedirs(os.path.dirname(zip_path), exist_ok=True)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file in files:
if not os.path.exists(file):
logger.warning(f"文件不存在,跳过: {file}")
continue
# 计算文件在zip中的路径,添加folder_name作为前缀目录
if base_dir:
arcname = os.path.join(folder_name, os.path.relpath(file, base_dir))
else:
arcname = os.path.join(folder_name, os.path.basename(file))
try:
zipf.write(file, arcname)
except Exception as e:
logger.error(f"添加文件到zip失败: {file}, 错误: {e}")
continue
return True
except Exception as e:
logger.error(f"创建zip文件失败: {e}")
return False |