Duibonduil commited on
Commit
d5421b3
·
verified ·
1 Parent(s): d70d6cb

Upload 2 files

Browse files
aworld/core/event/base.py ADDED
@@ -0,0 +1,185 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # coding: utf-8
2
+ # Copyright (c) 2025 inclusionAI.
3
+ import abc
4
+ import time
5
+ import uuid
6
+ from dataclasses import dataclass, field
7
+ from typing import Any, Dict, Generic, TypeVar, List, Optional
8
+
9
+ from pydantic import BaseModel
10
+
11
+ from aworld.config.conf import ConfigDict
12
+ from aworld.core.common import Config, Observation, ActionModel, TaskItem
13
+ from aworld.core.context.base import Context
14
+
15
+
16
+ class Constants:
17
+ AGENT = "agent"
18
+ TOOL = "tool"
19
+ TASK = "task"
20
+ OUTPUT = "output"
21
+ TOOL_CALLBACK = "tool_callback"
22
+
23
+
24
+ class TopicType:
25
+ START = "__start"
26
+ FINISHED = "__finished"
27
+ OUTPUT = "__output"
28
+ ERROR = "__error"
29
+ RERUN = "__rerun"
30
+ HUMAN_CONFIRM = "__human_confirm"
31
+ # for dynamic subscribe
32
+ SUBSCRIBE_TOOL = "__subscribe_tool"
33
+ SUBSCRIBE_AGENT = "__subscribe_agent"
34
+
35
+
36
+ DataType = TypeVar('DataType')
37
+
38
+
39
+ @dataclass
40
+ class Message(Generic[DataType]):
41
+ """The message structure for event transmission.
42
+
43
+ Each message has a unique ID, and the actual data is carried through the `payload` attribute,
44
+ peer to peer(p2p) message transmission is achieved by setting the `receiver`, and topic based
45
+ message transmission is achieved by setting the `topic`.
46
+
47
+ Specific message recognition and processing can be achieved through the type of `payload`
48
+ or by extending `Message`.
49
+ """
50
+ session_id: str
51
+ payload: Optional[DataType]
52
+ # Current caller
53
+ sender: str
54
+ # event type
55
+ category: str
56
+ # Next caller
57
+ receiver: str = None
58
+ # The previous caller
59
+ caller: str = None
60
+ id: str = uuid.uuid4().hex
61
+ priority: int = 0
62
+ # Topic of message
63
+ topic: str = None
64
+ headers: Dict[str, Any] = field(default_factory=dict)
65
+ timestamp: int = time.time()
66
+
67
+ def __post_init__(self):
68
+ context = self.headers.get("context")
69
+ if not context:
70
+ self.headers['context'] = Context.instance()
71
+
72
+ def __lt__(self, other: object) -> bool:
73
+ if not isinstance(other, Message):
74
+ raise RuntimeError
75
+ return self.priority < other.priority
76
+
77
+ def key(self) -> str:
78
+ category = self.category if self.category else ''
79
+ if self.topic:
80
+ return f'{category}_{self.topic}'
81
+ else:
82
+ return f'{category}_{self.sender if self.sender else ""}'
83
+
84
+ def is_error(self):
85
+ return self.topic == TopicType.ERROR
86
+
87
+ @property
88
+ def task_id(self):
89
+ return self.context.task_id
90
+
91
+ @property
92
+ def context(self) -> Context:
93
+ return self.headers.get('context')
94
+
95
+ @context.setter
96
+ def context(self, context: Context):
97
+ self.headers['context'] = context
98
+
99
+
100
+ @dataclass
101
+ class TaskEvent(Message[TaskItem]):
102
+ """Task message is oriented towards applications, can interact with third-party entities independently."""
103
+ category: str = 'task'
104
+
105
+
106
+ @dataclass
107
+ class AgentMessage(Message[Observation]):
108
+ """Agent event is oriented towards applications, can interact with third-party entities independently.
109
+
110
+ For example, `agent` event can interact with other agents through the A2A protocol.
111
+ """
112
+ category: str = 'agent'
113
+
114
+
115
+ @dataclass
116
+ class ToolMessage(Message[List[ActionModel]]):
117
+ """Tool event is oriented towards applications, can interact with third-party entities independently.
118
+
119
+ For example, `tool` event can interact with other tools through the MCP protocol.
120
+ """
121
+ category: str = 'tool'
122
+
123
+
124
+ class Messageable(object):
125
+ """Top-level API for data reception, transmission and transformation."""
126
+ __metaclass__ = abc.ABCMeta
127
+
128
+ def __init__(self, conf: Config = None, **kwargs):
129
+ self.conf = conf
130
+ if isinstance(conf, Dict):
131
+ self.conf = ConfigDict(conf)
132
+ elif isinstance(conf, BaseModel):
133
+ # To add flexibility
134
+ self.conf = ConfigDict(conf.model_dump())
135
+
136
+ @abc.abstractmethod
137
+ async def send(self, message: Message, **kwargs):
138
+ """Send a message to the receiver.
139
+
140
+ Args:
141
+ message: Message structure that carries the data that needs to be processed.
142
+ """
143
+
144
+ @abc.abstractmethod
145
+ async def receive(self, message: Message, **kwargs):
146
+ """Receive a message from the sender.
147
+
148
+ Mainly used for request-driven (call), event-driven is generally handled using `Eventbus`.
149
+
150
+ Args:
151
+ message: Message structure that carries the data that needs to be processed.
152
+ """
153
+
154
+ async def transform(self, message: Message, **kwargs):
155
+ """Transforms a message into a standardized format from the sender.
156
+
157
+ Args:
158
+ message: Message structure that carries the data that needs to be processed.
159
+ """
160
+
161
+
162
+ class Recordable(Messageable):
163
+ """Top-level API for recording data."""
164
+
165
+ async def send(self, message: Message, **kwargs):
166
+ return await self.write(message, **kwargs)
167
+
168
+ async def receive(self, message: Message, **kwargs):
169
+ return await self.read(message, **kwargs)
170
+
171
+ @abc.abstractmethod
172
+ async def read(self, message: Message, **kwargs):
173
+ """Read a message from the store.
174
+
175
+ Args:
176
+ message: Message structure that carries the data that needs to be read.
177
+ """
178
+
179
+ @abc.abstractmethod
180
+ async def write(self, message: Message, **kwargs):
181
+ """Write a message to the store.
182
+
183
+ Args:
184
+ message: Message structure that carries the data that needs to be write.
185
+ """
aworld/core/event/event_bus.py ADDED
@@ -0,0 +1,150 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # coding: utf-8
2
+ # Copyright (c) 2025 inclusionAI.
3
+ import abc
4
+ from asyncio import Queue, PriorityQueue
5
+ from inspect import isfunction
6
+ from typing import Callable, Any, Dict, List
7
+
8
+ from aworld.core.singleton import InheritanceSingleton
9
+
10
+ from aworld.core.common import Config
11
+ from aworld.core.event.base import Message, Messageable
12
+ from aworld.logs.util import logger
13
+
14
+
15
+ class Eventbus(Messageable, InheritanceSingleton):
16
+ __metaclass__ = abc.ABCMeta
17
+
18
+ def __init__(self, conf: Config = None, **kwargs):
19
+ super().__init__(conf, **kwargs)
20
+ # {event_type: {topic: [handler1, handler2]}}
21
+ self._subscribers: Dict[str, Dict[str, List[Callable[..., Any]]]] = {}
22
+
23
+ # {event_type, handler}
24
+ self._transformer: Dict[str, Callable[..., Any]] = {}
25
+
26
+ async def send(self, message: Message, **kwargs):
27
+ return await self.publish(message, **kwargs)
28
+
29
+ async def receive(self, message: Message, **kwargs):
30
+ return await self.consume(message)
31
+
32
+ async def publish(self, messages: Message, **kwargs):
33
+ """Publish a message, equals `send`."""
34
+
35
+ async def consume(self, message: Message, **kwargs):
36
+ """Consume the message queue."""
37
+
38
+ async def subscribe(self, event_type: str, topic: str, handler: Callable[..., Any], **kwargs):
39
+ """Subscribe the handler to the event type and the topic.
40
+
41
+ NOTE: The handler list is executed sequentially in the topic, the output
42
+ of the previous one is the input of the next one.
43
+
44
+ Args:
45
+ event_type: Type of events, fixed ones(task, agent, tool, error).
46
+ topic: Classify messages through the topic.
47
+ handler: Function of handle the event type and topic message.
48
+ kwargs:
49
+ - transformer: Whether it is a transformer handler.
50
+ - order: Handler order in the topic.
51
+ """
52
+
53
+ async def unsubscribe(self, event_type: str, topic: str, handler: Callable[..., Any], **kwargs):
54
+ """Unsubscribe the handler to the event type and the topic.
55
+
56
+ Args:
57
+ event_type: Type of events, fixed ones(task, agent, tool, error).
58
+ topic: Classify messages through the topic.
59
+ handler: Function of handle the event type and topic message.
60
+ kwargs:
61
+ - transformer: Whether it is a transformer handler.
62
+ """
63
+
64
+ def get_handlers(self, event_type: str) -> Dict[str, List[Callable[..., Any]]]:
65
+ return self._subscribers.get(event_type, {})
66
+
67
+ def get_topic_handlers(self, event_type: str, topic: str) -> List[Callable[..., Any]]:
68
+ return self._subscribers.get(event_type, {}).get(topic, [])
69
+
70
+ def get_transform_handlers(self, key: str) -> Callable[..., Any]:
71
+ return self._transformer.get(key, None)
72
+
73
+ def close(self):
74
+ pass
75
+
76
+
77
+ class InMemoryEventbus(Eventbus):
78
+ def __init__(self, conf: Config = None, **kwargs):
79
+ super().__init__(conf, **kwargs)
80
+
81
+ # use asyncio Queue as default
82
+ # use asyncio Queue as default, isolation based on session_id
83
+ # self._message_queue: Queue = Queue()
84
+ self._message_queue: Dict[str, Queue] = {}
85
+
86
+ def wait_consume_size(self, id: str) -> int:
87
+ return self._message_queue.get(id, Queue()).qsize()
88
+
89
+ async def publish(self, message: Message, **kwargs):
90
+ queue = self._message_queue.get(message.task_id)
91
+ if not queue:
92
+ queue = PriorityQueue()
93
+ self._message_queue[message.task_id] = queue
94
+ await queue.put(message)
95
+
96
+ async def consume(self, message: Message, **kwargs):
97
+ return await self._message_queue.get(message.task_id, PriorityQueue()).get()
98
+
99
+ async def consume_nowait(self, message: Message):
100
+ return self._message_queue.get(message.task_id, PriorityQueue()).get_nowait()
101
+
102
+ async def done(self, id: str):
103
+ while not self._message_queue.get(id, PriorityQueue()).empty():
104
+ self._message_queue.get(id, PriorityQueue()).get_nowait()
105
+ self._message_queue.get(id, PriorityQueue()).task_done()
106
+
107
+ async def subscribe(self, event_type: str, topic: str, handler: Callable[..., Any], **kwargs):
108
+ if kwargs.get("transformer"):
109
+ if event_type in self._transformer:
110
+ logger.warning(f"{event_type} transform already subscribe.")
111
+ return
112
+
113
+ if isfunction(handler):
114
+ self._transformer[event_type] = handler
115
+ else:
116
+ logger.warning(f"{event_type} {topic} subscribe fail, handler {handler} is not a function.")
117
+ return
118
+
119
+ order = kwargs.get('order', 99999)
120
+
121
+ handlers = self._subscribers.get(event_type)
122
+ if not handlers:
123
+ self._subscribers[event_type] = {}
124
+ topic_handlers = self._subscribers[event_type].get(topic)
125
+ if not topic_handlers:
126
+ self._subscribers[event_type][topic] = []
127
+
128
+ if order >= len(self._subscribers[event_type][topic]):
129
+ self._subscribers[event_type][topic].append(handler)
130
+ else:
131
+ self._subscribers[event_type][topic].insert(order, handler)
132
+ logger.info(f"subscribe {event_type} {topic} {handler} success.")
133
+ logger.info(f"{self._subscribers}")
134
+
135
+ async def unsubscribe(self, event_type: str, topic: str, handler: Callable[..., Any], **kwargs):
136
+ if kwargs.get("transformer"):
137
+ if event_type not in self._transformer:
138
+ logger.warning(f"{event_type} transform not subscribe.")
139
+ return
140
+
141
+ self._transformer.pop(event_type, None)
142
+ return
143
+
144
+ if event_type not in self._subscribers:
145
+ logger.warning(f"{event_type} handler not register.")
146
+ return
147
+
148
+ handlers = self._subscribers[event_type]
149
+ topic_handlers: List = handlers.get(topic, [])
150
+ topic_handlers.remove(handler)