AIToyBot / app.py
masadonline's picture
Update app.py
9a3c15f verified
import streamlit as st
import os
import time
from datetime import datetime, timezone
import json
import PyPDF2
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
from twilio.rest import Client
from groq import Groq
import re
# --- Page Configuration ---
st.set_page_config(page_title="RAG Customer Support Chatbot", layout="wide")
# --- Default Configurations & File Paths ---
DEFAULT_TWILIO_ACCOUNT_SID_FALLBACK = ""
DEFAULT_TWILIO_AUTH_TOKEN_FALLBACK = ""
DEFAULT_GROQ_API_KEY_FALLBACK = ""
#DEFAULT_TWILIO_CONVERSATION_SERVICE_SID = ""
DEFAULT_TWILIO_BOT_WHATSAPP_IDENTITY = st.secrets.get("TWILIO_PHONE_NUMBER", "whatsapp:+14155238886")
DEFAULT_EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
DEFAULT_POLLING_INTERVAL_S = 30
DOCS_FOLDER = "docs/"
CUSTOMER_ORDERS_FILE = os.path.join(DOCS_FOLDER, "CustomerOrders.json")
PRODUCTS_FILE = os.path.join(DOCS_FOLDER, "Products.json")
POLICY_PDF_FILE = os.path.join(DOCS_FOLDER, "ProductReturnPolicy.pdf")
FAQ_PDF_FILE = os.path.join(DOCS_FOLDER, "FAQ.pdf")
# --- Application Secrets Configuration ---
APP_TWILIO_ACCOUNT_SID = st.secrets.get("TWILIO_ACCOUNT_SID")
APP_TWILIO_AUTH_TOKEN = st.secrets.get("TWILIO_AUTH_TOKEN")
APP_GROQ_API_KEY = st.secrets.get("GROQ_API_KEY")
#APP_TWILIO_CONVERSATION_SERVICE_SID_SECRET = st.secrets.get("TWILIO_CONVERSATION_SERVICE_SID")
APP_TWILIO_BOT_WHATSAPP_IDENTITY_SECRET = st.secrets.get("TWILIO_BOT_WHATSAPP_IDENTITY")
# --- RAG Processing Utilities ---
def load_json_data(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
except FileNotFoundError:
st.error(f"Error: JSON file not found at {file_path}")
return None
except json.JSONDecodeError:
st.error(f"Error: Could not decode JSON from {file_path}")
return None
except Exception as e:
st.error(f"An unexpected error occurred while loading {file_path}: {e}")
return None
def load_pdf_data(file_path):
try:
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
text_pages = []
for page_num in range(len(reader.pages)):
page = reader.pages[page_num]
text_pages.append(page.extract_text() or "")
return text_pages
except FileNotFoundError:
st.error(f"Error: PDF file not found at {file_path}")
return []
except Exception as e:
st.error(f"An error occurred while processing PDF {file_path}: {e}")
return []
def chunk_text(text_pages, chunk_size=1000, chunk_overlap=200):
full_text = "\n".join(text_pages)
if not full_text.strip():
return []
chunks = []
start = 0
while start < len(full_text):
end = start + chunk_size
chunks.append(full_text[start:end])
if end >= len(full_text):
break
start += (chunk_size - chunk_overlap)
if start >= len(full_text):
break
return [chunk for chunk in chunks if chunk.strip()]
@st.cache_resource(show_spinner="Initializing embedding model...")
def initialize_embedding_model(model_name=DEFAULT_EMBEDDING_MODEL_NAME):
try:
model = SentenceTransformer(model_name)
return model
except Exception as e:
st.error(f"Error initializing embedding model '{model_name}': {e}")
return None
@st.cache_resource(show_spinner="Building FAISS index for PDF documents...")
def create_faiss_index(_text_chunks, _embedding_model):
if not _text_chunks or _embedding_model is None:
st.warning("Cannot create FAISS index: No text chunks or embedding model available.")
return None, []
try:
valid_chunks = [str(chunk) for chunk in _text_chunks if chunk and isinstance(chunk, str) and chunk.strip()]
if not valid_chunks:
st.warning("No valid text chunks to embed for FAISS index.")
return None, []
embeddings = _embedding_model.encode(valid_chunks, convert_to_tensor=False)
if embeddings.ndim == 1:
embeddings = embeddings.reshape(1, -1)
if embeddings.shape[0] == 0:
st.warning("No embeddings were generated for FAISS index.")
return None, []
dimension = embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(np.array(embeddings, dtype=np.float32))
return index, valid_chunks
except Exception as e:
st.error(f"Error creating FAISS index: {e}")
return None, []
def search_faiss_index(index, query_text, embedding_model, indexed_chunks, k=3):
if index is None or embedding_model is None or not query_text:
return []
try:
query_embedding = embedding_model.encode([query_text], convert_to_tensor=False)
if query_embedding.ndim == 1:
query_embedding = query_embedding.reshape(1, -1)
distances, indices = index.search(np.array(query_embedding, dtype=np.float32), k)
results = []
for i in range(len(indices[0])):
idx = indices[0][i]
if 0 <= idx < len(indexed_chunks):
results.append(indexed_chunks[idx])
return results
except Exception as e:
st.error(f"Error searching FAISS index: {e}")
return []
def get_order_details(order_id, customer_orders_data):
if not customer_orders_data:
return "Customer order data is not loaded."
for order in customer_orders_data:
if order.get("order_id") == order_id:
return json.dumps(order, indent=2)
return f"No order found with ID: {order_id}."
def get_product_info(query, products_data):
if not products_data:
st.warning("Product data is not loaded or is empty in get_product_info.")
return "Product data is not loaded."
query_lower = query.lower()
found_products = []
for product in products_data:
if not isinstance(product, dict):
continue
product_id_lower = str(product.get("Product_ID", "")).lower()
product_name_lower = str(product.get("Product_Name", "")).lower()
product_type_lower = str(product.get("Product_Type", "")).lower()
match = False
if product_id_lower and product_id_lower in query_lower:
match = True
if not match and product_name_lower:
if query_lower in product_name_lower or product_name_lower in query_lower:
match = True
if not match and product_type_lower:
if query_lower in product_type_lower or product_type_lower in query_lower:
match = True
if match:
found_products.append(product)
if found_products:
return json.dumps(found_products, indent=2)
return f"No product information found matching your query: '{query}'."
# --- LLM Operations ---
@st.cache_data(show_spinner="Generating response with LLaMA3...")
def generate_response_groq(_groq_client, query, context, model="llama3-8b-8192",
intent=None, customer_name=None, item_name=None,
shipping_address=None, delivery_date=None, order_id=None, order_status=None):
if not _groq_client:
return "GROQ client not initialized. Please check API key."
if not query:
return "Query is empty."
system_message = "You are a helpful customer support assistant."
user_prompt = ""
if intent == "ORDER_STATUS" and order_id and customer_name and order_status:
system_message = (
f"You are an exceptionally friendly and helpful customer support assistant. "
f"Your current task is to provide a single, complete, and human-like sentence as a response to {customer_name} "
f"about their order {order_id}. You MUST incorporate all relevant order details provided into this single sentence."
)
item_description = item_name if item_name else "the ordered item(s)"
# Construct the core information string that the LLM needs to build upon
core_info_parts = [
f"your order {order_id}",
f"for {item_description}",
f"has a status of '{order_status}'"
]
if order_status.lower() == "delivered":
if shipping_address:
core_info_parts.append(f"and was delivered to {shipping_address}")
else:
core_info_parts.append("and was delivered (address not specified)")
if delivery_date:
core_info_parts.append(f"on {delivery_date}")
else:
core_info_parts.append("(delivery date not specified)")
core_information_to_include = ", ".join(core_info_parts[:-1]) + (f" {core_info_parts[-1]}" if len(core_info_parts) > 1 else "")
if not order_status.lower() == "delivered" and len(core_info_parts) > 1 : # for non-delivered, avoid 'and' before status
core_information_to_include = f"your order {order_id} for {item_description} has a status of '{order_status}'"
user_prompt = (
f"Customer: {customer_name}\n"
f"Order ID: {order_id}\n"
f"Item(s): {item_description}\n"
f"Status: {order_status}\n"
)
if order_status.lower() == "delivered":
user_prompt += f"Shipping Address: {shipping_address if shipping_address else 'Not specified'}\n"
user_prompt += f"Delivered On: {delivery_date if delivery_date else 'Not specified'}\n"
user_prompt += f"\nOriginal user query for context: '{query}'\n\n"
user_prompt += (
f"Your task: Generate a single, complete, and human-like sentence that starts with a greeting to {customer_name}. "
f"This sentence MUST convey the following essential information: {core_information_to_include}.\n"
f"For example, if all details are present for a delivered order: 'Hi {customer_name}, {core_information_to_include}.'\n"
f"For example, for a non-delivered order: 'Hi {customer_name}, {core_information_to_include}.'\n"
f"IMPORTANT: Do not ask questions. Do not add any extra conversational fluff. Just provide the single, informative sentence as requested. "
f"Ensure the sentence flows naturally and uses the details you've been given.\n"
f"Respond now with ONLY that single sentence."
)
# For LLM's deeper reference, though the primary instruction is above:
# user_prompt += f"\n\nFull database context for your reference if needed: {context}"
else: # Default prompt structure for other intents or if details are missing
system_message = "You are a helpful customer support assistant."
user_prompt = f"""Use the following context to answer the user's question.
If the context doesn't contain the answer, state that you don't have enough information or ask clarifying questions.
Do not make up information. Be concise and polite.
Context:
{context}
User Question: {query}
Assistant Answer:
"""
try:
chat_completion = _groq_client.chat.completions.create(
messages=[
{"role": "system", "content": system_message},
{"role": "user", "content": user_prompt}
],
model=model,
temperature=0.5, # Slightly lower temperature might help with stricter adherence
max_tokens=1024,
top_p=1
)
response = chat_completion.choices[0].message.content.strip() # Added strip()
return response
except Exception as e:
st.error(f"Error calling GROQ API: {e}")
return "Sorry, I encountered an error while trying to generate a response."
def initialize_groq_client(api_key_val):
if not api_key_val:
st.warning("GROQ API Key is missing.")
return None
try:
client = Groq(api_key=api_key_val)
return client
except Exception as e:
st.error(f"Failed to initialize GROQ client: {e}")
return None
# --- Twilio Operations ---
def initialize_twilio_client(acc_sid, auth_tkn):
if not acc_sid or not auth_tkn:
st.warning("Twilio Account SID or Auth Token is missing.")
return None
try:
client = Client(acc_sid, auth_tkn)
return client
except Exception as e:
st.error(f"Failed to initialize Twilio client: {e}")
return None
def get_new_whatsapp_messages(twilio_client, bot_start_time_utc, processed_message_sids, bot_whatsapp_identity_val):
if not twilio_client:
st.warning("Twilio client not initialized.")
return []
if not bot_whatsapp_identity_val:
st.warning("Twilio Bot WhatsApp Identity not provided.")
return []
new_messages_to_process = []
try:
# Get all conversations (not limited to a specific service)
conversations = twilio_client.conversations.v1.conversations.list(limit=50)
for conv in conversations:
if conv.date_updated and conv.date_updated > bot_start_time_utc:
messages = twilio_client.conversations.v1 \
.conversations(conv.sid) \
.messages \
.list(order='desc', limit=10)
for msg in messages:
if msg.sid in processed_message_sids:
continue
# Check if message is from WhatsApp and not from the bot
if msg.author and msg.author.lower() != bot_whatsapp_identity_val.lower() and \
msg.date_created and msg.date_created > bot_start_time_utc and \
msg.author.startswith('whatsapp:'):
new_messages_to_process.append({
"conversation_sid": conv.sid, "message_sid": msg.sid,
"author_identity": msg.author, "message_body": msg.body,
"timestamp_utc": msg.date_created
})
break
except Exception as e:
st.error(f"Error fetching Twilio messages: {e}")
return sorted(new_messages_to_process, key=lambda m: m['timestamp_utc'])
def send_whatsapp_message(twilio_client, conversation_sid, message_body, bot_identity_val):
if not twilio_client:
st.error("Twilio client not initialized for sending message.")
return False
if not bot_identity_val:
st.error("Bot identity not provided for sending message.")
return False
try:
twilio_client.conversations.v1 \
.conversations(conversation_sid) \
.messages \
.create(author=bot_identity_val, body=message_body)
st.success(f"Sent reply to conversation {conversation_sid}")
st.write(f"Twilio response to send: {message_body}")
print(f"[Twilio Send] Sending response: {message_body}")
return True
except Exception as e:
st.error(f"Error sending Twilio message to {conversation_sid}: {e}")
return False
# --- Main Application Logic & UI ---
st.title("🤖 RAG-Based Customer Support Chatbot")
st.markdown("Powered by Streamlit, Twilio, GROQ LLaMA3, and FAISS.")
# --- Sidebar for Configurations ---
st.sidebar.title("⚙️ Configurations")
if APP_TWILIO_ACCOUNT_SID:
st.sidebar.text_input("Twilio Account SID (from Secrets)", value="********" + APP_TWILIO_ACCOUNT_SID[-4:] if len(APP_TWILIO_ACCOUNT_SID) > 4 else "********", disabled=True)
twilio_account_sid_to_use = APP_TWILIO_ACCOUNT_SID
else:
st.sidebar.warning("Secret 'TWILIO_ACCOUNT_SID' not found.")
twilio_account_sid_to_use = st.sidebar.text_input("Twilio Account SID (Enter Manually)", value=DEFAULT_TWILIO_ACCOUNT_SID_FALLBACK, type="password")
if APP_TWILIO_AUTH_TOKEN:
st.sidebar.text_input("Twilio Auth Token (from Secrets)", value="********", disabled=True)
twilio_auth_token_to_use = APP_TWILIO_AUTH_TOKEN
else:
st.sidebar.warning("Secret 'TWILIO_AUTH_TOKEN' not found.")
twilio_auth_token_to_use = st.sidebar.text_input("Twilio Auth Token (Enter Manually)", value=DEFAULT_TWILIO_AUTH_TOKEN_FALLBACK, type="password")
if APP_GROQ_API_KEY:
st.sidebar.text_input("GROQ API Key (from Secrets)", value="gsk_********" + APP_GROQ_API_KEY[-4:] if len(APP_GROQ_API_KEY) > 8 else "********", disabled=True)
groq_api_key_to_use = APP_GROQ_API_KEY
else:
st.sidebar.warning("Secret 'GROQ_API_KEY' not found.")
groq_api_key_to_use = st.sidebar.text_input("GROQ API Key (Enter Manually)", value=DEFAULT_GROQ_API_KEY_FALLBACK, type="password")
# twilio_conversation_service_sid_to_use = st.sidebar.text_input(
# "Twilio Conversation Service SID (IS...)",
# value=APP_TWILIO_CONVERSATION_SERVICE_SID_SECRET or DEFAULT_TWILIO_CONVERSATION_SERVICE_SID,
# type="password",
# help="The SID of your Twilio Conversations Service. Can be set by 'TWILIO_CONVERSATION_SERVICE_SID' secret."
# )
twilio_bot_whatsapp_identity_to_use = st.sidebar.text_input(
"Twilio Bot WhatsApp Identity",
value=APP_TWILIO_BOT_WHATSAPP_IDENTITY_SECRET or DEFAULT_TWILIO_BOT_WHATSAPP_IDENTITY,
help="e.g., 'whatsapp:+1234567890'. Can be set by 'TWILIO_BOT_WHATSAPP_IDENTITY' secret."
)
embedding_model_name_to_use = st.sidebar.text_input(
"Embedding Model Name",
value=DEFAULT_EMBEDDING_MODEL_NAME
)
polling_interval_to_use = st.sidebar.number_input(
"Twilio Polling Interval (seconds)",
min_value=10, max_value=300,
value=DEFAULT_POLLING_INTERVAL_S,
step=5
)
# --- Initialize Session State ---
if "app_started" not in st.session_state: st.session_state.app_started = False
if "bot_started" not in st.session_state: st.session_state.bot_started = False
if "rag_pipeline_ready" not in st.session_state: st.session_state.rag_pipeline_ready = False
if "last_twilio_poll_time" not in st.session_state: st.session_state.last_twilio_poll_time = time.time()
if "bot_start_time_utc" not in st.session_state: st.session_state.bot_start_time_utc = None
if "processed_message_sids" not in st.session_state: st.session_state.processed_message_sids = set()
if "manual_chat_history" not in st.session_state: st.session_state.manual_chat_history = []
# --- Helper: Simple Intent Classifier ---
def simple_intent_classifier(query):
query_lower = query.lower()
order_keywords = ["order", "status", "track", "delivery"]
order_id_match = re.search(r'\b(ord\d{3,})\b', query_lower, re.IGNORECASE)
if any(k in query_lower for k in order_keywords):
if order_id_match:
return "ORDER_STATUS", order_id_match.group(1).upper()
return "ORDER_STATUS", None
product_keywords = ["product", "item", "buy", "price", "feature", "stock"]
product_id_match = re.search(r'\b(prd\d{3,})\b', query_lower, re.IGNORECASE)
if any(k in query_lower for k in product_keywords) or product_id_match:
return "PRODUCT_INFO", None
if any(k in query_lower for k in ["return", "policy", "refund", "exchange", "faq", "question", "how to", "support"]):
return "GENERAL_POLICY_FAQ", None
return "UNKNOWN", None
# --- Main Application Controls ---
col1, col2, col3, col4 = st.columns(4)
with col1:
if st.button("🚀 Start App", disabled=st.session_state.app_started, use_container_width=True):
if not groq_api_key_to_use:
st.error("GROQ API Key is required.")
else:
with st.spinner("Initializing RAG pipeline..."):
st.session_state.embedding_model = initialize_embedding_model(embedding_model_name_to_use)
st.session_state.customer_orders_data = load_json_data(CUSTOMER_ORDERS_FILE)
st.session_state.products_data = load_json_data(PRODUCTS_FILE)
policy_pdf_pages = load_pdf_data(POLICY_PDF_FILE)
faq_pdf_pages = load_pdf_data(FAQ_PDF_FILE)
all_pdf_text_pages = policy_pdf_pages + faq_pdf_pages
st.session_state.pdf_text_chunks_raw = chunk_text(all_pdf_text_pages)
if st.session_state.embedding_model and st.session_state.pdf_text_chunks_raw:
st.session_state.faiss_index_pdfs, st.session_state.indexed_pdf_chunks = \
create_faiss_index(st.session_state.pdf_text_chunks_raw, st.session_state.embedding_model)
else:
st.session_state.faiss_index_pdfs, st.session_state.indexed_pdf_chunks = None, []
st.warning("FAISS index for PDFs could not be created (model or chunks missing).")
st.session_state.groq_client = initialize_groq_client(groq_api_key_to_use)
if st.session_state.embedding_model and \
st.session_state.groq_client and \
st.session_state.customer_orders_data is not None and \
st.session_state.products_data is not None and \
(st.session_state.faiss_index_pdfs is not None or not all_pdf_text_pages):
st.session_state.rag_pipeline_ready = True
st.session_state.app_started = True
st.success("RAG Application Started!")
st.rerun()
else:
error_messages = []
if not st.session_state.embedding_model: error_messages.append("Embedding model failed to initialize.")
if not st.session_state.groq_client: error_messages.append("GROQ client failed to initialize.")
if st.session_state.customer_orders_data is None: error_messages.append(f"CustomerOrders.json ({CUSTOMER_ORDERS_FILE}) failed to load.")
if st.session_state.products_data is None: error_messages.append(f"Products.json ({PRODUCTS_FILE}) failed to load.")
if all_pdf_text_pages and st.session_state.faiss_index_pdfs is None: error_messages.append("PDF FAISS index failed to create.")
st.error("Failed to initialize RAG pipeline. Issues:\n- " + "\n- ".join(error_messages) + "\nCheck configurations and ensure all data files are present in 'docs/'.")
st.session_state.app_started = False
with col2:
if st.button("🛑 Stop App", disabled=not st.session_state.app_started, use_container_width=True):
keys_to_reset = ["app_started", "bot_started", "rag_pipeline_ready", "embedding_model",
"customer_orders_data", "products_data", "pdf_text_chunks_raw",
"faiss_index_pdfs", "indexed_pdf_chunks", "groq_client", "twilio_client",
"bot_start_time_utc", "processed_message_sids", "manual_chat_history"]
for key in keys_to_reset:
if key in st.session_state: del st.session_state[key]
st.session_state.app_started = False
st.session_state.bot_started = False
st.session_state.rag_pipeline_ready = False
st.session_state.processed_message_sids = set()
st.session_state.manual_chat_history = []
st.success("Application Stopped.")
st.rerun()
with col3:
if st.button("💬 Start WhatsApp Bot", disabled=not st.session_state.app_started or st.session_state.bot_started, use_container_width=True):
if not all([twilio_account_sid_to_use, twilio_auth_token_to_use, twilio_bot_whatsapp_identity_to_use]):
st.error("Twilio Account SID, Auth Token, Conversation Service SID, and Bot WhatsApp Identity are all required.")
else:
st.session_state.twilio_client = initialize_twilio_client(twilio_account_sid_to_use, twilio_auth_token_to_use)
if st.session_state.twilio_client:
st.session_state.bot_started = True
st.session_state.bot_start_time_utc = datetime.now(timezone.utc)
st.session_state.processed_message_sids = set()
st.session_state.last_twilio_poll_time = time.time() - polling_interval_to_use - 1
st.success("WhatsApp Bot Started!")
st.rerun()
else:
st.error("Failed to initialize Twilio client. WhatsApp Bot not started.")
with col4:
if st.button("🔕 Stop WhatsApp Bot", disabled=not st.session_state.bot_started, use_container_width=True):
st.session_state.bot_started = False
st.info("WhatsApp Bot Stopped.")
st.rerun()
st.divider()
# --- Manual Query Interface ---
if st.session_state.get("app_started") and st.session_state.get("rag_pipeline_ready"):
st.subheader("💬 Manual Query")
for chat_entry in st.session_state.manual_chat_history:
with st.chat_message(chat_entry["role"]):
st.markdown(chat_entry["content"])
if "context" in chat_entry and chat_entry["context"]:
with st.expander("Retrieved Context"):
try:
if isinstance(chat_entry["context"], str) and \
(chat_entry["context"].strip().startswith('{') or chat_entry["context"].strip().startswith('[')):
st.json(json.loads(chat_entry["context"]))
elif isinstance(chat_entry["context"], list):
st.json(chat_entry["context"])
else:
st.text(str(chat_entry["context"]))
except (json.JSONDecodeError, TypeError):
st.text(str(chat_entry["context"]))
user_query_manual = st.chat_input("Ask a question:")
if user_query_manual:
st.session_state.manual_chat_history.append({"role": "user", "content": user_query_manual})
with st.chat_message("user"): st.markdown(user_query_manual)
with st.spinner("Thinking..."):
intent_result = simple_intent_classifier(user_query_manual)
intent = intent_result[0]
potential_oid_from_intent = intent_result[1]
context_for_llm, raw_context_data = "No specific context could be retrieved.", None
extracted_customer_name, extracted_item_name, extracted_shipping_address, \
extracted_delivery_date, extracted_order_id, extracted_order_status = [None] * 6
if intent == "ORDER_STATUS":
order_id_to_check = None
if potential_oid_from_intent:
order_id_to_check = potential_oid_from_intent
else:
match_manual = re.search(r'\b(ord\d{3,})\b', user_query_manual.lower(), re.IGNORECASE)
if match_manual:
order_id_to_check = match_manual.group(1).upper()
if order_id_to_check:
raw_context_data = get_order_details(order_id_to_check, st.session_state.customer_orders_data)
# context_for_llm will be used as the 'context' parameter in generate_response_groq
# For ORDER_STATUS, this raw_context_data (JSON string) is still useful for LLM's reference,
# even though specific fields are extracted for the specialized prompt.
context_for_llm = raw_context_data
if isinstance(raw_context_data, str) and not raw_context_data.startswith("No order found") and not raw_context_data.startswith("Customer order data is not loaded"):
try:
order_data_dict = json.loads(raw_context_data)
extracted_customer_name = order_data_dict.get("customer_name")
items = order_data_dict.get("items")
if items and len(items) > 0 and isinstance(items[0], dict):
extracted_item_name = items[0].get("name", "your item(s)")
else:
extracted_item_name = "your item(s)" # Fallback
extracted_shipping_address = order_data_dict.get("shipping_address")
extracted_delivery_date = order_data_dict.get("delivered_on")
extracted_order_status = order_data_dict.get("status")
extracted_order_id = order_data_dict.get("order_id") # Should be same as order_id_to_check
except json.JSONDecodeError:
st.warning(f"Could not parse order details JSON for {order_id_to_check} for personalization.")
context_for_llm = f"Error parsing order details for {order_id_to_check}. Raw data: {raw_context_data}"
elif isinstance(raw_context_data, str): # Handle "No order found" or "data not loaded"
context_for_llm = raw_context_data # LLM will state this
else:
context_for_llm = "To check an order status, please provide a valid Order ID (e.g., ORD123)."
raw_context_data = {"message": "Order ID needed or not found in query."}
elif intent == "PRODUCT_INFO":
raw_context_data = get_product_info(user_query_manual, st.session_state.products_data)
context_for_llm = raw_context_data # Product info is directly used as context
elif intent == "GENERAL_POLICY_FAQ" or intent == "UNKNOWN":
if st.session_state.faiss_index_pdfs and st.session_state.embedding_model and st.session_state.indexed_pdf_chunks:
k_val = 3 if intent == "GENERAL_POLICY_FAQ" else 2
retrieved_chunks = search_faiss_index(st.session_state.faiss_index_pdfs, user_query_manual,
st.session_state.embedding_model, st.session_state.indexed_pdf_chunks, k=k_val)
if retrieved_chunks:
context_for_llm = "Relevant information from documents:\n\n" + "\n\n---\n\n".join(retrieved_chunks)
raw_context_data = retrieved_chunks
else:
context_for_llm = "I couldn't find specific information in our policy or FAQ documents regarding your query."
raw_context_data = {"message": "No relevant PDF chunks found."}
else:
context_for_llm = "Our policy and FAQ documents are currently unavailable for search."
raw_context_data = {"message": "PDF index or embedding model not ready."}
llm_response = generate_response_groq(
_groq_client=st.session_state.groq_client,
query=user_query_manual,
context=context_for_llm,
intent=intent,
customer_name=extracted_customer_name,
item_name=extracted_item_name,
shipping_address=extracted_shipping_address,
delivery_date=extracted_delivery_date,
order_id=extracted_order_id, # This will be the specific order ID from user query
order_status=extracted_order_status
)
with st.chat_message("assistant"):
st.markdown(llm_response)
if raw_context_data:
with st.expander("Retrieved Context For Assistant"):
try:
if isinstance(raw_context_data, str) and \
(raw_context_data.strip().startswith('{') or raw_context_data.strip().startswith('[')):
st.json(json.loads(raw_context_data))
elif isinstance(raw_context_data, list):
st.json(raw_context_data)
else:
st.text(str(raw_context_data))
except (json.JSONDecodeError, TypeError):
st.text(str(raw_context_data))
st.session_state.manual_chat_history.append({"role": "assistant", "content": llm_response, "context": raw_context_data})
st.rerun()
# --- Twilio Bot Polling Logic ---
if st.session_state.get("bot_started") and st.session_state.get("rag_pipeline_ready"):
current_time = time.time()
if "last_twilio_poll_time" not in st.session_state:
st.session_state.last_twilio_poll_time = current_time - polling_interval_to_use - 1
if (current_time - st.session_state.last_twilio_poll_time) > polling_interval_to_use:
st.session_state.last_twilio_poll_time = current_time
if not st.session_state.get("twilio_client") or not twilio_bot_whatsapp_identity_to_use or not st.session_state.get("bot_start_time_utc"):
st.warning("Twilio client/config missing for polling. Ensure bot is started and WhatsApp identity is set.")
else:
with st.spinner(f"Checking WhatsApp messages (last poll: {datetime.fromtimestamp(st.session_state.last_twilio_poll_time).strftime('%H:%M:%S')})..."):
new_messages = get_new_whatsapp_messages(
st.session_state.twilio_client,
st.session_state.bot_start_time_utc,
st.session_state.processed_message_sids,
twilio_bot_whatsapp_identity_to_use
)
if new_messages:
st.info(f"Found {len(new_messages)} new WhatsApp message(s) to process.")
for msg_data in new_messages:
user_query_whatsapp, conv_sid, msg_sid, author_id = \
msg_data["message_body"], msg_data["conversation_sid"], \
msg_data["message_sid"], msg_data["author_identity"]
st.write(f"Processing WhatsApp message from {author_id} in conversation {conv_sid}: '{user_query_whatsapp}' (SID: {msg_sid})")
intent_result_whatsapp = simple_intent_classifier(user_query_whatsapp)
intent_whatsapp = intent_result_whatsapp[0]
potential_oid_whatsapp = intent_result_whatsapp[1]
context_for_llm_whatsapp = "No specific context could be retrieved."
raw_context_data_whatsapp = None
wa_customer_name, wa_item_name, wa_shipping_address, \
wa_delivery_date, wa_order_id, wa_order_status = [None] * 6
if intent_whatsapp == "ORDER_STATUS":
order_id_to_check_whatsapp = None
if potential_oid_whatsapp:
order_id_to_check_whatsapp = potential_oid_whatsapp
else:
match_whatsapp = re.search(r'\b(ord\d{3,})\b', user_query_whatsapp.lower(), re.IGNORECASE)
if match_whatsapp:
order_id_to_check_whatsapp = match_whatsapp.group(1).upper()
if order_id_to_check_whatsapp:
raw_context_data_whatsapp = get_order_details(order_id_to_check_whatsapp, st.session_state.customer_orders_data)
context_for_llm_whatsapp = raw_context_data_whatsapp # Full JSON string as context
if isinstance(raw_context_data_whatsapp, str) and not raw_context_data_whatsapp.startswith("No order found") and not raw_context_data_whatsapp.startswith("Customer order data is not loaded"):
try:
order_data_dict_wa = json.loads(raw_context_data_whatsapp)
wa_customer_name = order_data_dict_wa.get("customer_name")
items_wa = order_data_dict_wa.get("items")
if items_wa and len(items_wa) > 0 and isinstance(items_wa[0], dict):
wa_item_name = items_wa[0].get("name", "your item(s)")
else:
wa_item_name = "your item(s)"
wa_shipping_address = order_data_dict_wa.get("shipping_address")
wa_delivery_date = order_data_dict_wa.get("delivered_on")
wa_order_status = order_data_dict_wa.get("status")
wa_order_id = order_data_dict_wa.get("order_id")
except json.JSONDecodeError:
st.warning(f"Could not parse order details JSON for {order_id_to_check_whatsapp} (WhatsApp) for personalization.")
context_for_llm_whatsapp = f"Error parsing order details for {order_id_to_check_whatsapp}. Raw data: {raw_context_data_whatsapp}"
elif isinstance(raw_context_data_whatsapp, str):
context_for_llm_whatsapp = raw_context_data_whatsapp
else:
context_for_llm_whatsapp = "To check an order status, please provide a valid Order ID (e.g., ORD123)."
raw_context_data_whatsapp = {"message": "Order ID needed or not found in query."}
elif intent_whatsapp == "PRODUCT_INFO":
raw_context_data_whatsapp = get_product_info(user_query_whatsapp, st.session_state.products_data)
context_for_llm_whatsapp = raw_context_data_whatsapp
elif intent_whatsapp == "GENERAL_POLICY_FAQ" or intent_whatsapp == "UNKNOWN":
if st.session_state.faiss_index_pdfs and st.session_state.embedding_model and st.session_state.indexed_pdf_chunks:
k_val_whatsapp = 3 if intent_whatsapp == "GENERAL_POLICY_FAQ" else 2
chunks_whatsapp = search_faiss_index(st.session_state.faiss_index_pdfs, user_query_whatsapp,
st.session_state.embedding_model, st.session_state.indexed_pdf_chunks, k=k_val_whatsapp)
if chunks_whatsapp:
context_for_llm_whatsapp = "Relevant information from documents:\n\n" + "\n\n---\n\n".join(chunks_whatsapp)
raw_context_data_whatsapp = chunks_whatsapp
else:
context_for_llm_whatsapp = "I couldn't find specific information in our policy or FAQ documents regarding your query."
raw_context_data_whatsapp = {"message": "No relevant PDF chunks found."}
else:
context_for_llm_whatsapp = "Our policy and FAQ documents are currently unavailable for search."
raw_context_data_whatsapp = {"message": "PDF index or embedding model not ready."}
response_whatsapp = generate_response_groq(
_groq_client=st.session_state.groq_client,
query=user_query_whatsapp,
context=context_for_llm_whatsapp,
intent=intent_whatsapp,
customer_name=wa_customer_name,
item_name=wa_item_name,
shipping_address=wa_shipping_address,
delivery_date=wa_delivery_date,
order_id=wa_order_id,
order_status=wa_order_status
).strip().replace('\n', ' ')
if send_whatsapp_message(
st.session_state.twilio_client,
conv_sid,
response_whatsapp,
twilio_bot_whatsapp_identity_to_use
):
st.session_state.processed_message_sids.add(msg_sid)
#print(f"[Twilio Send] Sending response: {message_body}")
st.success(f"Successfully responded to WhatsApp message SID {msg_sid} from {author_id}.")
else:
st.error(f"Failed to send WhatsApp response for message SID {msg_sid} from {author_id}.")
st.rerun()
# --- Footer & Status ---
st.sidebar.markdown("---")
st.sidebar.info("Ensure all keys and SIDs are correctly configured. Primary API keys (Twilio SID/Token, GROQ Key) are loaded from secrets if available.")
if st.session_state.get("app_started"):
status_color = "green" if st.session_state.get("rag_pipeline_ready") else "orange"
app_status_text = "App RUNNING" if st.session_state.get("rag_pipeline_ready") else "App Initializing/Error"
bot_status_text = "WhatsApp Bot RUNNING" if st.session_state.get("bot_started") else "WhatsApp Bot STOPPED"
st.sidebar.markdown(f"<span style='color:{status_color};'>{app_status_text}</span>. {bot_status_text}.", unsafe_allow_html=True)
else:
st.sidebar.warning("App is STOPPED.")
#Chatbot is sending multiple messages with twilio. I want same response as per manual query.
# --- Simulated background loop using rerun ---
if st.session_state.get("bot_started") and st.session_state.get("rag_pipeline_ready"):
current_time = time.time()
last_poll = st.session_state.get("last_twilio_poll_time", 0)
interval = polling_interval_to_use # 30 by default from sidebar
if current_time - last_poll >= interval:
st.session_state.last_twilio_poll_time = current_time
st.rerun()
else:
# Wait the remaining time before rerunning
time_remaining = interval - (current_time - last_poll)
time.sleep(min(5, time_remaining)) # Avoid sleeping too long
st.rerun()