import os import time import threading import streamlit as st from twilio.rest import Client from sentence_transformers import SentenceTransformer from transformers import AutoTokenizer import faiss import numpy as np import docx from groq import Groq import requests from io import StringIO from pdfminer.high_level import extract_text_to_fp from pdfminer.layout import LAParams from twilio.base.exceptions import TwilioRestException # Add this at the top import pdfplumber import datetime import csv APP_START_TIME = datetime.datetime.now(datetime.timezone.utc) os.environ["PYTORCH_JIT"] = "0" # --- PDF Extraction --- def _extract_tables_from_page(page): """Extracts tables from a single page of a PDF.""" tables = page.extract_tables() if not tables: return [] formatted_tables = [] for table in tables: formatted_table = [] for row in table: if row: # Filter out empty rows formatted_row = [cell if cell is not None else "" for cell in row] # Replace None with "" formatted_table.append(formatted_row) else: formatted_table.append([""]) # Append an empty row if the row is None formatted_tables.append(formatted_table) return formatted_tables def extract_text_from_pdf(pdf_path): text_output = StringIO() all_tables = [] try: with pdfplumber.open(pdf_path) as pdf: for page in pdf.pages: # Extract tables page_tables = _extract_tables_from_page(page) if page_tables: all_tables.extend(page_tables) # Extract text text = page.extract_text() if text: text_output.write(text + "\n\n") except Exception as e: print(f"Error extracting with pdfplumber: {e}") # Fallback to pdfminer if pdfplumber fails with open(pdf_path, 'rb') as file: extract_text_to_fp(file, text_output, laparams=LAParams(), output_type='text', codec=None) extracted_text = text_output.getvalue() return extracted_text, all_tables # Return text and list of tables def clean_extracted_text(text): lines = text.splitlines() cleaned = [] for line in lines: line = line.strip() if line: line = ' '.join(line.split()) cleaned.append(line) return '\n'.join(cleaned) def _format_tables_internal(tables): """Formats extracted tables into a string representation.""" formatted_tables_str = [] for table in tables: # Use csv writer to handle commas and quotes correctly with StringIO() as csvfile: csvwriter = csv.writer(csvfile) csvwriter.writerows(table) formatted_tables_str.append(csvfile.getvalue()) return "\n\n".join(formatted_tables_str) # --- DOCX Extraction --- def extract_text_from_docx(docx_path): try: doc = docx.Document(docx_path) return '\n'.join(para.text for para in doc.paragraphs) except Exception: return "" # --- Chunking --- def chunk_text(text, tokenizer, chunk_size=128, chunk_overlap=32, max_tokens=512): tokens = tokenizer.tokenize(text) chunks = [] start = 0 while start < len(tokens): end = min(start + chunk_size, len(tokens)) chunk_tokens = tokens[start:end] chunk_text = tokenizer.convert_tokens_to_string(chunk_tokens) chunks.append(chunk_text) if end == len(tokens): break start += chunk_size - chunk_overlap return chunks def retrieve_chunks(question, index, embed_model, text_chunks, k=3): question_embedding = embed_model.encode(question) D, I = index.search(np.array([question_embedding]), k) return [text_chunks[i] for i in I[0]] # --- Groq Answer Generator --- def generate_answer_with_groq(question, context): url = "https://api.groq.com/openai/v1/chat/completions" api_key = os.environ.get("GROQ_API_KEY") headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } prompt = ( f"Customer asked: '{question}'\n\n" f"Here is the relevant product or policy info to help:\n{context}\n\n" f"Respond in a friendly and helpful tone as a toy shop support agent." ) payload = { "model": "llama3-8b-8192", "messages": [ { "role": "system", "content": ( "You are ToyBot, a friendly and helpful WhatsApp assistant for an online toy shop. " "Your goal is to politely answer customer questions, help them choose the right toys, " "provide order or delivery information, explain return policies, and guide them through purchases." ) }, {"role": "user", "content": prompt}, ], "temperature": 0.5, "max_tokens": 300, } response = requests.post(url, headers=headers, json=payload) response.raise_for_status() return response.json()['choices'][0]['message']['content'].strip() # --- Twilio Functions --- def fetch_latest_incoming_message(client, conversation_sid): try: messages = client.conversations.v1.conversations(conversation_sid).messages.list() for msg in reversed(messages): if msg.author.startswith("whatsapp:"): return { "sid": msg.sid, "body": msg.body, "author": msg.author, "timestamp": msg.date_created, } except TwilioRestException as e: if e.status == 404: print(f"Conversation {conversation_sid} not found, skipping...") else: print(f"Twilio error fetching messages for {conversation_sid}:", e) except Exception as e: #print(f"Unexpected error in fetch_latest_incoming_message for {conversation_sid}:", e) pass return None def send_twilio_message(client, conversation_sid, body): return client.conversations.v1.conversations(conversation_sid).messages.create( author="system", body=body ) # --- Load Knowledge Base --- def setup_knowledge_base(): folder_path = "docs" all_text = "" # Process PDFs for filename in ["FAQ.pdf", "ProductReturnPolicy.pdf"]: pdf_path = os.path.join(folder_path, filename) text, tables = extract_text_from_pdf(pdf_path) all_text += clean_extracted_text(text) + "\n" all_text += _format_tables_internal(tables) + "\n" # Process CSVs for filename in ["CustomerOrders.csv"]: csv_path = os.path.join(folder_path, filename) try: with open(csv_path, newline='', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile) for row in reader: line = f"Order ID: {row.get('OrderID')} | Customer Name: {row.get('CustomerName')} | Order Date: {row.get('OrderDate')} | ProductID: {row.get('ProductID')} | Date: {row.get('OrderDate')} | Quantity: {row.get('Quantity')} | UnitPrice(USD): {row.get('UnitPrice(USD)')} | TotalPrice(USD): {row.get('TotalPrice(USD)')} | ShippingAddress: {row.get('ShippingAddress')} | OrderStatus: {row.get('OrderStatus')}" all_text += line + "\n" except Exception as e: print(f"āŒ Error reading {filename}: {e}") for filename in ["Products.csv"]: csv_path = os.path.join(folder_path, filename) try: with open(csv_path, newline='', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile) for row in reader: line = f"Product ID: {row.get('ProductID')} | Toy Name: {row.get('ToyName')} | Category: {row.get('Category')} | Price(USD): {row.get('Price(USD)')} | Stock Quantity: {row.get('StockQuantity')} | Description: {row.get('Description')}" all_text += line + "\n" except Exception as e: print(f"āŒ Error reading {filename}: {e}") # Tokenization & chunking tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased') chunks = chunk_text(all_text, tokenizer) model = SentenceTransformer('all-mpnet-base-v2') embeddings = model.encode(chunks, show_progress_bar=False, truncation=True, max_length=512) dim = embeddings[0].shape[0] index = faiss.IndexFlatL2(dim) index.add(np.array(embeddings).astype('float32')) return index, model, chunks # --- Monitor Conversations --- def start_conversation_monitor(client, index, embed_model, text_chunks): processed_convos = set() last_processed_timestamp = {} def poll_conversation(convo_sid): while True: try: latest_msg = fetch_latest_incoming_message(client, convo_sid) if latest_msg: msg_time = latest_msg["timestamp"] if convo_sid not in last_processed_timestamp or msg_time > last_processed_timestamp[convo_sid]: last_processed_timestamp[convo_sid] = msg_time question = latest_msg["body"] sender = latest_msg["author"] print(f"\nšŸ“„ New message from {sender} in {convo_sid}: {question}") context = "\n\n".join(retrieve_chunks(question, index, embed_model, text_chunks)) answer = generate_answer_with_groq(question, context) send_twilio_message(client, convo_sid, answer) print(f"šŸ“¤ Replied to {sender}: {answer}") time.sleep(3) except Exception as e: print(f"āŒ Error in convo {convo_sid} polling:", e) time.sleep(5) def poll_new_conversations(): print("āž”ļø Monitoring for new WhatsApp conversations...") while True: try: conversations = client.conversations.v1.conversations.list(limit=20) for convo in conversations: convo_full = client.conversations.v1.conversations(convo.sid).fetch() if convo.sid not in processed_convos and convo_full.date_created > APP_START_TIME: participants = client.conversations.v1.conversations(convo.sid).participants.list() for p in participants: address = p.messaging_binding.get("address", "") if p.messaging_binding else "" if address.startswith("whatsapp:"): print(f"šŸ†• New WhatsApp convo found: {convo.sid}") processed_convos.add(convo.sid) threading.Thread(target=poll_conversation, args=(convo.sid,), daemon=True).start() except Exception as e: print("āŒ Error polling conversations:", e) time.sleep(5) # āœ… Launch conversation polling monitor threading.Thread(target=poll_new_conversations, daemon=True).start() # --- Streamlit UI --- st.set_page_config(page_title="Quasa – AI Powered WhatsApp Chatbot", layout="wide") st.title("šŸ“± Quasa – AI Powered WhatsApp Chatbot") account_sid = st.secrets.get("TWILIO_SID") auth_token = st.secrets.get("TWILIO_TOKEN") GROQ_API_KEY = st.secrets.get("GROQ_API_KEY") if not all([account_sid, auth_token, GROQ_API_KEY]): st.warning("āš ļø Provide all credentials below:") account_sid = st.text_input("Twilio SID", value=account_sid or "") auth_token = st.text_input("Twilio Token", type="password", value=auth_token or "") GROQ_API_KEY = st.text_input("GROQ API Key", type="password", value=GROQ_API_KEY or "") if all([account_sid, auth_token, GROQ_API_KEY]): os.environ["GROQ_API_KEY"] = GROQ_API_KEY index, model, chunks = setup_knowledge_base() st.success("Knowledge base loaded.") st.success("🟢 Monitoring new WhatsApp conversations...") client = Client(account_sid, auth_token) threading.Thread(target=start_conversation_monitor, args=(client, index, model, chunks), daemon=True).start() st.info("ā³ Waiting for new messages...")