File size: 7,709 Bytes
ae64487
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# coding: utf-8
# Copyright (c) 2025 inclusionAI.
import copy
import inspect
import os.path
from typing import Dict, Any, List, Union

from aworld.core.context.base import Context
from aworld.logs.util import logger
from aworld.models.qwen_tokenizer import qwen_tokenizer
from aworld.models.openai_tokenizer import openai_tokenizer
from aworld.utils import import_package


def usage_process(usage: Dict[str, Union[int, Dict[str, int]]] = {}, context: Context = None):
    if not context:
        context = Context.instance()

    stacks = inspect.stack()
    index = 0
    for idx, stack in enumerate(stacks):
        index = idx + 1
        file = os.path.basename(stack.filename)
        # supported use `llm.py` utility function only
        if 'call_llm_model' in stack.function and file == 'llm.py':
            break

    if index >= len(stacks):
        logger.warning("not category usage find to count")
    else:
        instance = stacks[index].frame.f_locals.get('self')
        name = getattr(instance, "_name", "unknown")
        usage[name] = copy.copy(usage)
    # total usage
    context.add_token(usage)


def num_tokens_from_messages(messages, model="gpt-4o"):
    """Return the number of tokens used by a list of messages."""
    import_package("tiktoken")
    import tiktoken

    if model.lower() == "qwen":
        encoding = qwen_tokenizer
    elif model.lower() == "openai":
        encoding = openai_tokenizer
    else:
        try:
            encoding = tiktoken.encoding_for_model(model)
        except KeyError:
            logger.warning(f"{model} model not found. Using cl100k_base encoding.")
            encoding = tiktoken.get_encoding("cl100k_base")

    tokens_per_message = 3
    tokens_per_name = 1

    num_tokens = 0
    for message in messages:
        num_tokens += tokens_per_message
        if isinstance(message, str):
            num_tokens += len(encoding.encode(message))
        else:
            for key, value in message.items():
                num_tokens += len(encoding.encode(str(value)))
                if key == "name":
                    num_tokens += tokens_per_name
    num_tokens += 3
    return num_tokens

def truncate_tokens_from_messages(messages: List[Dict[str, Any]], max_tokens: int, keep_both_sides: bool = False, model: str = "gpt-4o"):
    import_package("tiktoken")
    import tiktoken

    if model.lower() == "qwen":
        return qwen_tokenizer.truncate(messages, max_tokens, keep_both_sides)
    elif model.lower() == "openai":
        return openai_tokenizer.truncate(messages, max_tokens, keep_both_sides)
    
    try:
        encoding = tiktoken.encoding_for_model(model)
    except KeyError:
        logger.warning(f"{model} model not found. Using cl100k_base encoding.")
        encoding = tiktoken.get_encoding("cl100k_base")
        
    return encoding.truncate(messages, max_tokens, keep_both_sides)
    
def agent_desc_transform(agent_dict: Dict[str, Any],
                         agents: List[str] = None,
                         provider: str = 'openai',
                         strategy: str = 'min') -> List[Dict[str, Any]]:
    """Default implement transform framework standard protocol to openai protocol of agent description.

    Args:
        agent_dict: Dict of descriptions of agents that are registered in the agent factory.
        agents: Description of special agents to use.
        provider: Different descriptions formats need to be processed based on the provider.
        strategy: The value is `min` or `max`, when no special agents are provided, `min` indicates no content returned,
                 `max` means get all agents' descriptions.
    """
    agent_as_tools = []
    if not agents and strategy == 'min':
        return agent_as_tools

    if provider and 'openai' in provider:
        for agent_name, agent_info in agent_dict.items():
            if agents and agent_name not in agents:
                logger.debug(f"{agent_name} can not supported in {agents}, you can set `tools` params to support it.")
                continue

            for action in agent_info["abilities"]:
                # Build parameter properties
                properties = {}
                required = []
                for param_name, param_info in action["params"].items():
                    properties[param_name] = {
                        "description": param_info["desc"],
                        "type": param_info["type"] if param_info["type"] != "str" else "string"
                    }
                    if param_info.get("required", False):
                        required.append(param_name)

                openai_function_schema = {
                    "name": f'{agent_name}__{action["name"]}',
                    "description": action["desc"],
                    "parameters": {
                        "type": "object",
                        "properties": properties,
                        "required": required
                    }
                }

                agent_as_tools.append({
                    "type": "function",
                    "function": openai_function_schema
                })
    return agent_as_tools


def tool_desc_transform(tool_dict: Dict[str, Any],
                        tools: List[str] = None,
                        black_tool_actions: Dict[str, List[str]] = {},
                        provider: str = 'openai',
                        strategy: str = 'min') -> List[Dict[str, Any]]:
    """Default implement transform framework standard protocol to openai protocol of tool description.

    Args:
        tool_dict: Dict of descriptions of tools that are registered in the agent factory.
        tools: Description of special tools to use.
        provider: Different descriptions formats need to be processed based on the provider.
        strategy: The value is `min` or `max`, when no special tools are provided, `min` indicates no content returned,
                 `max` means get all tools' descriptions.
    """
    openai_tools = []
    if not tools and strategy == 'min':
        return openai_tools

    if black_tool_actions is None:
        black_tool_actions = {}

    if provider and 'openai' in provider:
        for tool_name, tool_info in tool_dict.items():
            if tools and tool_name not in tools:
                logger.debug(f"{tool_name} can not supported in {tools}, you can set `tools` params to support it.")
                continue

            black_actions = black_tool_actions.get(tool_name, [])
            for action in tool_info["actions"]:
                if action['name'] in black_actions:
                    continue
                # Build parameter properties
                properties = {}
                required = []
                for param_name, param_info in action["params"].items():
                    properties[param_name] = {
                        "description": param_info["desc"],
                        "type": param_info["type"] if param_info["type"] != "str" else "string"
                    }
                    if param_info.get("required", False):
                        required.append(param_name)

                openai_function_schema = {
                    "name": f'{tool_name}__{action["name"]}',
                    "description": action["desc"],
                    "parameters": {
                        "type": "object",
                        "properties": properties,
                        "required": required
                    }
                }

                openai_tools.append({
                    "type": "function",
                    "function": openai_function_schema
                })
    return openai_tools