Spaces:
Sleeping
Sleeping
import streamlit as st | |
import os | |
import time | |
from datetime import datetime, timezone | |
import json | |
import PyPDF2 | |
from sentence_transformers import SentenceTransformer | |
import faiss | |
import numpy as np | |
from twilio.rest import Client | |
from groq import Groq | |
import re | |
# --- Page Configuration --- | |
st.set_page_config(page_title="RAG Customer Support Chatbot", layout="wide") | |
# --- Default Configurations & File Paths --- | |
DEFAULT_TWILIO_ACCOUNT_SID_FALLBACK = "" | |
DEFAULT_TWILIO_AUTH_TOKEN_FALLBACK = "" | |
DEFAULT_GROQ_API_KEY_FALLBACK = "" | |
#DEFAULT_TWILIO_CONVERSATION_SERVICE_SID = "" | |
DEFAULT_TWILIO_BOT_WHATSAPP_IDENTITY = st.secrets.get("TWILIO_PHONE_NUMBER", "whatsapp:+14155238886") | |
DEFAULT_EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" | |
DEFAULT_POLLING_INTERVAL_S = 30 | |
DOCS_FOLDER = "docs/" | |
CUSTOMER_ORDERS_FILE = os.path.join(DOCS_FOLDER, "CustomerOrders.json") | |
PRODUCTS_FILE = os.path.join(DOCS_FOLDER, "Products.json") | |
POLICY_PDF_FILE = os.path.join(DOCS_FOLDER, "ProductReturnPolicy.pdf") | |
FAQ_PDF_FILE = os.path.join(DOCS_FOLDER, "FAQ.pdf") | |
# --- Application Secrets Configuration --- | |
APP_TWILIO_ACCOUNT_SID = st.secrets.get("TWILIO_ACCOUNT_SID") | |
APP_TWILIO_AUTH_TOKEN = st.secrets.get("TWILIO_AUTH_TOKEN") | |
APP_GROQ_API_KEY = st.secrets.get("GROQ_API_KEY") | |
#APP_TWILIO_CONVERSATION_SERVICE_SID_SECRET = st.secrets.get("TWILIO_CONVERSATION_SERVICE_SID") | |
APP_TWILIO_BOT_WHATSAPP_IDENTITY_SECRET = st.secrets.get("TWILIO_BOT_WHATSAPP_IDENTITY") | |
# --- RAG Processing Utilities --- | |
def load_json_data(file_path): | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
return data | |
except FileNotFoundError: | |
st.error(f"Error: JSON file not found at {file_path}") | |
return None | |
except json.JSONDecodeError: | |
st.error(f"Error: Could not decode JSON from {file_path}") | |
return None | |
except Exception as e: | |
st.error(f"An unexpected error occurred while loading {file_path}: {e}") | |
return None | |
def load_pdf_data(file_path): | |
try: | |
with open(file_path, 'rb') as f: | |
reader = PyPDF2.PdfReader(f) | |
text_pages = [] | |
for page_num in range(len(reader.pages)): | |
page = reader.pages[page_num] | |
text_pages.append(page.extract_text() or "") | |
return text_pages | |
except FileNotFoundError: | |
st.error(f"Error: PDF file not found at {file_path}") | |
return [] | |
except Exception as e: | |
st.error(f"An error occurred while processing PDF {file_path}: {e}") | |
return [] | |
def chunk_text(text_pages, chunk_size=1000, chunk_overlap=200): | |
full_text = "\n".join(text_pages) | |
if not full_text.strip(): | |
return [] | |
chunks = [] | |
start = 0 | |
while start < len(full_text): | |
end = start + chunk_size | |
chunks.append(full_text[start:end]) | |
if end >= len(full_text): | |
break | |
start += (chunk_size - chunk_overlap) | |
if start >= len(full_text): | |
break | |
return [chunk for chunk in chunks if chunk.strip()] | |
def initialize_embedding_model(model_name=DEFAULT_EMBEDDING_MODEL_NAME): | |
try: | |
model = SentenceTransformer(model_name) | |
return model | |
except Exception as e: | |
st.error(f"Error initializing embedding model '{model_name}': {e}") | |
return None | |
def create_faiss_index(_text_chunks, _embedding_model): | |
if not _text_chunks or _embedding_model is None: | |
st.warning("Cannot create FAISS index: No text chunks or embedding model available.") | |
return None, [] | |
try: | |
valid_chunks = [str(chunk) for chunk in _text_chunks if chunk and isinstance(chunk, str) and chunk.strip()] | |
if not valid_chunks: | |
st.warning("No valid text chunks to embed for FAISS index.") | |
return None, [] | |
embeddings = _embedding_model.encode(valid_chunks, convert_to_tensor=False) | |
if embeddings.ndim == 1: | |
embeddings = embeddings.reshape(1, -1) | |
if embeddings.shape[0] == 0: | |
st.warning("No embeddings were generated for FAISS index.") | |
return None, [] | |
dimension = embeddings.shape[1] | |
index = faiss.IndexFlatL2(dimension) | |
index.add(np.array(embeddings, dtype=np.float32)) | |
return index, valid_chunks | |
except Exception as e: | |
st.error(f"Error creating FAISS index: {e}") | |
return None, [] | |
def search_faiss_index(index, query_text, embedding_model, indexed_chunks, k=3): | |
if index is None or embedding_model is None or not query_text: | |
return [] | |
try: | |
query_embedding = embedding_model.encode([query_text], convert_to_tensor=False) | |
if query_embedding.ndim == 1: | |
query_embedding = query_embedding.reshape(1, -1) | |
distances, indices = index.search(np.array(query_embedding, dtype=np.float32), k) | |
results = [] | |
for i in range(len(indices[0])): | |
idx = indices[0][i] | |
if 0 <= idx < len(indexed_chunks): | |
results.append(indexed_chunks[idx]) | |
return results | |
except Exception as e: | |
st.error(f"Error searching FAISS index: {e}") | |
return [] | |
def get_order_details(order_id, customer_orders_data): | |
if not customer_orders_data: | |
return "Customer order data is not loaded." | |
for order in customer_orders_data: | |
if order.get("order_id") == order_id: | |
return json.dumps(order, indent=2) | |
return f"No order found with ID: {order_id}." | |
def get_product_info(query, products_data): | |
if not products_data: | |
st.warning("Product data is not loaded or is empty in get_product_info.") | |
return "Product data is not loaded." | |
query_lower = query.lower() | |
found_products = [] | |
for product in products_data: | |
if not isinstance(product, dict): | |
continue | |
product_id_lower = str(product.get("Product_ID", "")).lower() | |
product_name_lower = str(product.get("Product_Name", "")).lower() | |
product_type_lower = str(product.get("Product_Type", "")).lower() | |
match = False | |
if product_id_lower and product_id_lower in query_lower: | |
match = True | |
if not match and product_name_lower: | |
if query_lower in product_name_lower or product_name_lower in query_lower: | |
match = True | |
if not match and product_type_lower: | |
if query_lower in product_type_lower or product_type_lower in query_lower: | |
match = True | |
if match: | |
found_products.append(product) | |
if found_products: | |
return json.dumps(found_products, indent=2) | |
return f"No product information found matching your query: '{query}'." | |
# --- LLM Operations --- | |
def generate_response_groq(_groq_client, query, context, model="llama3-8b-8192", | |
intent=None, customer_name=None, item_name=None, | |
shipping_address=None, delivery_date=None, order_id=None, order_status=None): | |
if not _groq_client: | |
return "GROQ client not initialized. Please check API key." | |
if not query: | |
return "Query is empty." | |
system_message = "You are a helpful customer support assistant." | |
user_prompt = "" | |
if intent == "ORDER_STATUS" and order_id and customer_name and order_status: | |
system_message = ( | |
f"You are an exceptionally friendly and helpful customer support assistant. " | |
f"Your current task is to provide a single, complete, and human-like sentence as a response to {customer_name} " | |
f"about their order {order_id}. You MUST incorporate all relevant order details provided into this single sentence." | |
) | |
item_description = item_name if item_name else "the ordered item(s)" | |
# Construct the core information string that the LLM needs to build upon | |
core_info_parts = [ | |
f"your order {order_id}", | |
f"for {item_description}", | |
f"has a status of '{order_status}'" | |
] | |
if order_status.lower() == "delivered": | |
if shipping_address: | |
core_info_parts.append(f"and was delivered to {shipping_address}") | |
else: | |
core_info_parts.append("and was delivered (address not specified)") | |
if delivery_date: | |
core_info_parts.append(f"on {delivery_date}") | |
else: | |
core_info_parts.append("(delivery date not specified)") | |
core_information_to_include = ", ".join(core_info_parts[:-1]) + (f" {core_info_parts[-1]}" if len(core_info_parts) > 1 else "") | |
if not order_status.lower() == "delivered" and len(core_info_parts) > 1 : # for non-delivered, avoid 'and' before status | |
core_information_to_include = f"your order {order_id} for {item_description} has a status of '{order_status}'" | |
user_prompt = ( | |
f"Customer: {customer_name}\n" | |
f"Order ID: {order_id}\n" | |
f"Item(s): {item_description}\n" | |
f"Status: {order_status}\n" | |
) | |
if order_status.lower() == "delivered": | |
user_prompt += f"Shipping Address: {shipping_address if shipping_address else 'Not specified'}\n" | |
user_prompt += f"Delivered On: {delivery_date if delivery_date else 'Not specified'}\n" | |
user_prompt += f"\nOriginal user query for context: '{query}'\n\n" | |
user_prompt += ( | |
f"Your task: Generate a single, complete, and human-like sentence that starts with a greeting to {customer_name}. " | |
f"This sentence MUST convey the following essential information: {core_information_to_include}.\n" | |
f"For example, if all details are present for a delivered order: 'Hi {customer_name}, {core_information_to_include}.'\n" | |
f"For example, for a non-delivered order: 'Hi {customer_name}, {core_information_to_include}.'\n" | |
f"IMPORTANT: Do not ask questions. Do not add any extra conversational fluff. Just provide the single, informative sentence as requested. " | |
f"Ensure the sentence flows naturally and uses the details you've been given.\n" | |
f"Respond now with ONLY that single sentence." | |
) | |
# For LLM's deeper reference, though the primary instruction is above: | |
# user_prompt += f"\n\nFull database context for your reference if needed: {context}" | |
else: # Default prompt structure for other intents or if details are missing | |
system_message = "You are a helpful customer support assistant." | |
user_prompt = f"""Use the following context to answer the user's question. | |
If the context doesn't contain the answer, state that you don't have enough information or ask clarifying questions. | |
Do not make up information. Be concise and polite. | |
Context: | |
{context} | |
User Question: {query} | |
Assistant Answer: | |
""" | |
try: | |
chat_completion = _groq_client.chat.completions.create( | |
messages=[ | |
{"role": "system", "content": system_message}, | |
{"role": "user", "content": user_prompt} | |
], | |
model=model, | |
temperature=0.5, # Slightly lower temperature might help with stricter adherence | |
max_tokens=1024, | |
top_p=1 | |
) | |
response = chat_completion.choices[0].message.content.strip() # Added strip() | |
return response | |
except Exception as e: | |
st.error(f"Error calling GROQ API: {e}") | |
return "Sorry, I encountered an error while trying to generate a response." | |
def initialize_groq_client(api_key_val): | |
if not api_key_val: | |
st.warning("GROQ API Key is missing.") | |
return None | |
try: | |
client = Groq(api_key=api_key_val) | |
return client | |
except Exception as e: | |
st.error(f"Failed to initialize GROQ client: {e}") | |
return None | |
# --- Twilio Operations --- | |
def initialize_twilio_client(acc_sid, auth_tkn): | |
if not acc_sid or not auth_tkn: | |
st.warning("Twilio Account SID or Auth Token is missing.") | |
return None | |
try: | |
client = Client(acc_sid, auth_tkn) | |
return client | |
except Exception as e: | |
st.error(f"Failed to initialize Twilio client: {e}") | |
return None | |
def get_new_whatsapp_messages(twilio_client, bot_start_time_utc, processed_message_sids, bot_whatsapp_identity_val): | |
if not twilio_client: | |
st.warning("Twilio client not initialized.") | |
return [] | |
if not bot_whatsapp_identity_val: | |
st.warning("Twilio Bot WhatsApp Identity not provided.") | |
return [] | |
new_messages_to_process = [] | |
try: | |
# Get all conversations (not limited to a specific service) | |
conversations = twilio_client.conversations.v1.conversations.list(limit=50) | |
for conv in conversations: | |
if conv.date_updated and conv.date_updated > bot_start_time_utc: | |
messages = twilio_client.conversations.v1 \ | |
.conversations(conv.sid) \ | |
.messages \ | |
.list(order='desc', limit=10) | |
for msg in messages: | |
if msg.sid in processed_message_sids: | |
continue | |
# Check if message is from WhatsApp and not from the bot | |
if msg.author and msg.author.lower() != bot_whatsapp_identity_val.lower() and \ | |
msg.date_created and msg.date_created > bot_start_time_utc and \ | |
msg.author.startswith('whatsapp:'): | |
new_messages_to_process.append({ | |
"conversation_sid": conv.sid, "message_sid": msg.sid, | |
"author_identity": msg.author, "message_body": msg.body, | |
"timestamp_utc": msg.date_created | |
}) | |
break | |
except Exception as e: | |
st.error(f"Error fetching Twilio messages: {e}") | |
return sorted(new_messages_to_process, key=lambda m: m['timestamp_utc']) | |
def send_whatsapp_message(twilio_client, conversation_sid, message_body, bot_identity_val): | |
if not twilio_client: | |
st.error("Twilio client not initialized for sending message.") | |
return False | |
if not bot_identity_val: | |
st.error("Bot identity not provided for sending message.") | |
return False | |
try: | |
twilio_client.conversations.v1 \ | |
.conversations(conversation_sid) \ | |
.messages \ | |
.create(author=bot_identity_val, body=message_body) | |
st.success(f"Sent reply to conversation {conversation_sid}") | |
st.write(f"Twilio response to send: {message_body}") | |
print(f"[Twilio Send] Sending response: {message_body}") | |
return True | |
except Exception as e: | |
st.error(f"Error sending Twilio message to {conversation_sid}: {e}") | |
return False | |
# --- Main Application Logic & UI --- | |
st.title("🤖 RAG-Based Customer Support Chatbot") | |
st.markdown("Powered by Streamlit, Twilio, GROQ LLaMA3, and FAISS.") | |
# --- Sidebar for Configurations --- | |
st.sidebar.title("⚙️ Configurations") | |
if APP_TWILIO_ACCOUNT_SID: | |
st.sidebar.text_input("Twilio Account SID (from Secrets)", value="********" + APP_TWILIO_ACCOUNT_SID[-4:] if len(APP_TWILIO_ACCOUNT_SID) > 4 else "********", disabled=True) | |
twilio_account_sid_to_use = APP_TWILIO_ACCOUNT_SID | |
else: | |
st.sidebar.warning("Secret 'TWILIO_ACCOUNT_SID' not found.") | |
twilio_account_sid_to_use = st.sidebar.text_input("Twilio Account SID (Enter Manually)", value=DEFAULT_TWILIO_ACCOUNT_SID_FALLBACK, type="password") | |
if APP_TWILIO_AUTH_TOKEN: | |
st.sidebar.text_input("Twilio Auth Token (from Secrets)", value="********", disabled=True) | |
twilio_auth_token_to_use = APP_TWILIO_AUTH_TOKEN | |
else: | |
st.sidebar.warning("Secret 'TWILIO_AUTH_TOKEN' not found.") | |
twilio_auth_token_to_use = st.sidebar.text_input("Twilio Auth Token (Enter Manually)", value=DEFAULT_TWILIO_AUTH_TOKEN_FALLBACK, type="password") | |
if APP_GROQ_API_KEY: | |
st.sidebar.text_input("GROQ API Key (from Secrets)", value="gsk_********" + APP_GROQ_API_KEY[-4:] if len(APP_GROQ_API_KEY) > 8 else "********", disabled=True) | |
groq_api_key_to_use = APP_GROQ_API_KEY | |
else: | |
st.sidebar.warning("Secret 'GROQ_API_KEY' not found.") | |
groq_api_key_to_use = st.sidebar.text_input("GROQ API Key (Enter Manually)", value=DEFAULT_GROQ_API_KEY_FALLBACK, type="password") | |
# twilio_conversation_service_sid_to_use = st.sidebar.text_input( | |
# "Twilio Conversation Service SID (IS...)", | |
# value=APP_TWILIO_CONVERSATION_SERVICE_SID_SECRET or DEFAULT_TWILIO_CONVERSATION_SERVICE_SID, | |
# type="password", | |
# help="The SID of your Twilio Conversations Service. Can be set by 'TWILIO_CONVERSATION_SERVICE_SID' secret." | |
# ) | |
twilio_bot_whatsapp_identity_to_use = st.sidebar.text_input( | |
"Twilio Bot WhatsApp Identity", | |
value=APP_TWILIO_BOT_WHATSAPP_IDENTITY_SECRET or DEFAULT_TWILIO_BOT_WHATSAPP_IDENTITY, | |
help="e.g., 'whatsapp:+1234567890'. Can be set by 'TWILIO_BOT_WHATSAPP_IDENTITY' secret." | |
) | |
embedding_model_name_to_use = st.sidebar.text_input( | |
"Embedding Model Name", | |
value=DEFAULT_EMBEDDING_MODEL_NAME | |
) | |
polling_interval_to_use = st.sidebar.number_input( | |
"Twilio Polling Interval (seconds)", | |
min_value=10, max_value=300, | |
value=DEFAULT_POLLING_INTERVAL_S, | |
step=5 | |
) | |
# --- Initialize Session State --- | |
if "app_started" not in st.session_state: st.session_state.app_started = False | |
if "bot_started" not in st.session_state: st.session_state.bot_started = False | |
if "rag_pipeline_ready" not in st.session_state: st.session_state.rag_pipeline_ready = False | |
if "last_twilio_poll_time" not in st.session_state: st.session_state.last_twilio_poll_time = time.time() | |
if "bot_start_time_utc" not in st.session_state: st.session_state.bot_start_time_utc = None | |
if "processed_message_sids" not in st.session_state: st.session_state.processed_message_sids = set() | |
if "manual_chat_history" not in st.session_state: st.session_state.manual_chat_history = [] | |
# --- Helper: Simple Intent Classifier --- | |
def simple_intent_classifier(query): | |
query_lower = query.lower() | |
order_keywords = ["order", "status", "track", "delivery"] | |
order_id_match = re.search(r'\b(ord\d{3,})\b', query_lower, re.IGNORECASE) | |
if any(k in query_lower for k in order_keywords): | |
if order_id_match: | |
return "ORDER_STATUS", order_id_match.group(1).upper() | |
return "ORDER_STATUS", None | |
product_keywords = ["product", "item", "buy", "price", "feature", "stock"] | |
product_id_match = re.search(r'\b(prd\d{3,})\b', query_lower, re.IGNORECASE) | |
if any(k in query_lower for k in product_keywords) or product_id_match: | |
return "PRODUCT_INFO", None | |
if any(k in query_lower for k in ["return", "policy", "refund", "exchange", "faq", "question", "how to", "support"]): | |
return "GENERAL_POLICY_FAQ", None | |
return "UNKNOWN", None | |
# --- Main Application Controls --- | |
col1, col2, col3, col4 = st.columns(4) | |
with col1: | |
if st.button("🚀 Start App", disabled=st.session_state.app_started, use_container_width=True): | |
if not groq_api_key_to_use: | |
st.error("GROQ API Key is required.") | |
else: | |
with st.spinner("Initializing RAG pipeline..."): | |
st.session_state.embedding_model = initialize_embedding_model(embedding_model_name_to_use) | |
st.session_state.customer_orders_data = load_json_data(CUSTOMER_ORDERS_FILE) | |
st.session_state.products_data = load_json_data(PRODUCTS_FILE) | |
policy_pdf_pages = load_pdf_data(POLICY_PDF_FILE) | |
faq_pdf_pages = load_pdf_data(FAQ_PDF_FILE) | |
all_pdf_text_pages = policy_pdf_pages + faq_pdf_pages | |
st.session_state.pdf_text_chunks_raw = chunk_text(all_pdf_text_pages) | |
if st.session_state.embedding_model and st.session_state.pdf_text_chunks_raw: | |
st.session_state.faiss_index_pdfs, st.session_state.indexed_pdf_chunks = \ | |
create_faiss_index(st.session_state.pdf_text_chunks_raw, st.session_state.embedding_model) | |
else: | |
st.session_state.faiss_index_pdfs, st.session_state.indexed_pdf_chunks = None, [] | |
st.warning("FAISS index for PDFs could not be created (model or chunks missing).") | |
st.session_state.groq_client = initialize_groq_client(groq_api_key_to_use) | |
if st.session_state.embedding_model and \ | |
st.session_state.groq_client and \ | |
st.session_state.customer_orders_data is not None and \ | |
st.session_state.products_data is not None and \ | |
(st.session_state.faiss_index_pdfs is not None or not all_pdf_text_pages): | |
st.session_state.rag_pipeline_ready = True | |
st.session_state.app_started = True | |
st.success("RAG Application Started!") | |
st.rerun() | |
else: | |
error_messages = [] | |
if not st.session_state.embedding_model: error_messages.append("Embedding model failed to initialize.") | |
if not st.session_state.groq_client: error_messages.append("GROQ client failed to initialize.") | |
if st.session_state.customer_orders_data is None: error_messages.append(f"CustomerOrders.json ({CUSTOMER_ORDERS_FILE}) failed to load.") | |
if st.session_state.products_data is None: error_messages.append(f"Products.json ({PRODUCTS_FILE}) failed to load.") | |
if all_pdf_text_pages and st.session_state.faiss_index_pdfs is None: error_messages.append("PDF FAISS index failed to create.") | |
st.error("Failed to initialize RAG pipeline. Issues:\n- " + "\n- ".join(error_messages) + "\nCheck configurations and ensure all data files are present in 'docs/'.") | |
st.session_state.app_started = False | |
with col2: | |
if st.button("🛑 Stop App", disabled=not st.session_state.app_started, use_container_width=True): | |
keys_to_reset = ["app_started", "bot_started", "rag_pipeline_ready", "embedding_model", | |
"customer_orders_data", "products_data", "pdf_text_chunks_raw", | |
"faiss_index_pdfs", "indexed_pdf_chunks", "groq_client", "twilio_client", | |
"bot_start_time_utc", "processed_message_sids", "manual_chat_history"] | |
for key in keys_to_reset: | |
if key in st.session_state: del st.session_state[key] | |
st.session_state.app_started = False | |
st.session_state.bot_started = False | |
st.session_state.rag_pipeline_ready = False | |
st.session_state.processed_message_sids = set() | |
st.session_state.manual_chat_history = [] | |
st.success("Application Stopped.") | |
st.rerun() | |
with col3: | |
if st.button("💬 Start WhatsApp Bot", disabled=not st.session_state.app_started or st.session_state.bot_started, use_container_width=True): | |
if not all([twilio_account_sid_to_use, twilio_auth_token_to_use, twilio_bot_whatsapp_identity_to_use]): | |
st.error("Twilio Account SID, Auth Token, Conversation Service SID, and Bot WhatsApp Identity are all required.") | |
else: | |
st.session_state.twilio_client = initialize_twilio_client(twilio_account_sid_to_use, twilio_auth_token_to_use) | |
if st.session_state.twilio_client: | |
st.session_state.bot_started = True | |
st.session_state.bot_start_time_utc = datetime.now(timezone.utc) | |
st.session_state.processed_message_sids = set() | |
st.session_state.last_twilio_poll_time = time.time() - polling_interval_to_use - 1 | |
st.success("WhatsApp Bot Started!") | |
st.rerun() | |
else: | |
st.error("Failed to initialize Twilio client. WhatsApp Bot not started.") | |
with col4: | |
if st.button("🔕 Stop WhatsApp Bot", disabled=not st.session_state.bot_started, use_container_width=True): | |
st.session_state.bot_started = False | |
st.info("WhatsApp Bot Stopped.") | |
st.rerun() | |
st.divider() | |
# --- Manual Query Interface --- | |
if st.session_state.get("app_started") and st.session_state.get("rag_pipeline_ready"): | |
st.subheader("💬 Manual Query") | |
for chat_entry in st.session_state.manual_chat_history: | |
with st.chat_message(chat_entry["role"]): | |
st.markdown(chat_entry["content"]) | |
if "context" in chat_entry and chat_entry["context"]: | |
with st.expander("Retrieved Context"): | |
try: | |
if isinstance(chat_entry["context"], str) and \ | |
(chat_entry["context"].strip().startswith('{') or chat_entry["context"].strip().startswith('[')): | |
st.json(json.loads(chat_entry["context"])) | |
elif isinstance(chat_entry["context"], list): | |
st.json(chat_entry["context"]) | |
else: | |
st.text(str(chat_entry["context"])) | |
except (json.JSONDecodeError, TypeError): | |
st.text(str(chat_entry["context"])) | |
user_query_manual = st.chat_input("Ask a question:") | |
if user_query_manual: | |
st.session_state.manual_chat_history.append({"role": "user", "content": user_query_manual}) | |
with st.chat_message("user"): st.markdown(user_query_manual) | |
with st.spinner("Thinking..."): | |
intent_result = simple_intent_classifier(user_query_manual) | |
intent = intent_result[0] | |
potential_oid_from_intent = intent_result[1] | |
context_for_llm, raw_context_data = "No specific context could be retrieved.", None | |
extracted_customer_name, extracted_item_name, extracted_shipping_address, \ | |
extracted_delivery_date, extracted_order_id, extracted_order_status = [None] * 6 | |
if intent == "ORDER_STATUS": | |
order_id_to_check = None | |
if potential_oid_from_intent: | |
order_id_to_check = potential_oid_from_intent | |
else: | |
match_manual = re.search(r'\b(ord\d{3,})\b', user_query_manual.lower(), re.IGNORECASE) | |
if match_manual: | |
order_id_to_check = match_manual.group(1).upper() | |
if order_id_to_check: | |
raw_context_data = get_order_details(order_id_to_check, st.session_state.customer_orders_data) | |
# context_for_llm will be used as the 'context' parameter in generate_response_groq | |
# For ORDER_STATUS, this raw_context_data (JSON string) is still useful for LLM's reference, | |
# even though specific fields are extracted for the specialized prompt. | |
context_for_llm = raw_context_data | |
if isinstance(raw_context_data, str) and not raw_context_data.startswith("No order found") and not raw_context_data.startswith("Customer order data is not loaded"): | |
try: | |
order_data_dict = json.loads(raw_context_data) | |
extracted_customer_name = order_data_dict.get("customer_name") | |
items = order_data_dict.get("items") | |
if items and len(items) > 0 and isinstance(items[0], dict): | |
extracted_item_name = items[0].get("name", "your item(s)") | |
else: | |
extracted_item_name = "your item(s)" # Fallback | |
extracted_shipping_address = order_data_dict.get("shipping_address") | |
extracted_delivery_date = order_data_dict.get("delivered_on") | |
extracted_order_status = order_data_dict.get("status") | |
extracted_order_id = order_data_dict.get("order_id") # Should be same as order_id_to_check | |
except json.JSONDecodeError: | |
st.warning(f"Could not parse order details JSON for {order_id_to_check} for personalization.") | |
context_for_llm = f"Error parsing order details for {order_id_to_check}. Raw data: {raw_context_data}" | |
elif isinstance(raw_context_data, str): # Handle "No order found" or "data not loaded" | |
context_for_llm = raw_context_data # LLM will state this | |
else: | |
context_for_llm = "To check an order status, please provide a valid Order ID (e.g., ORD123)." | |
raw_context_data = {"message": "Order ID needed or not found in query."} | |
elif intent == "PRODUCT_INFO": | |
raw_context_data = get_product_info(user_query_manual, st.session_state.products_data) | |
context_for_llm = raw_context_data # Product info is directly used as context | |
elif intent == "GENERAL_POLICY_FAQ" or intent == "UNKNOWN": | |
if st.session_state.faiss_index_pdfs and st.session_state.embedding_model and st.session_state.indexed_pdf_chunks: | |
k_val = 3 if intent == "GENERAL_POLICY_FAQ" else 2 | |
retrieved_chunks = search_faiss_index(st.session_state.faiss_index_pdfs, user_query_manual, | |
st.session_state.embedding_model, st.session_state.indexed_pdf_chunks, k=k_val) | |
if retrieved_chunks: | |
context_for_llm = "Relevant information from documents:\n\n" + "\n\n---\n\n".join(retrieved_chunks) | |
raw_context_data = retrieved_chunks | |
else: | |
context_for_llm = "I couldn't find specific information in our policy or FAQ documents regarding your query." | |
raw_context_data = {"message": "No relevant PDF chunks found."} | |
else: | |
context_for_llm = "Our policy and FAQ documents are currently unavailable for search." | |
raw_context_data = {"message": "PDF index or embedding model not ready."} | |
llm_response = generate_response_groq( | |
_groq_client=st.session_state.groq_client, | |
query=user_query_manual, | |
context=context_for_llm, | |
intent=intent, | |
customer_name=extracted_customer_name, | |
item_name=extracted_item_name, | |
shipping_address=extracted_shipping_address, | |
delivery_date=extracted_delivery_date, | |
order_id=extracted_order_id, # This will be the specific order ID from user query | |
order_status=extracted_order_status | |
) | |
with st.chat_message("assistant"): | |
st.markdown(llm_response) | |
if raw_context_data: | |
with st.expander("Retrieved Context For Assistant"): | |
try: | |
if isinstance(raw_context_data, str) and \ | |
(raw_context_data.strip().startswith('{') or raw_context_data.strip().startswith('[')): | |
st.json(json.loads(raw_context_data)) | |
elif isinstance(raw_context_data, list): | |
st.json(raw_context_data) | |
else: | |
st.text(str(raw_context_data)) | |
except (json.JSONDecodeError, TypeError): | |
st.text(str(raw_context_data)) | |
st.session_state.manual_chat_history.append({"role": "assistant", "content": llm_response, "context": raw_context_data}) | |
st.rerun() | |
# --- Twilio Bot Polling Logic --- | |
if st.session_state.get("bot_started") and st.session_state.get("rag_pipeline_ready"): | |
current_time = time.time() | |
if "last_twilio_poll_time" not in st.session_state: | |
st.session_state.last_twilio_poll_time = current_time - polling_interval_to_use - 1 | |
if (current_time - st.session_state.last_twilio_poll_time) > polling_interval_to_use: | |
st.session_state.last_twilio_poll_time = current_time | |
if not st.session_state.get("twilio_client") or not twilio_bot_whatsapp_identity_to_use or not st.session_state.get("bot_start_time_utc"): | |
st.warning("Twilio client/config missing for polling. Ensure bot is started and WhatsApp identity is set.") | |
else: | |
with st.spinner(f"Checking WhatsApp messages (last poll: {datetime.fromtimestamp(st.session_state.last_twilio_poll_time).strftime('%H:%M:%S')})..."): | |
new_messages = get_new_whatsapp_messages( | |
st.session_state.twilio_client, | |
st.session_state.bot_start_time_utc, | |
st.session_state.processed_message_sids, | |
twilio_bot_whatsapp_identity_to_use | |
) | |
if new_messages: | |
st.info(f"Found {len(new_messages)} new WhatsApp message(s) to process.") | |
for msg_data in new_messages: | |
user_query_whatsapp, conv_sid, msg_sid, author_id = \ | |
msg_data["message_body"], msg_data["conversation_sid"], \ | |
msg_data["message_sid"], msg_data["author_identity"] | |
st.write(f"Processing WhatsApp message from {author_id} in conversation {conv_sid}: '{user_query_whatsapp}' (SID: {msg_sid})") | |
intent_result_whatsapp = simple_intent_classifier(user_query_whatsapp) | |
intent_whatsapp = intent_result_whatsapp[0] | |
potential_oid_whatsapp = intent_result_whatsapp[1] | |
context_for_llm_whatsapp = "No specific context could be retrieved." | |
raw_context_data_whatsapp = None | |
wa_customer_name, wa_item_name, wa_shipping_address, \ | |
wa_delivery_date, wa_order_id, wa_order_status = [None] * 6 | |
if intent_whatsapp == "ORDER_STATUS": | |
order_id_to_check_whatsapp = None | |
if potential_oid_whatsapp: | |
order_id_to_check_whatsapp = potential_oid_whatsapp | |
else: | |
match_whatsapp = re.search(r'\b(ord\d{3,})\b', user_query_whatsapp.lower(), re.IGNORECASE) | |
if match_whatsapp: | |
order_id_to_check_whatsapp = match_whatsapp.group(1).upper() | |
if order_id_to_check_whatsapp: | |
raw_context_data_whatsapp = get_order_details(order_id_to_check_whatsapp, st.session_state.customer_orders_data) | |
context_for_llm_whatsapp = raw_context_data_whatsapp # Full JSON string as context | |
if isinstance(raw_context_data_whatsapp, str) and not raw_context_data_whatsapp.startswith("No order found") and not raw_context_data_whatsapp.startswith("Customer order data is not loaded"): | |
try: | |
order_data_dict_wa = json.loads(raw_context_data_whatsapp) | |
wa_customer_name = order_data_dict_wa.get("customer_name") | |
items_wa = order_data_dict_wa.get("items") | |
if items_wa and len(items_wa) > 0 and isinstance(items_wa[0], dict): | |
wa_item_name = items_wa[0].get("name", "your item(s)") | |
else: | |
wa_item_name = "your item(s)" | |
wa_shipping_address = order_data_dict_wa.get("shipping_address") | |
wa_delivery_date = order_data_dict_wa.get("delivered_on") | |
wa_order_status = order_data_dict_wa.get("status") | |
wa_order_id = order_data_dict_wa.get("order_id") | |
except json.JSONDecodeError: | |
st.warning(f"Could not parse order details JSON for {order_id_to_check_whatsapp} (WhatsApp) for personalization.") | |
context_for_llm_whatsapp = f"Error parsing order details for {order_id_to_check_whatsapp}. Raw data: {raw_context_data_whatsapp}" | |
elif isinstance(raw_context_data_whatsapp, str): | |
context_for_llm_whatsapp = raw_context_data_whatsapp | |
else: | |
context_for_llm_whatsapp = "To check an order status, please provide a valid Order ID (e.g., ORD123)." | |
raw_context_data_whatsapp = {"message": "Order ID needed or not found in query."} | |
elif intent_whatsapp == "PRODUCT_INFO": | |
raw_context_data_whatsapp = get_product_info(user_query_whatsapp, st.session_state.products_data) | |
context_for_llm_whatsapp = raw_context_data_whatsapp | |
elif intent_whatsapp == "GENERAL_POLICY_FAQ" or intent_whatsapp == "UNKNOWN": | |
if st.session_state.faiss_index_pdfs and st.session_state.embedding_model and st.session_state.indexed_pdf_chunks: | |
k_val_whatsapp = 3 if intent_whatsapp == "GENERAL_POLICY_FAQ" else 2 | |
chunks_whatsapp = search_faiss_index(st.session_state.faiss_index_pdfs, user_query_whatsapp, | |
st.session_state.embedding_model, st.session_state.indexed_pdf_chunks, k=k_val_whatsapp) | |
if chunks_whatsapp: | |
context_for_llm_whatsapp = "Relevant information from documents:\n\n" + "\n\n---\n\n".join(chunks_whatsapp) | |
raw_context_data_whatsapp = chunks_whatsapp | |
else: | |
context_for_llm_whatsapp = "I couldn't find specific information in our policy or FAQ documents regarding your query." | |
raw_context_data_whatsapp = {"message": "No relevant PDF chunks found."} | |
else: | |
context_for_llm_whatsapp = "Our policy and FAQ documents are currently unavailable for search." | |
raw_context_data_whatsapp = {"message": "PDF index or embedding model not ready."} | |
response_whatsapp = generate_response_groq( | |
_groq_client=st.session_state.groq_client, | |
query=user_query_whatsapp, | |
context=context_for_llm_whatsapp, | |
intent=intent_whatsapp, | |
customer_name=wa_customer_name, | |
item_name=wa_item_name, | |
shipping_address=wa_shipping_address, | |
delivery_date=wa_delivery_date, | |
order_id=wa_order_id, | |
order_status=wa_order_status | |
).strip().replace('\n', ' ') | |
if send_whatsapp_message( | |
st.session_state.twilio_client, | |
conv_sid, | |
response_whatsapp, | |
twilio_bot_whatsapp_identity_to_use | |
): | |
st.session_state.processed_message_sids.add(msg_sid) | |
#print(f"[Twilio Send] Sending response: {message_body}") | |
st.success(f"Successfully responded to WhatsApp message SID {msg_sid} from {author_id}.") | |
else: | |
st.error(f"Failed to send WhatsApp response for message SID {msg_sid} from {author_id}.") | |
st.rerun() | |
# --- Footer & Status --- | |
st.sidebar.markdown("---") | |
st.sidebar.info("Ensure all keys and SIDs are correctly configured. Primary API keys (Twilio SID/Token, GROQ Key) are loaded from secrets if available.") | |
if st.session_state.get("app_started"): | |
status_color = "green" if st.session_state.get("rag_pipeline_ready") else "orange" | |
app_status_text = "App RUNNING" if st.session_state.get("rag_pipeline_ready") else "App Initializing/Error" | |
bot_status_text = "WhatsApp Bot RUNNING" if st.session_state.get("bot_started") else "WhatsApp Bot STOPPED" | |
st.sidebar.markdown(f"<span style='color:{status_color};'>{app_status_text}</span>. {bot_status_text}.", unsafe_allow_html=True) | |
else: | |
st.sidebar.warning("App is STOPPED.") | |
#Chatbot is sending multiple messages with twilio. I want same response as per manual query. | |
# --- Simulated background loop using rerun --- | |
if st.session_state.get("bot_started") and st.session_state.get("rag_pipeline_ready"): | |
current_time = time.time() | |
last_poll = st.session_state.get("last_twilio_poll_time", 0) | |
interval = polling_interval_to_use # 30 by default from sidebar | |
if current_time - last_poll >= interval: | |
st.session_state.last_twilio_poll_time = current_time | |
st.rerun() | |
else: | |
# Wait the remaining time before rerunning | |
time_remaining = interval - (current_time - last_poll) | |
time.sleep(min(5, time_remaining)) # Avoid sleeping too long | |
st.rerun() | |