dangthr commited on
Commit
7c4e616
·
verified ·
1 Parent(s): d61a965

Update remote_uploader.py

Browse files
Files changed (1) hide show
  1. remote_uploader.py +11 -54
remote_uploader.py CHANGED
@@ -7,8 +7,7 @@ import json
7
  import base64
8
  import logging
9
  import sys
10
- import time # 修复点 1: 导入 time 模块以使用 time.sleep()
11
- from urllib.parse import urlparse # 修复点 2: 导入 urlparse 以便解析 URL
12
 
13
  # 配置日志
14
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -23,36 +22,19 @@ class WebSocketFileUploader:
23
  async def connect_and_upload(self, upload_dir):
24
  """连接到 WebSocket 服务器并上传文件"""
25
 
26
- # --- 修复点 3: 智能地构建 WebSocket URL ---
27
- # 目标: 将主应用传来的 HTTP URL (如 http://...:5001) 转换为正确的 WebSocket URL (ws://...:8765)
28
- try:
29
- parsed_http_url = urlparse(self.server_url)
30
- # 根据 HTTP 协议确定 WebSocket 协议 (http -> ws, https -> wss)
31
- ws_scheme = 'wss' if parsed_http_url.scheme == 'https' else 'ws'
32
-
33
- # 使用从 HTTP URL 获取的主机名,但硬编码正确的 WebSocket 端口 8765
34
- ws_netloc = f"{parsed_http_url.hostname}:8765"
35
-
36
- # 重新构建完整的 WebSocket URL
37
- ws_url = f"{ws_scheme}://{ws_netloc}/ws/upload/{self.space_id}?token={self.api_key}"
38
- except Exception as e:
39
- logger.error(f"解析服务器 URL '{self.server_url}' 时出错: {e}")
40
- logger.info("回退到 HTTP API 上传...")
41
- return self._fallback_http_upload(upload_dir)
42
- # --- 修复结束 ---
43
 
44
  logger.info(f"正在连接到 WebSocket: {ws_url}")
45
 
46
  try:
47
- async with websockets.connect(ws_url, ping_interval=20, ping_timeout=60) as websocket:
 
48
  logger.info("WebSocket 连接成功")
49
 
50
- # 注意:原始代码中的认证流程是先连接再发认证消息,这很好。
51
- # 我们将遵循这个流程。客户端不需要发送 auth 消息,
52
- # 因为服务器端的 handle_websocket_upload 函数会直接从连接路径和参数中验证。
53
- # 我们只需要等待服务器的 "auth_success" 响应。
54
-
55
- # 等待认证响应
56
  response = await websocket.recv()
57
  auth_response = json.loads(response)
58
 
@@ -61,23 +43,19 @@ class WebSocketFileUploader:
61
  return await self._upload_files(websocket, upload_dir)
62
  else:
63
  logger.error(f"认证失败: {auth_response.get('message', '未知错误')}")
64
- # 认证失败也回退到 HTTP
65
  logger.info("回退到 HTTP API 上传...")
66
  return self._fallback_http_upload(upload_dir)
67
 
68
  except Exception as e:
69
  logger.error(f"WebSocket 连接失败: {e}")
70
- # 如果 WebSocket 失败,回退到 HTTP API
71
  logger.info("回退到 HTTP API 上传...")
72
  return self._fallback_http_upload(upload_dir)
73
 
74
  async def _upload_files(self, websocket, upload_dir):
75
- """通过 WebSocket 上传目录中的所有文件"""
76
  if not os.path.exists(upload_dir):
77
  logger.error(f"目录不存在: {upload_dir}")
78
  return False
79
 
80
- # 获取所有文件
81
  all_files = []
82
  for root, dirs, files in os.walk(upload_dir):
83
  for file in files:
@@ -103,7 +81,6 @@ class WebSocketFileUploader:
103
  failed_count += 1
104
  logger.error(f"❌ 文件 '{os.path.basename(file_path)}' 上传失败")
105
 
106
- # 稍微延迟一下
107
  await asyncio.sleep(0.5)
108
 
