|
|
|
|
|
import os |
|
import chromadb |
|
from dotenv import load_dotenv |
|
import json |
|
|
|
|
|
from langchain_core.documents import Document |
|
from langchain_core.runnables import RunnablePassthrough |
|
from langchain_core.output_parsers import StrOutputParser |
|
from langchain.prompts import ChatPromptTemplate |
|
from langchain.chains.query_constructor.base import AttributeInfo |
|
from langchain.retrievers.self_query.base import SelfQueryRetriever |
|
from langchain.retrievers.document_compressors import LLMChainExtractor, CrossEncoderReranker |
|
from langchain.retrievers import ContextualCompressionRetriever |
|
|
|
|
|
from langchain_community.vectorstores import Chroma |
|
from langchain_community.document_loaders import PyPDFDirectoryLoader, PyPDFLoader |
|
from langchain_community.cross_encoders import HuggingFaceCrossEncoder |
|
from langchain_experimental.text_splitter import SemanticChunker |
|
from langchain.text_splitter import ( |
|
CharacterTextSplitter, |
|
RecursiveCharacterTextSplitter |
|
) |
|
from langchain_core.tools import tool |
|
from langchain.agents import create_tool_calling_agent, AgentExecutor |
|
from langchain_core.prompts import ChatPromptTemplate |
|
|
|
|
|
from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI, ChatOpenAI |
|
from langchain.embeddings.openai import OpenAIEmbeddings |
|
|
|
|
|
from llama_parse import LlamaParse |
|
from llama_index.core import Settings, SimpleDirectoryReader |
|
|
|
|
|
from langgraph.graph import StateGraph, END, START |
|
|
|
|
|
from pydantic import BaseModel |
|
|
|
|
|
from typing import Dict, List, Tuple, Any, TypedDict |
|
|
|
|
|
import numpy as np |
|
from groq import Groq |
|
from mem0 import MemoryClient |
|
import streamlit as st |
|
from datetime import datetime |
|
|
|
|
|
|
|
api_key = os.getenv("API_KEY") |
|
endpoint = os.getenv("OPENAI_API_BASE") |
|
llama_api_key = os.environ['GROQ_API_KEY'] |
|
MEM0_api_key = os.environ['mem0'] |
|
|
|
|
|
embedding_function = chromadb.utils.embedding_functions.OpenAIEmbeddingFunction( |
|
api_base=endpoint, |
|
api_key=api_key, |
|
model_name='text-embedding-ada-002' |
|
) |
|
|
|
|
|
|
|
embedding_model = OpenAIEmbeddings( |
|
openai_api_base=endpoint, |
|
openai_api_key=api_key, |
|
model='text-embedding-ada-002' |
|
) |
|
|
|
|
|
|
|
llm = ChatOpenAI( |
|
openai_api_base=endpoint, |
|
openai_api_key=api_key, |
|
model="gpt-4o-mini", |
|
streaming=False |
|
) |
|
|
|
|
|
|
|
Settings.llm = llm |
|
Settings.embedding = embedding_model |
|
|
|
|
|
|
|
class AgentState(TypedDict): |
|
query: str |
|
expanded_query: str |
|
context: List[Dict[str, Any]] |
|
response: str |
|
precision_score: float |
|
groundedness_score: float |
|
groundedness_loop_count: int |
|
precision_loop_count: int |
|
feedback: str |
|
query_feedback: str |
|
groundedness_check: bool |
|
loop_max_iter: int |
|
|
|
def expand_query(state): |
|
""" |
|
Expands the user query to improve retrieval of nutrition disorder-related information. |
|
|
|
Args: |
|
state (Dict): The current state of the workflow, containing the user query. |
|
|
|
Returns: |
|
Dict: The updated state with the expanded query. |
|
""" |
|
print("---------Expanding Query---------") |
|
system_message = '''You are an AI specializing in improving search queries to retrieve the most relevant nutrition disorder-related information. |
|
Your task is to **refine** and **expand** the given query so that better search results are obtained, while **keeping the original intent** unchanged. |
|
|
|
Guidelines: |
|
- Add **specific details** where needed. Example: If a user asks about "anorexia," specify aspects like symptoms, causes, or treatment options. |
|
- Include **related terms** to improve retrieval (e.g., “bulimia” → “bulimia nervosa vs binge eating disorder”). |
|
- If the user provides an unclear query, suggest necessary clarifications. |
|
- **DO NOT** answer the question. Your job is only to enhance the query. |
|
|
|
Examples: |
|
1. User Query: "Tell me about eating disorders." |
|
Expanded Query: "Provide details on eating disorders, including types (e.g., anorexia nervosa, bulimia nervosa), symptoms, causes, and treatment options." |
|
|
|
2. User Query: "What is anorexia?" |
|
Expanded Query: "Explain anorexia nervosa, including its symptoms, causes, risk factors, and treatment options." |
|
|
|
3. User Query: "How to treat bulimia?" |
|
Expanded Query: "Describe treatment options for bulimia nervosa, including psychotherapy, medications, and lifestyle changes." |
|
|
|
4. User Query: "What are the effects of malnutrition?" |
|
Expanded Query: "Explain the effects of malnutrition on physical and mental health, including specific nutrient deficiencies and their consequences." |
|
|
|
Now, expand the following query:''' |
|
|
|
expand_prompt = ChatPromptTemplate.from_messages([ |
|
("system", system_message), |
|
("user", "Expand this query: {query} using the feedback: {query_feedback}") |
|
|
|
]) |
|
|
|
chain = expand_prompt | llm | StrOutputParser() |
|
expanded_query = chain.invoke({"query": state['query'], "query_feedback":state["query_feedback"]}) |
|
print("expanded_query", expanded_query) |
|
state["expanded_query"] = expanded_query |
|
return state |
|
|
|
|
|
|
|
vector_store = Chroma( |
|
collection_name="nutritional_hypotheticals", |
|
persist_directory="./nutritional_db", |
|
embedding_function=embedding_model |
|
|
|
) |
|
|
|
|
|
retriever = vector_store.as_retriever( |
|
search_type='similarity', |
|
search_kwargs={'k': 3} |
|
) |
|
|
|
def retrieve_context(state): |
|
""" |
|
Retrieves context from the vector store using the expanded or original query. |
|
|
|
Args: |
|
state (Dict): The current state of the workflow, containing the query and expanded query. |
|
|
|
Returns: |
|
Dict: The updated state with the retrieved context. |
|
""" |
|
print("---------retrieve_context---------") |
|
query = state['expanded_query'] |
|
|
|
|
|
|
|
docs = retriever.invoke(query) |
|
print("Retrieved documents:", docs) |
|
|
|
|
|
context= [ |
|
{ |
|
"content": doc.page_content, |
|
"metadata": doc.metadata |
|
} |
|
for doc in docs |
|
] |
|
state['context'] = context |
|
print("Extracted context with metadata:", context) |
|
|
|
return state |
|
|
|
|
|
|
|
def craft_response(state: Dict) -> Dict: |
|
""" |
|
Generates a response using the retrieved context, focusing on nutrition disorders. |
|
|
|
Args: |
|
state (Dict): The current state of the workflow, containing the query and retrieved context. |
|
|
|
Returns: |
|
Dict: The updated state with the generated response. |
|
""" |
|
system_message = '''You are a professional AI nutrition disorder specialist generating responses based on retrieved documents. |
|
Your task is to use the given **context** to generate a highly accurate, informative, and user-friendly response. |
|
|
|
Guidelines: |
|
- **Be direct and concise** while ensuring completeness. |
|
- **DO NOT include information that is not present in the context.** |
|
- If multiple sources exist, synthesize them into a coherent response. |
|
- If the context does not fully answer the query, state what additional information is needed. |
|
- Use bullet points when explaining complex concepts. |
|
|
|
Example: |
|
User Query: "What are the symptoms of anorexia nervosa?" |
|
Context: |
|
1. Anorexia nervosa is characterized by extreme weight loss and fear of gaining weight. |
|
2. Common symptoms include restricted eating, distorted body image, and excessive exercise. |
|
Response: |
|
"Anorexia nervosa is an eating disorder characterized by extreme weight loss and an intense fear of gaining weight. Common symptoms include: |
|
- Restricted eating |
|
- Distorted body image |
|
- Excessive exercise |
|
If you or someone you know is experiencing these symptoms, it is important to seek professional help."''' |
|
|
|
response_prompt = ChatPromptTemplate.from_messages([ |
|
("system", system_message), |
|
("user", "Query: {query}\nContext: {context}\n\nResponse:") |
|
]) |
|
|
|
chain = response_prompt | llm | StrOutputParser() |
|
state['response'] = chain.invoke({ |
|
"query": state['query'], |
|
"context": "\n".join([doc["content"] for doc in state['context']]) |
|
}) |
|
return state |
|
|
|
|
|
|
|
def score_groundedness(state: Dict) -> Dict: |
|
""" |
|
Checks whether the response is grounded in the retrieved context. |
|
|
|
Args: |
|
state (Dict): The current state of the workflow, containing the response and context. |
|
|
|
Returns: |
|
Dict: The updated state with the groundedness score. |
|
""" |
|
print("---------check_groundedness---------") |
|
system_message = '''You are an AI tasked with evaluating whether a response is grounded in the provided context and includes proper citations. |
|
|
|
Guidelines: |
|
1. **Groundedness Check**: |
|
- Verify that the response accurately reflects the information in the context. |
|
- Flag any unsupported claims or deviations from the context. |
|
|
|
2. **Citation Check**: |
|
- Ensure that the response includes citations to the source material (e.g., "According to [Source], ..."). |
|
- If citations are missing, suggest adding them. |
|
|
|
3. **Scoring**: |
|
- Assign a groundedness score between 0 and 1, where 1 means fully grounded and properly cited. |
|
|
|
Examples: |
|
1. Response: "Anorexia nervosa is caused by genetic factors (Source 1)." |
|
Context: "Anorexia nervosa is influenced by genetic, environmental, and psychological factors (Source 1)." |
|
Evaluation: "The response is grounded and properly cited. Groundedness score: 1.0." |
|
|
|
2. Response: "Bulimia nervosa can be cured with diet alone." |
|
Context: "Treatment for bulimia nervosa involves psychotherapy and medications (Source 2)." |
|
Evaluation: "The response is ungrounded and lacks citations. Groundedness score: 0.2." |
|
|
|
3. Response: "Anorexia nervosa has a high mortality rate." |
|
Context: "Anorexia nervosa has one of the highest mortality rates among psychiatric disorders (Source 3)." |
|
Evaluation: "The response is grounded but lacks a citation. Groundedness score: 0.7. ." |
|
|
|
****Return only a float score (e.g., 0.9). Do not provide explanations.**** |
|
|
|
Now, evaluate the following response: |
|
''' |
|
|
|
groundedness_prompt = ChatPromptTemplate.from_messages([ |
|
("system", system_message), |
|
("user", "Context: {context}\nResponse: {response}\n\nGroundedness score:") |
|
]) |
|
|
|
chain = groundedness_prompt | llm | StrOutputParser() |
|
groundedness_score = float(chain.invoke({ |
|
"context": "\n".join([doc["content"] for doc in state['context']]), |
|
"response": state['response'] |
|
})) |
|
print("groundedness_score: ",groundedness_score) |
|
state['groundedness_loop_count'] +=1 |
|
print("#########Groundedness Incremented###########") |
|
state['groundedness_score'] = groundedness_score |
|
return state |
|
|
|
|
|
|
|
def check_precision(state: Dict) -> Dict: |
|
""" |
|
Checks whether the response precisely addresses the user’s query. |
|
|
|
Args: |
|
state (Dict): The current state of the workflow, containing the query and response. |
|
|
|
Returns: |
|
Dict: The updated state with the precision score. |
|
""" |
|
print("---------check_precision---------") |
|
system_message = '''You are an AI evaluator assessing the **precision** of the response. |
|
Your task is to **score** how well the response addresses the user’s original nutrition disorder-related query. |
|
|
|
Scoring Criteria: |
|
- 1.0 → The response is fully precise, directly answering the question. |
|
- 0.7 → The response is mostly correct but contains some generalization. |
|
- 0.5 → The response is somewhat relevant but lacks key details. |
|
- 0.3 → The response is vague or only partially correct. |
|
- 0.0 → The response is incorrect or misleading. |
|
|
|
Examples: |
|
1. Query: "What are the symptoms of anorexia nervosa?" |
|
Response: "The symptoms of anorexia nervosa include extreme weight loss, fear of gaining weight, and a distorted body image." |
|
Precision Score: 1.0 |
|
|
|
2. Query: "How is bulimia nervosa treated?" |
|
Response: "Bulimia nervosa is treated with therapy and medications." |
|
Precision Score: 0.7 |
|
|
|
3. Query: "What causes binge eating disorder?" |
|
Response: "Binge eating disorder is caused by a combination of genetic, psychological, and environmental factors." |
|
Precision Score: 0.5 |
|
|
|
4. Query: "What are the effects of malnutrition?" |
|
Response: "Malnutrition can lead to health problems." |
|
Precision Score: 0.3 |
|
|
|
5. Query: "What is the mortality rate of anorexia nervosa?" |
|
Response: "Anorexia nervosa is a type of eating disorder." |
|
Precision Score: 0.0 |
|
|
|
*****Return only a float score (e.g., 0.9). Do not provide explanations.***** |
|
Now, evaluate the following query and response: |
|
''' |
|
precision_prompt = ChatPromptTemplate.from_messages([ |
|
("system", system_message), |
|
("user", "Query: {query}\nResponse: {response}\n\nPrecision score:") |
|
]) |
|
|
|
chain = precision_prompt | llm | StrOutputParser() |
|
precision_score = float(chain.invoke({ |
|
"query": state['query'], |
|
"response": state['response'] |
|
})) |
|
state['precision_score'] = precision_score |
|
print("precision_score:", precision_score) |
|
state['precision_loop_count'] +=1 |
|
print("#########Precision Incremented###########") |
|
return state |
|
|
|
|
|
|
|
def refine_response(state: Dict) -> Dict: |
|
""" |
|
Suggests improvements for the generated response. |
|
|
|
Args: |
|
state (Dict): The current state of the workflow, containing the query and response. |
|
|
|
Returns: |
|
Dict: The updated state with response refinement suggestions. |
|
""" |
|
print("---------refine_response---------") |
|
|
|
system_message = '''You are an AI response refinement assistant. Your task is to suggest **improvements** for the given response. |
|
|
|
### Guidelines: |
|
- Identify **gaps in the explanation** (missing key details). |
|
- Highlight **unclear or vague parts** that need elaboration. |
|
- Suggest **additional details** that should be included for better accuracy. |
|
- Ensure the refined response is **precise** and **grounded** in the retrieved context. |
|
|
|
### Examples: |
|
1. Query: "What are the symptoms of anorexia nervosa?" |
|
Response: "The symptoms include weight loss and fear of gaining weight." |
|
Suggestions: "The response is missing key details about behavioral and emotional symptoms. Add details like 'distorted body image' and 'restrictive eating patterns.'" |
|
|
|
2. Query: "How is bulimia nervosa treated?" |
|
Response: "Bulimia nervosa is treated with therapy." |
|
Suggestions: "The response is too vague. Specify the types of therapy (e.g., cognitive-behavioral therapy) and mention other treatments like nutritional counseling and medications." |
|
|
|
3. Query: "What causes binge eating disorder?" |
|
Response: "Binge eating disorder is caused by psychological factors." |
|
Suggestions: "The response is incomplete. Add details about genetic and environmental factors, and explain how they contribute to the disorder." |
|
|
|
Now, suggest improvements for the following response: |
|
''' |
|
|
|
refine_response_prompt = ChatPromptTemplate.from_messages([ |
|
("system", system_message), |
|
("user", "Query: {query}\nResponse: {response}\n\n" |
|
"What improvements can be made to enhance accuracy and completeness?") |
|
]) |
|
|
|
chain = refine_response_prompt | llm| StrOutputParser() |
|
|
|
|
|
feedback = f"Previous Response: {state['response']}\nSuggestions: {chain.invoke({'query': state['query'], 'response': state['response']})}" |
|
print("feedback: ", feedback) |
|
print(f"State: {state}") |
|
state['feedback'] = feedback |
|
return state |
|
|
|
|
|
|
|
def refine_query(state: Dict) -> Dict: |
|
""" |
|
Suggests improvements for the expanded query. |
|
|
|
Args: |
|
state (Dict): The current state of the workflow, containing the query and expanded query. |
|
|
|
Returns: |
|
Dict: The updated state with query refinement suggestions. |
|
""" |
|
print("---------refine_query---------") |
|
system_message = '''You are an AI query refinement assistant. Your task is to suggest **improvements** for the expanded query. |
|
|
|
### Guidelines: |
|
- Add **specific keywords** to improve document retrieval. |
|
- Identify **missing details** that should be included. |
|
- Suggest **ways to narrow the scope** for better precision. |
|
|
|
### Examples: |
|
1. Original Query: "Tell me about eating disorders." |
|
Expanded Query: "Provide details on eating disorders, including types, symptoms, causes, and treatment options." |
|
Suggestions: "Add specific types of eating disorders like 'anorexia nervosa' and 'bulimia nervosa' to improve retrieval." |
|
|
|
2. Original Query: "What is anorexia?" |
|
Expanded Query: "Explain anorexia nervosa, including its symptoms and causes." |
|
Suggestions: "Include details about treatment options and risk factors to make the query more comprehensive." |
|
|
|
3. Original Query: "How to treat bulimia?" |
|
Expanded Query: "Describe treatment options for bulimia nervosa." |
|
Suggestions: "Specify types of treatments like 'cognitive-behavioral therapy' and 'medications' for better precision." |
|
|
|
Now, suggest improvements for the following expanded query: |
|
''' |
|
|
|
refine_query_prompt = ChatPromptTemplate.from_messages([ |
|
("system", system_message), |
|
("user", "Original Query: {query}\nExpanded Query: {expanded_query}\n\n" |
|
"What improvements can be made for a better search?") |
|
]) |
|
|
|
chain = refine_query_prompt | llm | StrOutputParser() |
|
|
|
|
|
query_feedback = f"Previous Expanded Query: {state['expanded_query']}\nSuggestions: {chain.invoke({'query': state['query'], 'expanded_query': state['expanded_query']})}" |
|
print("query_feedback: ", query_feedback) |
|
print(f"Groundedness loop count: {state['groundedness_loop_count']}") |
|
state['query_feedback'] = query_feedback |
|
return state |
|
|
|
|
|
|
|
def should_continue_groundedness(state): |
|
"""Decides if groundedness is sufficient or needs improvement.""" |
|
print("---------should_continue_groundedness---------") |
|
print("groundedness loop count: ", state['groundedness_loop_count']) |
|
if state['groundedness_score'] >= 0.4: |
|
print("Moving to precision") |
|
return "check_precision" |
|
else: |
|
if state["groundedness_loop_count"] > state['loop_max_iter']: |
|
return "max_iterations_reached" |
|
else: |
|
print(f"---------Groundedness Score Threshold Not met. Refining Response-----------") |
|
return "refine_response" |
|
|
|
|
|
def should_continue_precision(state: Dict) -> str: |
|
"""Decides if precision is sufficient or needs improvement.""" |
|
print("---------should_continue_precision---------") |
|
print("precision loop count: ",state['precision_loop_count']) |
|
if state['precision_score'] >= 0.7: |
|
return "pass" |
|
else: |
|
if state['precision_loop_count'] > state['loop_max_iter']: |
|
return "max_iterations_reached" |
|
else: |
|
print(f"---------Precision Score Threshold Not met. Refining Query-----------") |
|
|
|
return "refine_query" |
|
|
|
|
|
|
|
def max_iterations_reached(state: Dict) -> Dict: |
|
"""Handles the case when the maximum number of iterations is reached.""" |
|
print("---------max_iterations_reached---------") |
|
"""Handles the case when the maximum number of iterations is reached.""" |
|
response = "I'm unable to refine the response further. Please provide more context or clarify your question." |
|
state['response'] = response |
|
return state |
|
|
|
|
|
|
|
def create_workflow() -> StateGraph: |
|
"""Creates the updated workflow for the AI nutrition agent.""" |
|
workflow = StateGraph(AgentState) |
|
|
|
|
|
workflow.add_node("expand_query", expand_query) |
|
workflow.add_node("retrieve_context", retrieve_context) |
|
workflow.add_node("craft_response", craft_response) |
|
workflow.add_node("score_groundedness", score_groundedness) |
|
workflow.add_node("refine_response", refine_response) |
|
workflow.add_node("check_precision", check_precision) |
|
workflow.add_node("refine_query", refine_query) |
|
workflow.add_node("max_iterations_reached", max_iterations_reached) |
|
|
|
|
|
workflow.add_edge(START, "expand_query") |
|
workflow.add_edge("expand_query", "retrieve_context") |
|
workflow.add_edge("retrieve_context", "craft_response") |
|
workflow.add_edge("craft_response", "score_groundedness") |
|
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
"score_groundedness", |
|
should_continue_groundedness, |
|
{ |
|
"check_precision": "check_precision", |
|
"refine_response": "refine_response", |
|
"max_iterations_reached": "max_iterations_reached" |
|
} |
|
) |
|
workflow.add_edge("refine_response", "craft_response") |
|
|
|
|
|
workflow.add_conditional_edges( |
|
"check_precision", |
|
should_continue_precision, |
|
{ |
|
"pass": END, |
|
"refine_query": "refine_query", |
|
"max_iterations_reached": "max_iterations_reached" |
|
} |
|
) |
|
workflow.add_edge("refine_query", "expand_query") |
|
|
|
workflow.add_edge("max_iterations_reached", END) |
|
|
|
|
|
|
|
return workflow |
|
|
|
|
|
|
|
|
|
WORKFLOW_APP = create_workflow().compile() |
|
@tool |
|
def agentic_rag(query: str): |
|
""" |
|
Runs the RAG-based agent with conversation history for context-aware responses. |
|
|
|
Args: |
|
query (str): The current user query. |
|
|
|
Returns: |
|
Dict[str, Any]: The updated state with the generated response and conversation history. |
|
""" |
|
|
|
inputs = { |
|
"query": query, |
|
"expanded_query": "", |
|
"context": [], |
|
"response": "", |
|
"precision_score": 0.0, |
|
"groundedness_score": 0.0, |
|
"groundedness_loop_count": 0, |
|
"precision_loop_count": 0, |
|
"feedback": "", |
|
"query_feedback":"", |
|
"loop_max_iter":2 |
|
|
|
} |
|
|
|
output = WORKFLOW_APP.invoke(inputs) |
|
|
|
return output |
|
|
|
|
|
|
|
llama_guard_client = Groq(api_key=llama_api_key) |
|
|
|
def filter_input_with_llama_guard(user_input, model="meta-llama/llama-guard-4-12b"): |
|
""" |
|
Filters user input using Llama Guard to ensure it is safe. |
|
|
|
Parameters: |
|
- user_input: The input provided by the user. |
|
- model: The Llama Guard model to be used for filtering (default is "llama-guard-3-8b"). |
|
|
|
Returns: |
|
- The filtered and safe input. |
|
""" |
|
try: |
|
|
|
response = llama_guard_client.chat.completions.create( |
|
messages=[{"role": "user", "content": user_input}], |
|
model=model, |
|
) |
|
|
|
return response.choices[0].message.content.strip() |
|
except Exception as e: |
|
print(f"Error with Llama Guard: {e}") |
|
return None |
|
|
|
|
|
|
|
|
|
class NutritionBot: |
|
def __init__(self): |
|
""" |
|
Initialize the NutritionBot class, setting up memory, the LLM client, tools, and the agent executor. |
|
""" |
|
|
|
|
|
self.memory = MemoryClient(api_key=MEM0_api_key) |
|
|
|
self.client = ChatOpenAI( |
|
model_name="gpt-4o-mini", |
|
api_key=api_key, |
|
base_url=endpoint, |
|
temperature=0, |
|
streaming=False |
|
) |
|
|
|
|
|
|
|
tools = [agentic_rag] |
|
|
|
|
|
system_prompt = """You are a caring and knowledgeable Medical Support Agent, specializing in nutrition disorder-related guidance. Your goal is to provide accurate, empathetic, and tailored nutritional recommendations while ensuring a seamless customer experience. |
|
Guidelines for Interaction: |
|
Maintain a polite, professional, and reassuring tone. |
|
Show genuine empathy for customer concerns and health challenges. |
|
Reference past interactions to provide personalized and consistent advice. |
|
Engage with the customer by asking about their food preferences, dietary restrictions, and lifestyle before offering recommendations. |
|
Ensure consistent and accurate information across conversations. |
|
If any detail is unclear or missing, proactively ask for clarification. |
|
Always use the agentic_rag tool to retrieve up-to-date and evidence-based nutrition insights. |
|
Keep track of ongoing issues and follow-ups to ensure continuity in support. |
|
Your primary goal is to help customers make informed nutrition decisions that align with their health conditions and personal preferences. |
|
|
|
""" |
|
|
|
|
|
prompt = ChatPromptTemplate.from_messages([ |
|
("system", system_prompt), |
|
("human", "{input}"), |
|
("placeholder", "{agent_scratchpad}") |
|
]) |
|
|
|
|
|
agent = create_tool_calling_agent(self.client, tools, prompt) |
|
|
|
|
|
self.agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) |
|
|
|
def store_customer_interaction(self, user_id: str, message: str, response: str, metadata: Dict = None): |
|
""" |
|
Store customer interaction in memory for future reference. |
|
|
|
Args: |
|
user_id (str): Unique identifier for the customer. |
|
message (str): Customer's query or message. |
|
response (str): Chatbot's response. |
|
metadata (Dict, optional): Additional metadata for the interaction. |
|
""" |
|
if metadata is None: |
|
metadata = {} |
|
|
|
|
|
metadata["timestamp"] = datetime.now().isoformat() |
|
|
|
|
|
conversation = [ |
|
{"role": "user", "content": message}, |
|
{"role": "assistant", "content": response} |
|
] |
|
|
|
|
|
self.memory.add( |
|
conversation, |
|
user_id=user_id, |
|
output_format="v1.1", |
|
metadata=metadata |
|
) |
|
|
|
def get_relevant_history(self, user_id: str, query: str) -> List[Dict]: |
|
""" |
|
Retrieve past interactions relevant to the current query. |
|
|
|
Args: |
|
user_id (str): Unique identifier for the customer. |
|
query (str): The customer's current query. |
|
|
|
Returns: |
|
List[Dict]: A list of relevant past interactions. |
|
""" |
|
return self.memory.search( |
|
query=query, |
|
user_id=user_id, |
|
limit=5 |
|
) |
|
|
|
def handle_customer_query(self, user_id: str, query: str) -> str: |
|
""" |
|
Process a customer's query and provide a response, taking into account past interactions. |
|
|
|
Args: |
|
user_id (str): Unique identifier for the customer. |
|
query (str): Customer's query. |
|
|
|
Returns: |
|
str: Chatbot's response. |
|
""" |
|
|
|
|
|
relevant_history = self.get_relevant_history(user_id, query) |
|
|
|
|
|
context = "Previous relevant interactions:\n" |
|
for memory in relevant_history: |
|
context += f"Customer: {memory['memory']}\n" |
|
context += f"Support: {memory['memory']}\n" |
|
context += "---\n" |
|
|
|
|
|
print("Context: ", context) |
|
|
|
|
|
prompt = f""" |
|
Context: |
|
{context} |
|
|
|
Current customer query: {query} |
|
|
|
Provide a helpful response that takes into account any relevant past interactions. |
|
""" |
|
|
|
|
|
response = self.agent_executor.invoke({"input": prompt}) |
|
|
|
|
|
self.store_customer_interaction( |
|
user_id=user_id, |
|
message=query, |
|
response=response["output"], |
|
metadata={"type": "support_query"} |
|
) |
|
|
|
|
|
return response['output'] |
|
|
|
|
|
|
|
def nutrition_disorder_streamlit(): |
|
""" |
|
A Streamlit-based UI for the Nutrition Disorder Specialist Agent. |
|
""" |
|
st.title("Welcome to the SK Nutrition Disorder Specialist") |
|
st.write("You can me anything about nutrition disorders, symptoms, causes, treatments, and more.") |
|
st.write(" >>> Example: What are the symptoms of Vitamin deficiency?") |
|
st.write("Type 'exit' to end the conversation.") |
|
|
|
|
|
if 'chat_history' not in st.session_state: |
|
st.session_state.chat_history = [] |
|
if 'user_id' not in st.session_state: |
|
st.session_state.user_id = None |
|
|
|
|
|
if st.session_state.user_id is None: |
|
with st.form("login_form", clear_on_submit=True): |
|
user_id = st.text_input("Please enter your name to begin:") |
|
submit_button = st.form_submit_button("Login") |
|
if submit_button and user_id: |
|
st.session_state.user_id = user_id |
|
st.session_state.chat_history.append({ |
|
"role": "assistant", |
|
"content": f"Welcome, {user_id}! How can I help you with nutrition disorders today?" |
|
}) |
|
st.session_state.login_submitted = True |
|
|
|
|
|
if st.session_state.get("login_submitted", False): |
|
st.session_state.pop("login_submitted") |
|
st.rerun() |
|
else: |
|
|
|
for message in st.session_state.chat_history: |
|
with st.chat_message(message["role"]): |
|
st.write(message["content"]) |
|
|
|
|
|
user_query = st.chat_input("Type your question here (or 'exit' to end)...") |
|
|
|
if user_query: |
|
|
|
if user_query.lower() == "exit": |
|
st.session_state.chat_history.append({"role": "user", "content": "exit"}) |
|
with st.chat_message("user"): |
|
st.write("exit") |
|
goodbye_msg = "Goodbye! Feel free to return if you have more questions about nutrition disorders." |
|
st.session_state.chat_history.append({"role": "assistant", "content": goodbye_msg}) |
|
with st.chat_message("assistant"): |
|
st.write(goodbye_msg) |
|
st.session_state.user_id = None |
|
st.rerun() |
|
return |
|
|
|
|
|
st.session_state.chat_history.append({"role": "user", "content": user_query}) |
|
with st.chat_message("user"): |
|
st.write(user_query) |
|
|
|
|
|
filtered_result = filter_input_with_llama_guard(user_query) |
|
|
|
|
|
|
|
with st.chat_message("assistant"): |
|
|
|
if filtered_result in ["safe", "S6", "S7"]: |
|
try: |
|
|
|
if 'chatbot' not in st.session_state: |
|
st.session_state.chatbot = NutritionBot() |
|
|
|
st.write("Please wait...") |
|
|
|
response = st.session_state.chatbot.handle_customer_query( |
|
st.session_state.user_id, |
|
user_query |
|
) |
|
|
|
st.write(response) |
|
st.session_state.chat_history.append({"role": "assistant", "content": response}) |
|
except Exception as e: |
|
error_msg = f"Sorry, I encountered an error while processing your query. Please try again. Error: {str(e)}" |
|
st.write(error_msg) |
|
st.session_state.chat_history.append({"role": "assistant", "content": error_msg}) |
|
else: |
|
inappropriate_msg = "I apologize, but I cannot process that input as it may be inappropriate. Please try again." |
|
st.write(inappropriate_msg) |
|
st.session_state.chat_history.append({"role": "assistant", "content": inappropriate_msg}) |
|
|
|
if __name__ == "__main__": |
|
nutrition_disorder_streamlit() |
|
|