Spaces:
Paused
Paused
from datetime import datetime | |
from enum import Enum | |
from typing import Any, Optional | |
from pydantic import BaseModel, field_validator | |
from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk | |
from core.workflow.entities.node_entities import NodeRunMetadataKey | |
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState | |
from core.workflow.nodes import NodeType | |
from core.workflow.nodes.base import BaseNodeData | |
class QueueEvent(str, Enum): | |
""" | |
QueueEvent enum | |
""" | |
LLM_CHUNK = "llm_chunk" | |
TEXT_CHUNK = "text_chunk" | |
AGENT_MESSAGE = "agent_message" | |
MESSAGE_REPLACE = "message_replace" | |
MESSAGE_END = "message_end" | |
ADVANCED_CHAT_MESSAGE_END = "advanced_chat_message_end" | |
WORKFLOW_STARTED = "workflow_started" | |
WORKFLOW_SUCCEEDED = "workflow_succeeded" | |
WORKFLOW_FAILED = "workflow_failed" | |
ITERATION_START = "iteration_start" | |
ITERATION_NEXT = "iteration_next" | |
ITERATION_COMPLETED = "iteration_completed" | |
NODE_STARTED = "node_started" | |
NODE_SUCCEEDED = "node_succeeded" | |
NODE_FAILED = "node_failed" | |
RETRIEVER_RESOURCES = "retriever_resources" | |
ANNOTATION_REPLY = "annotation_reply" | |
AGENT_THOUGHT = "agent_thought" | |
MESSAGE_FILE = "message_file" | |
PARALLEL_BRANCH_RUN_STARTED = "parallel_branch_run_started" | |
PARALLEL_BRANCH_RUN_SUCCEEDED = "parallel_branch_run_succeeded" | |
PARALLEL_BRANCH_RUN_FAILED = "parallel_branch_run_failed" | |
ERROR = "error" | |
PING = "ping" | |
STOP = "stop" | |
class AppQueueEvent(BaseModel): | |
""" | |
QueueEvent abstract entity | |
""" | |
event: QueueEvent | |
class QueueLLMChunkEvent(AppQueueEvent): | |
""" | |
QueueLLMChunkEvent entity | |
Only for basic mode apps | |
""" | |
event: QueueEvent = QueueEvent.LLM_CHUNK | |
chunk: LLMResultChunk | |
class QueueIterationStartEvent(AppQueueEvent): | |
""" | |
QueueIterationStartEvent entity | |
""" | |
event: QueueEvent = QueueEvent.ITERATION_START | |
node_execution_id: str | |
node_id: str | |
node_type: NodeType | |
node_data: BaseNodeData | |
parallel_id: Optional[str] = None | |
"""parallel id if node is in parallel""" | |
parallel_start_node_id: Optional[str] = None | |
"""parallel start node id if node is in parallel""" | |
parent_parallel_id: Optional[str] = None | |
"""parent parallel id if node is in parallel""" | |
parent_parallel_start_node_id: Optional[str] = None | |
"""parent parallel start node id if node is in parallel""" | |
start_at: datetime | |
node_run_index: int | |
inputs: Optional[dict[str, Any]] = None | |
predecessor_node_id: Optional[str] = None | |
metadata: Optional[dict[str, Any]] = None | |
class QueueIterationNextEvent(AppQueueEvent): | |
""" | |
QueueIterationNextEvent entity | |
""" | |
event: QueueEvent = QueueEvent.ITERATION_NEXT | |
index: int | |
node_execution_id: str | |
node_id: str | |
node_type: NodeType | |
node_data: BaseNodeData | |
parallel_id: Optional[str] = None | |
"""parallel id if node is in parallel""" | |
parallel_start_node_id: Optional[str] = None | |
"""parallel start node id if node is in parallel""" | |
parent_parallel_id: Optional[str] = None | |
"""parent parallel id if node is in parallel""" | |
parent_parallel_start_node_id: Optional[str] = None | |
"""parent parallel start node id if node is in parallel""" | |
parallel_mode_run_id: Optional[str] = None | |
"""iteratoin run in parallel mode run id""" | |
node_run_index: int | |
output: Optional[Any] = None # output for the current iteration | |
def set_output(cls, v): | |
""" | |
Set output | |
""" | |
if v is None: | |
return None | |
if isinstance(v, int | float | str | bool | dict | list): | |
return v | |
raise ValueError("output must be a valid type") | |
class QueueIterationCompletedEvent(AppQueueEvent): | |
""" | |
QueueIterationCompletedEvent entity | |
""" | |
event: QueueEvent = QueueEvent.ITERATION_COMPLETED | |
node_execution_id: str | |
node_id: str | |
node_type: NodeType | |
node_data: BaseNodeData | |
parallel_id: Optional[str] = None | |
"""parallel id if node is in parallel""" | |
parallel_start_node_id: Optional[str] = None | |
"""parallel start node id if node is in parallel""" | |
parent_parallel_id: Optional[str] = None | |
"""parent parallel id if node is in parallel""" | |
parent_parallel_start_node_id: Optional[str] = None | |
"""parent parallel start node id if node is in parallel""" | |
start_at: datetime | |
node_run_index: int | |
inputs: Optional[dict[str, Any]] = None | |
outputs: Optional[dict[str, Any]] = None | |
metadata: Optional[dict[str, Any]] = None | |
steps: int = 0 | |
error: Optional[str] = None | |
class QueueTextChunkEvent(AppQueueEvent): | |
""" | |
QueueTextChunkEvent entity | |
""" | |
event: QueueEvent = QueueEvent.TEXT_CHUNK | |
text: str | |
from_variable_selector: Optional[list[str]] = None | |
"""from variable selector""" | |
in_iteration_id: Optional[str] = None | |
"""iteration id if node is in iteration""" | |
class QueueAgentMessageEvent(AppQueueEvent): | |
""" | |
QueueMessageEvent entity | |
""" | |
event: QueueEvent = QueueEvent.AGENT_MESSAGE | |
chunk: LLMResultChunk | |
class QueueMessageReplaceEvent(AppQueueEvent): | |
""" | |
QueueMessageReplaceEvent entity | |
""" | |
event: QueueEvent = QueueEvent.MESSAGE_REPLACE | |
text: str | |
class QueueRetrieverResourcesEvent(AppQueueEvent): | |
""" | |
QueueRetrieverResourcesEvent entity | |
""" | |
event: QueueEvent = QueueEvent.RETRIEVER_RESOURCES | |
retriever_resources: list[dict] | |
in_iteration_id: Optional[str] = None | |
"""iteration id if node is in iteration""" | |
class QueueAnnotationReplyEvent(AppQueueEvent): | |
""" | |
QueueAnnotationReplyEvent entity | |
""" | |
event: QueueEvent = QueueEvent.ANNOTATION_REPLY | |
message_annotation_id: str | |
class QueueMessageEndEvent(AppQueueEvent): | |
""" | |
QueueMessageEndEvent entity | |
""" | |
event: QueueEvent = QueueEvent.MESSAGE_END | |
llm_result: Optional[LLMResult] = None | |
class QueueAdvancedChatMessageEndEvent(AppQueueEvent): | |
""" | |
QueueAdvancedChatMessageEndEvent entity | |
""" | |
event: QueueEvent = QueueEvent.ADVANCED_CHAT_MESSAGE_END | |
class QueueWorkflowStartedEvent(AppQueueEvent): | |
""" | |
QueueWorkflowStartedEvent entity | |
""" | |
event: QueueEvent = QueueEvent.WORKFLOW_STARTED | |
graph_runtime_state: GraphRuntimeState | |
class QueueWorkflowSucceededEvent(AppQueueEvent): | |
""" | |
QueueWorkflowSucceededEvent entity | |
""" | |
event: QueueEvent = QueueEvent.WORKFLOW_SUCCEEDED | |
outputs: Optional[dict[str, Any]] = None | |
class QueueWorkflowFailedEvent(AppQueueEvent): | |
""" | |
QueueWorkflowFailedEvent entity | |
""" | |
event: QueueEvent = QueueEvent.WORKFLOW_FAILED | |
error: str | |
class QueueNodeStartedEvent(AppQueueEvent): | |
""" | |
QueueNodeStartedEvent entity | |
""" | |
event: QueueEvent = QueueEvent.NODE_STARTED | |
node_execution_id: str | |
node_id: str | |
node_type: NodeType | |
node_data: BaseNodeData | |
node_run_index: int = 1 | |
predecessor_node_id: Optional[str] = None | |
parallel_id: Optional[str] = None | |
"""parallel id if node is in parallel""" | |
parallel_start_node_id: Optional[str] = None | |
"""parallel start node id if node is in parallel""" | |
parent_parallel_id: Optional[str] = None | |
"""parent parallel id if node is in parallel""" | |
parent_parallel_start_node_id: Optional[str] = None | |
"""parent parallel start node id if node is in parallel""" | |
in_iteration_id: Optional[str] = None | |
"""iteration id if node is in iteration""" | |
start_at: datetime | |
parallel_mode_run_id: Optional[str] = None | |
"""iteratoin run in parallel mode run id""" | |
class QueueNodeSucceededEvent(AppQueueEvent): | |
""" | |
QueueNodeSucceededEvent entity | |
""" | |
event: QueueEvent = QueueEvent.NODE_SUCCEEDED | |
node_execution_id: str | |
node_id: str | |
node_type: NodeType | |
node_data: BaseNodeData | |
parallel_id: Optional[str] = None | |
"""parallel id if node is in parallel""" | |
parallel_start_node_id: Optional[str] = None | |
"""parallel start node id if node is in parallel""" | |
parent_parallel_id: Optional[str] = None | |
"""parent parallel id if node is in parallel""" | |
parent_parallel_start_node_id: Optional[str] = None | |
"""parent parallel start node id if node is in parallel""" | |
in_iteration_id: Optional[str] = None | |
"""iteration id if node is in iteration""" | |
start_at: datetime | |
inputs: Optional[dict[str, Any]] = None | |
process_data: Optional[dict[str, Any]] = None | |
outputs: Optional[dict[str, Any]] = None | |
execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None | |
error: Optional[str] = None | |
class QueueNodeInIterationFailedEvent(AppQueueEvent): | |
""" | |
QueueNodeInIterationFailedEvent entity | |
""" | |
event: QueueEvent = QueueEvent.NODE_FAILED | |
node_execution_id: str | |
node_id: str | |
node_type: NodeType | |
node_data: BaseNodeData | |
parallel_id: Optional[str] = None | |
"""parallel id if node is in parallel""" | |
parallel_start_node_id: Optional[str] = None | |
"""parallel start node id if node is in parallel""" | |
parent_parallel_id: Optional[str] = None | |
"""parent parallel id if node is in parallel""" | |
parent_parallel_start_node_id: Optional[str] = None | |
"""parent parallel start node id if node is in parallel""" | |
in_iteration_id: Optional[str] = None | |
"""iteration id if node is in iteration""" | |
start_at: datetime | |
inputs: Optional[dict[str, Any]] = None | |
process_data: Optional[dict[str, Any]] = None | |
outputs: Optional[dict[str, Any]] = None | |
execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None | |
error: str | |
class QueueNodeFailedEvent(AppQueueEvent): | |
""" | |
QueueNodeFailedEvent entity | |
""" | |
event: QueueEvent = QueueEvent.NODE_FAILED | |
node_execution_id: str | |
node_id: str | |
node_type: NodeType | |
node_data: BaseNodeData | |
parallel_id: Optional[str] = None | |
"""parallel id if node is in parallel""" | |
parallel_start_node_id: Optional[str] = None | |
"""parallel start node id if node is in parallel""" | |
parent_parallel_id: Optional[str] = None | |
"""parent parallel id if node is in parallel""" | |
parent_parallel_start_node_id: Optional[str] = None | |
"""parent parallel start node id if node is in parallel""" | |
in_iteration_id: Optional[str] = None | |
"""iteration id if node is in iteration""" | |
start_at: datetime | |
inputs: Optional[dict[str, Any]] = None | |
process_data: Optional[dict[str, Any]] = None | |
outputs: Optional[dict[str, Any]] = None | |
execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None | |
error: str | |
class QueueAgentThoughtEvent(AppQueueEvent): | |
""" | |
QueueAgentThoughtEvent entity | |
""" | |
event: QueueEvent = QueueEvent.AGENT_THOUGHT | |
agent_thought_id: str | |
class QueueMessageFileEvent(AppQueueEvent): | |
""" | |
QueueAgentThoughtEvent entity | |
""" | |
event: QueueEvent = QueueEvent.MESSAGE_FILE | |
message_file_id: str | |
class QueueErrorEvent(AppQueueEvent): | |
""" | |
QueueErrorEvent entity | |
""" | |
event: QueueEvent = QueueEvent.ERROR | |
error: Any = None | |
class QueuePingEvent(AppQueueEvent): | |
""" | |
QueuePingEvent entity | |
""" | |
event: QueueEvent = QueueEvent.PING | |
class QueueStopEvent(AppQueueEvent): | |
""" | |
QueueStopEvent entity | |
""" | |
class StopBy(Enum): | |
""" | |
Stop by enum | |
""" | |
USER_MANUAL = "user-manual" | |
ANNOTATION_REPLY = "annotation-reply" | |
OUTPUT_MODERATION = "output-moderation" | |
INPUT_MODERATION = "input-moderation" | |
event: QueueEvent = QueueEvent.STOP | |
stopped_by: StopBy | |
def get_stop_reason(self) -> str: | |
""" | |
To stop reason | |
""" | |
reason_mapping = { | |
QueueStopEvent.StopBy.USER_MANUAL: "Stopped by user.", | |
QueueStopEvent.StopBy.ANNOTATION_REPLY: "Stopped by annotation reply.", | |
QueueStopEvent.StopBy.OUTPUT_MODERATION: "Stopped by output moderation.", | |
QueueStopEvent.StopBy.INPUT_MODERATION: "Stopped by input moderation.", | |
} | |
return reason_mapping.get(self.stopped_by, "Stopped by unknown reason.") | |
class QueueMessage(BaseModel): | |
""" | |
QueueMessage abstract entity | |
""" | |
task_id: str | |
app_mode: str | |
event: AppQueueEvent | |
class MessageQueueMessage(QueueMessage): | |
""" | |
MessageQueueMessage entity | |
""" | |
message_id: str | |
conversation_id: str | |
class WorkflowQueueMessage(QueueMessage): | |
""" | |
WorkflowQueueMessage entity | |
""" | |
pass | |
class QueueParallelBranchRunStartedEvent(AppQueueEvent): | |
""" | |
QueueParallelBranchRunStartedEvent entity | |
""" | |
event: QueueEvent = QueueEvent.PARALLEL_BRANCH_RUN_STARTED | |
parallel_id: str | |
parallel_start_node_id: str | |
parent_parallel_id: Optional[str] = None | |
"""parent parallel id if node is in parallel""" | |
parent_parallel_start_node_id: Optional[str] = None | |
"""parent parallel start node id if node is in parallel""" | |
in_iteration_id: Optional[str] = None | |
"""iteration id if node is in iteration""" | |
class QueueParallelBranchRunSucceededEvent(AppQueueEvent): | |
""" | |
QueueParallelBranchRunSucceededEvent entity | |
""" | |
event: QueueEvent = QueueEvent.PARALLEL_BRANCH_RUN_SUCCEEDED | |
parallel_id: str | |
parallel_start_node_id: str | |
parent_parallel_id: Optional[str] = None | |
"""parent parallel id if node is in parallel""" | |
parent_parallel_start_node_id: Optional[str] = None | |
"""parent parallel start node id if node is in parallel""" | |
in_iteration_id: Optional[str] = None | |
"""iteration id if node is in iteration""" | |
class QueueParallelBranchRunFailedEvent(AppQueueEvent): | |
""" | |
QueueParallelBranchRunFailedEvent entity | |
""" | |
event: QueueEvent = QueueEvent.PARALLEL_BRANCH_RUN_FAILED | |
parallel_id: str | |
parallel_start_node_id: str | |
parent_parallel_id: Optional[str] = None | |
"""parent parallel id if node is in parallel""" | |
parent_parallel_start_node_id: Optional[str] = None | |
"""parent parallel start node id if node is in parallel""" | |
in_iteration_id: Optional[str] = None | |
"""iteration id if node is in iteration""" | |
error: str | |