Spaces:
Runtime error
Runtime error
| import time | |
| import types | |
| from typing import Any, Dict, List, Tuple, Union | |
| from langchain.agents import AgentExecutor | |
| from langchain.input import get_color_mapping | |
| from langchain.schema import AgentAction, AgentFinish | |
| from .translator import Translator | |
| class AgentExecutorWithTranslation(AgentExecutor): | |
| translator: Translator = Translator() | |
| def prep_outputs( | |
| self, | |
| inputs: Dict[str, str], | |
| outputs: Dict[str, str], | |
| return_only_outputs: bool = False, | |
| ) -> Dict[str, str]: | |
| try: | |
| outputs = super().prep_outputs(inputs, outputs, return_only_outputs) | |
| except ValueError as e: | |
| return outputs | |
| else: | |
| if "input" in outputs: | |
| outputs = self.translator(outputs) | |
| return outputs | |
| class Executor(AgentExecutorWithTranslation): | |
| def _call(self, inputs: Dict[str, str]) -> Dict[str, Any]: | |
| """Run text through and get agent response.""" | |
| # Construct a mapping of tool name to tool for easy lookup | |
| name_to_tool_map = {tool.name: tool for tool in self.tools} | |
| # We construct a mapping from each tool to a color, used for logging. | |
| color_mapping = get_color_mapping( | |
| [tool.name for tool in self.tools], excluded_colors=["green"] | |
| ) | |
| intermediate_steps: List[Tuple[AgentAction, str]] = [] | |
| # Let's start tracking the iterations the agent has gone through | |
| iterations = 0 | |
| time_elapsed = 0.0 | |
| start_time = time.time() | |
| # We now enter the agent loop (until it returns something). | |
| while self._should_continue(iterations, time_elapsed): | |
| next_step_output = self._take_next_step( | |
| name_to_tool_map, color_mapping, inputs, intermediate_steps | |
| ) | |
| if isinstance(next_step_output, AgentFinish): | |
| yield self._return(next_step_output, intermediate_steps) | |
| return | |
| for i, output in enumerate(next_step_output): | |
| agent_action = output[0] | |
| tool_logo = None | |
| for tool in self.tools: | |
| if tool.name == agent_action.tool: | |
| tool_logo = tool.tool_logo_md | |
| if isinstance(output[1], types.GeneratorType): | |
| logo = f"{tool_logo}" if tool_logo is not None else "" | |
| yield ( | |
| AgentAction("", agent_action.tool_input, agent_action.log), | |
| f"Further use other tool {logo} to answer the question.", | |
| ) | |
| for out in output[1]: | |
| yield out | |
| next_step_output[i] = (agent_action, out) | |
| else: | |
| for tool in self.tools: | |
| if tool.name == agent_action.tool: | |
| yield ( | |
| AgentAction( | |
| tool_logo, agent_action.tool_input, agent_action.log | |
| ), | |
| output[1], | |
| ) | |
| intermediate_steps.extend(next_step_output) | |
| if len(next_step_output) == 1: | |
| next_step_action = next_step_output[0] | |
| # See if tool should return directly | |
| tool_return = self._get_tool_return(next_step_action) | |
| if tool_return is not None: | |
| yield self._return(tool_return, intermediate_steps) | |
| return | |
| iterations += 1 | |
| time_elapsed = time.time() - start_time | |
| output = self.agent.return_stopped_response( | |
| self.early_stopping_method, intermediate_steps, **inputs | |
| ) | |
| yield self._return(output, intermediate_steps) | |
| return | |
| def __call__( | |
| self, inputs: Union[Dict[str, Any], Any], return_only_outputs: bool = False | |
| ) -> Dict[str, Any]: | |
| """Run the logic of this chain and add to output if desired. | |
| Args: | |
| inputs: Dictionary of inputs, or single input if chain expects | |
| only one param. | |
| return_only_outputs: boolean for whether to return only outputs in the | |
| response. If True, only new keys generated by this chain will be | |
| returned. If False, both input keys and new keys generated by this | |
| chain will be returned. Defaults to False. | |
| """ | |
| inputs = self.prep_inputs(inputs) | |
| self.callback_manager.on_chain_start( | |
| {"name": self.__class__.__name__}, | |
| inputs, | |
| verbose=self.verbose, | |
| ) | |
| try: | |
| for output in self._call(inputs): | |
| if type(output) is dict: | |
| output = self.prep_outputs(inputs, output, return_only_outputs) | |
| yield output | |
| except (KeyboardInterrupt, Exception) as e: | |
| self.callback_manager.on_chain_error(e, verbose=self.verbose) | |
| raise e | |
| self.callback_manager.on_chain_end(output, verbose=self.verbose) | |
| # return self.prep_outputs(inputs, output, return_only_outputs) | |
| return output | |