import requests import argparse import os import asyncio import websockets import json import base64 import logging import sys import time # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class WebSocketFileUploader: def __init__(self, server_url, api_key, space_id): self.server_url = server_url self.api_key = api_key self.space_id = space_id async def connect_and_upload(self, upload_dir): """连接到 WebSocket 服务器并上传文件""" # 将 HTTP URL 转换为 WebSocket URL # 清理URL,移除可能的换行符、空格和反引号 clean_server_url = self.server_url.strip().replace('`', '') # 移除反引号 ws_url = clean_server_url.replace('http://', 'ws://').replace('https://', 'wss://').rstrip('/') ws_url += f'/ws/upload/{self.space_id}?token={self.api_key}' logger.info(f"正在连接到 WebSocket: {ws_url}") try: async with websockets.connect(ws_url, ping_interval=20, ping_timeout=60) as websocket: logger.info("WebSocket 连接成功") # 发送认证信息 auth_msg = { "type": "auth", "api_key": self.api_key, "space_id": self.space_id } await websocket.send(json.dumps(auth_msg)) # 等待认证响应 response = await websocket.recv() auth_response = json.loads(response) if auth_response.get("type") == "auth_success": logger.info("认证成功,开始上传文件...") return await self._upload_files(websocket, upload_dir) else: logger.error(f"认证失败: {auth_response.get('message', '未知错误')}") return False except Exception as e: logger.error(f"WebSocket 连接失败: {e}") # 如果 WebSocket 失败,回退到 HTTP API logger.info("回退到 HTTP API 上传...") return self._fallback_http_upload(upload_dir) async def _upload_files(self, websocket, upload_dir): """通过 WebSocket 上传目录中的所有文件""" if not os.path.exists(upload_dir): logger.error(f"目录不存在: {upload_dir}") return False # 获取所有文件 all_files = [] for root, dirs, files in os.walk(upload_dir): for file in files: file_path = os.path.join(root, file) if os.path.isfile(file_path): all_files.append(file_path) if not all_files: logger.info("📁 目录中没有找到任何文件") return True logger.info(f"📁 找到 {len(all_files)} 个文件,开始上传...") success_count = 0 failed_count = 0 for file_path in all_files: try: if await self._upload_single_file(websocket, file_path): success_count += 1 logger.info(f"✅ 文件 '{os.path.basename(file_path)}' 上传成功!") else: failed_count += 1 logger.error(f"❌ 文件 '{os.path.basename(file_path)}' 上传失败") # 稍微延迟一下 await asyncio.sleep(0.5) except Exception as e: logger.error(f"上传文件 {file_path} 时发生异常: {e}") failed_count += 1 logger.info(f"📊 上传完成! 成功: {success_count}, 失败: {failed_count}") return success_count > 0 async def _upload_single_file(self, websocket, file_path): """上传单个文件通过 WebSocket""" try: filename = os.path.basename(file_path) logger.info(f"正在上传文件: {filename}") # 读取文件内容并编码为 base64 with open(file_path, 'rb') as f: file_content = f.read() file_b64 = base64.b64encode(file_content).decode('utf-8') # 构建上传消息 upload_msg = { "type": "file_upload", "filename": filename, "content": file_b64, "space_id": self.space_id } # 发送文件 await websocket.send(json.dumps(upload_msg)) # 等待响应 response = await websocket.recv() result = json.loads(response) if result.get("type") == "upload_success": return True else: logger.error(f"上传失败: {result.get('message', '未知错误')}") return False except Exception as e: logger.error(f"上传文件 {file_path} 时出错: {e}") return False def _fallback_http_upload(self, upload_dir): """HTTP API 回退方案""" logger.info("使用 HTTP API 回退方案上传文件...") if not os.path.exists(upload_dir): logger.error(f"目录不存在: {upload_dir}") return False # 构建完整的 API 端点 URL # 清理URL,移除可能的换行符、空格和反引号 clean_server_url = self.server_url.strip().replace('`', '') # 移除反引号 upload_url = f"{clean_server_url.rstrip('/')}/api/remote_upload" # 准备请求头 headers = { 'X-API-Key': self.api_key } data = { 'space_id': self.space_id } # 获取所有文件 all_files = [] for root, dirs, files in os.walk(upload_dir): for file in files: file_path = os.path.join(root, file) if os.path.isfile(file_path): all_files.append(file_path) if not all_files: logger.info("📁 目录中没有找到任何文件") return True logger.info(f"📁 找到 {len(all_files)} 个文件,开始HTTP上传...") success_count = 0 failed_count = 0 for file_path in all_files: try: filename = os.path.basename(file_path) logger.info(f"正在上传文件: {filename}") with open(file_path, 'rb') as f: files = { 'file': (filename, f) } # 发送 POST 请求 response = requests.post(upload_url, headers=headers, data=data, files=files, timeout=30) if response.status_code == 200: logger.info(f"✅ 文件 '{filename}' 上传成功!") success_count += 1 else: logger.error(f"❌ 上传失败: HTTP {response.status_code}") failed_count += 1 time.sleep(0.5) except Exception as e: logger.error(f"上传文件 {file_path} 时发生异常: {e}") failed_count += 1 logger.info(f"📊 上传完成! 成功: {success_count}, 失败: {failed_count}") return success_count > 0 async def upload_directory_websocket(upload_dir, server_url, api_key, space_id): """使用 WebSocket 方式上传目录中的所有文件""" logger.info(f"🔍 开始扫描目录: {upload_dir}") logger.info(f"📡 服务器地址: {server_url}") logger.info(f"🔑 Space ID: {space_id}") logger.info("-" * 50) uploader = WebSocketFileUploader(server_url, api_key, space_id) success = await uploader.connect_and_upload(upload_dir) if success: logger.info("🎉 文件已成功上传到您的网盘!") else: logger.error("❌ 文件上传失败") return success if __name__ == "__main__": parser = argparse.ArgumentParser(description="WebSocket 文件上传器 - 扫描并上传指定文件夹中的所有文件") parser.add_argument("api_key", help="您的 API 密钥") parser.add_argument("space_id", help="Space ID") parser.add_argument("--server", default="https://gkbtyo-rqvays-5001.preview.cloudstudio.work", help="服务器的 URL 地址") parser.add_argument("--upload-dir", default="output", help="要上传的目录 (默认: output)") args = parser.parse_args() # 运行异步上传 asyncio.run(upload_directory_websocket(args.upload_dir, args.server, args.api_key, args.space_id))