FLUX.1-Krea-dev / remote_uploader.py
dangthr's picture
Update remote_uploader.py
9d9567e verified
raw
history blame
6.68 kB
import requests
import argparse
import os
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class FileUploadHandler(FileSystemEventHandler):
def __init__(self, server_url, api_key, space_id):
self.server_url = server_url
self.api_key = api_key
self.space_id = space_id
self.uploaded_files = set() # 跟踪已上传的文件
def send_log(self, message):
"""发送日志到服务器"""
try:
log_url = f"{self.server_url.rstrip('/')}/api/remote_log"
headers = {
'X-API-Key': self.api_key,
'Content-Type': 'application/json'
}
data = {
'space_id': self.space_id,
'message': message,
'timestamp': time.time()
}
response = requests.post(log_url, headers=headers, json=data, timeout=5)
if response.status_code != 200:
print(f"发送日志失败: {response.status_code}")
except Exception as e:
print(f"发送日志异常: {e}")
def on_created(self, event):
"""当新文件被创建时触发"""
if not event.is_directory:
# 等待一段时间确保文件写入完成
time.sleep(1)
self.upload_file(event.src_path)
def on_modified(self, event):
"""当文件被修改时触发"""
if not event.is_directory:
# 等待一段时间确保文件写入完成
time.sleep(1)
self.upload_file(event.src_path)
def upload_file(self, file_path):
"""上传单个文件"""
if not os.path.exists(file_path):
message = f"文件不存在: {file_path}"
print(message)
self.send_log(message)
return
filename = os.path.basename(file_path)
# 避免重复上传同一个文件
if file_path in self.uploaded_files:
return
message = f"检测到新文件: {filename}"
print(message)
self.send_log(message)
message = f"正在上传到服务器..."
print(message)
self.send_log(message)
# 构建完整的 API 端点 URL
upload_url = f"{self.server_url.rstrip('/')}/api/remote_upload"
# 准备请求头和数据
headers = {
'X-API-Key': self.api_key
}
data = {
'space_id': self.space_id
}
# 读取文件内容并上传
try:
with open(file_path, 'rb') as f:
files = {
'file': (filename, f)
}
# 发送 POST 请求
response = requests.post(upload_url, headers=headers, data=data, files=files)
# 打印服务器响应
message = f"服务器响应 ({response.status_code}):"
print(message)
self.send_log(message)
try:
result = response.json()
print(result)
self.send_log(str(result))
if response.status_code == 200:
self.uploaded_files.add(file_path)
success_msg = f"✅ 文件 '{filename}' 上传成功!"
print(success_msg)
self.send_log(success_msg)
else:
error_msg = f"❌ 上传失败: {result.get('error', '未知错误')}"
print(error_msg)
self.send_log(error_msg)
except requests.exceptions.JSONDecodeError:
print(response.text)
self.send_log(response.text)
except IOError as e:
error_msg = f"读取文件时出错: {e}"
print(error_msg)
self.send_log(error_msg)
except requests.exceptions.RequestException as e:
error_msg = f"请求时出错: {e}"
print(error_msg)
self.send_log(error_msg)
def monitor_directory(watch_dir, server_url, api_key, space_id):
"""监听目录变化"""
if not os.path.exists(watch_dir):
print(f"创建监听目录: {watch_dir}")
os.makedirs(watch_dir, exist_ok=True)
handler = FileUploadHandler(server_url, api_key, space_id)
start_msg = f"🔍 开始监听目录: {watch_dir}"
print(start_msg)
handler.send_log(start_msg)
server_msg = f"📡 服务器地址: {server_url}"
print(server_msg)
handler.send_log(server_msg)
space_msg = f"🔑 Space ID: {space_id}"
print(space_msg)
handler.send_log(space_msg)
wait_msg = "等待新文件..."
print(wait_msg)
handler.send_log(wait_msg)
print("-" * 50)
handler.send_log("-" * 50)
# 首先上传目录中已存在的文件
existing_files = [f for f in os.listdir(watch_dir) if os.path.isfile(os.path.join(watch_dir, f))]
if existing_files:
existing_msg = f"发现 {len(existing_files)} 个已存在的文件,开始上传..."
print(existing_msg)
handler.send_log(existing_msg)
for filename in existing_files:
file_path = os.path.join(watch_dir, filename)
handler.upload_file(file_path)
print("-" * 50)
handler.send_log("-" * 50)
# 创建观察者并开始监听
observer = Observer()
observer.schedule(handler, watch_dir, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop_msg = "\n停止监听..."
print(stop_msg)
handler.send_log(stop_msg)
observer.stop()
observer.join()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="远程文件监听上传器 - 自动监听 output 文件夹并上传新文件")
parser.add_argument("api_key", help="您的 API 密钥")
parser.add_argument("space_id", help="Space ID")
parser.add_argument("--server", default="http://127.0.0.1:5001", help="服务器的 URL 地址 (默认: http://127.0.0.1:5001)")
parser.add_argument("--watch-dir", default="output", help="要监听的目录 (默认: output)")
args = parser.parse_args()
# 开始监听目录
monitor_directory(args.watch_dir, args.server, args.api_key, args.space_id)