Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import threading | |
| import streamlit as st | |
| from twilio.rest import Client | |
| from pdfminer.high_level import extract_text | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import AutoTokenizer | |
| import faiss | |
| import numpy as np | |
| import docx | |
| from groq import Groq | |
| import PyPDF2 | |
| import requests | |
| # --- Text Extraction Utilities --- | |
| def extract_text_from_pdf(pdf_path): | |
| try: | |
| text = "" | |
| with open(pdf_path, 'rb') as file: | |
| reader = PyPDF2.PdfReader(file) | |
| for page in reader.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text | |
| return text | |
| except: | |
| return extract_text(pdf_path) | |
| def extract_text_from_docx(docx_path): | |
| try: | |
| doc = docx.Document(docx_path) | |
| return '\n'.join(para.text for para in doc.paragraphs) | |
| except: | |
| return "" | |
| # --- Chunking & Retrieval --- | |
| def chunk_text(text, tokenizer, chunk_size=150, chunk_overlap=30): | |
| tokens = tokenizer.tokenize(text) | |
| chunks, start = [], 0 | |
| while start < len(tokens): | |
| end = min(start + chunk_size, len(tokens)) | |
| chunk_tokens = tokens[start:end] | |
| chunks.append(tokenizer.convert_tokens_to_string(chunk_tokens)) | |
| start += chunk_size - chunk_overlap | |
| return chunks | |
| def retrieve_chunks(question, index, embed_model, text_chunks, k=3): | |
| question_embedding = embed_model.encode(question) | |
| D, I = index.search(np.array([question_embedding]), k) | |
| relevant_chunks = [text_chunks[i] for i in I[0]] | |
| return relevant_chunks | |
| # --- Groq Answer Generator --- | |
| def generate_answer_with_groq(question, context): | |
| url = "https://api.groq.com/openai/v1/chat/completions" | |
| api_key = os.environ.get("GROQ_API_KEY") | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| } | |
| prompt = ( | |
| f"Customer asked: '{question}'\n\n" | |
| f"Here is the relevant product or policy info to help:\n{context}\n\n" | |
| f"Respond in a friendly and helpful tone as a toy shop support agent." | |
| ) | |
| payload = { | |
| "model": "llama3-8b-8192", | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You are ToyBot, a friendly and helpful WhatsApp assistant for an online toy shop. " | |
| "Your goal is to politely answer customer questions, help them choose the right toys, " | |
| "provide order or delivery information, explain return policies, and guide them through purchases." | |
| ) | |
| }, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| "temperature": 0.5, | |
| "max_tokens": 300, | |
| } | |
| response = requests.post(url, headers=headers, json=payload) | |
| response.raise_for_status() | |
| return response.json()['choices'][0]['message']['content'].strip() | |
| # --- Twilio Functions --- | |
| def get_whatsapp_conversation_sids(client): | |
| sids = [] | |
| conversations = client.conversations.v1.conversations.list(limit=50) | |
| for convo in conversations: | |
| try: | |
| participants = client.conversations.v1.conversations(convo.sid).participants.list() | |
| for p in participants: | |
| if (p.identity and p.identity.startswith("whatsapp:")) or ( | |
| p.messaging_binding and p.messaging_binding.get("address", "").startswith("whatsapp:") | |
| ): | |
| sids.append(convo.sid) | |
| break | |
| except: | |
| continue | |
| return sids | |
| def fetch_latest_incoming_message(client, conversation_sid): | |
| messages = client.conversations.v1.conversations(conversation_sid).messages.list(limit=10) | |
| for msg in reversed(messages): | |
| if msg.author.startswith("whatsapp:"): | |
| return { | |
| "sid": msg.sid, | |
| "body": msg.body, | |
| "author": msg.author, | |
| "timestamp": msg.date_created, | |
| } | |
| return None | |
| def send_twilio_message(client, conversation_sid, body): | |
| return client.conversations.v1.conversations(conversation_sid).messages.create( | |
| author="system", body=body | |
| ) | |
| # --- Load Knowledge Base --- | |
| def setup_knowledge_base(): | |
| folder_path = "docs" | |
| all_text = "" | |
| for file in os.listdir(folder_path): | |
| path = os.path.join(folder_path, file) | |
| if file.endswith(".pdf"): | |
| all_text += extract_text_from_pdf(path) + "\n" | |
| elif file.endswith((".docx", ".doc")): | |
| all_text += extract_text_from_docx(path) + "\n" | |
| tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased') | |
| chunks = chunk_text(all_text, tokenizer) | |
| model = SentenceTransformer('all-mpnet-base-v2') | |
| embeddings = model.encode(chunks) | |
| dim = embeddings[0].shape[0] | |
| index = faiss.IndexFlatL2(dim) | |
| index.add(np.array(embeddings).astype('float32')) | |
| return index, model, chunks | |
| # --- Monitor All Conversations --- | |
| def start_conversation_monitor(client, index, embed_model, text_chunks): | |
| last_msg_index = {} | |
| monitored_sids = set() | |
| def poll_conversation(convo_sid): | |
| last_processed_timestamp = None | |
| while True: | |
| try: | |
| latest_msg = fetch_latest_incoming_message(client, convo_sid) | |
| if latest_msg: | |
| msg_time = latest_msg["timestamp"] | |
| if last_processed_timestamp is None or msg_time > last_processed_timestamp: | |
| last_processed_timestamp = msg_time | |
| question = latest_msg["body"] | |
| sender = latest_msg["author"] | |
| print(f"\nπ₯ New message from {sender} in {convo_sid}: {question}") | |
| context = "\n\n".join(retrieve_chunks(question, index, embed_model, text_chunks)) | |
| answer = generate_answer_with_groq(question, context) | |
| send_twilio_message(client, convo_sid, answer) | |
| print(f"π€ Replied to {sender}: {answer}") | |
| time.sleep(3) | |
| except Exception as e: | |
| print(f"β Error in convo {convo_sid} polling:", e) | |
| time.sleep(5) | |
| def monitor_all_conversations(): | |
| while True: | |
| try: | |
| current_sids = set(get_whatsapp_conversation_sids(client)) | |
| new_sids = current_sids - monitored_sids | |
| for sid in new_sids: | |
| print(f"β‘οΈ Starting to monitor new conversation: {sid}") | |
| monitored_sids.add(sid) | |
| threading.Thread(target=poll_conversation, args=(sid,), daemon=True).start() | |
| time.sleep(15) # refresh every 15 seconds or adjust as needed | |
| except Exception as e: | |
| print("β Error in conversation monitoring loop:", e) | |
| time.sleep(15) | |
| # Start the monitoring loop in a separate thread so it runs in background | |
| threading.Thread(target=monitor_all_conversations, daemon=True).start() | |
| # --- Streamlit UI --- | |
| st.set_page_config(page_title="Quasa β A Smart WhatsApp Chatbot", layout="wide") | |
| st.title("π± Quasa β A Smart WhatsApp Chatbot") | |
| account_sid = st.secrets.get("TWILIO_SID") | |
| auth_token = st.secrets.get("TWILIO_TOKEN") | |
| GROQ_API_KEY = st.secrets.get("GROQ_API_KEY") | |
| if not all([account_sid, auth_token, GROQ_API_KEY]): | |
| st.warning("β οΈ Provide all credentials below:") | |
| account_sid = st.text_input("Twilio SID", value=account_sid or "") | |
| auth_token = st.text_input("Twilio Token", type="password", value=auth_token or "") | |
| GROQ_API_KEY = st.text_input("GROQ API Key", type="password", value=GROQ_API_KEY or "") | |
| if all([account_sid, auth_token, GROQ_API_KEY]): | |
| os.environ["GROQ_API_KEY"] = GROQ_API_KEY | |
| client = Client(account_sid, auth_token) | |
| conversation_sids = get_whatsapp_conversation_sids(client) | |
| if conversation_sids: | |
| st.success(f"β {len(conversation_sids)} WhatsApp conversation(s) found. Initializing chatbot...") | |
| index, model, chunks = setup_knowledge_base() | |
| start_conversation_monitor(client, index, model, chunks) | |
| st.success("π’ Chatbot is running in background and will reply to new messages.") | |
| else: | |
| st.error("β No WhatsApp conversations found.") |