Spaces:
Sleeping
Sleeping
import os | |
import json | |
import time | |
import threading | |
import datetime | |
import csv | |
import docx | |
import streamlit as st | |
from io import StringIO | |
import numpy as np | |
import requests | |
import pdfplumber | |
from pdfminer.high_level import extract_text_to_fp | |
from pdfminer.layout import LAParams | |
from sentence_transformers import SentenceTransformer | |
from transformers import AutoTokenizer | |
import faiss | |
from twilio.rest import Client | |
from twilio.base.exceptions import TwilioRestException | |
APP_START_TIME = datetime.datetime.now(datetime.timezone.utc) | |
os.environ["PYTORCH_JIT"] = "0" | |
DOCS_FOLDER = "./docs" | |
# ---------------- PDF / DOCX / JSON LOADERS ---------------- | |
def _extract_tables_from_page(page): | |
tables = page.extract_tables() | |
formatted_tables = [] | |
for table in tables: | |
formatted_row = [[cell if cell else "" for cell in row] for row in table] | |
formatted_tables.append(formatted_row) | |
return formatted_tables | |
def extract_text_from_pdf(pdf_path): | |
text_output = StringIO() | |
all_tables = [] | |
try: | |
with pdfplumber.open(pdf_path) as pdf: | |
for page in pdf.pages: | |
all_tables.extend(_extract_tables_from_page(page)) | |
text = page.extract_text() | |
if text: | |
text_output.write(text + "\n\n") | |
except Exception as e: | |
print(f"[pdfplumber error] Falling back: {e}") | |
with open(pdf_path, 'rb') as file: | |
extract_text_to_fp(file, text_output, laparams=LAParams(), output_type='text') | |
return text_output.getvalue(), all_tables | |
def extract_text_from_docx(docx_path): | |
try: | |
doc = docx.Document(docx_path) | |
return "\n".join(para.text for para in doc.paragraphs) | |
except Exception as e: | |
print(f"[DOCX error] {e}") | |
return "" | |
def load_json_data(json_path): | |
try: | |
with open(json_path, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
if isinstance(data, list): | |
return "\n\n".join("\n".join(f"{k}: {v}" for k, v in d.items()) for d in data if isinstance(d, dict)) | |
if isinstance(data, dict): | |
return "\n".join(f"{k}: {v}" for k, v in data.items()) | |
return str(data) | |
except Exception as e: | |
print(f"[JSON error] {e}") | |
return "" | |
def clean_extracted_text(text): | |
return '\n'.join(' '.join(line.strip().split()) for line in text.splitlines() if line.strip()) | |
# ---------------- CHUNKING ---------------- | |
def chunk_text(text, tokenizer, chunk_size=128, chunk_overlap=32): | |
tokens = tokenizer.tokenize(text) | |
chunks, start = [], 0 | |
while start < len(tokens): | |
end = min(start + chunk_size, len(tokens)) | |
chunk = tokens[start:end] | |
chunks.append(tokenizer.convert_tokens_to_string(chunk)) | |
if end == len(tokens): break | |
start += chunk_size - chunk_overlap | |
return chunks | |
def retrieve_chunks(question, index, embed_model, text_chunks, k=3): | |
q_embedding = embed_model.encode(question) | |
D, I = index.search(np.array([q_embedding]), k) | |
return [text_chunks[i] for i in I[0] if i < len(text_chunks)] | |
# ---------------- GROQ ANSWER GENERATOR ---------------- | |
def generate_answer_with_groq(question, context): | |
url = "https://api.groq.com/openai/v1/chat/completions" | |
api_key = os.environ.get("GROQ_API_KEY") | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json", | |
} | |
prompt = ( | |
f"Customer asked: '{question}'\n\n" | |
f"Here is the relevant information:\n{context}\n\n" | |
"Reply as a friendly toy shop assistant, include customer name from the context if possible." | |
) | |
payload = { | |
"model": "llama3-8b-8192", | |
"messages": [ | |
{ | |
"role": "system", | |
"content": "You are ToyBot, a helpful assistant for an online toy shop." | |
}, | |
{"role": "user", "content": prompt} | |
] | |
} | |
response = requests.post(url, headers=headers, json=payload) | |
return response.json()["choices"][0]["message"]["content"] | |
# ---------------- TWILIO MONITOR ---------------- | |
def handle_incoming_messages(index, embed_model, tokenizer, text_chunks): | |
client = Client(os.environ["TWILIO_ACCOUNT_SID"], os.environ["TWILIO_AUTH_TOKEN"]) | |
conversation_sid = os.environ.get("TWILIO_CONVERSATION_SID") | |
if not conversation_sid: | |
print("β TWILIO_CONVERSATION_SID not set") | |
return | |
last_check_time = APP_START_TIME | |
while True: | |
try: | |
messages = client.conversations.conversations(conversation_sid).messages.list(order="asc") | |
for msg in messages: | |
msg_time = msg.date_created.replace(tzinfo=datetime.timezone.utc) | |
if msg_time > last_check_time: | |
print(f"π© New message from {msg.author}: {msg.body}") | |
answer = generate_answer_with_groq(msg.body, "\n".join(retrieve_chunks(msg.body, index, embed_model, text_chunks))) | |
client.conversations.conversations(conversation_sid).messages.create(author="ToyBot", body=answer) | |
last_check_time = datetime.datetime.now(datetime.timezone.utc) | |
except TwilioRestException as e: | |
print(f"[Twilio error] {e}") | |
time.sleep(10) | |
# ---------------- STREAMLIT UI ---------------- | |
st.title("π ToyShop Assistant β RAG WhatsApp Bot") | |
def load_all_documents(folder_path): | |
full_text = "" | |
all_tables = [] | |
for filename in os.listdir(folder_path): | |
filepath = os.path.join(folder_path, filename) | |
ext = filename.lower().split(".")[-1] | |
if ext == "pdf": | |
text, tables = extract_text_from_pdf(filepath) | |
all_tables.extend(tables) | |
elif ext == "docx": | |
text = extract_text_from_docx(filepath) | |
elif ext == "json": | |
text = load_json_data(filepath) | |
else: | |
try: | |
with open(filepath, "r", encoding="utf-8") as f: | |
text = f.read() | |
except Exception: | |
continue | |
full_text += clean_extracted_text(text) + "\n\n" | |
return full_text, all_tables | |
with st.spinner("Loading documents..."): | |
full_text, tables = load_all_documents(DOCS_FOLDER) | |
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") | |
embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
chunks = chunk_text(full_text, tokenizer) | |
embeddings = embed_model.encode(chunks) | |
index = faiss.IndexFlatL2(embeddings.shape[1]) | |
index.add(np.array(embeddings)) | |
if "listener_started" not in st.session_state: | |
threading.Thread(target=handle_incoming_messages, args=(index, embed_model, tokenizer, chunks), daemon=True).start() | |
st.session_state.listener_started = True | |
st.success("β WhatsApp listener started.") | |
st.success(f"π Loaded {len(chunks)} text chunks from docs/ folder") | |