Spaces:
Sleeping
Sleeping
File size: 7,709 Bytes
ae64487 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# coding: utf-8
# Copyright (c) 2025 inclusionAI.
import copy
import inspect
import os.path
from typing import Dict, Any, List, Union
from aworld.core.context.base import Context
from aworld.logs.util import logger
from aworld.models.qwen_tokenizer import qwen_tokenizer
from aworld.models.openai_tokenizer import openai_tokenizer
from aworld.utils import import_package
def usage_process(usage: Dict[str, Union[int, Dict[str, int]]] = {}, context: Context = None):
if not context:
context = Context.instance()
stacks = inspect.stack()
index = 0
for idx, stack in enumerate(stacks):
index = idx + 1
file = os.path.basename(stack.filename)
# supported use `llm.py` utility function only
if 'call_llm_model' in stack.function and file == 'llm.py':
break
if index >= len(stacks):
logger.warning("not category usage find to count")
else:
instance = stacks[index].frame.f_locals.get('self')
name = getattr(instance, "_name", "unknown")
usage[name] = copy.copy(usage)
# total usage
context.add_token(usage)
def num_tokens_from_messages(messages, model="gpt-4o"):
"""Return the number of tokens used by a list of messages."""
import_package("tiktoken")
import tiktoken
if model.lower() == "qwen":
encoding = qwen_tokenizer
elif model.lower() == "openai":
encoding = openai_tokenizer
else:
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
logger.warning(f"{model} model not found. Using cl100k_base encoding.")
encoding = tiktoken.get_encoding("cl100k_base")
tokens_per_message = 3
tokens_per_name = 1
num_tokens = 0
for message in messages:
num_tokens += tokens_per_message
if isinstance(message, str):
num_tokens += len(encoding.encode(message))
else:
for key, value in message.items():
num_tokens += len(encoding.encode(str(value)))
if key == "name":
num_tokens += tokens_per_name
num_tokens += 3
return num_tokens
def truncate_tokens_from_messages(messages: List[Dict[str, Any]], max_tokens: int, keep_both_sides: bool = False, model: str = "gpt-4o"):
import_package("tiktoken")
import tiktoken
if model.lower() == "qwen":
return qwen_tokenizer.truncate(messages, max_tokens, keep_both_sides)
elif model.lower() == "openai":
return openai_tokenizer.truncate(messages, max_tokens, keep_both_sides)
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
logger.warning(f"{model} model not found. Using cl100k_base encoding.")
encoding = tiktoken.get_encoding("cl100k_base")
return encoding.truncate(messages, max_tokens, keep_both_sides)
def agent_desc_transform(agent_dict: Dict[str, Any],
agents: List[str] = None,
provider: str = 'openai',
strategy: str = 'min') -> List[Dict[str, Any]]:
"""Default implement transform framework standard protocol to openai protocol of agent description.
Args:
agent_dict: Dict of descriptions of agents that are registered in the agent factory.
agents: Description of special agents to use.
provider: Different descriptions formats need to be processed based on the provider.
strategy: The value is `min` or `max`, when no special agents are provided, `min` indicates no content returned,
`max` means get all agents' descriptions.
"""
agent_as_tools = []
if not agents and strategy == 'min':
return agent_as_tools
if provider and 'openai' in provider:
for agent_name, agent_info in agent_dict.items():
if agents and agent_name not in agents:
logger.debug(f"{agent_name} can not supported in {agents}, you can set `tools` params to support it.")
continue
for action in agent_info["abilities"]:
# Build parameter properties
properties = {}
required = []
for param_name, param_info in action["params"].items():
properties[param_name] = {
"description": param_info["desc"],
"type": param_info["type"] if param_info["type"] != "str" else "string"
}
if param_info.get("required", False):
required.append(param_name)
openai_function_schema = {
"name": f'{agent_name}__{action["name"]}',
"description": action["desc"],
"parameters": {
"type": "object",
"properties": properties,
"required": required
}
}
agent_as_tools.append({
"type": "function",
"function": openai_function_schema
})
return agent_as_tools
def tool_desc_transform(tool_dict: Dict[str, Any],
tools: List[str] = None,
black_tool_actions: Dict[str, List[str]] = {},
provider: str = 'openai',
strategy: str = 'min') -> List[Dict[str, Any]]:
"""Default implement transform framework standard protocol to openai protocol of tool description.
Args:
tool_dict: Dict of descriptions of tools that are registered in the agent factory.
tools: Description of special tools to use.
provider: Different descriptions formats need to be processed based on the provider.
strategy: The value is `min` or `max`, when no special tools are provided, `min` indicates no content returned,
`max` means get all tools' descriptions.
"""
openai_tools = []
if not tools and strategy == 'min':
return openai_tools
if black_tool_actions is None:
black_tool_actions = {}
if provider and 'openai' in provider:
for tool_name, tool_info in tool_dict.items():
if tools and tool_name not in tools:
logger.debug(f"{tool_name} can not supported in {tools}, you can set `tools` params to support it.")
continue
black_actions = black_tool_actions.get(tool_name, [])
for action in tool_info["actions"]:
if action['name'] in black_actions:
continue
# Build parameter properties
properties = {}
required = []
for param_name, param_info in action["params"].items():
properties[param_name] = {
"description": param_info["desc"],
"type": param_info["type"] if param_info["type"] != "str" else "string"
}
if param_info.get("required", False):
required.append(param_name)
openai_function_schema = {
"name": f'{tool_name}__{action["name"]}',
"description": action["desc"],
"parameters": {
"type": "object",
"properties": properties,
"required": required
}
}
openai_tools.append({
"type": "function",
"function": openai_function_schema
})
return openai_tools
|