import os import time import streamlit as st from twilio.rest import Client from pdfminer.high_level import extract_text from sentence_transformers import SentenceTransformer from transformers import AutoTokenizer import faiss import numpy as np import docx from groq import Groq import PyPDF2 import requests # --- Auto-refresh every 10 seconds --- if "last_refresh" not in st.session_state: st.session_state.last_refresh = time.time() elif time.time() - st.session_state.last_refresh > 10: st.session_state.last_refresh = time.time() st.experimental_rerun() # --- Document Loaders --- def extract_text_from_pdf(pdf_path): try: text = "" with open(pdf_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) for page_num in range(len(pdf_reader.pages)): page = pdf_reader.pages[page_num] page_text = page.extract_text() if page_text: text += page_text return text except: return extract_text(pdf_path) def extract_text_from_docx(docx_path): try: doc = docx.Document(docx_path) return '\n'.join(para.text for para in doc.paragraphs) except: return "" def chunk_text(text, tokenizer, chunk_size=150, chunk_overlap=30): tokens = tokenizer.tokenize(text) chunks, start = [], 0 while start < len(tokens): end = min(start + chunk_size, len(tokens)) chunk_tokens = tokens[start:end] chunks.append(tokenizer.convert_tokens_to_string(chunk_tokens)) start += chunk_size - chunk_overlap return chunks def retrieve_chunks(question, index, embed_model, text_chunks, k=3): question_embedding = embed_model.encode([question])[0] D, I = index.search(np.array([question_embedding]), k) return [text_chunks[i] for i in I[0]] # --- GROQ Answer Generation --- def generate_answer_with_groq(question, context, retries=3, delay=2): url = "https://api.groq.com/openai/v1/chat/completions" api_key = os.environ["GROQ_API_KEY"] headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } prompt = ( f"Customer asked: '{question}'\n\n" f"Here is the relevant product or policy info to help:\n{context}\n\n" f"Respond in a friendly and helpful tone as a toy shop support agent." ) payload = { "model": "llama3-8b-8192", "messages": [ { "role": "system", "content": ( "You are ToyBot, a friendly and helpful WhatsApp assistant for an online toy shop. " "Your goal is to politely answer customer questions, help them choose the right toys, " "provide order or delivery information, explain return policies, and guide them through purchases. " "Always sound warm, helpful, and trustworthy like a professional customer support agent." ) }, {"role": "user", "content": prompt}, ], "temperature": 0.5, "max_tokens": 300, } for attempt in range(retries): try: response = requests.post(url, headers=headers, json=payload) result = response.json() return result['choices'][0]['message']['content'].strip() except Exception as e: if "503" in str(e) and attempt < retries - 1: time.sleep(delay) continue else: return f"⚠️ Groq API Error: {str(e)}" # --- Twilio Chat Handlers --- def fetch_latest_conversation_sid(account_sid, auth_token): try: client = Client(account_sid, auth_token) conversations = client.conversations.v1.conversations.list(limit=1) if conversations: return conversations[0].sid except Exception as e: st.error(f"⚠️ Could not fetch conversation SID: {e}") return None def fetch_latest_incoming_message(account_sid, auth_token, conversation_sid): client = Client(account_sid, auth_token) messages = client.conversations.v1.conversations(conversation_sid).messages.list(limit=10) for msg in reversed(messages): if msg.author.startswith("whatsapp:"): return msg.body, msg.author, msg.index return None, None, None def send_twilio_message(account_sid, auth_token, conversation_sid, body): try: client = Client(account_sid, auth_token) # Get participants to find the bot's WhatsApp identity participants = client.conversations.v1.conversations(conversation_sid).participants.list() # Pick the first participant whose identity starts with 'whatsapp:' bot_identity = None for p in participants: if p.identity.startswith("whatsapp:"): bot_identity = p.identity break if not bot_identity: return "⚠️ Bot identity with whatsapp: prefix not found in participants." message = client.conversations.v1.conversations(conversation_sid).messages.create( author=bot_identity, body=body ) return message.sid except Exception as e: return str(e) # --- Streamlit UI --- st.set_page_config(page_title="Quasa – A Smart WhatsApp Chatbot", layout="wide") st.markdown(""" """, unsafe_allow_html=True) st.markdown('
📱 Quasa – A Smart WhatsApp Chatbot
', unsafe_allow_html=True) st.markdown('
Talk to your documents using WhatsApp. Powered by Groq, Twilio, and RAG.
', unsafe_allow_html=True) # Load secrets or fallback account_sid = st.secrets.get("TWILIO_SID") auth_token = st.secrets.get("TWILIO_TOKEN") GROQ_API_KEY = st.secrets.get("GROQ_API_KEY") if not all([account_sid, auth_token, GROQ_API_KEY]): st.warning("⚠️ Some secrets are missing. Please provide them manually:") account_sid = st.text_input("Twilio SID", value=account_sid or "") auth_token = st.text_input("Twilio Auth Token", type="password", value=auth_token or "") GROQ_API_KEY = st.text_input("GROQ API Key", type="password", value=GROQ_API_KEY or "") if all([account_sid, auth_token, GROQ_API_KEY]): os.environ["GROQ_API_KEY"] = GROQ_API_KEY conversation_sid = fetch_latest_conversation_sid(account_sid, auth_token) if conversation_sid: @st.cache_resource def setup_knowledge_base(): folder_path = "docs" all_text = "" for file in os.listdir(folder_path): if file.endswith(".pdf"): all_text += extract_text_from_pdf(os.path.join(folder_path, file)) + "\n" elif file.endswith((".docx", ".doc")): all_text += extract_text_from_docx(os.path.join(folder_path, file)) + "\n" tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased') chunks = chunk_text(all_text, tokenizer) model = SentenceTransformer('all-mpnet-base-v2') embeddings = model.encode(chunks) dim = embeddings[0].shape[0] index = faiss.IndexFlatL2(dim) index.add(np.array(embeddings)) return index, model, chunks index, embedding_model, text_chunks = setup_knowledge_base() st.success(f"✅ Knowledge base ready. Monitoring WhatsApp messages for conversation: `{conversation_sid}`") if "last_processed_index" not in st.session_state: st.session_state.last_processed_index = -1 with st.spinner("Checking for new WhatsApp messages..."): question, sender, msg_index = fetch_latest_incoming_message(account_sid, auth_token, conversation_sid) if question and msg_index != st.session_state.last_processed_index: st.session_state.last_processed_index = msg_index st.info(f"📥 New question from **{sender}**:\n\n> {question}") relevant_chunks = retrieve_chunks(question, index, embedding_model, text_chunks) context = "\n\n".join(relevant_chunks) answer = generate_answer_with_groq(question, context) send_twilio_message(account_sid, auth_token, conversation_sid, answer) st.success("📤 Answer sent back to user on WhatsApp!") st.markdown(f"### ✨ Answer:\n\n{answer}") else: st.warning("No new messages found.") else: st.warning("❗ No active conversation found.") else: st.warning("❗ Please provide all required credentials.")