File size: 7,107 Bytes
cabc5bb
 
41348e6
cabc5bb
41348e6
cabc5bb
 
 
819a890
41348e6
 
 
 
 
 
 
 
 
 
 
819a890
cabc5bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41348e6
 
cabc5bb
 
 
 
 
 
 
 
 
41348e6
cabc5bb
 
 
 
 
 
 
 
 
 
 
 
41348e6
cabc5bb
 
41348e6
cabc5bb
 
 
 
 
 
 
 
 
41348e6
 
cabc5bb
 
 
41348e6
 
 
 
cabc5bb
 
 
41348e6
cabc5bb
41348e6
 
 
cabc5bb
 
41348e6
 
cabc5bb
 
41348e6
 
cabc5bb
41348e6
 
 
 
 
 
 
cabc5bb
41348e6
 
cabc5bb
41348e6
 
 
 
 
 
 
 
cabc5bb
 
 
 
 
 
41348e6
cabc5bb
 
 
 
41348e6
cabc5bb
41348e6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cabc5bb
41348e6
cabc5bb
 
 
41348e6
cabc5bb
41348e6
 
 
 
 
 
 
cabc5bb
 
 
 
 
 
 
 
 
41348e6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
#!/bin/sh

# 检查必要的环境变量
if [ -z "$HF_TOKEN" ] || [ -z "$DATASET_ID" ]; then
    echo "错误: 未设置 HF_TOKEN 或 DATASET_ID 环境变量" >&2
    exit 1
fi

# 激活虚拟环境
if [ -f "$APP_HOME/venv/bin/activate" ]; then
    . "$APP_HOME/venv/bin/activate"
else
    echo "错误: 虚拟环境激活脚本未找到" >&2
    exit 1
fi

# 设置默认同步间隔
SYNC_INTERVAL=${SYNC_INTERVAL:-7200}  # 默认2小时
STORAGE_PATH="$APP_HOME/data"
SQUASH_FLAG_FILE="/tmp/last_squash_time"

# 生成同步脚本
cat > hf_sync.py << 'EOL'
# HuggingFace 同步脚本
from huggingface_hub import HfApi
import sys
import os
import tarfile
import tempfile

# 管理备份文件数量,超出最大数量则自动删除最旧的备份
def manage_backups(api, repo_id, max_files=50):
    files = api.list_repo_files(repo_id=repo_id, repo_type="dataset")
    backup_files = [f for f in files if f.startswith('backup_') and f.endswith('.tar.gz')]
    backup_files.sort()
    if len(backup_files) >= max_files:
        files_to_delete = backup_files[:(len(backup_files) - max_files + 1)]
        for file_to_delete in files_to_delete:
            try:
                api.delete_file(path_in_repo=file_to_delete, repo_id=repo_id, repo_type="dataset")
                print(f'已删除旧备份: {file_to_delete}')
            except Exception as e:
                print(f'删除 {file_to_delete} 时出错: {str(e)}')

# 上传备份文件到 HuggingFace
def upload_backup(file_path, file_name, token, repo_id):
    api = HfApi(token=token)
    try:
        api.upload_file(
            path_or_fileobj=file_path,
            path_in_repo=file_name,
            repo_id=repo_id,
            repo_type="dataset"
        )
        print(f"成功上传 {file_name}")
        manage_backups(api, repo_id)
    except Exception as e:
        print(f"上传文件出错: {str(e)}")
        return False
    return True

# 下载最新备份
def download_latest_backup(token, repo_id, extract_path):
    try:
        api = HfApi(token=token)
        files = api.list_repo_files(repo_id=repo_id, repo_type="dataset")
        backup_files = [f for f in files if f.startswith('backup_') and f.endswith('.tar.gz')]
        if not backup_files:
            print("未找到任何备份文件")
            return False
        latest_backup = sorted(backup_files)[-1]
        with tempfile.TemporaryDirectory() as temp_dir:
            filepath = api.hf_hub_download(
                repo_id=repo_id,
                filename=latest_backup,
                repo_type="dataset",
                local_dir=temp_dir
            )
            if filepath and os.path.exists(filepath):
                with tarfile.open(filepath, 'r:gz') as tar:
                    tar.extractall(extract_path)
                print(f"已成功恢复备份: {latest_backup}")
                return True
    except Exception as e:
        print(f"下载备份出错: {str(e)}")
    return False

