dangthr commited on
Commit
28cf1dd
·
verified ·
1 Parent(s): 381a6c0

Update remote_client.py

Browse files
Files changed (1) hide show
  1. remote_client.py +204 -177
remote_client.py CHANGED
@@ -1,9 +1,16 @@
 
 
 
 
 
 
1
  import asyncio
2
- import socketio
3
  import subprocess
4
  import json
5
  import logging
6
  import sys
 
7
  import base64
8
  import os
9
  import threading
@@ -15,217 +22,237 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(
15
  logger = logging.getLogger(__name__)
16
 
17
  class RemoteClient:
18
- def __init__(self, space_id, user_token, server_url):
19
- self.space_id = space_id
20
- self.user_token = user_token
21
  self.server_url = server_url
22
- self.sio = socketio.AsyncClient(ping_timeout=60, ping_interval=25)
23
- self.connected = False
 
 
24
 
25
- # 绑定事件
26
- self.sio.on('connect', self.on_connect)
27
- self.sio.on('disconnect', self.on_disconnect)
28
- self.sio.on('registered', self.on_registered)
29
- self.sio.on('execute_command', self.on_execute_command)
30
- self.sio.on('error', self.on_error)
31
- self.sio.on('upload_success', self.on_upload_success)
32
-
33
- async def on_connect(self):
34
- logger.info("Connected to server")
35
- # 注册客户端
36
- await self.sio.emit('register_client', {
37
- 'space_id': self.space_id,
38
- 'token': self.user_token
39
- })
40
-
41
- async def on_disconnect(self):
42
- logger.info("Disconnected from server")
43
- self.connected = False
44
-
45
- async def on_registered(self, data):
46
- logger.info(f"Successfully registered: {data}")
47
- self.connected = True
48
 
49
- async def on_error(self, data):
50
- logger.error(f"Server error: {data}")
 
 
 
 
 
 
 
 
 
 
 
51
 
52
- async def on_upload_success(self, data):
53
- logger.info(f"File upload successful: {data}")
 
 
 
 
 
 
 
 
 
 
 
 
54
 
55
- async def on_execute_command(self, data):
56
- """处理执行命令请求"""
57
- task_id = data.get('task_id')
58
- command = data.get('command')
59
- token = data.get('token')
60
-
61
- logger.info(f"Received command for task {task_id}: {command}")
62
 
63
- # 发送开始执行的状态
64
- await self.sio.emit('command_result', {
65
- 'task_id': task_id,
66
- 'status': 'running',
67
- 'output': f'开始执行命令: {command}'
68
- })
69
-
70
- # 在后台执行命令
71
- asyncio.create_task(self.execute_command_async(task_id, command, token))
72
 
73
- async def execute_command_async(self, task_id, command, token):
74
- """异步执行命令"""
 
 
 
75
  try:
76
- # 创建 output 目录
77
- os.makedirs('./output', exist_ok=True)
78
-
79
- logger.info(f"Executing: {command}")
80
-
81
  # 执行命令
82
- process = await asyncio.create_subprocess_shell(
83
  command,
84
- stdout=asyncio.subprocess.PIPE,
85
- stderr=asyncio.subprocess.STDOUT,
86
- text=True
 
87
  )
88
 
89
- # 实时读取输出
90
- output_buffer = ""
91
- while True:
92
- line = await process.stdout.readline()
93
- if not line:
94
- break
95
- line_str = line.decode() if isinstance(line, bytes) else line
96
- output_buffer += line_str
97
- logger.info(f"Command output: {line_str.strip()}")
98
-
99
- # 发送实时输出
100
- await self.sio.emit('command_result', {
101
- 'task_id': task_id,
102
- 'status': 'running',
103
- 'output': line_str
104
- })
105
 
106
- # 等待进程完成
107
- await process.wait()
108
 
109
  if process.returncode == 0:
110
- logger.info("Command executed successfully")
111
-
112
- # 扫描 output 目录并上传文件
113
- await self.upload_output_files(token)
114
-
115
- await self.sio.emit('command_result', {
116
- 'task_id': task_id,
117
- 'status': 'completed',
118
- 'output': '命令执行完成,文件已上传到网盘'
119
- })
120
  else:
121
- logger.error(f"Command failed with return code: {process.returncode}")
122
- await self.sio.emit('command_result', {
123
- 'task_id': task_id,
124
- 'status': 'failed',
125
- 'output': f'命令执行失败,退出码: {process.returncode}'
126
- })
127
 
 
 
 
 
 
 
 
 
 
 
128
  except Exception as e:
129
- logger.error(f"Error executing command: {e}")
130
- await self.sio.emit('command_result', {
131
- 'task_id': task_id,
132
- 'status': 'failed',
133
- 'output': f'执行命令时出错: {str(e)}'
134
- })
 
 
135
 
136
- async def upload_output_files(self, token):
137
- """上传 output 目录中的文件"""
138
- output_dir = Path('./output')
139
- if not output_dir.exists():
140
- logger.warning("Output directory does not exist")
141
- return
142
 
143
- uploaded_count = 0
144
- failed_count = 0
145
-
146
- for file_path in output_dir.iterdir():
147
- if file_path.is_file():
148
- try:
149
- logger.info(f"Uploading file: {file_path.name}")
150
-
151
- # 读取文件内容
152
- with open(file_path, 'rb') as f:
153
- file_content = f.read()
154
-
155
- # 编码为 base64
156
- file_b64 = base64.b64encode(file_content).decode('utf-8')
157
-
158
- # 发送文件上传请求
159
- await self.sio.emit('file_upload', {
160
- 'filename': file_path.name,
161
- 'content': file_b64,
162
- 'token': token
163
- })
164
-
165
- uploaded_count += 1
166
- logger.info(f"Successfully uploaded: {file_path.name}")
167
-
168
- # 删除已上传的文件
169
- file_path.unlink()
170
-
171
- # 避免发送过大的数据包,稍作延迟
172
- await asyncio.sleep(0.5)
173
-
174
- except Exception as e:
175
- logger.error(f"Failed to upload {file_path.name}: {e}")
176
- failed_count += 1
177
 
178
- logger.info(f"Upload completed. Success: {uploaded_count}, Failed: {failed_count}")
 
 
 
 
179
 
180
- async def connect(self):
181
- """连接到服务器"""
182
  try:
183
- # 构建 WebSocket URL
184
- ws_url = self.server_url.replace('https://', 'wss://').replace('http://', 'ws://')
185
- if not ws_url.endswith('/'):
186
- ws_url += '/'
187
- ws_url += 'socket.io/'
 
 
188
 
189
- logger.info(f"Connecting to {ws_url}")
190
- await self.sio.connect(ws_url, transports=['websocket'])
 
 
 
191
 
192
- # 保持连接
193
- while self.connected:
194
- await asyncio.sleep(1)
 
 
 
 
 
 
 
 
 
 
195
 
 
 
 
 
 
 
 
196
  except Exception as e:
197
- logger.error(f"Connection failed: {e}")
198
- raise
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
199
 
200
- async def disconnect(self):
201
- """断开连接"""
202
- if self.sio.connected:
203
- await self.sio.disconnect()
204
 
205
  async def main():
206
- if len(sys.argv) != 4:
207
- print("Usage: python3 remote_client.py <space_id> <user_token> <server_url>")
208
- print("Example: python3 remote_client.py 'abc123' 'token456' 'https://gkbtyo-rqvays-5001.preview.cloudstudio.work'")
209
  sys.exit(1)
210
 
211
- space_id = sys.argv[1]
212
  user_token = sys.argv[2]
213
- server_url = sys.argv[3]
214
 
215
- logger.info(f"Starting remote client for Space: {space_id}")
216
- logger.info(f"Server: {server_url}")
217
 
218
- client = RemoteClient(space_id, user_token, server_url)
219
-
220
- try:
221
- await client.connect()
222
- except KeyboardInterrupt:
223
- logger.info("Received interrupt signal")
224
- except Exception as e:
225
- logger.error(f"Client error: {e}")
226
- finally:
227
- await client.disconnect()
228
- logger.info("Client stopped")
229
 
230
  if __name__ == "__main__":
231
  asyncio.run(main())
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ WebSocket远程客户端 - 在inferless环境中运行
4
+ 用于连接到主服务器并执行远程命令和文件传输
5
+ """
6
+
7
  import asyncio
8
+ import websockets
9
  import subprocess
10
  import json
11
  import logging
12
  import sys
13
+ import urllib.parse
14
  import base64
15
  import os
16
  import threading
 
22
  logger = logging.getLogger(__name__)
23
 
24
  class RemoteClient:
25
+ def __init__(self, server_url, user_token, space_id):
 
 
26
  self.server_url = server_url
27
+ self.user_token = user_token
28
+ self.space_id = space_id
29
+ self.websocket = None
30
+ self.is_connected = False
31
 
32
+ async def connect(self):
33
+ """连接到WebSocket服务器"""
34
+ # 构建WebSocket URL
35
+ ws_url = self.server_url.replace('http', 'ws').replace('https', 'wss')
36
+ uri = f"{ws_url}/ws/client/{self.space_id}?token={urllib.parse.quote(self.user_token)}"
37
+
38
+ logger.info(f"正在连接到服务器: {uri}")
39
+
40
+ try:
41
+ async with websockets.connect(uri, ping_interval=20, ping_timeout=60) as websocket:
42
+ self.websocket = websocket
43
+ self.is_connected = True
44
+ logger.info("✅ 已连接到WebSocket服务器")
45
+
46
+ # 发送客户端注册信息
47
+ await self.register_client()
48
+
49
+ # 启动消息监听循环
50
+ await self.message_loop()
51
+
52
+ except Exception as e:
53
+ logger.error(f" 连接失败: {e}")
54
+ self.is_connected = False
55
 
56
+ async def register_client(self):
57
+ """注册客户端到服务器"""
58
+ registration = {
59
+ "type": "register",
60
+ "space_id": self.space_id,
61
+ "client_info": {
62
+ "environment": "inferless",
63
+ "python_version": sys.version,
64
+ "working_directory": os.getcwd()
65
+ }
66
+ }
67
+ await self.websocket.send(json.dumps(registration))
68
+ logger.info("📝 已发送客户端注册信息")
69
 
70
+ async def message_loop(self):
71
+ """主消息处理循环"""
72
+ while self.is_connected:
73
+ try:
74
+ message = await self.websocket.recv()
75
+ data = json.loads(message)
76
+ await self.handle_message(data)
77
+
78
+ except websockets.exceptions.ConnectionClosed:
79
+ logger.error("🔌 WebSocket连接已关闭")
80
+ self.is_connected = False
81
+ break
82
+ except Exception as e:
83
+ logger.error(f"❌ 处理消息时出错: {e}")
84
 
85
+ async def handle_message(self, data):
86
+ """处理收到的消息"""
87
+ message_type = data.get("type")
 
 
 
 
88
 
89
+ if message_type == "command":
90
+ await self.execute_command(data)
91
+ elif message_type == "upload_files":
92
+ await self.upload_files(data)
93
+ elif message_type == "ping":
94
+ await self.send_pong()
95
+ else:
96
+ logger.warning(f"⚠️ 未知消息类型: {message_type}")
 
97
 
98
+ async def execute_command(self, data):
99
+ """执行远程命令"""
100
+ command = data.get("command", "")
101
+ logger.info(f"🚀 执行命令: {command}")
102
+
103
  try:
 
 
 
 
 
104
  # 执行命令
105
+ process = subprocess.run(
106
  command,
107
+ shell=True,
108
+ capture_output=True,
109
+ text=True,
110
+ timeout=300 # 5分钟超时
111
  )
112
 
113
+ # 发送执行结果
114
+ result = {
115
+ "type": "command_result",
116
+ "command": command,
117
+ "returncode": process.returncode,
118
+ "stdout": process.stdout,
119
+ "stderr": process.stderr,
120
+ "success": process.returncode == 0
121
+ }
 
 
 
 
 
 
 
122
 
123
+ await self.websocket.send(json.dumps(result))
 
124
 
125
  if process.returncode == 0:
126
+ logger.info(" 命令执行成功")
 
 
 
 
 
 
 
 
 
127
  else:
128
+ logger.error(f" 命令执行失败,返回码: {process.returncode}")
 
 
 
 
 
129
 
130
+ except subprocess.TimeoutExpired:
131
+ error_result = {
132
+ "type": "command_result",
133
+ "command": command,
134
+ "error": "命令执行超时",
135
+ "success": False
136
+ }
137
+ await self.websocket.send(json.dumps(error_result))
138
+ logger.error("⏰ 命令执行超时")
139
+
140
  except Exception as e:
141
+ error_result = {
142
+ "type": "command_result",
143
+ "command": command,
144
+ "error": str(e),
145
+ "success": False
146
+ }
147
+ await self.websocket.send(json.dumps(error_result))
148
+ logger.error(f"❌ 命令执行异常: {e}")
149
 
150
+ async def upload_files(self, data):
151
+ """上传指定目录中的所有文件"""
152
+ upload_dir = data.get("directory", "output")
 
 
 
153
 
154
+ logger.info(f"📁 开始上传目录: {upload_dir}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
155
 
156
+ if not os.path.exists(upload_dir):
157
+ error_msg = f"目录不存在: {upload_dir}"
158
+ logger.error(error_msg)
159
+ await self.send_upload_result(False, error_msg)
160
+ return
161
 
 
 
162
  try:
163
+ # 扫描目录中的所有文件
164
+ files_to_upload = []
165
+ for root, dirs, files in os.walk(upload_dir):
166
+ for file in files:
167
+ file_path = os.path.join(root, file)
168
+ if os.path.isfile(file_path):
169
+ files_to_upload.append(file_path)
170
 
171
+ if not files_to_upload:
172
+ msg = f"目录 {upload_dir} 中没有找到文件"
173
+ logger.info(msg)
174
+ await self.send_upload_result(True, msg)
175
+ return
176
 
177
+ logger.info(f"📦 找到 {len(files_to_upload)} 个文件,开始上传...")
178
+
179
+ success_count = 0
180
+ failed_count = 0
181
+
182
+ for file_path in files_to_upload:
183
+ try:
184
+ await self.upload_single_file(file_path)
185
+ success_count += 1
186
+ logger.info(f"✅ 上传成功: {os.path.basename(file_path)}")
187
+ except Exception as e:
188
+ failed_count += 1
189
+ logger.error(f"❌ 上传失败: {file_path} - {e}")
190
 
191
+ # 稍作延迟避免服务器压力
192
+ await asyncio.sleep(0.1)
193
+
194
+ result_msg = f"上传完成! 成功: {success_count}, 失败: {failed_count}"
195
+ logger.info(f"📊 {result_msg}")
196
+ await self.send_upload_result(True, result_msg)
197
+
198
  except Exception as e:
199
+ error_msg = f"上传过程中出错: {e}"
200
+ logger.error(error_msg)
201
+ await self.send_upload_result(False, error_msg)
202
+
203
+ async def upload_single_file(self, file_path):
204
+ """上传单个文件"""
205
+ filename = os.path.basename(file_path)
206
+
207
+ with open(file_path, 'rb') as f:
208
+ file_content = f.read()
209
+ file_b64 = base64.b64encode(file_content).decode('utf-8')
210
+
211
+ file_message = {
212
+ "type": "file_upload",
213
+ "filename": filename,
214
+ "content": file_b64,
215
+ "file_size": len(file_content)
216
+ }
217
+
218
+ await self.websocket.send(json.dumps(file_message))
219
+
220
+ async def send_upload_result(self, success, message):
221
+ """发送上传结果"""
222
+ result = {
223
+ "type": "upload_result",
224
+ "success": success,
225
+ "message": message
226
+ }
227
+ await self.websocket.send(json.dumps(result))
228
 
229
+ async def send_pong(self):
230
+ """响应ping消息"""
231
+ pong = {"type": "pong"}
232
+ await self.websocket.send(json.dumps(pong))
233
 
234
  async def main():
235
+ if len(sys.argv) < 4:
236
+ print("使用方法: python remote_client.py <server_url> <user_token> <space_id>")
237
+ print("示例: python remote_client.py https://gkbtyo-rqvays-5001.preview.cloudstudio.work abc123 space456")
238
  sys.exit(1)
239
 
240
+ server_url = sys.argv[1]
241
  user_token = sys.argv[2]
242
+ space_id = sys.argv[3]
243
 
244
+ client = RemoteClient(server_url, user_token, space_id)
 
245
 
246
+ while True:
247
+ try:
248
+ await client.connect()
249
+ except KeyboardInterrupt:
250
+ logger.info("👋 收到中断信号,正在退出...")
251
+ break
252
+ except Exception as e:
253
+ logger.error(f" 连接异常: {e}")
254
+ logger.info("🔄 5秒后重新尝试连接...")
255
+ await asyncio.sleep(5)
 
256
 
257
  if __name__ == "__main__":
258
  asyncio.run(main())