import asyncio import socketio import subprocess import json import logging import sys import base64 import os import threading import time from pathlib import Path # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class RemoteClient: def __init__(self, space_id, user_token, server_url): self.space_id = space_id self.user_token = user_token self.server_url = server_url self.sio = socketio.AsyncClient(ping_timeout=60, ping_interval=25) self.connected = False # 绑定事件 self.sio.on('connect', self.on_connect) self.sio.on('disconnect', self.on_disconnect) self.sio.on('registered', self.on_registered) self.sio.on('execute_command', self.on_execute_command) self.sio.on('error', self.on_error) self.sio.on('upload_success', self.on_upload_success) async def on_connect(self): logger.info("Connected to server") # 注册客户端 await self.sio.emit('register_client', { 'space_id': self.space_id, 'token': self.user_token }) async def on_disconnect(self): logger.info("Disconnected from server") self.connected = False async def on_registered(self, data): logger.info(f"Successfully registered: {data}") self.connected = True async def on_error(self, data): logger.error(f"Server error: {data}") async def on_upload_success(self, data): logger.info(f"File upload successful: {data}") async def on_execute_command(self, data): """处理执行命令请求""" task_id = data.get('task_id') command = data.get('command') token = data.get('token') logger.info(f"Received command for task {task_id}: {command}") # 发送开始执行的状态 await self.sio.emit('command_result', { 'task_id': task_id, 'status': 'running', 'output': f'开始执行命令: {command}' }) # 在后台执行命令 asyncio.create_task(self.execute_command_async(task_id, command, token)) async def execute_command_async(self, task_id, command, token): """异步执行命令""" try: # 创建 output 目录 os.makedirs('./output', exist_ok=True) logger.info(f"Executing: {command}") # 执行命令 process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, text=True ) # 实时读取输出 output_buffer = "" while True: line = await process.stdout.readline() if not line: break line_str = line.decode() if isinstance(line, bytes) else line output_buffer += line_str logger.info(f"Command output: {line_str.strip()}") # 发送实时输出 await self.sio.emit('command_result', { 'task_id': task_id, 'status': 'running', 'output': line_str }) # 等待进程完成 await process.wait() if process.returncode == 0: logger.info("Command executed successfully") # 扫描 output 目录并上传文件 await self.upload_output_files(token) await self.sio.emit('command_result', { 'task_id': task_id, 'status': 'completed', 'output': '命令执行完成,文件已上传到网盘' }) else: logger.error(f"Command failed with return code: {process.returncode}") await self.sio.emit('command_result', { 'task_id': task_id, 'status': 'failed', 'output': f'命令执行失败,退出码: {process.returncode}' }) except Exception as e: logger.error(f"Error executing command: {e}") await self.sio.emit('command_result', { 'task_id': task_id, 'status': 'failed', 'output': f'执行命令时出错: {str(e)}' }) async def upload_output_files(self, token): """上传 output 目录中的文件""" output_dir = Path('./output') if not output_dir.exists(): logger.warning("Output directory does not exist") return uploaded_count = 0 failed_count = 0 for file_path in output_dir.iterdir(): if file_path.is_file(): try: logger.info(f"Uploading file: {file_path.name}") # 读取文件内容 with open(file_path, 'rb') as f: file_content = f.read() # 编码为 base64 file_b64 = base64.b64encode(file_content).decode('utf-8') # 发送文件上传请求 await self.sio.emit('file_upload', { 'filename': file_path.name, 'content': file_b64, 'token': token }) uploaded_count += 1 logger.info(f"Successfully uploaded: {file_path.name}") # 删除已上传的文件 file_path.unlink() # 避免发送过大的数据包,稍作延迟 await asyncio.sleep(0.5) except Exception as e: logger.error(f"Failed to upload {file_path.name}: {e}") failed_count += 1 logger.info(f"Upload completed. Success: {uploaded_count}, Failed: {failed_count}") async def connect(self): """连接到服务器""" try: # 构建 WebSocket URL ws_url = self.server_url.replace('https://', 'wss://').replace('http://', 'ws://') if not ws_url.endswith('/'): ws_url += '/' ws_url += 'socket.io/' logger.info(f"Connecting to {ws_url}") await self.sio.connect(ws_url, transports=['websocket']) # 保持连接 while self.connected: await asyncio.sleep(1) except Exception as e: logger.error(f"Connection failed: {e}") raise async def disconnect(self): """断开连接""" if self.sio.connected: await self.sio.disconnect() async def main(): if len(sys.argv) != 4: print("Usage: python3 remote_client.py ") print("Example: python3 remote_client.py 'abc123' 'token456' 'https://gkbtyo-rqvays-5001.preview.cloudstudio.work'") sys.exit(1) space_id = sys.argv[1] user_token = sys.argv[2] server_url = sys.argv[3] logger.info(f"Starting remote client for Space: {space_id}") logger.info(f"Server: {server_url}") client = RemoteClient(space_id, user_token, server_url) try: await client.connect() except KeyboardInterrupt: logger.info("Received interrupt signal") except Exception as e: logger.error(f"Client error: {e}") finally: await client.disconnect() logger.info("Client stopped") if __name__ == "__main__": asyncio.run(main())