import os import time import threading import streamlit as st from twilio.rest import Client from sentence_transformers import SentenceTransformer from transformers import AutoTokenizer import faiss import numpy as np import docx from groq import Groq import requests from io import StringIO from pdfminer.high_level import extract_text_to_fp from pdfminer.layout import LAParams from twilio.base.exceptions import TwilioRestException import pdfplumber import datetime import csv import json import re APP_START_TIME = datetime.datetime.now(datetime.timezone.utc) os.environ["PYTORCH_JIT"] = "0" # Twilio Setup TWILIO_ACCOUNT_SID = os.getenv("TWILIO_ACCOUNT_SID") TWILIO_AUTH_TOKEN = os.getenv("TWILIO_AUTH_TOKEN") twilio_client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN) # ---------------- PDF & DOCX & JSON Extraction ---------------- def _extract_tables_from_page(page): tables = page.extract_tables() formatted_tables = [] for table in tables: formatted_row = [[cell if cell is not None else "" for cell in row] for row in table] formatted_tables.append(formatted_row) return formatted_tables def extract_text_from_pdf(pdf_path): text_output = StringIO() all_tables = [] try: with pdfplumber.open(pdf_path) as pdf: for page in pdf.pages: all_tables.extend(_extract_tables_from_page(page)) text = page.extract_text() if text: text_output.write(text + "\n\n") except Exception as e: with open(pdf_path, 'rb') as file: extract_text_to_fp(file, text_output, laparams=LAParams(), output_type='text') return text_output.getvalue(), all_tables def _format_tables_internal(tables): formatted = [] for table in tables: with StringIO() as csvfile: writer = csv.writer(csvfile) writer.writerows(table) formatted.append(csvfile.getvalue()) return "\n\n".join(formatted) def clean_extracted_text(text): return '\n'.join(' '.join(line.strip().split()) for line in text.splitlines() if line.strip()) def extract_text_from_docx(path): try: doc = docx.Document(path) return '\n'.join(p.text for p in doc.paragraphs) except: return "" def load_json_data(path): try: with open(path, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, dict): return "\n".join(f"{k}: {v}" for k, v in data.items() if not isinstance(v, (dict, list))) elif isinstance(data, list): return "\n\n".join("\n".join(f"{k}: {v}" for k, v in item.items() if not isinstance(v, (dict, list))) for item in data if isinstance(item, dict)) else: return json.dumps(data, ensure_ascii=False, indent=2) except Exception as e: return "" # ---------------- Chunking ---------------- def chunk_text(text, tokenizer, chunk_size=128, chunk_overlap=32): tokens = tokenizer.tokenize(text) chunks = [] start = 0 while start < len(tokens): end = min(start + chunk_size, len(tokens)) chunk = tokens[start:end] chunks.append(tokenizer.convert_tokens_to_string(chunk)) start += chunk_size - chunk_overlap return chunks def retrieve_chunks(question, index, embed_model, text_chunks, k=3): q_embedding = embed_model.encode(question) D, I = index.search(np.array([q_embedding]), k) return [text_chunks[i] for i in I[0]] # ---------------- Groq Answer Generator ---------------- def generate_answer_with_groq(question, context): url = "https://api.groq.com/openai/v1/chat/completions" api_key = os.environ.get("GROQ_API_KEY") headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} prompt = f"Customer asked: '{question}'\n\nHere is the relevant information to help:\n{context}" payload = { "model": "llama3-8b-8192", "messages": [ {"role": "system", "content": "You are ToyBot, a friendly WhatsApp assistant..."}, {"role": "user", "content": prompt}, ], "temperature": 0.5, "max_tokens": 300, } response = requests.post(url, headers=headers, json=payload) return response.json()['choices'][0]['message']['content'].strip() # ---------------- Twilio Integration ---------------- def fetch_latest_incoming_message(client, conversation_sid): try: messages = client.conversations.v1.conversations(conversation_sid).messages.list() for msg in reversed(messages): if msg.author.startswith("whatsapp:"): return {"sid": msg.sid, "body": msg.body, "author": msg.author, "timestamp": msg.date_created} except TwilioRestException: return None def send_twilio_message(client, conversation_sid, body): return client.conversations.v1.conversations(conversation_sid).messages.create(author="system", body=body) # ---------------- Knowledge Base Setup ---------------- def setup_knowledge_base(): folder = "docs" text = "" for f in os.listdir(folder): path = os.path.join(folder, f) if f.endswith(".pdf"): t, tables = extract_text_from_pdf(path) text += clean_extracted_text(t) + "\n" + _format_tables_internal(tables) + "\n" elif f.endswith(".docx"): text += clean_extracted_text(extract_text_from_docx(path)) + "\n" elif f.endswith(".json"): text += load_json_data(path) + "\n" elif f.endswith(".csv"): with open(path, newline='', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile) for row in reader: text += ' | '.join(f"{k}: {v}" for k, v in row.items()) + "\n" return text # ---------------- Message Processing Loop ---------------- def process_messages_loop(conversation_sid, index, text_chunks, tokenizer, embed_model): processed_sids = set() while True: message = fetch_latest_incoming_message(twilio_client, conversation_sid) if message and message['sid'] not in processed_sids and message['timestamp'] > APP_START_TIME: question = message['body'] relevant = retrieve_chunks(question, index, embed_model, text_chunks) answer = generate_answer_with_groq(question, '\n'.join(relevant)) send_twilio_message(twilio_client, conversation_sid, answer) processed_sids.add(message['sid']) time.sleep(5) # ---------------- Streamlit UI ---------------- st.title("📱 ToyShop WhatsApp Chatbot") kb_text = setup_knowledge_base() tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") embed_model = SentenceTransformer("all-MiniLM-L6-v2") chunks = chunk_text(kb_text, tokenizer) embeddings = embed_model.encode(chunks) index = faiss.IndexFlatL2(len(embeddings[0])) index.add(np.array(embeddings)) # Automatically fetch conversation SID conversations = twilio_client.conversations.v1.conversations.list(limit=5) conversation_sid = conversations[0].sid if conversations else None if conversation_sid: st.success(f"Monitoring Twilio conversation SID: {conversation_sid}") threading.Thread(target=process_messages_loop, args=(conversation_sid, index, chunks, tokenizer, embed_model), daemon=True).start() else: st.error("No active Twilio conversation found.")