Pix-Agent / app /api /pdf_websocket.py
ManTea's picture
QA version persionality
c8b8c9b
import logging
from typing import Dict, List, Optional, Any
from fastapi import WebSocket, WebSocketDisconnect, APIRouter
from pydantic import BaseModel
import json
import time
# Cấu hình logging
logger = logging.getLogger(__name__)
# Models cho Swagger documentation
class ConnectionStatus(BaseModel):
user_id: str
active: bool
connection_count: int
last_activity: Optional[float] = None
class UserConnection(BaseModel):
user_id: str
connection_count: int
class AllConnectionsStatus(BaseModel):
total_users: int
total_connections: int
users: List[UserConnection]
# Khởi tạo router
router = APIRouter(
prefix="/ws",
tags=["WebSockets"],
)
class ConnectionManager:
"""Quản lý các kết nối WebSocket"""
def __init__(self):
# Lưu trữ các kết nối theo user_id
self.active_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, user_id: str):
"""Kết nối một WebSocket mới"""
await websocket.accept()
if user_id not in self.active_connections:
self.active_connections[user_id] = []
self.active_connections[user_id].append(websocket)
logger.info(f"New WebSocket connection for user {user_id}. Total connections: {len(self.active_connections[user_id])}")
def disconnect(self, websocket: WebSocket, user_id: str):
"""Ngắt kết nối WebSocket"""
if user_id in self.active_connections:
if websocket in self.active_connections[user_id]:
self.active_connections[user_id].remove(websocket)
# Xóa user_id khỏi dict nếu không còn kết nối nào
if not self.active_connections[user_id]:
del self.active_connections[user_id]
logger.info(f"WebSocket disconnected for user {user_id}")
async def send_message(self, message: Dict[str, Any], user_id: str):
"""Gửi tin nhắn tới tất cả kết nối của một user"""
if user_id in self.active_connections:
disconnected_websockets = []
for websocket in self.active_connections[user_id]:
try:
await websocket.send_text(json.dumps(message))
except Exception as e:
logger.error(f"Error sending message to WebSocket: {str(e)}")
disconnected_websockets.append(websocket)
# Xóa các kết nối bị ngắt
for websocket in disconnected_websockets:
self.disconnect(websocket, user_id)
def get_connection_status(self, user_id: str = None) -> Dict[str, Any]:
"""Lấy thông tin về trạng thái kết nối WebSocket"""
if user_id:
# Trả về thông tin kết nối cho user cụ thể
if user_id in self.active_connections:
return {
"user_id": user_id,
"active": True,
"connection_count": len(self.active_connections[user_id]),
"last_activity": time.time()
}
else:
return {
"user_id": user_id,
"active": False,
"connection_count": 0,
"last_activity": None
}
else:
# Trả về thông tin tất cả kết nối
result = {
"total_users": len(self.active_connections),
"total_connections": sum(len(connections) for connections in self.active_connections.values()),
"users": []
}
for uid, connections in self.active_connections.items():
result["users"].append({
"user_id": uid,
"connection_count": len(connections)
})
return result
# Tạo instance của ConnectionManager
manager = ConnectionManager()
# Test route for manual WebSocket sending
@router.get("/ws/test/{user_id}")
async def test_websocket_send(user_id: str):
"""
Test route to manually send a WebSocket message to a user
This is useful for debugging WebSocket connections
"""
logger.info(f"Attempting to send test message to user: {user_id}")
# Check if user has a connection
status = manager.get_connection_status(user_id)
if not status["active"]:
logger.warning(f"No active WebSocket connection for user: {user_id}")
return {"success": False, "message": f"No active WebSocket connection for user: {user_id}"}
# Send test message
await manager.send_message({
"type": "test_message",
"message": "This is a test WebSocket message",
"timestamp": int(time.time())
}, user_id)
logger.info(f"Test message sent to user: {user_id}")
return {"success": True, "message": f"Test message sent to user: {user_id}"}
@router.websocket("/ws/pdf/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
"""Endpoint WebSocket để cập nhật tiến trình xử lý PDF"""
logger.info(f"WebSocket connection request received for user: {user_id}")
try:
await manager.connect(websocket, user_id)
logger.info(f"WebSocket connection accepted for user: {user_id}")
# Send a test message to confirm connection
await manager.send_message({
"type": "connection_established",
"message": "WebSocket connection established successfully",
"user_id": user_id,
"timestamp": int(time.time())
}, user_id)
try:
while True:
# Đợi tin nhắn từ client (chỉ để giữ kết nối)
data = await websocket.receive_text()
logger.debug(f"Received from client: {data}")
# Echo back to confirm receipt
if data != "heartbeat": # Don't echo heartbeats
await manager.send_message({
"type": "echo",
"message": f"Received: {data}",
"timestamp": int(time.time())
}, user_id)
except WebSocketDisconnect:
logger.info(f"WebSocket disconnected for user: {user_id}")
manager.disconnect(websocket, user_id)
except Exception as e:
logger.error(f"WebSocket error: {str(e)}")
manager.disconnect(websocket, user_id)
except Exception as e:
logger.error(f"Failed to establish WebSocket connection: {str(e)}")
# Ensure the connection is closed properly
if websocket.client_state != 4: # 4 = CLOSED
await websocket.close(code=1011, reason=f"Server error: {str(e)}")
import logging
from typing import Dict, List, Optional, Any
from fastapi import WebSocket, WebSocketDisconnect, APIRouter
from pydantic import BaseModel
import json
import time
# Cấu hình logging
logger = logging.getLogger(__name__)
# Models cho Swagger documentation
class ConnectionStatus(BaseModel):
user_id: str
active: bool
connection_count: int
last_activity: Optional[float] = None
class UserConnection(BaseModel):
user_id: str
connection_count: int
class AllConnectionsStatus(BaseModel):
total_users: int
total_connections: int
users: List[UserConnection]
# Khởi tạo router
router = APIRouter(
prefix="",
tags=["WebSockets"],
)
class ConnectionManager:
"""Quản lý các kết nối WebSocket"""
def __init__(self):
# Lưu trữ các kết nối theo user_id
self.active_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, user_id: str):
"""Kết nối một WebSocket mới"""
await websocket.accept()
if user_id not in self.active_connections:
self.active_connections[user_id] = []
self.active_connections[user_id].append(websocket)
logger.info(f"New WebSocket connection for user {user_id}. Total connections: {len(self.active_connections[user_id])}")
def disconnect(self, websocket: WebSocket, user_id: str):
"""Ngắt kết nối WebSocket"""
if user_id in self.active_connections:
if websocket in self.active_connections[user_id]:
self.active_connections[user_id].remove(websocket)
# Xóa user_id khỏi dict nếu không còn kết nối nào
if not self.active_connections[user_id]:
del self.active_connections[user_id]
logger.info(f"WebSocket disconnected for user {user_id}")
async def send_message(self, message: Dict[str, Any], user_id: str):
"""Gửi tin nhắn tới tất cả kết nối của một user"""
if user_id in self.active_connections:
disconnected_websockets = []
for websocket in self.active_connections[user_id]:
try:
await websocket.send_text(json.dumps(message))
except Exception as e:
logger.error(f"Error sending message to WebSocket: {str(e)}")
disconnected_websockets.append(websocket)
# Xóa các kết nối bị ngắt
for websocket in disconnected_websockets:
self.disconnect(websocket, user_id)
def get_connection_status(self, user_id: str = None) -> Dict[str, Any]:
"""Lấy thông tin về trạng thái kết nối WebSocket"""
if user_id:
# Trả về thông tin kết nối cho user cụ thể
if user_id in self.active_connections:
return {
"user_id": user_id,
"active": True,
"connection_count": len(self.active_connections[user_id]),
"last_activity": time.time()
}
else:
return {
"user_id": user_id,
"active": False,
"connection_count": 0,
"last_activity": None
}
else:
# Trả về thông tin tất cả kết nối
result = {
"total_users": len(self.active_connections),
"total_connections": sum(len(connections) for connections in self.active_connections.values()),
"users": []
}
for uid, connections in self.active_connections.items():
result["users"].append({
"user_id": uid,
"connection_count": len(connections)
})
return result
# Tạo instance của ConnectionManager
manager = ConnectionManager()
@router.websocket("/ws/pdf/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
"""Endpoint WebSocket để cập nhật tiến trình xử lý PDF"""
await manager.connect(websocket, user_id)
try:
while True:
# Đợi tin nhắn từ client (chỉ để giữ kết nối)
await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket, user_id)
except Exception as e:
logger.error(f"WebSocket error: {str(e)}")
manager.disconnect(websocket, user_id)
# API endpoints để kiểm tra trạng thái WebSocket
@router.get("/ws/status", response_model=AllConnectionsStatus, responses={
200: {
"description": "Successful response",
"content": {
"application/json": {
"example": {
"total_users": 2,
"total_connections": 3,
"users": [
{"user_id": "user1", "connection_count": 2},
{"user_id": "user2", "connection_count": 1}
]
}
}
}
}
})
async def get_all_websocket_connections():
"""
Lấy thông tin về tất cả kết nối WebSocket hiện tại.
Endpoint này trả về:
- Tổng số người dùng đang kết nối
- Tổng số kết nối WebSocket
- Danh sách người dùng kèm theo số lượng kết nối của mỗi người
"""
return manager.get_connection_status()
@router.get("/ws/status/{user_id}", response_model=ConnectionStatus, responses={
200: {
"description": "Successful response for active connection",
"content": {
"application/json": {
"examples": {
"active_connection": {
"summary": "Active connection",
"value": {
"user_id": "user123",
"active": True,
"connection_count": 2,
"last_activity": 1634567890.123
}
},
"no_connection": {
"summary": "No active connection",
"value": {
"user_id": "user456",
"active": False,
"connection_count": 0,
"last_activity": None
}
}
}
}
}
}
})
async def get_user_websocket_status(user_id: str):
"""
Lấy thông tin về kết nối WebSocket của một người dùng cụ thể.
Parameters:
- **user_id**: ID của người dùng cần kiểm tra
Returns:
- Thông tin về trạng thái kết nối, bao gồm:
- active: Có đang kết nối hay không
- connection_count: Số lượng kết nối hiện tại
- last_activity: Thời gian hoạt động gần nhất
"""
return manager.get_connection_status(user_id)
# Các hàm gửi thông báo cập nhật trạng thái
async def send_pdf_upload_started(user_id: str, filename: str, document_id: str):
"""Gửi thông báo bắt đầu upload PDF"""
await manager.send_message({
"type": "pdf_upload_started",
"document_id": document_id,
"filename": filename,
"timestamp": int(time.time())
}, user_id)
async def send_pdf_upload_progress(user_id: str, document_id: str, step: str, progress: float, message: str):
"""Gửi thông báo tiến độ upload PDF"""
await manager.send_message({
"type": "pdf_upload_progress",
"document_id": document_id,
"step": step,
"progress": progress,
"message": message,
"timestamp": int(time.time())
}, user_id)
async def send_pdf_upload_completed(user_id: str, document_id: str, filename: str, chunks: int):
"""Gửi thông báo hoàn thành upload PDF"""
await manager.send_message({
"type": "pdf_upload_completed",
"document_id": document_id,
"filename": filename,
"chunks": chunks,
"timestamp": int(time.time())
}, user_id)
async def send_pdf_upload_failed(user_id: str, document_id: str, filename: str, error: str):
"""Gửi thông báo lỗi upload PDF"""
await manager.send_message({
"type": "pdf_upload_failed",
"document_id": document_id,
"filename": filename,
"error": error,
"timestamp": int(time.time())
}, user_id)
async def send_pdf_delete_started(user_id: str, namespace: str):
"""Gửi thông báo bắt đầu xóa PDF"""
await manager.send_message({
"type": "pdf_delete_started",
"namespace": namespace,
"timestamp": int(time.time())
}, user_id)
async def send_pdf_delete_completed(user_id: str, namespace: str, deleted_count: int = 0):
"""Gửi thông báo hoàn thành xóa PDF"""
await manager.send_message({
"type": "pdf_delete_completed",
"namespace": namespace,
"deleted_count": deleted_count,
"timestamp": int(time.time())
}, user_id)
async def send_pdf_delete_failed(user_id: str, namespace: str, error: str):
"""Gửi thông báo lỗi xóa PDF"""
await manager.send_message({
"type": "pdf_delete_failed",
"namespace": namespace,
"error": error,
"timestamp": int(time.time())
}, user_id)