import os import time import threading import streamlit as st from twilio.rest import Client from sentence_transformers import SentenceTransformer from transformers import AutoTokenizer import faiss import numpy as np import docx from groq import Groq import requests from io import StringIO from pdfminer.high_level import extract_text_to_fp from pdfminer.layout import LAParams from twilio.base.exceptions import TwilioRestException import pdfplumber import datetime import csv import json import re APP_START_TIME = datetime.datetime.now(datetime.timezone.utc) os.environ["PYTORCH_JIT"] = "0" # ---------------- PDF & DOCX & JSON Extraction ---------------- def _extract_tables_from_page(page): tables = page.extract_tables() formatted_tables = [] for table in tables: formatted_table = [] for row in table: formatted_row = [cell if cell is not None else "" for cell in row] formatted_table.append(formatted_row) formatted_tables.append(formatted_table) return formatted_tables def extract_text_from_pdf(pdf_path): text_output = StringIO() all_tables = [] try: with pdfplumber.open(pdf_path) as pdf: for page in pdf.pages: all_tables.extend(_extract_tables_from_page(page)) text = page.extract_text() if text: text_output.write(text + "\n\n") except Exception as e: print(f"pdfplumber error: {e}") with open(pdf_path, 'rb') as file: extract_text_to_fp(file, text_output, laparams=LAParams(), output_type='text') return text_output.getvalue(), all_tables def _format_tables_internal(tables): formatted_tables_str = [] for table in tables: with StringIO() as csvfile: writer = csv.writer(csvfile) writer.writerows(table) formatted_tables_str.append(csvfile.getvalue()) return "\n\n".join(formatted_tables_str) def clean_extracted_text(text): return '\n'.join(' '.join(line.strip().split()) for line in text.splitlines() if line.strip()) def extract_text_from_docx(docx_path): try: doc = docx.Document(docx_path) return '\n'.join(para.text for para in doc.paragraphs) except: return "" def load_json_data(json_path): try: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, dict): # Flatten dictionary values (avoiding nested structures as strings) return "\n".join(f"{key}: {value}" for key, value in data.items() if not isinstance(value, (dict, list))) elif isinstance(data, list): # Flatten list of dictionaries all_items = [] for item in data: if isinstance(item, dict): all_items.append("\n".join(f"{key}: {value}" for key, value in item.items() if not isinstance(value, (dict, list)))) return "\n\n".join(all_items) else: return json.dumps(data, ensure_ascii=False, indent=2) except Exception as e: print(f"JSON read error: {e}") return "" # ---------------- Chunking ---------------- def chunk_text(text, tokenizer, chunk_size=128, chunk_overlap=32): tokens = tokenizer.tokenize(text) chunks = [] start = 0 while start < len(tokens): end = min(start + chunk_size, len(tokens)) chunk = tokens[start:end] chunks.append(tokenizer.convert_tokens_to_string(chunk)) if end == len(tokens): break start += chunk_size - chunk_overlap return chunks def retrieve_chunks(question, index, embed_model, text_chunks, k=3): q_embedding = embed_model.encode(question) D, I = index.search(np.array([q_embedding]), k) return [text_chunks[i] for i in I[0]] # ---------------- Groq Answer Generator ---------------- def generate_answer_with_groq(question, context): url = "https://api.groq.com/openai/v1/chat/completions" api_key = os.environ.get("GROQ_API_KEY") headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } prompt = ( f"Customer asked: '{question}'\n\n" f"Here is the relevant information to help:\n{context}\n\n" f"Respond in a friendly and helpful tone as a toy shop support agent." ) payload = { "model": "llama3-8b-8192", "messages": [ { "role": "system", "content": ( "You are ToyBot, a friendly WhatsApp assistant for an online toy shop. " "Help customers with toys, delivery, and returns in a helpful tone." ) }, {"role": "user", "content": prompt}, ], "temperature": 0.5, "max_tokens": 300, } response = requests.post(url, headers=headers, json=payload) response.raise_for_status() return response.json()['choices'][0]['message']['content'].strip() # ---------------- Twilio Integration ---------------- def fetch_latest_incoming_message(client, conversation_sid): try: messages = client.conversations.v1.conversations(conversation_sid).messages.list() for msg in reversed(messages): if msg.author.startswith("whatsapp:"): return { "sid": msg.sid, "body": msg.body, "author": msg.author, "timestamp": msg.date_created, } except TwilioRestException as e: print(f"Twilio error: {e}") return None def send_twilio_message(client, conversation_sid, body): return client.conversations.v1.conversations(conversation_sid).messages.create( author="system", body=body ) # ---------------- Knowledge Base Setup ---------------- def setup_knowledge_base(): folder_path = "docs" all_text = "" for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) if filename.endswith(".pdf"): text, tables = extract_text_from_pdf(file_path) all_text += clean_extracted_text(text) + "\n" all_text += _format_tables_internal(tables) + "\n" elif filename.endswith(".docx"): text = extract_text_from_docx(file_path) all_text += clean_extracted_text(text) + "\n" elif filename.endswith(".json"): text = load_json_data(file_path) all_text += text + "\n" elif filename.endswith(".csv"): try: with open(file_path, newline='', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile) for row in reader: line = ' | '.join(f"{k}: {v}" for k, v in row.items()) all_text += line + "\n" except Exception as e: print(f"CSV read error: {e}") tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased') chunks = chunk_text(all_text, tokenizer) model = SentenceTransformer('all-mpnet-base-v2') embeddings = model.encode(chunks, show_progress_bar=False) dim = embeddings[0].shape[0] index = faiss.IndexFlatL2(dim) index.add(np.array(embeddings).astype('float32')) return index, model, chunks # ---------------- Monitor Twilio Conversations ---------------- def start_conversation_monitor(client, index, embed_model, text_chunks): processed_convos = set() last_processed_timestamp = {} def poll_convo(convo_sid): while True: latest_msg = fetch_latest_incoming_message(client, convo_sid) if latest_msg: msg_time = latest_msg["timestamp"] if convo_sid not in last_processed_timestamp or msg_time > last_processed_timestamp[convo_sid]: last_processed_timestamp[convo_sid] = msg_time question = latest_msg["body"] sender = latest_msg["author"] print(f"📩 New message from {sender}: {question}") context = "\n\n".join(retrieve_chunks(question, index, embed_model, text_chunks)) answer = generate_answer_with_groq(question, context) send_twilio_message(client, convo_sid, answer) time.sleep(5) for convo in client.conversations.v1.conversations.list(): if convo.sid not in processed_convos: processed_convos.add(convo.sid) threading.Thread(target=poll_convo, args=(convo.sid,), daemon=True).start() # ---------------- Main Entry ---------------- if __name__ == "__main__": st.title("🤖 ToyBot WhatsApp Assistant") st.write("Initializing knowledge base...") index, model, chunks = setup_knowledge_base() st.success("Knowledge base loaded.") st.write("Waiting for WhatsApp messages...") account_sid = os.environ.get("TWILIO_ACCOUNT_SID") auth_token = os.environ.get("TWILIO_AUTH_TOKEN") if not account_sid or not auth_token: st.error("❌ Twilio credentials not set.") else: client = Client(account_sid, auth_token) start_conversation_monitor(client, index, model, chunks) st.info("✅ Bot is now monitoring Twilio conversations.")