Quasa / app.py
masadonline's picture
Update app.py
b089ae9 verified
raw
history blame
6.96 kB
import os
import json
import time
import threading
import datetime
import csv
import docx
import streamlit as st
from io import StringIO
import numpy as np
import requests
import pdfplumber
from pdfminer.high_level import extract_text_to_fp
from pdfminer.layout import LAParams
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer
import faiss
from twilio.rest import Client
from twilio.base.exceptions import TwilioRestException
APP_START_TIME = datetime.datetime.now(datetime.timezone.utc)
os.environ["PYTORCH_JIT"] = "0"
DOCS_FOLDER = "./docs"
# ---------------- PDF / DOCX / JSON LOADERS ----------------
def _extract_tables_from_page(page):
tables = page.extract_tables()
formatted_tables = []
for table in tables:
formatted_row = [[cell if cell else "" for cell in row] for row in table]
formatted_tables.append(formatted_row)
return formatted_tables
def extract_text_from_pdf(pdf_path):
text_output = StringIO()
all_tables = []
try:
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
all_tables.extend(_extract_tables_from_page(page))
text = page.extract_text()
if text:
text_output.write(text + "\n\n")
except Exception as e:
print(f"[pdfplumber error] Falling back: {e}")
with open(pdf_path, 'rb') as file:
extract_text_to_fp(file, text_output, laparams=LAParams(), output_type='text')
return text_output.getvalue(), all_tables
def extract_text_from_docx(docx_path):
try:
doc = docx.Document(docx_path)
return "\n".join(para.text for para in doc.paragraphs)
except Exception as e:
print(f"[DOCX error] {e}")
return ""
def load_json_data(json_path):
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
return "\n\n".join("\n".join(f"{k}: {v}" for k, v in d.items()) for d in data if isinstance(d, dict))
if isinstance(data, dict):
return "\n".join(f"{k}: {v}" for k, v in data.items())
return str(data)
except Exception as e:
print(f"[JSON error] {e}")
return ""
def clean_extracted_text(text):
return '\n'.join(' '.join(line.strip().split()) for line in text.splitlines() if line.strip())
# ---------------- CHUNKING ----------------
def chunk_text(text, tokenizer, chunk_size=128, chunk_overlap=32):
tokens = tokenizer.tokenize(text)
chunks, start = [], 0
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk = tokens[start:end]
chunks.append(tokenizer.convert_tokens_to_string(chunk))
if end == len(tokens): break
start += chunk_size - chunk_overlap
return chunks
def retrieve_chunks(question, index, embed_model, text_chunks, k=3):
q_embedding = embed_model.encode(question)
D, I = index.search(np.array([q_embedding]), k)
return [text_chunks[i] for i in I[0] if i < len(text_chunks)]
# ---------------- GROQ ANSWER GENERATOR ----------------
def generate_answer_with_groq(question, context):
url = "https://api.groq.com/openai/v1/chat/completions"
api_key = os.environ.get("GROQ_API_KEY")
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
prompt = (
f"Customer asked: '{question}'\n\n"
f"Here is the relevant information:\n{context}\n\n"
"Reply as a friendly toy shop assistant, include customer name from the context if possible."
)
payload = {
"model": "llama3-8b-8192",
"messages": [
{
"role": "system",
"content": "You are ToyBot, a helpful assistant for an online toy shop."
},
{"role": "user", "content": prompt}
]
}
response = requests.post(url, headers=headers, json=payload)
return response.json()["choices"][0]["message"]["content"]
# ---------------- TWILIO MONITOR ----------------
def handle_incoming_messages(index, embed_model, tokenizer, text_chunks):
client = Client(os.environ["TWILIO_ACCOUNT_SID"], os.environ["TWILIO_AUTH_TOKEN"])
conversation_sid = os.environ.get("TWILIO_CONVERSATION_SID")
if not conversation_sid:
print("❌ TWILIO_CONVERSATION_SID not set")
return
last_check_time = APP_START_TIME
while True:
try:
messages = client.conversations.conversations(conversation_sid).messages.list(order="asc")
for msg in messages:
msg_time = msg.date_created.replace(tzinfo=datetime.timezone.utc)
if msg_time > last_check_time:
print(f"πŸ“© New message from {msg.author}: {msg.body}")
answer = generate_answer_with_groq(msg.body, "\n".join(retrieve_chunks(msg.body, index, embed_model, text_chunks)))
client.conversations.conversations(conversation_sid).messages.create(author="ToyBot", body=answer)
last_check_time = datetime.datetime.now(datetime.timezone.utc)
except TwilioRestException as e:
print(f"[Twilio error] {e}")
time.sleep(10)
# ---------------- STREAMLIT UI ----------------
st.title("🎁 ToyShop Assistant – RAG WhatsApp Bot")
def load_all_documents(folder_path):
full_text = ""
all_tables = []
for filename in os.listdir(folder_path):
filepath = os.path.join(folder_path, filename)
ext = filename.lower().split(".")[-1]
if ext == "pdf":
text, tables = extract_text_from_pdf(filepath)
all_tables.extend(tables)
elif ext == "docx":
text = extract_text_from_docx(filepath)
elif ext == "json":
text = load_json_data(filepath)
else:
try:
with open(filepath, "r", encoding="utf-8") as f:
text = f.read()
except Exception:
continue
full_text += clean_extracted_text(text) + "\n\n"
return full_text, all_tables
with st.spinner("Loading documents..."):
full_text, tables = load_all_documents(DOCS_FOLDER)
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
chunks = chunk_text(full_text, tokenizer)
embeddings = embed_model.encode(chunks)
index = faiss.IndexFlatL2(embeddings.shape[1])
index.add(np.array(embeddings))
if "listener_started" not in st.session_state:
threading.Thread(target=handle_incoming_messages, args=(index, embed_model, tokenizer, chunks), daemon=True).start()
st.session_state.listener_started = True
st.success("βœ… WhatsApp listener started.")
st.success(f"πŸ“š Loaded {len(chunks)} text chunks from docs/ folder")