# 合并历史提交
def super_squash_history(token, repo_id):
    try:
        api = HfApi(token=token)
        api.super_squash_history(repo_id=repo_id, repo_type="dataset")
        print("历史合并完成。")
    except Exception as e:
        print(f"合并历史出错: {str(e)}")
        return False
    return True

# 主函数
if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("错误: 参数不足")
        sys.exit(1)
        
    action = sys.argv[1]
    token = sys.argv[2]
    repo_id = sys.argv[3]
    
    if action == "upload":
        if len(sys.argv) < 6:
            print("错误: upload 操作需要文件路径和文件名参数")
            sys.exit(1)
        file_path = sys.argv[4]
        file_name = sys.argv[5]
        if not upload_backup(file_path, file_name, token, repo_id):
            sys.exit(1)
    elif action == "download":
        extract_path = sys.argv[4] if len(sys.argv) > 4 else '.'
        if not download_latest_backup(token, repo_id, extract_path):
            sys.exit(1)
    elif action == "super_squash":
        if not super_squash_history(token, repo_id):
            sys.exit(1)
    else:
        print(f"错误: 未知操作 '{action}'")
        sys.exit(1)
        
    sys.exit(0)
EOL

# 首次启动时从 HuggingFace 下载最新备份
echo "正在从 HuggingFace 下载最新备份..."
if ! python hf_sync.py download "${HF_TOKEN}" "${DATASET_ID}" "${APP_HOME}"; then
    echo "警告: 下载备份失败,将使用空数据目录启动" >&2
fi

# 启动 OpenList 服务
echo "启动 OpenList 服务..."
$APP_HOME/openlist server &
OPENLIST_PID=$!

# 同步函数
sync_data() {
    while true; do
        echo "同步进程启动于 $(date)"
        
        # 确保数据目录存在
        if [ -d "${STORAGE_PATH}" ]; then
            # 创建备份
            timestamp=$(date +%Y%m%d_%H%M%S)
            backup_file="backup_${timestamp}.tar.gz"
            temp_backup="/tmp/${backup_file}"
            
            # 压缩目录
            echo "正在创建数据备份..."
            if tar -czf "${temp_backup}" -C "$(dirname "${STORAGE_PATH}")" "$(basename "${STORAGE_PATH}")"; then
                # 上传到 HuggingFace
                echo "正在上传备份到 HuggingFace..."
                if python hf_sync.py upload "${HF_TOKEN}" "${DATASET_ID}" "${temp_backup}" "${backup_file}"; then
                    echo "备份上传成功"
                else
                    echo "错误: 备份上传失败" >&2
                fi
                
                # 合并历史提交
                NOW=$(date +%s)
                SEVEN_DAYS=$((7*24*60*60))
                if [ ! -f "$SQUASH_FLAG_FILE" ]; then
                    echo $NOW > "$SQUASH_FLAG_FILE"
                    echo "首次合并历史提交..."
                    python hf_sync.py super_squash "${HF_TOKEN}" "${DATASET_ID}"
                else
                    LAST=$(cat "$SQUASH_FLAG_FILE")
                    DIFF=$((NOW - LAST))
                    if [ $DIFF -ge $SEVEN_DAYS ]; then
                        echo $NOW > "$SQUASH_FLAG_FILE"
                        echo "距离上次合并已超过7天,正在合并历史提交..."
                        python hf_sync.py super_squash "${HF_TOKEN}" "${DATASET_ID}"
                    else
                        echo "距离上次合并未满7天,本次跳过合并历史提交。"
                    fi
                fi
            else
                echo "错误: 创建备份失败" >&2
            fi

            # 清理临时文件
            rm -f "${temp_backup}"
        else
            echo "警告: 存储目录 ${STORAGE_PATH} 不存在,等待中..." >&2
        fi
        
        # 检查 OpenList 服务是否仍在运行
        if ! kill -0 $OPENLIST_PID 2>/dev/null; then
            echo "错误: OpenList 服务已停止,退出同步进程" >&2
            exit 1
        fi
        
        # 同步间隔
        echo "下次同步将在 ${SYNC_INTERVAL} 秒后进行..."
        sleep $SYNC_INTERVAL
    done
}

# 启动同步进程
sync_data