import os import time import threading import streamlit as st from twilio.rest import Client from sentence_transformers import SentenceTransformer from transformers import AutoTokenizer import faiss import numpy as np import docx from groq import Groq import requests from io import StringIO from pdfminer.high_level import extract_text_to_fp from pdfminer.layout import LAParams from twilio.base.exceptions import TwilioRestException import pdfplumber import datetime import csv import json import re APP_START_TIME = datetime.datetime.now(datetime.timezone.utc) os.environ["PYTORCH_JIT"] = "0" # Twilio Setup TWILIO_ACCOUNT_SID = os.getenv("TWILIO_ACCOUNT_SID") TWILIO_AUTH_TOKEN = os.getenv("TWILIO_AUTH_TOKEN") twilio_client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN) # ---------------- PDF & DOCX & JSON Extraction ---------------- def _extract_tables_from_page(page): tables = page.extract_tables() formatted_tables = [] for table in tables: formatted_row = [[cell if cell is not None else "" for cell in row] for row in table] formatted_tables.append(formatted_row) return formatted_tables def extract_text_from_pdf(pdf_path): text_output = StringIO() all_tables = [] try: with pdfplumber.open(pdf_path) as pdf: for page in pdf.pages: all_tables.extend(_extract_tables_from_page(page)) text = page.extract_text() if text: text_output.write(text + "\n\n") except Exception as e: with open(pdf_path, 'rb') as file: extract_text_to_fp(file, text_output, laparams=LAParams(), output_type='text') return text_output.getvalue(), all_tables def _format_tables_internal(tables): formatted = [] for table in tables: with StringIO() as csvfile: writer = csv.writer(csvfile) writer.writerows(table) formatted.append(csvfile.getvalue()) return "\n\n".join(formatted) def clean_extracted_text(text): return '\n'.join(' '.join(line.strip().split()) for line in text.splitlines() if line.strip()) def extract_text_from_docx(path): try: doc = docx.Document(path) return '\n'.join(p.text for p in doc.paragraphs) except: return "" def load_json_data(path): try: with open(path, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, dict): return "\n".join(f"{k}: {v}" for k, v in data.items() if not isinstance(v, (dict, list))) elif isinstance(data, list): return "\n\n".join("\n".join(f"{k}: {v}" for k, v in item.items() if isinstance(item, dict)) for item in data) else: return json.dumps(data, ensure_ascii=False, indent=2) except Exception as e: return "" # ---------------- Chunking ---------------- def chunk_text(text, tokenizer, chunk_size=128, chunk_overlap=32): tokens = tokenizer.tokenize(text) chunks = [] start = 0 while start < len(tokens): end = min(start + chunk_size, len(tokens)) chunk = tokens[start:end] chunks.append(tokenizer.convert_tokens_to_string(chunk)) start += chunk_size - chunk_overlap return chunks def retrieve_chunks(question, index, embed_model, text_chunks, k=3): q_embedding = embed_model.encode(question) D, I = index.search(np.array([q_embedding]), k) return [text_chunks[i] for i in I[0]] # ---------------- Groq Answer Generator ---------------- def generate_answer_with_groq(question, context): url = "https://api.groq.com/openai/v1/chat/completions" api_key = os.getenv("GROQ_API_KEY") headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} prompt = f"Customer asked: '{question}'\n\nHere is the relevant information to help:\n{context}" payload = { "model": "llama3-8b-8192", "messages": [ {"role": "system", "content": "You are ToyBot, a friendly WhatsApp assistant..."}, {"role": "user", "content": prompt}, ], "temperature": 0.5, "max_tokens": 300, } response = requests.post(url, headers=headers, json=payload) return response.json()['choices'][0]['message']['content'].strip() # ---------------- Twilio Integration ---------------- def fetch_latest_incoming_message(client, conversation_sid): try: messages = client.conversations.v1.conversations(conversation_sid).messages.list() for msg in reversed(messages): if ( msg.author.startswith("whatsapp:") and msg.date_created and msg.date_created > APP_START_TIME ): return { "sid": msg.sid, "body": msg.body, "author": msg.author, "timestamp": msg.date_created, } except TwilioRestException: return None def send_twilio_message(client, conversation_sid, body): return client.conversations.v1.conversations(conversation_sid).messages.create(author="system", body=body) def get_latest_whatsapp_conversation_sid(client): try: conversations = client.conversations.v1.conversations.list(limit=20) filtered = [ c for c in conversations if c.date_created and c.date_created > APP_START_TIME ] for convo in sorted(filtered, key=lambda c: c.date_created, reverse=True): messages = convo.messages.list(limit=1) if messages and any(m.author.startswith("whatsapp:") and m.date_created > APP_START_TIME for m in messages): return convo.sid except Exception as e: print("Error fetching valid conversation SID:", e) return None # ---------------- Load Orders ---------------- def load_orders(): orders_path = "docs/orders.json" try: with open(orders_path, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: print(f"Error loading orders: {e}") return {} def extract_order_id(text): pattern = r"\bORD\d{3,}\b" match = re.search(pattern, text, re.IGNORECASE) if match: return match.group(0).upper() return None def format_order_response(order_id, order_data): if not order_data: return f"Sorry, I could not find details for order ID {order_id}." details = [ f"Order ID: {order_id}", f"Customer Name: {order_data.get('customer_name', 'N/A')}", f"Address: {order_data.get('address', 'N/A')}", f"Items: {', '.join(order_data.get('items', []))}", f"Status: {order_data.get('status', 'N/A')}", ] return "\n".join(details) # ---------------- Knowledge Base Setup ---------------- def setup_knowledge_base(): folder = "docs" text = "" for f in os.listdir(folder): path = os.path.join(folder, f) if f.endswith(".pdf"): t, tables = extract_text_from_pdf(path) text += clean_extracted_text(t) + "\n" + _format_tables_internal(tables) + "\n" elif f.endswith(".docx"): text += clean_extracted_text(extract_text_from_docx(path)) + "\n" elif f.endswith(".json"): # Skip orders.json here to avoid mixing with KB text if f == "orders.json": continue text += load_json_data(path) + "\n" elif f.endswith(".csv"): with open(path, newline='', encoding='utf-8') as csvfile: reader = csv.reader(csvfile) text += "\n".join(", ".join(row) for row in reader) + "\n" return text # ---------------- App Logic ---------------- def process_messages_loop(): embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") knowledge_text = setup_knowledge_base() text_chunks = chunk_text(knowledge_text, tokenizer) embeddings = embed_model.encode(text_chunks) index = faiss.IndexFlatL2(embeddings.shape[1]) index.add(embeddings) orders = load_orders() # Load orders once at start seen_sids = set() while True: conversation_sid = get_latest_whatsapp_conversation_sid(twilio_client) if not conversation_sid: time.sleep(5) continue message = fetch_latest_incoming_message(twilio_client, conversation_sid) if message and message["sid"] not in seen_sids: seen_sids.add(message["sid"]) question = message["body"] order_id = extract_order_id(question) if order_id and order_id in orders: answer = format_order_response(order_id, orders[order_id]) else: chunks = retrieve_chunks(question, index, embed_model, text_chunks) answer = generate_answer_with_groq(question, "\n\n".join(chunks)) send_twilio_message(twilio_client, conversation_sid, answer) time.sleep(5) # ---------------- Streamlit UI ---------------- st.title("ToyShop WhatsApp Assistant (Groq + Twilio)") if st.button("Start WhatsApp Bot"): thread = threading.Thread(target=process_messages_loop, daemon=True) thread.start() st.success("WhatsApp assistant started and monitoring for new messages.")