Spaces:
Sleeping
Sleeping
import json | |
from rich import print as rich_print | |
from rich.panel import Panel | |
from rich.console import Console | |
from rich.pretty import Pretty | |
from rich.markdown import Markdown | |
from rich.json import JSON | |
from typing import TypedDict, Sequence, Annotated | |
from langchain_core.messages import BaseMessage | |
from langgraph.graph.message import add_messages | |
from langgraph.graph import StateGraph, START, END | |
from langchain_openai import ChatOpenAI | |
from langgraph.prebuilt import ToolNode, tools_condition | |
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage | |
from tqdm import tqdm | |
def print_conversation(messages): | |
console = Console(width=200, soft_wrap=True) | |
for msg in messages: | |
role = msg.get("role", "unknown").capitalize() | |
content = msg.get("content", "") | |
try: | |
parsed_json = json.loads(content) | |
rendered_content = JSON.from_data(parsed_json) | |
except (json.JSONDecodeError, TypeError): | |
rendered_content = Markdown(content.strip()) | |
panel = Panel( | |
rendered_content, | |
title=f"[bold blue]{role}[/]", | |
border_style="green" if role == "User" else "magenta", | |
expand=True | |
) | |
console.print(panel) | |
def generate_final_answer(qa: dict[str, str]) -> str: | |
"""Invokes gpt-4o-mini to extract generate a final answer based on the content query, response, and metadata""" | |
final_answer_llm = ChatOpenAI(model="gpt-4o", temperature=0) | |
system_prompt = ( | |
"You will receive a JSON string containing a user's query, a response, and metadata. " | |
"Extract and return only the final answer to the query as a plain string. " | |
"Do not return anything else. " | |
"Avoid any labels, prefixes, or explanation. " | |
"Return only the exact value that satisfies the query, suitable for string comparison." | |
"If the query is not answerable due to a missing file in the input and is reflected in the response, answer with 'File not found'. " | |
) | |
system_message = SystemMessage(content=system_prompt) | |
messages = [ | |
system_message, | |
HumanMessage(content=f'Generate the final answer for the following query:\n\n{json.dumps(qa)}') | |
] | |
response = final_answer_llm.invoke(messages) | |
return response.content | |
class AgentState(TypedDict): | |
messages: Annotated[Sequence[BaseMessage], add_messages] | |
class BasicOpenAIAgentWorkflow: | |
"""Basic custom class from an agent prompted for tool-use pattern""" | |
def __init__(self, tools: list, model='gpt-4o', backstory:str="", streaming=False): | |
self.name = "Basic OpenAI Agent Workflow" | |
self.tools = tools | |
self.llm = ChatOpenAI(model=model, temperature=0, streaming=streaming) | |
self.graph = None | |
self.history = [] | |
self.history_messages = [] # Store messages in LangChain format | |
self.backstory = backstory if backstory else "You are a helpful assistant that can use tools to answer questions. Your name is Gaia." | |
role_message = {'role': 'system', 'content': self.backstory} | |
self.history.append(role_message) | |
def _call_llm(self, state: AgentState): | |
"""invokes the assigned llm""" | |
return {'messages': [self.llm.invoke(state['messages'])]} | |
def _convert_history_to_messages(self): | |
"""Convert self.history to LangChain-compatible messages""" | |
converted = [] | |
for msg in self.history: | |
content = msg['content'] | |
if not isinstance(content, str): | |
raise ValueError(f"Expected string content, got: {type(content)} — {content}") | |
if msg['role'] == 'user': | |
converted.append(HumanMessage(content=content)) | |
elif msg['role'] == 'assistant': | |
converted.append(AIMessage(content=content)) | |
elif msg['role'] == 'system': | |
converted.append(SystemMessage(content=content)) | |
else: | |
raise ValueError(f"Unknown role in message: {msg}") | |
self.history_messages = converted | |
def create_basic_tool_use_agent_state_graph(self, custom_tools_nm="tools"): | |
"""Binds tools, creates and compiles graph""" | |
self.llm = self.llm.bind_tools(self.tools) | |
# Graph Init | |
graph = StateGraph(AgentState) | |
# Nodes | |
graph.add_node('agent', self._call_llm) | |
tools_node = ToolNode(self.tools) | |
graph.add_node(custom_tools_nm, tools_node) | |
# Edges | |
graph.add_edge(START, "agent") | |
graph.add_conditional_edges('agent', tools_condition, {'tools': custom_tools_nm, END: END}) | |
self.graph = graph.compile() | |
def chat(self, query, verbose=2, only_final_answer=False): | |
"""Simple agent call""" | |
if isinstance(query, dict): | |
query = query["messages"] | |
user_message = {'role': 'user', 'content': query} | |
self.history.append(user_message) | |
# Ensure history has at least 1 message | |
if not self.history: | |
raise ValueError("History is empty. Cannot proceed.") | |
self._convert_history_to_messages() | |
if not self.history_messages: | |
raise ValueError("Converted message history is empty. Something went wrong.") | |
response = self.graph.invoke({'messages': self.history_messages}) # invoke with all the history to keep context (dummy mem) | |
response = response['messages'][-1].content | |
if only_final_answer: | |
final_answer_content = { | |
'query': query, | |
'response': response, | |
'metadata': {} | |
} | |
response = generate_final_answer(final_answer_content) | |
assistant_message = {'role': 'assistant', 'content': response} | |
self.history.append(assistant_message) | |
if verbose==2: | |
print_conversation(self.history) | |
elif verbose==1: | |
print_conversation([assistant_message]) | |
return response | |
def invoke(self, input_str: str): | |
"""Invoke the compiled graph with the input data""" | |
_ = self.chat(input_str) # prints response in terminal | |
self._convert_history_to_messages() | |
return {'messages': self.history_messages} | |
def chat_batch(self, queries=None, only_final_answer=False): | |
"""Send several simple agent calls to the llm using the compiled graph""" | |
if queries is None: | |
queries = [] | |
for i, query in tqdm(enumerate(queries, start=1)): | |
if i == len(queries): | |
self.chat(query, verbose=2, only_final_answer=only_final_answer) | |
else: | |
self.chat(query, verbose=0, only_final_answer=only_final_answer) |