Spaces:
Running
Running
#!/usr/bin/env python3 | |
from huggingface_hub import HfApi | |
import sys | |
import os | |
import shutil | |
import time | |
import glob | |
from datetime import datetime | |
import tempfile | |
def manage_backups(api, repo_id, max_files=10): | |
"""管理备份文件数量,删除旧的备份""" | |
try: | |
files = api.list_repo_files(repo_id=repo_id, repo_type="dataset") | |
backup_files = [f for f in files if f.startswith('navidrome_backup_') and f.endswith('.tar.gz')] | |
backup_files.sort() | |
if len(backup_files) >= max_files: | |
files_to_delete = backup_files[:(len(backup_files) - max_files + 1)] | |
for file_to_delete in files_to_delete: | |
try: | |
api.delete_file(path_in_repo=file_to_delete, repo_id=repo_id, repo_type="dataset") | |
print(f'已删除旧备份: {file_to_delete}') | |
except Exception as e: | |
print(f'删除 {file_to_delete} 时出错: {str(e)}') | |
except Exception as e: | |
print(f'管理备份时出错: {str(e)}') | |
def upload_backup(file_path, file_name, token, repo_id): | |
"""上传备份文件到HuggingFace""" | |
api = HfApi(token=token) | |
try: | |
api.upload_file( | |
path_or_fileobj=file_path, | |
path_in_repo=file_name, | |
repo_id=repo_id, | |
repo_type="dataset" | |
) | |
print(f"成功上传 {file_name}") | |
manage_backups(api, repo_id) | |
except Exception as e: | |
print(f"文件上传出错: {str(e)}") | |
def download_latest_backup(token, repo_id, data_dir): | |
"""下载最新的备份文件并恢复""" | |
try: | |
# 确保目标目录存在并有正确权限 | |
os.makedirs(data_dir, exist_ok=True) | |
os.chmod(data_dir, 0o755) | |
api = HfApi(token=token) | |
files = api.list_repo_files(repo_id=repo_id, repo_type="dataset") | |
backup_files = [f for f in files if f.startswith('navidrome_backup_') and f.endswith('.tar.gz')] | |
if not backup_files: | |
print("未找到备份文件") | |
return False | |
latest_backup = sorted(backup_files)[-1] | |
# 使用临时目录下载文件 | |
with tempfile.TemporaryDirectory() as temp_dir: | |
filepath = api.hf_hub_download( | |
repo_id=repo_id, | |
filename=latest_backup, | |
repo_type="dataset", | |
local_dir=temp_dir | |
) | |
if filepath and os.path.exists(filepath): | |
# 解压备份文件 | |
import tarfile | |
with tarfile.open(filepath, "r:gz") as tar: | |
tar.extractall(path=data_dir) | |
print(f"成功从 {latest_backup} 恢复备份到 {data_dir}") | |
return True | |
return False | |
except Exception as e: | |
print(f"下载备份时出错: {str(e)}") | |
return False | |
def create_backup(data_dir, exclude_dirs=None): | |
"""创建备份文件,排除指定目录""" | |
if exclude_dirs is None: | |
exclude_dirs = [] | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
backup_filename = f"navidrome_backup_{timestamp}.tar.gz" | |
backup_path = f"/tmp/{backup_filename}" | |
try: | |
import tarfile | |
with tarfile.open(backup_path, "w:gz") as tar: | |
for item in os.listdir(data_dir): | |
item_path = os.path.join(data_dir, item) | |
# 排除指定目录 | |
if os.path.isdir(item_path) and item in exclude_dirs: | |
continue | |
tar.add(item_path, arcname=item) | |
print(f"备份文件创建成功: {backup_path}") | |
return backup_path, backup_filename | |
except Exception as e: | |
print(f"创建备份时出错: {str(e)}") | |
return None, None | |
if __name__ == "__main__": | |
action = sys.argv[1] | |
token = sys.argv[2] | |
repo_id = sys.argv[3] | |
data_dir = sys.argv[4] | |
if action == "upload": | |
# 排除音乐目录和缓存,只备份数据和配置 | |
exclude_dirs = ["cache", "music"] | |
backup_path, backup_filename = create_backup(data_dir, exclude_dirs) | |
if backup_path: | |
upload_backup(backup_path, backup_filename, token, repo_id) | |
# 清理临时文件 | |
os.remove(backup_path) | |
elif action == "download": | |
download_latest_backup(token, repo_id, data_dir) |