Spaces:
Runtime error
Runtime error
import requests | |
import argparse | |
import os | |
import time | |
class FileUploader: | |
def __init__(self, server_url, api_key, space_id): | |
self.server_url = server_url | |
self.api_key = api_key | |
self.space_id = space_id | |
def upload_file(self, file_path): | |
"""上传单个文件""" | |
if not os.path.exists(file_path): | |
print(f"文件不存在: {file_path}") | |
return False | |
filename = os.path.basename(file_path) | |
print(f"正在上传文件: {filename}") | |
# 构建完整的 API 端点 URL | |
upload_url = f"{self.server_url.rstrip('/')}/api/remote_upload" | |
# 准备请求头和数据 | |
headers = { | |
'X-API-Key': self.api_key | |
} | |
data = { | |
'space_id': self.space_id | |
} | |
# 读取文件内容并上传 | |
try: | |
with open(file_path, 'rb') as f: | |
files = { | |
'file': (filename, f) | |
} | |
# 发送 POST 请求 | |
response = requests.post(upload_url, headers=headers, data=data, files=files) | |
# 打印服务器响应 | |
print(f"服务器响应 ({response.status_code}):") | |
try: | |
result = response.json() | |
print(result) | |
if response.status_code == 200: | |
print(f"✅ 文件 '{filename}' 上传成功!") | |
return True | |
else: | |
print(f"❌ 上传失败: {result.get('error', '未知错误')}") | |
return False | |
except requests.exceptions.JSONDecodeError: | |
print(response.text) | |
return False | |
except IOError as e: | |
print(f"读取文件时出错: {e}") | |
return False | |
except requests.exceptions.RequestException as e: | |
print(f"请求时出错: {e}") | |
return False | |
def upload_directory_once(watch_dir, server_url, api_key, space_id): | |
"""一次性扫描并上传目录中的所有文件""" | |
if not os.path.exists(watch_dir): | |
print(f"目录不存在: {watch_dir}") | |
return | |
print(f"🔍 开始扫描目录: {watch_dir}") | |
print(f"📡 服务器地址: {server_url}") | |
print(f"🔑 Space ID: {space_id}") | |
print("-" * 50) | |
uploader = FileUploader(server_url, api_key, space_id) | |
# 获取所有文件 | |
all_files = [] | |
for root, dirs, files in os.walk(watch_dir): | |
for file in files: | |
file_path = os.path.join(root, file) | |
if os.path.isfile(file_path): | |
all_files.append(file_path) | |
if not all_files: | |
print("📁 目录中没有找到任何文件") | |
return | |
print(f"📁 找到 {len(all_files)} 个文件,开始上传...") | |
success_count = 0 | |
failed_count = 0 | |
for file_path in all_files: | |
try: | |
if uploader.upload_file(file_path): | |
success_count += 1 | |
else: | |
failed_count += 1 | |
# 稍微延迟一下,避免服务器压力过大 | |
time.sleep(0.5) | |
except Exception as e: | |
print(f"上传文件 {file_path} 时发生异常: {e}") | |
failed_count += 1 | |
print("-" * 50) | |
print(f"📊 上传完成! 成功: {success_count}, 失败: {failed_count}") | |
if success_count > 0: | |
print("🎉 文件已成功上传到您的网盘!") | |
return success_count, failed_count | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="一次性文件上传器 - 扫描并上传指定文件夹中的所有文件") | |
parser.add_argument("api_key", help="您的 API 密钥") | |
parser.add_argument("space_id", help="Space ID") | |
parser.add_argument("--server", default="http://127.0.0.1:5001", help="服务器的 URL 地址 (默认: http://127.0.0.1:5001)") | |
parser.add_argument("--upload-dir", default="output", help="要上传的目录 (默认: output)") | |
args = parser.parse_args() | |
# 开始一次性上传 | |
upload_directory_once(args.upload_dir, args.server, args.api_key, args.space_id) |