FLUX.1-Krea-dev / remote_uploader.py
dangthr's picture
Update remote_uploader.py
4ea9e2d verified
raw
history blame
8.76 kB
import requests
import argparse
import os
import asyncio
import websockets
import json
import base64
import logging
import sys
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class WebSocketFileUploader:
def __init__(self, server_url, api_key, space_id):
self.server_url = server_url
self.api_key = api_key
self.space_id = space_id
async def connect_and_upload(self, upload_dir):
"""连接到 WebSocket 服务器并上传文件"""
# 将 HTTP URL 转换为 WebSocket URL
ws_url = self.server_url.replace('http://', 'ws://').replace('https://', 'wss://').rstrip('/')
ws_url += f'/ws/upload/{self.space_id}?token={self.api_key}'
logger.info(f"正在连接到 WebSocket: {ws_url}")
try:
async with websockets.connect(ws_url, ping_interval=20, ping_timeout=60) as websocket:
logger.info("WebSocket 连接成功")
# 发送认证信息
auth_msg = {
"type": "auth",
"api_key": self.api_key,
"space_id": self.space_id
}
await websocket.send(json.dumps(auth_msg))
# 等待认证响应
response = await websocket.recv()
auth_response = json.loads(response)
if auth_response.get("type") == "auth_success":
logger.info("认证成功,开始上传文件...")
return await self._upload_files(websocket, upload_dir)
else:
logger.error(f"认证失败: {auth_response.get('message', '未知错误')}")
return False
except Exception as e:
logger.error(f"WebSocket 连接失败: {e}")
# 如果 WebSocket 失败,回退到 HTTP API
logger.info("回退到 HTTP API 上传...")
return self._fallback_http_upload(upload_dir)
async def _upload_files(self, websocket, upload_dir):
"""通过 WebSocket 上传目录中的所有文件"""
if not os.path.exists(upload_dir):
logger.error(f"目录不存在: {upload_dir}")
return False
# 获取所有文件
all_files = []
for root, dirs, files in os.walk(upload_dir):
for file in files:
file_path = os.path.join(root, file)
if os.path.isfile(file_path):
all_files.append(file_path)
if not all_files:
logger.info("📁 目录中没有找到任何文件")
return True
logger.info(f"📁 找到 {len(all_files)} 个文件,开始上传...")
success_count = 0
failed_count = 0
for file_path in all_files:
try:
if await self._upload_single_file(websocket, file_path):
success_count += 1
logger.info(f"✅ 文件 '{os.path.basename(file_path)}' 上传成功!")
else:
failed_count += 1
logger.error(f"❌ 文件 '{os.path.basename(file_path)}' 上传失败")
# 稍微延迟一下
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"上传文件 {file_path} 时发生异常: {e}")
failed_count += 1
logger.info(f"📊 上传完成! 成功: {success_count}, 失败: {failed_count}")
return success_count > 0
async def _upload_single_file(self, websocket, file_path):
"""上传单个文件通过 WebSocket"""
try:
filename = os.path.basename(file_path)
logger.info(f"正在上传文件: {filename}")
# 读取文件内容并编码为 base64
with open(file_path, 'rb') as f:
file_content = f.read()
file_b64 = base64.b64encode(file_content).decode('utf-8')
# 构建上传消息
upload_msg = {
"type": "file_upload",
"filename": filename,
"content": file_b64,
"space_id": self.space_id
}
# 发送文件
await websocket.send(json.dumps(upload_msg))
# 等待响应
response = await websocket.recv()
result = json.loads(response)
if result.get("type") == "upload_success":
return True
else:
logger.error(f"上传失败: {result.get('message', '未知错误')}")
return False
except Exception as e:
logger.error(f"上传文件 {file_path} 时出错: {e}")
return False
def _fallback_http_upload(self, upload_dir):
"""HTTP API 回退方案"""
logger.info("使用 HTTP API 回退方案上传文件...")
if not os.path.exists(upload_dir):
logger.error(f"目录不存在: {upload_dir}")
return False
# 构建完整的 API 端点 URL
upload_url = f"{self.server_url.rstrip('/')}/api/remote_upload"
# 准备请求头
headers = {
'X-API-Key': self.api_key
}
data = {
'space_id': self.space_id
}
# 获取所有文件
all_files = []
for root, dirs, files in os.walk(upload_dir):
for file in files:
file_path = os.path.join(root, file)
if os.path.isfile(file_path):
all_files.append(file_path)
if not all_files:
logger.info("📁 目录中没有找到任何文件")
return True
logger.info(f"📁 找到 {len(all_files)} 个文件,开始HTTP上传...")
success_count = 0
failed_count = 0
for file_path in all_files:
try:
filename = os.path.basename(file_path)
logger.info(f"正在上传文件: {filename}")
with open(file_path, 'rb') as f:
files = {
'file': (filename, f)
}
# 发送 POST 请求
response = requests.post(upload_url, headers=headers, data=data, files=files, timeout=30)
if response.status_code == 200:
logger.info(f"✅ 文件 '{filename}' 上传成功!")
success_count += 1
else:
logger.error(f"❌ 上传失败: HTTP {response.status_code}")
failed_count += 1
time.sleep(0.5)
except Exception as e:
logger.error(f"上传文件 {file_path} 时发生异常: {e}")
failed_count += 1
logger.info(f"📊 上传完成! 成功: {success_count}, 失败: {failed_count}")
return success_count > 0
async def upload_directory_websocket(upload_dir, server_url, api_key, space_id):
"""使用 WebSocket 方式上传目录中的所有文件"""
logger.info(f"🔍 开始扫描目录: {upload_dir}")
logger.info(f"📡 服务器地址: {server_url}")
logger.info(f"🔑 Space ID: {space_id}")
logger.info("-" * 50)
uploader = WebSocketFileUploader(server_url, api_key, space_id)
success = await uploader.connect_and_upload(upload_dir)
if success:
logger.info("🎉 文件已成功上传到您的网盘!")
else:
logger.error("❌ 文件上传失败")
return success
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="WebSocket 文件上传器 - 扫描并上传指定文件夹中的所有文件")
parser.add_argument("api_key", help="您的 API 密钥")
parser.add_argument("space_id", help="Space ID")
parser.add_argument("--server", default="https://gkbtyo-rqvays-5001.preview.cloudstudio.work", help="服务器的 URL 地址")
parser.add_argument("--upload-dir", default="output", help="要上传的目录 (默认: output)")
args = parser.parse_args()
# 运行异步上传
asyncio.run(upload_directory_websocket(args.upload_dir, args.server, args.api_key, args.space_id))