import json from rich import print as rich_print from rich.panel import Panel from rich.console import Console from rich.pretty import Pretty from rich.markdown import Markdown from rich.json import JSON from typing import TypedDict, Sequence, Annotated from langchain_core.messages import BaseMessage from langgraph.graph.message import add_messages from langgraph.graph import StateGraph, START, END from langchain_openai import ChatOpenAI from langgraph.prebuilt import ToolNode, tools_condition from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from tqdm import tqdm def print_conversation(messages): console = Console(width=200, soft_wrap=True) for msg in messages: role = msg.get("role", "unknown").capitalize() content = msg.get("content", "") try: parsed_json = json.loads(content) rendered_content = JSON.from_data(parsed_json) except (json.JSONDecodeError, TypeError): rendered_content = Markdown(content.strip()) panel = Panel( rendered_content, title=f"[bold blue]{role}[/]", border_style="green" if role == "User" else "magenta", expand=True ) console.print(panel) def generate_final_answer(qa: dict[str, str]) -> str: """Invokes gpt-4o-mini to extract generate a final answer based on the content query, response, and metadata""" final_answer_llm = ChatOpenAI(model="gpt-4o", temperature=0) system_prompt = ( "You will receive a JSON string containing a user's query, a response, and metadata. " "Extract and return only the final answer to the query as a plain string. " "Do not return anything else. " "Avoid any labels, prefixes, or explanation. " "Return only the exact value that satisfies the query, suitable for string comparison." "If the query is not answerable due to a missing file in the input and is reflected in the response, answer with 'File not found'. " ) system_message = SystemMessage(content=system_prompt) messages = [ system_message, HumanMessage(content=f'Generate the final answer for the following query:\n\n{json.dumps(qa)}') ] response = final_answer_llm.invoke(messages) return response.content class AgentState(TypedDict): messages: Annotated[Sequence[BaseMessage], add_messages] class BasicOpenAIAgentWorkflow: """Basic custom class from an agent prompted for tool-use pattern""" def __init__(self, tools: list, model='gpt-4o', backstory:str="", streaming=False): self.name = "Basic OpenAI Agent Workflow" self.tools = tools self.llm = ChatOpenAI(model=model, temperature=0, streaming=streaming) self.graph = None self.history = [] self.history_messages = [] # Store messages in LangChain format self.backstory = backstory if backstory else "You are a helpful assistant that can use tools to answer questions. Your name is Gaia." role_message = {'role': 'system', 'content': self.backstory} self.history.append(role_message) def _call_llm(self, state: AgentState): """invokes the assigned llm""" return {'messages': [self.llm.invoke(state['messages'])]} def _convert_history_to_messages(self): """Convert self.history to LangChain-compatible messages""" converted = [] for msg in self.history: content = msg['content'] if not isinstance(content, str): raise ValueError(f"Expected string content, got: {type(content)} — {content}") if msg['role'] == 'user': converted.append(HumanMessage(content=content)) elif msg['role'] == 'assistant': converted.append(AIMessage(content=content)) elif msg['role'] == 'system': converted.append(SystemMessage(content=content)) else: raise ValueError(f"Unknown role in message: {msg}") self.history_messages = converted def create_basic_tool_use_agent_state_graph(self, custom_tools_nm="tools"): """Binds tools, creates and compiles graph""" self.llm = self.llm.bind_tools(self.tools) # Graph Init graph = StateGraph(AgentState) # Nodes graph.add_node('agent', self._call_llm) tools_node = ToolNode(self.tools) graph.add_node(custom_tools_nm, tools_node) # Edges graph.add_edge(START, "agent") graph.add_conditional_edges('agent', tools_condition, {'tools': custom_tools_nm, END: END}) self.graph = graph.compile() def chat(self, query, verbose=2, only_final_answer=False): """Simple agent call""" if isinstance(query, dict): query = query["messages"] user_message = {'role': 'user', 'content': query} self.history.append(user_message) # Ensure history has at least 1 message if not self.history: raise ValueError("History is empty. Cannot proceed.") self._convert_history_to_messages() if not self.history_messages: raise ValueError("Converted message history is empty. Something went wrong.") response = self.graph.invoke({'messages': self.history_messages}) # invoke with all the history to keep context (dummy mem) response = response['messages'][-1].content if only_final_answer: final_answer_content = { 'query': query, 'response': response, 'metadata': {} } response = generate_final_answer(final_answer_content) assistant_message = {'role': 'assistant', 'content': response} self.history.append(assistant_message) if verbose==2: print_conversation(self.history) elif verbose==1: print_conversation([assistant_message]) return response def invoke(self, input_str: str): """Invoke the compiled graph with the input data""" _ = self.chat(input_str) # prints response in terminal self._convert_history_to_messages() return {'messages': self.history_messages} def chat_batch(self, queries=None, only_final_answer=False): """Send several simple agent calls to the llm using the compiled graph""" if queries is None: queries = [] for i, query in tqdm(enumerate(queries, start=1)): if i == len(queries): self.chat(query, verbose=2, only_final_answer=only_final_answer) else: self.chat(query, verbose=0, only_final_answer=only_final_answer)