Duibonduil's picture
Upload 7 files
d0c79e9 verified
raw
history blame
9.38 kB
import logging
from abc import ABC
from typing import Dict, Any, Union, List, Literal, Optional
from datetime import datetime
import uuid
from aworld.models.model_response import ToolCall
from examples.debate.agent.base import DebateSpeech
from examples.debate.agent.prompts import user_assignment_prompt, user_assignment_system_prompt, affirmative_few_shots, \
negative_few_shots, \
user_debate_prompt
from examples.debate.agent.search.search_engine import SearchEngine
from examples.debate.agent.search.tavily_search_engine import TavilySearchEngine
from examples.debate.agent.stream_output_agent import StreamOutputAgent
from aworld.config import AgentConfig
from aworld.core.common import Observation, ActionModel
from aworld.output import SearchOutput, SearchItem, MessageOutput
from aworld.output.artifact import ArtifactType
def truncate_content(raw_content, char_limit):
if raw_content is None:
raw_content = ''
if len(raw_content) > char_limit:
raw_content = raw_content[:char_limit] + "... [truncated]"
return raw_content
class DebateAgent(StreamOutputAgent, ABC):
stance: Literal["affirmative", "negative"]
def __init__(self, name: str, stance: Literal["affirmative", "negative"], conf: AgentConfig, search_engine: Optional[SearchEngine] = TavilySearchEngine()):
conf.name = name
super().__init__(conf)
self.steps = 0
self.stance = stance
self.search_engine = search_engine
async def speech(self, topic: str, opinion: str,oppose_opinion: str, round: int, speech_history: list[DebateSpeech]) -> DebateSpeech:
observation = Observation(content=self.get_latest_speech(speech_history).content if self.get_latest_speech(speech_history) else "")
info = {
"topic": topic,
"round": round,
"opinion": opinion,
"oppose_opinion": oppose_opinion,
"history": speech_history
}
actions = await self.async_policy(observation, info)
return actions[0].policy_info
async def async_policy(self, observation: Observation, info: Dict[str, Any] = {}, **kwargs) -> Union[
List[ActionModel], None]:
## step 1: params
opponent_claim = observation.content
round = info["round"]
opinion = info["opinion"]
oppose_opinion = info["oppose_opinion"]
topic = info["topic"]
history: list[DebateSpeech] = info["history"]
#Event.emit("xxx")
## step2: gen keywords
keywords = await self.gen_keywords(topic, opinion, oppose_opinion, opponent_claim, history)
logging.info(f"gen keywords = {keywords}")
## step3:search_webpages
search_results = await self.search_webpages(keywords, max_results=5)
for search_result in search_results:
logging.info(f"keyword#{search_result['query']}-> result size is {len(search_result['results'])}")
search_item = {
"query": search_result.get("query", ""),
"results": [SearchItem(title=result["title"],url=result["url"], content=result['content'], raw_content=result['raw_content'], metadata={}) for result in search_result["results"]],
"origin_tool_call": ToolCall.from_dict({
"id": f"call_search",
"type": "function",
"function": {
"name": "search",
"arguments": keywords
}
})
}
search_output = SearchOutput.from_dict(search_item)
await self.workspace.create_artifact(
artifact_type=ArtifactType.WEB_PAGES,
artifact_id=str(uuid.uuid4()),
content=search_output,
metadata={
"query": search_output.query,
"user": self.name(),
"round": info["round"],
"opinion": info["opinion"],
"oppose_opinion": info["oppose_opinion"],
"topic": info["topic"],
"tags": [f"user#{self.name()}",f"Rounds#{info['round']}"]
}
)
## step4 gen result
user_response = await self.gen_statement(topic, opinion, oppose_opinion, opponent_claim, history, search_results)
logging.info(f"user_response is {user_response}")
## step3: gen speech
speech = DebateSpeech.from_dict({
"round": round,
"type": "speech",
"stance": self.stance,
"name": self.name(),
})
async def after_speech_call(message_output_response):
logging.info(f"{self.stance}#{self.name()}: after_speech_call")
speech.metadata = {}
speech.content = message_output_response
speech.finished = True
await speech.convert_to_parts(user_response, after_speech_call)
action = ActionModel(
policy_info=speech
)
return [action]
async def gen_keywords(self, topic, opinion, oppose_opinion, last_oppose_speech_content, history):
current_time = datetime.now().strftime("%Y-%m-%d-%H")
human_prompt = user_assignment_prompt.format(topic=topic,
opinion=opinion,
oppose_opinion=oppose_opinion,
last_oppose_speech_content=last_oppose_speech_content,
current_time = current_time,
limit=2
)
messages = [{'role': 'system', 'content': user_assignment_system_prompt},
{'role': 'user', 'content': human_prompt}]
output = await self.async_call_llm(messages)
response = await output.get_finished_response()
return response.split(",")
async def search_webpages(self, keywords, max_results):
return await self.search_engine.async_batch_search(queries=keywords, max_results=max_results)
async def gen_statement(self, topic, opinion, oppose_opinion, opponent_claim, history, search_results) -> MessageOutput:
search_results_content = ""
for search_result in search_results:
search_results_content += f"SearchQuery: {search_result['query']}"
search_results_content += "\n\n".join([truncate_content(s['content'], 1000) for s in search_result['results']])
unique_history = history
# if len(history) >= 2:
# for i in range(len(history)):
# # Check if the current element is the same as the next one
# if i == len(history) - 1 or history[i] != history[i+1]:
# # Add the current element to the result list
# unique_history.append(history[i])
affirmative_chat_history = ""
negative_chat_history = ""
if len(unique_history) >= 2:
if self.stance == "affirmative":
for speech in unique_history[:-1]:
if speech.stance == "affirmative":
affirmative_chat_history = affirmative_chat_history + "You: " + speech.content + "\n"
elif speech.stance == "negative":
affirmative_chat_history = affirmative_chat_history + "Your Opponent: " + speech.content + "\n"
elif self.stance == "negative":
for speech in unique_history[:-1]:
if speech.stance == "negative":
negative_chat_history = negative_chat_history + "You: " + speech.content + "\n"
elif speech.stance == "affirmative":
negative_chat_history = negative_chat_history + "Your Opponent: " + speech.content + "\n"
few_shots = ""
chat_history = ""
if self.stance == "affirmative":
chat_history = affirmative_chat_history
few_shots = affirmative_few_shots
elif self.stance == "negative":
chat_history = negative_chat_history
few_shots = negative_few_shots
human_prompt = user_debate_prompt.format(topic=topic,
opinion=opinion,
oppose_opinion=oppose_opinion,
last_oppose_speech_content=opponent_claim,
search_results_content=search_results_content,
chat_history = chat_history,
few_shots = few_shots
)
messages = [{'role': 'system', 'content': user_assignment_system_prompt},
{'role': 'user', 'content': human_prompt}]
return await self.async_call_llm(messages)
def get_latest_speech(self, history: list[DebateSpeech]):
"""
get the latest speech from history
"""
if len(history) == 0:
return None
return history[-1]
def set_workspace(self, workspace):
self.workspace = workspace