109
  except Exception as e:
@@ -114,17 +91,14 @@ class WebSocketFileUploader:
114
  return success_count > 0
115
 
116
  async def _upload_single_file(self, websocket, file_path):
117
- """上传单个文件通过 WebSocket"""
118
  try:
119
  filename = os.path.basename(file_path)
120
  logger.info(f"正在上传文件: {filename}")
121
 
122
- # 读取文件内容并编码为 base64
123
  with open(file_path, 'rb') as f:
124
  file_content = f.read()
125
  file_b64 = base64.b64encode(file_content).decode('utf-8')
126
 
127
- # 构建上传消息
128
  upload_msg = {
129
  "type": "file_upload",
130
  "filename": filename,
@@ -132,10 +106,8 @@ class WebSocketFileUploader:
132
  "space_id": self.space_id
133
  }
134
 
135
- # 发送文件
136
  await websocket.send(json.dumps(upload_msg))
137
 
138
- # 等待响应
139
  response = await websocket.recv()
140
  result = json.loads(response)
141
 
@@ -150,25 +122,17 @@ class WebSocketFileUploader:
150
  return False
151
 
152
  def _fallback_http_upload(self, upload_dir):
153
- """HTTP API 回退方案"""
154
  logger.info("使用 HTTP API 回退方案上传文件...")
155
 
156
  if not os.path.exists(upload_dir):
157
  logger.error(f"目录不存在: {upload_dir}")
158
  return False
159
 
160
- # 构建完整的 API 端点 URL
161
  upload_url = f"{self.server_url.rstrip('/')}/api/remote_upload"
162
 
163
- # 准备请求头
164
- headers = {
165
- 'X-API-Key': self.api_key
166
- }
167
- data = {
168
- 'space_id': self.space_id
169
- }
170
 
171
- # 获取所有文件
172
  all_files = []
173
  for root, dirs, files in os.walk(upload_dir):
174
  for file in files:
@@ -191,11 +155,7 @@ class WebSocketFileUploader:
191
  logger.info(f"正在上传文件: {filename}")
192
 
193
  with open(file_path, 'rb') as f:
194
- files = {
195
- 'file': (filename, f)
196
- }
197
-
198
- # 发送 POST 请求
199
  response = requests.post(upload_url, headers=headers, data=data, files=files, timeout=60)
200
 
201
  if response.status_code == 200:
@@ -205,7 +165,6 @@ class WebSocketFileUploader:
205
  logger.error(f"❌ 上传失败: HTTP {response.status_code} - {response.text}")
206
  failed_count += 1
207
 
208
- # 使用 time.sleep()
209
  time.sleep(0.5)
210
 
211
  except Exception as e:
@@ -216,7 +175,6 @@ class WebSocketFileUploader:
216
  return success_count > 0
217
 
218
  async def upload_directory_websocket(upload_dir, server_url, api_key, space_id):
219
- """使用 WebSocket 方式上传目录中的所有文件"""
220
  logger.info(f"🔍 开始扫描目录: {upload_dir}")
221
  logger.info(f"📡 服务器地址: {server_url}")
222
  logger.info(f"🔑 Space ID: {space_id}")
@@ -241,5 +199,4 @@ if __name__ == "__main__":
241
 
242
  args = parser.parse_args()
243
 
244
- # 运行异步上传
245
  asyncio.run(upload_directory_websocket(args.upload_dir, args.server, args.api_key, args.space_id))
 
7
  import base64
8
  import logging
9
  import sys
10
+ import time
 
11
 
12
  # 配置日志
13
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
22
  async def connect_and_upload(self, upload_dir):
23
  """连接到 WebSocket 服务器并上传文件"""
24
 
25
+ # 修复点: 简化 WebSocket URL 的构建。
26
+ # 服务器在同一端口处理 HTTP WS,只需替换协议。
27
+ ws_url = self.server_url.replace('https://', 'wss://').replace('http://', 'ws://').rstrip('/')
28
+ ws_url += f'/ws/upload/{self.space_id}?token={self.api_key}'
 
 
 
 
 
 
 
 
 
 
 
 
 
