File size: 7,660 Bytes
3b13b0e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import os
import glob
import time
import platform
import shutil
from uuid import uuid4
from loguru import logger
from app.utils import utils

def open_task_folder(root_dir, task_id):
    """打开任务文件夹
    Args:
        root_dir: 项目根目录
        task_id: 任务ID
    """
    try:
        sys = platform.system()
        path = os.path.join(root_dir, "storage", "tasks", task_id)
        if os.path.exists(path):
            if sys == 'Windows':
                os.system(f"start {path}")
            if sys == 'Darwin':
                os.system(f"open {path}")
            if sys == 'Linux':
                os.system(f"xdg-open {path}")
    except Exception as e:
        logger.error(f"打开任务文件夹失败: {e}")

def cleanup_temp_files(temp_dir, max_age=3600):
    """清理临时文件
    Args:
        temp_dir: 临时文件目录
        max_age: 文件最大保存时间(秒)
    """
    if os.path.exists(temp_dir):
        for file in os.listdir(temp_dir):
            file_path = os.path.join(temp_dir, file)
            try:
                if os.path.getctime(file_path) < time.time() - max_age:
                    if os.path.isfile(file_path):
                        os.remove(file_path)
                    elif os.path.isdir(file_path):
                        shutil.rmtree(file_path)
                    logger.debug(f"已清理临时文件: {file_path}")
            except Exception as e:
                logger.error(f"清理临时文件失败: {file_path}, 错误: {e}")

def get_file_list(directory, file_types=None, sort_by='ctime', reverse=True):
    """获取指定目录下的文件列表
    Args:
        directory: 目录路径
        file_types: 文件类型列表,如 ['.mp4', '.mov']
        sort_by: 排序方式,支持 'ctime'(创建时间), 'mtime'(修改时间), 'size'(文件大小), 'name'(文件名)
        reverse: 是否倒序排序
    Returns:
        list: 文件信息列表
    """
    if not os.path.exists(directory):
        return []
    
    files = []
    if file_types:
        for file_type in file_types:
            files.extend(glob.glob(os.path.join(directory, f"*{file_type}")))
    else:
        files = glob.glob(os.path.join(directory, "*"))
    
    file_list = []
    for file_path in files:
        try:
            file_stat = os.stat(file_path)
            file_info = {
                "name": os.path.basename(file_path),
                "path": file_path,
                "size": file_stat.st_size,
                "ctime": file_stat.st_ctime,
                "mtime": file_stat.st_mtime
            }
            file_list.append(file_info)
        except Exception as e:
            logger.error(f"获取文件信息失败: {file_path}, 错误: {e}")
    
    # 排序
    if sort_by in ['ctime', 'mtime', 'size', 'name']:
        file_list.sort(key=lambda x: x.get(sort_by, ''), reverse=reverse)
    
    return file_list

def save_uploaded_file(uploaded_file, save_dir, allowed_types=None):
    """保存上传的文件
    Args:
        uploaded_file: StreamlitUploadedFile对象
        save_dir: 保存目录
        allowed_types: 允许的文件类型列表,如 ['.mp4', '.mov']
    Returns:
        str: 保存后的文件路径,失败返回None
    """
    try:
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        
        file_name, file_extension = os.path.splitext(uploaded_file.name)
        
        # 检查文件类型
        if allowed_types and file_extension.lower() not in allowed_types:
            logger.error(f"不支持的文件类型: {file_extension}")
            return None
        
        # 如果文件已存在,添加时间戳
        save_path = os.path.join(save_dir, uploaded_file.name)
        if os.path.exists(save_path):
            timestamp = time.strftime("%Y%m%d%H%M%S")
            new_file_name = f"{file_name}_{timestamp}{file_extension}"
            save_path = os.path.join(save_dir, new_file_name)
        
        # 保存文件
        with open(save_path, "wb") as f:
            f.write(uploaded_file.read())
        
        logger.info(f"文件保存成功: {save_path}")
        return save_path
    
    except Exception as e:
        logger.error(f"保存上传文件失败: {e}")
        return None

def create_temp_file(prefix='tmp', suffix='', directory=None):
    """创建临时文件
    Args:
        prefix: 文件名前缀
        suffix: 文件扩展名
        directory: 临时文件目录,默认使用系统临时目录
    Returns:
        str: 临时文件路径
    """
    try:
        if directory is None:
            directory = utils.storage_dir("temp", create=True)
        
        if not os.path.exists(directory):
            os.makedirs(directory)
        
        temp_file = os.path.join(directory, f"{prefix}-{str(uuid4())}{suffix}")
        return temp_file
    
    except Exception as e:
        logger.error(f"创建临时文件失败: {e}")
        return None

def get_file_size(file_path, format='MB'):
    """获取文件大小
    Args:
        file_path: 文件路径
        format: 返回格式,支持 'B', 'KB', 'MB', 'GB'
    Returns:
        float: 文件大小
    """
    try:
        size_bytes = os.path.getsize(file_path)
        
        if format.upper() == 'B':
            return size_bytes
        elif format.upper() == 'KB':
            return size_bytes / 1024
        elif format.upper() == 'MB':
            return size_bytes / (1024 * 1024)
        elif format.upper() == 'GB':
            return size_bytes / (1024 * 1024 * 1024)
        else:
            return size_bytes
    
    except Exception as e:
        logger.error(f"获取文件大小失败: {file_path}, 错误: {e}")
        return 0

def ensure_directory(directory):
    """确保目录存在,如果不存在则创建
    Args:
        directory: 目录路径
    Returns:
        bool: 是否成功
    """
    try:
        if not os.path.exists(directory):
            os.makedirs(directory)
        return True
    except Exception as e:
        logger.error(f"创建目录失败: {directory}, 错误: {e}")
        return False

def create_zip(files: list, zip_path: str, base_dir: str = None, folder_name: str = "demo") -> bool:
    """
    创建zip文件
    Args:
        files: 要打包的文件列表
        zip_path: zip文件保存路径
        base_dir: 基础目录,用于保持目录结构
        folder_name: zip解压后的文件夹名称,默认为frames
    Returns:
        bool: 是否成功
    """
    try:
        import zipfile
        
        # 确保目标目录存在
        os.makedirs(os.path.dirname(zip_path), exist_ok=True)
        
        with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            for file in files:
                if not os.path.exists(file):
                    logger.warning(f"文件不存在,跳过: {file}")
                    continue
                    
                # 计算文件在zip中的路径,添加folder_name作为前缀目录
                if base_dir:
                    arcname = os.path.join(folder_name, os.path.relpath(file, base_dir))
                else:
                    arcname = os.path.join(folder_name, os.path.basename(file))
                
                try:
                    zipf.write(file, arcname)
                except Exception as e:
                    logger.error(f"添加文件到zip失败: {file}, 错误: {e}")
                    continue

        return True
        
    except Exception as e:
        logger.error(f"创建zip文件失败: {e}")
        return False