File size: 8,058 Bytes
fef6820
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import asyncio
import socketio
import subprocess
import json
import logging
import sys
import base64
import os
import threading
import time
from pathlib import Path

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class RemoteClient:
    def __init__(self, space_id, user_token, server_url):
        self.space_id = space_id
        self.user_token = user_token
        self.server_url = server_url
        self.sio = socketio.AsyncClient(ping_timeout=60, ping_interval=25)
        self.connected = False
        
        # 绑定事件
        self.sio.on('connect', self.on_connect)
        self.sio.on('disconnect', self.on_disconnect)
        self.sio.on('registered', self.on_registered)
        self.sio.on('execute_command', self.on_execute_command)
        self.sio.on('error', self.on_error)
        self.sio.on('upload_success', self.on_upload_success)

    async def on_connect(self):
        logger.info("Connected to server")
        # 注册客户端
        await self.sio.emit('register_client', {
            'space_id': self.space_id,
            'token': self.user_token
        })

    async def on_disconnect(self):
        logger.info("Disconnected from server")
        self.connected = False

    async def on_registered(self, data):
        logger.info(f"Successfully registered: {data}")
        self.connected = True

    async def on_error(self, data):
        logger.error(f"Server error: {data}")

    async def on_upload_success(self, data):
        logger.info(f"File upload successful: {data}")

    async def on_execute_command(self, data):
        """处理执行命令请求"""
        task_id = data.get('task_id')
        command = data.get('command')
        token = data.get('token')
        
        logger.info(f"Received command for task {task_id}: {command}")
        
        # 发送开始执行的状态
        await self.sio.emit('command_result', {
            'task_id': task_id,
            'status': 'running',
            'output': f'开始执行命令: {command}'
        })
        
        # 在后台执行命令
        asyncio.create_task(self.execute_command_async(task_id, command, token))

    async def execute_command_async(self, task_id, command, token):
        """异步执行命令"""
        try:
            # 创建 output 目录
            os.makedirs('./output', exist_ok=True)
            
            logger.info(f"Executing: {command}")
            
            # 执行命令
            process = await asyncio.create_subprocess_shell(
                command,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.STDOUT,
                text=True
            )
            
            # 实时读取输出
            output_buffer = ""
            while True:
                line = await process.stdout.readline()
                if not line:
                    break
                line_str = line.decode() if isinstance(line, bytes) else line
                output_buffer += line_str
                logger.info(f"Command output: {line_str.strip()}")
                
                # 发送实时输出
                await self.sio.emit('command_result', {
                    'task_id': task_id,
                    'status': 'running',
                    'output': line_str
                })
            
            # 等待进程完成
            await process.wait()
            
            if process.returncode == 0:
                logger.info("Command executed successfully")
                
                # 扫描 output 目录并上传文件
                await self.upload_output_files(token)
                
                await self.sio.emit('command_result', {
                    'task_id': task_id,
                    'status': 'completed',
                    'output': '命令执行完成,文件已上传到网盘'
                })
            else:
                logger.error(f"Command failed with return code: {process.returncode}")
                await self.sio.emit('command_result', {
                    'task_id': task_id,
                    'status': 'failed',
                    'output': f'命令执行失败,退出码: {process.returncode}'
                })
                
        except Exception as e:
            logger.error(f"Error executing command: {e}")
            await self.sio.emit('command_result', {
                'task_id': task_id,
                'status': 'failed',
                'output': f'执行命令时出错: {str(e)}'
            })

    async def upload_output_files(self, token):
        """上传 output 目录中的文件"""
        output_dir = Path('./output')
        if not output_dir.exists():
            logger.warning("Output directory does not exist")
            return
        
        uploaded_count = 0
        failed_count = 0
        
        for file_path in output_dir.iterdir():
            if file_path.is_file():
                try:
                    logger.info(f"Uploading file: {file_path.name}")
                    
                    # 读取文件内容
                    with open(file_path, 'rb') as f:
                        file_content = f.read()
                    
                    # 编码为 base64
                    file_b64 = base64.b64encode(file_content).decode('utf-8')
                    
                    # 发送文件上传请求
                    await self.sio.emit('file_upload', {
                        'filename': file_path.name,
                        'content': file_b64,
                        'token': token
                    })
                    
                    uploaded_count += 1
                    logger.info(f"Successfully uploaded: {file_path.name}")
                    
                    # 删除已上传的文件
                    file_path.unlink()
                    
                    # 避免发送过大的数据包,稍作延迟
                    await asyncio.sleep(0.5)
                    
                except Exception as e:
                    logger.error(f"Failed to upload {file_path.name}: {e}")
                    failed_count += 1
        
        logger.info(f"Upload completed. Success: {uploaded_count}, Failed: {failed_count}")

    async def connect(self):
        """连接到服务器"""
        try:
            # 构建 WebSocket URL
            ws_url = self.server_url.replace('https://', 'wss://').replace('http://', 'ws://')
            if not ws_url.endswith('/'):
                ws_url += '/'
            ws_url += 'socket.io/'
            
            logger.info(f"Connecting to {ws_url}")
            await self.sio.connect(ws_url, transports=['websocket'])
            
            # 保持连接
            while self.connected:
                await asyncio.sleep(1)
                
        except Exception as e:
            logger.error(f"Connection failed: {e}")
            raise

    async def disconnect(self):
        """断开连接"""
        if self.sio.connected:
            await self.sio.disconnect()

async def main():
    if len(sys.argv) != 4:
        print("Usage: python3 remote_client.py <space_id> <user_token> <server_url>")
        print("Example: python3 remote_client.py 'abc123' 'token456' 'https://gkbtyo-rqvays-5001.preview.cloudstudio.work'")
        sys.exit(1)
    
    space_id = sys.argv[1]
    user_token = sys.argv[2]
    server_url = sys.argv[3]
    
    logger.info(f"Starting remote client for Space: {space_id}")
    logger.info(f"Server: {server_url}")
    
    client = RemoteClient(space_id, user_token, server_url)
    
    try:
        await client.connect()
    except KeyboardInterrupt:
        logger.info("Received interrupt signal")
    except Exception as e:
        logger.error(f"Client error: {e}")
    finally:
        await client.disconnect()
        logger.info("Client stopped")

if __name__ == "__main__":
    asyncio.run(main())