29
 
30
  logger.info(f"正在连接到 WebSocket: {ws_url}")
31
 
32
  try:
33
+ # 增加连接超时以更快地失败
34
+ async with websockets.connect(ws_url, open_timeout=10, ping_interval=20, ping_timeout=60) as websocket:
35
  logger.info("WebSocket 连接成功")
36
 
37
+ # 等待服务器的认证成功响应
 
 
 
 
 
38
  response = await websocket.recv()
39
  auth_response = json.loads(response)
40
 
 
43
  return await self._upload_files(websocket, upload_dir)
44
  else:
45
  logger.error(f"认证失败: {auth_response.get('message', '未知错误')}")
 
46
  logger.info("回退到 HTTP API 上传...")
47
  return self._fallback_http_upload(upload_dir)
48
 
49
  except Exception as e:
50
  logger.error(f"WebSocket 连接失败: {e}")
 
51
  logger.info("回退到 HTTP API 上传...")
52
  return self._fallback_http_upload(upload_dir)
53
 
54
  async def _upload_files(self, websocket, upload_dir):
 
55
  if not os.path.exists(upload_dir):
56
  logger.error(f"目录不存在: {upload_dir}")
57
  return False
58
 
 
59
  all_files = []
60
  for root, dirs, files in os.walk(upload_dir):
61
  for file in files:
 
81
  failed_count += 1
82
  logger.error(f"❌ 文件 '{os.path.basename(file_path)}' 上传失败")
83
 
 
84
  await asyncio.sleep(0.5)
85
 
86
  except Exception as e:
 
91
  return success_count > 0
92
 
93
  async def _upload_single_file(self, websocket, file_path):
 
94
  try:
95
  filename = os.path.basename(file_path)
96
  logger.info(f"正在上传文件: {filename}")
97
 
 
98
  with open(file_path, 'rb') as f:
99
  file_content = f.read()
100
  file_b64 = base64.b64encode(file_content).decode('utf-8')
101
 
 
102
  upload_msg = {
103
  "type": "file_upload",
104
  "filename": filename,
 
106
  "space_id": self.space_id
107
  }
108
 
 
109
  await websocket.send(json.dumps(upload_msg))
110
 
 
111
  response = await websocket.recv()
112
  result = json.loads(response)
113
 
 
122
  return False
123
 
124
  def _fallback_http_upload(self, upload_dir):
 
125
  logger.info("使用 HTTP API 回退方案上传文件...")
126
 
127
  if not os.path.exists(upload_dir):
128
  logger.error(f"目录不存在: {upload_dir}")
129
  return False
130
 
 
131
  upload_url = f"{self.server_url.rstrip('/')}/api/remote_upload"
132
 
133
+ headers = {'X-API-Key': self.api_key}
134
+ data = {'space_id': self.space_id}
 
 
 
 
 
135
 
 
136
  all_files = []
137
  for root, dirs, files in os.walk(upload_dir):
138
  for file in files:
 
155
  logger.info(f"正在上传文件: {filename}")
156
 
157
  with open(file_path, 'rb') as f:
158
+ files = {'file': (filename, f)}
 
 
 
 
159
  response = requests.post(upload_url, headers=headers, data=data, files=files, timeout=60)
160
 
161
  if response.status_code == 200:
 
165
  logger.error(f"❌ 上传失败: HTTP {response.status_code} - {response.text}")
166
  failed_count += 1
167
 
 
168
  time.sleep(0.5)
169
 
170
  except Exception as e:
 
175
  return success_count > 0
176
 
177
  async def upload_directory_websocket(upload_dir, server_url, api_key, space_id):
 
178
  logger.info(f"🔍 开始扫描目录: {upload_dir}")
179
  logger.info(f"📡 服务器地址: {server_url}")
180
  logger.info(f"🔑 Space ID: {space_id}")
 
199
 
200
  args = parser.parse_args()
201
 
 
202
  asyncio.run(upload_directory_websocket(args.upload_dir, args.server, args.api_key, args.space_id))