FLUX.1-Krea-dev / remote_uploader.py
dangthr's picture
Update remote_uploader.py
76b5b20 verified
raw
history blame
5.44 kB
import asyncio
import websockets
import json
import logging
import sys
import base64
import os
from urllib.parse import urlparse
# 配置日志
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class WebSocketFileUploader:
def __init__(self, server_url, api_key, space_id):
# 将HTTP URL转换为WebSocket URL
parsed_url = urlparse(server_url)
scheme = 'wss' if parsed_url.scheme == 'https' else 'ws'
self.ws_url = f"{scheme}://{parsed_url.netloc}/ws/upload/{space_id}"
self.api_key = api_key
self.space_id = space_id
async def upload_file(self, file_path):
"""通过WebSocket上传单个文件"""
if not os.path.exists(file_path):
print(f"文件不存在: {file_path}")
return False
filename = os.path.basename(file_path)
print(f"正在上传文件: {filename}")
try:
# 读取文件内容并转换为base64
with open(file_path, 'rb') as f:
file_content = f.read()
file_b64 = base64.b64encode(file_content).decode('utf-8')
# 连接WebSocket并上传文件
async with websockets.connect(self.ws_url, ping_interval=20, ping_timeout=60) as websocket:
# 发送认证信息
auth_msg = {
"type": "auth",
"api_key": self.api_key,
"space_id": self.space_id
}
await websocket.send(json.dumps(auth_msg))
# 等待认证响应
response = await websocket.recv()
auth_response = json.loads(response)
if auth_response.get("type") != "auth_success":
print(f"❌ 认证失败: {auth_response.get('message', '未知错误')}")
return False
# 发送文件
file_msg = {
"type": "file_upload",
"filename": filename,
"content": file_b64,
"space_id": self.space_id
}
await websocket.send(json.dumps(file_msg))
# 等待上传响应
response = await websocket.recv()
upload_response = json.loads(response)
if upload_response.get("type") == "upload_success":
print(f"✅ 文件 '{filename}' 上传成功!")
return True
else:
print(f"❌ 上传失败: {upload_response.get('message', '未知错误')}")
return False
except Exception as e:
print(f"上传文件时出错: {e}")
return False
async def upload_directory_once(upload_dir, server_url, api_key, space_id):
"""一次性扫描并上传目录中的所有文件"""
if not os.path.exists(upload_dir):
print(f"目录不存在: {upload_dir}")
return
print(f"🔍 开始扫描目录: {upload_dir}")
print(f"📡 服务器地址: {server_url}")
print(f"🔑 Space ID: {space_id}")
print("-" * 50)
uploader = WebSocketFileUploader(server_url, api_key, space_id)
# 获取所有文件
all_files = []
for root, dirs, files in os.walk(upload_dir):
for file in files:
file_path = os.path.join(root, file)
if os.path.isfile(file_path):
all_files.append(file_path)
if not all_files:
print("📁 目录中没有找到任何文件")
return
print(f"📁 找到 {len(all_files)} 个文件,开始上传...")
success_count = 0
failed_count = 0
for file_path in all_files:
try:
if await uploader.upload_file(file_path):
success_count += 1
else:
failed_count += 1
# 稍微延迟一下,避免服务器压力过大
await asyncio.sleep(0.5)
except Exception as e:
print(f"上传文件 {file_path} 时发生异常: {e}")
failed_count += 1
print("-" * 50)
print(f"📊 上传完成! 成功: {success_count}, 失败: {failed_count}")
if success_count > 0:
print("🎉 文件已成功上传到您的网盘!")
return success_count, failed_count
async def main():
if len(sys.argv) < 3:
print("使用方法: python3 remote_uploader.py <api_key> <space_id> [--server <server_url>] [--upload-dir <directory>]")
sys.exit(1)
api_key = sys.argv[1]
space_id = sys.argv[2]
# 解析可选参数
server_url = "https://gkbtyo-rqvays-5001.preview.cloudstudio.work"
upload_dir = "output"
i = 3
while i < len(sys.argv):
if sys.argv[i] == "--server" and i + 1 < len(sys.argv):
server_url = sys.argv[i + 1]
i += 2
elif sys.argv[i] == "--upload-dir" and i + 1 < len(sys.argv):
upload_dir = sys.argv[i + 1]
i += 2
else:
i += 1
# 开始异步上传
await upload_directory_once(upload_dir, server_url, api_key, space_id)
if __name__ == "__main__":
asyncio.run(main())