Spaces:
Sleeping
Sleeping
import os | |
import time | |
import threading | |
import streamlit as st | |
from twilio.rest import Client | |
from sentence_transformers import SentenceTransformer | |
from transformers import AutoTokenizer | |
import faiss | |
import numpy as np | |
import docx | |
from groq import Groq | |
import requests | |
from io import StringIO | |
from pdfminer.high_level import extract_text_to_fp | |
from pdfminer.layout import LAParams | |
from twilio.base.exceptions import TwilioRestException | |
import pdfplumber | |
import datetime | |
import csv | |
import json | |
import re | |
APP_START_TIME = datetime.datetime.now(datetime.timezone.utc) | |
os.environ["PYTORCH_JIT"] = "0" | |
# Twilio Setup | |
TWILIO_ACCOUNT_SID = os.getenv("TWILIO_ACCOUNT_SID") | |
TWILIO_AUTH_TOKEN = os.getenv("TWILIO_AUTH_TOKEN") | |
twilio_client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN) | |
# ---------------- PDF & DOCX & JSON Extraction ---------------- | |
def _extract_tables_from_page(page): | |
tables = page.extract_tables() | |
formatted_tables = [] | |
for table in tables: | |
formatted_row = [[cell if cell is not None else "" for cell in row] for row in table] | |
formatted_tables.append(formatted_row) | |
return formatted_tables | |
def extract_text_from_pdf(pdf_path): | |
text_output = StringIO() | |
all_tables = [] | |
try: | |
with pdfplumber.open(pdf_path) as pdf: | |
for page in pdf.pages: | |
all_tables.extend(_extract_tables_from_page(page)) | |
text = page.extract_text() | |
if text: | |
text_output.write(text + "\n\n") | |
except Exception as e: | |
with open(pdf_path, 'rb') as file: | |
extract_text_to_fp(file, text_output, laparams=LAParams(), output_type='text') | |
return text_output.getvalue(), all_tables | |
def _format_tables_internal(tables): | |
formatted = [] | |
for table in tables: | |
with StringIO() as csvfile: | |
writer = csv.writer(csvfile) | |
writer.writerows(table) | |
formatted.append(csvfile.getvalue()) | |
return "\n\n".join(formatted) | |
def clean_extracted_text(text): | |
return '\n'.join(' '.join(line.strip().split()) for line in text.splitlines() if line.strip()) | |
def extract_text_from_docx(path): | |
try: | |
doc = docx.Document(path) | |
return '\n'.join(p.text for p in doc.paragraphs) | |
except: | |
return "" | |
def load_json_data(path): | |
try: | |
with open(path, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
if isinstance(data, dict): | |
return "\n".join(f"{k}: {v}" for k, v in data.items() if not isinstance(v, (dict, list))) | |
elif isinstance(data, list): | |
return "\n\n".join("\n".join(f"{k}: {v}" for k, v in item.items() if not isinstance(v, (dict, list))) for item in data if isinstance(item, dict)) | |
else: | |
return json.dumps(data, ensure_ascii=False, indent=2) | |
except Exception as e: | |
return "" | |
# ---------------- Chunking ---------------- | |
def chunk_text(text, tokenizer, chunk_size=128, chunk_overlap=32): | |
tokens = tokenizer.tokenize(text) | |
chunks = [] | |
start = 0 | |
while start < len(tokens): | |
end = min(start + chunk_size, len(tokens)) | |
chunk = tokens[start:end] | |
chunks.append(tokenizer.convert_tokens_to_string(chunk)) | |
start += chunk_size - chunk_overlap | |
return chunks | |
def retrieve_chunks(question, index, embed_model, text_chunks, k=3): | |
q_embedding = embed_model.encode(question) | |
D, I = index.search(np.array([q_embedding]), k) | |
return [text_chunks[i] for i in I[0]] | |
# ---------------- Groq Answer Generator ---------------- | |
def generate_answer_with_groq(question, context): | |
url = "https://api.groq.com/openai/v1/chat/completions" | |
api_key = os.environ.get("GROQ_API_KEY") | |
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} | |
prompt = f"Customer asked: '{question}'\n\nHere is the relevant information to help:\n{context}" | |
payload = { | |
"model": "llama3-8b-8192", | |
"messages": [ | |
{"role": "system", "content": "You are ToyBot, a friendly WhatsApp assistant..."}, | |
{"role": "user", "content": prompt}, | |
], | |
"temperature": 0.5, | |
"max_tokens": 300, | |
} | |
response = requests.post(url, headers=headers, json=payload) | |
return response.json()['choices'][0]['message']['content'].strip() | |
# ---------------- Twilio Integration ---------------- | |
def fetch_latest_incoming_message(client, conversation_sid): | |
try: | |
messages = client.conversations.v1.conversations(conversation_sid).messages.list() | |
for msg in reversed(messages): | |
if msg.author.startswith("whatsapp:"): | |
return {"sid": msg.sid, "body": msg.body, "author": msg.author, "timestamp": msg.date_created} | |
except TwilioRestException: | |
return None | |
def send_twilio_message(client, conversation_sid, body): | |
return client.conversations.v1.conversations(conversation_sid).messages.create(author="system", body=body) | |
# ---------------- Knowledge Base Setup ---------------- | |
def setup_knowledge_base(): | |
folder = "docs" | |
text = "" | |
for f in os.listdir(folder): | |
path = os.path.join(folder, f) | |
if f.endswith(".pdf"): | |
t, tables = extract_text_from_pdf(path) | |
text += clean_extracted_text(t) + "\n" + _format_tables_internal(tables) + "\n" | |
elif f.endswith(".docx"): | |
text += clean_extracted_text(extract_text_from_docx(path)) + "\n" | |
elif f.endswith(".json"): | |
text += load_json_data(path) + "\n" | |
elif f.endswith(".csv"): | |
with open(path, newline='', encoding='utf-8') as csvfile: | |
reader = csv.DictReader(csvfile) | |
for row in reader: | |
text += ' | '.join(f"{k}: {v}" for k, v in row.items()) + "\n" | |
return text | |
# ---------------- Message Processing Loop ---------------- | |
def process_messages_loop(conversation_sid, index, text_chunks, tokenizer, embed_model): | |
processed_sids = set() | |
while True: | |
message = fetch_latest_incoming_message(twilio_client, conversation_sid) | |
if message and message['sid'] not in processed_sids and message['timestamp'] > APP_START_TIME: | |
question = message['body'] | |
relevant = retrieve_chunks(question, index, embed_model, text_chunks) | |
answer = generate_answer_with_groq(question, '\n'.join(relevant)) | |
send_twilio_message(twilio_client, conversation_sid, answer) | |
processed_sids.add(message['sid']) | |
time.sleep(5) | |
# ---------------- Streamlit UI ---------------- | |
st.title("📱 ToyShop WhatsApp Chatbot") | |
kb_text = setup_knowledge_base() | |
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") | |
embed_model = SentenceTransformer("all-MiniLM-L6-v2") | |
chunks = chunk_text(kb_text, tokenizer) | |
embeddings = embed_model.encode(chunks) | |
index = faiss.IndexFlatL2(len(embeddings[0])) | |
index.add(np.array(embeddings)) | |
# Automatically fetch conversation SID | |
conversations = twilio_client.conversations.v1.conversations.list(limit=5) | |
conversation_sid = conversations[0].sid if conversations else None | |
if conversation_sid: | |
st.success(f"Monitoring Twilio conversation SID: {conversation_sid}") | |
threading.Thread(target=process_messages_loop, args=(conversation_sid, index, chunks, tokenizer, embed_model), daemon=True).start() | |
else: | |
st.error("No active Twilio conversation found.") | |