FLUX.1-Krea-dev / remote_client.py
dangthr's picture
Create remote_client.py
fef6820 verified
raw
history blame
8.06 kB
import asyncio
import socketio
import subprocess
import json
import logging
import sys
import base64
import os
import threading
import time
from pathlib import Path
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class RemoteClient:
def __init__(self, space_id, user_token, server_url):
self.space_id = space_id
self.user_token = user_token
self.server_url = server_url
self.sio = socketio.AsyncClient(ping_timeout=60, ping_interval=25)
self.connected = False
# 绑定事件
self.sio.on('connect', self.on_connect)
self.sio.on('disconnect', self.on_disconnect)
self.sio.on('registered', self.on_registered)
self.sio.on('execute_command', self.on_execute_command)
self.sio.on('error', self.on_error)
self.sio.on('upload_success', self.on_upload_success)
async def on_connect(self):
logger.info("Connected to server")
# 注册客户端
await self.sio.emit('register_client', {
'space_id': self.space_id,
'token': self.user_token
})
async def on_disconnect(self):
logger.info("Disconnected from server")
self.connected = False
async def on_registered(self, data):
logger.info(f"Successfully registered: {data}")
self.connected = True
async def on_error(self, data):
logger.error(f"Server error: {data}")
async def on_upload_success(self, data):
logger.info(f"File upload successful: {data}")
async def on_execute_command(self, data):
"""处理执行命令请求"""
task_id = data.get('task_id')
command = data.get('command')
token = data.get('token')
logger.info(f"Received command for task {task_id}: {command}")
# 发送开始执行的状态
await self.sio.emit('command_result', {
'task_id': task_id,
'status': 'running',
'output': f'开始执行命令: {command}'
})
# 在后台执行命令
asyncio.create_task(self.execute_command_async(task_id, command, token))
async def execute_command_async(self, task_id, command, token):
"""异步执行命令"""
try:
# 创建 output 目录
os.makedirs('./output', exist_ok=True)
logger.info(f"Executing: {command}")
# 执行命令
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
text=True
)
# 实时读取输出
output_buffer = ""
while True:
line = await process.stdout.readline()
if not line:
break
line_str = line.decode() if isinstance(line, bytes) else line
output_buffer += line_str
logger.info(f"Command output: {line_str.strip()}")
# 发送实时输出
await self.sio.emit('command_result', {
'task_id': task_id,
'status': 'running',
'output': line_str
})
# 等待进程完成
await process.wait()
if process.returncode == 0:
logger.info("Command executed successfully")
# 扫描 output 目录并上传文件
await self.upload_output_files(token)
await self.sio.emit('command_result', {
'task_id': task_id,
'status': 'completed',
'output': '命令执行完成,文件已上传到网盘'
})
else:
logger.error(f"Command failed with return code: {process.returncode}")
await self.sio.emit('command_result', {
'task_id': task_id,
'status': 'failed',
'output': f'命令执行失败,退出码: {process.returncode}'
})
except Exception as e:
logger.error(f"Error executing command: {e}")
await self.sio.emit('command_result', {
'task_id': task_id,
'status': 'failed',
'output': f'执行命令时出错: {str(e)}'
})
async def upload_output_files(self, token):
"""上传 output 目录中的文件"""
output_dir = Path('./output')
if not output_dir.exists():
logger.warning("Output directory does not exist")
return
uploaded_count = 0
failed_count = 0
for file_path in output_dir.iterdir():
if file_path.is_file():
try:
logger.info(f"Uploading file: {file_path.name}")
# 读取文件内容
with open(file_path, 'rb') as f:
file_content = f.read()
# 编码为 base64
file_b64 = base64.b64encode(file_content).decode('utf-8')
# 发送文件上传请求
await self.sio.emit('file_upload', {
'filename': file_path.name,
'content': file_b64,
'token': token
})
uploaded_count += 1
logger.info(f"Successfully uploaded: {file_path.name}")
# 删除已上传的文件
file_path.unlink()
# 避免发送过大的数据包,稍作延迟
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Failed to upload {file_path.name}: {e}")
failed_count += 1
logger.info(f"Upload completed. Success: {uploaded_count}, Failed: {failed_count}")
async def connect(self):
"""连接到服务器"""
try:
# 构建 WebSocket URL
ws_url = self.server_url.replace('https://', 'wss://').replace('http://', 'ws://')
if not ws_url.endswith('/'):
ws_url += '/'
ws_url += 'socket.io/'
logger.info(f"Connecting to {ws_url}")
await self.sio.connect(ws_url, transports=['websocket'])
# 保持连接
while self.connected:
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Connection failed: {e}")
raise
async def disconnect(self):
"""断开连接"""
if self.sio.connected:
await self.sio.disconnect()
async def main():
if len(sys.argv) != 4:
print("Usage: python3 remote_client.py <space_id> <user_token> <server_url>")
print("Example: python3 remote_client.py 'abc123' 'token456' 'https://gkbtyo-rqvays-5001.preview.cloudstudio.work'")
sys.exit(1)
space_id = sys.argv[1]
user_token = sys.argv[2]
server_url = sys.argv[3]
logger.info(f"Starting remote client for Space: {space_id}")
logger.info(f"Server: {server_url}")
client = RemoteClient(space_id, user_token, server_url)
try:
await client.connect()
except KeyboardInterrupt:
logger.info("Received interrupt signal")
except Exception as e:
logger.error(f"Client error: {e}")
finally:
await client.disconnect()
logger.info("Client stopped")
if __name__ == "__main__":
asyncio.run(main())