Quasa / app.py
masadonline's picture
Update app.py
d85b86d verified
raw
history blame
12.2 kB
import os
import time
import threading
import streamlit as st
from twilio.rest import Client
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer
import faiss
import numpy as np
import docx
from groq import Groq
import requests
from io import StringIO
from pdfminer.high_level import extract_text_to_fp
from pdfminer.layout import LAParams
from twilio.base.exceptions import TwilioRestException # Add this at the top
import pdfplumber
import datetime
import csv
APP_START_TIME = datetime.datetime.now(datetime.timezone.utc)
os.environ["PYTORCH_JIT"] = "0"
# --- PDF Extraction ---
def _extract_tables_from_page(page):
"""Extracts tables from a single page of a PDF."""
tables = page.extract_tables()
if not tables:
return []
formatted_tables = []
for table in tables:
formatted_table = []
for row in table:
if row: # Filter out empty rows
formatted_row = [cell if cell is not None else "" for cell in row] # Replace None with ""
formatted_table.append(formatted_row)
else:
formatted_table.append([""]) # Append an empty row if the row is None
formatted_tables.append(formatted_table)
return formatted_tables
def extract_text_from_pdf(pdf_path):
text_output = StringIO()
all_tables = []
try:
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
# Extract tables
page_tables = _extract_tables_from_page(page)
if page_tables:
all_tables.extend(page_tables)
# Extract text
text = page.extract_text()
if text:
text_output.write(text + "\n\n")
except Exception as e:
print(f"Error extracting with pdfplumber: {e}")
# Fallback to pdfminer if pdfplumber fails
with open(pdf_path, 'rb') as file:
extract_text_to_fp(file, text_output, laparams=LAParams(), output_type='text', codec=None)
extracted_text = text_output.getvalue()
return extracted_text, all_tables # Return text and list of tables
def clean_extracted_text(text):
lines = text.splitlines()
cleaned = []
for line in lines:
line = line.strip()
if line:
line = ' '.join(line.split())
cleaned.append(line)
return '\n'.join(cleaned)
def _format_tables_internal(tables):
"""Formats extracted tables into a string representation."""
formatted_tables_str = []
for table in tables:
# Use csv writer to handle commas and quotes correctly
with StringIO() as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerows(table)
formatted_tables_str.append(csvfile.getvalue())
return "\n\n".join(formatted_tables_str)
# --- DOCX Extraction ---
def extract_text_from_docx(docx_path):
try:
doc = docx.Document(docx_path)
return '\n'.join(para.text for para in doc.paragraphs)
except Exception:
return ""
# --- Chunking ---
def chunk_text(text, tokenizer, chunk_size=128, chunk_overlap=32, max_tokens=512):
tokens = tokenizer.tokenize(text)
chunks = []
start = 0
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk_tokens = tokens[start:end]
chunk_text = tokenizer.convert_tokens_to_string(chunk_tokens)
chunks.append(chunk_text)
if end == len(tokens):
break
start += chunk_size - chunk_overlap
return chunks
def retrieve_chunks(question, index, embed_model, text_chunks, k=3):
question_embedding = embed_model.encode(question)
D, I = index.search(np.array([question_embedding]), k)
return [text_chunks[i] for i in I[0]]
# --- Groq Answer Generator ---
def generate_answer_with_groq(question, context):
url = "https://api.groq.com/openai/v1/chat/completions"
api_key = os.environ.get("GROQ_API_KEY")
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
prompt = (
f"Customer asked: '{question}'\n\n"
f"Here is the relevant product or policy info to help:\n{context}\n\n"
f"Respond in a friendly and helpful tone as a toy shop support agent."
)
payload = {
"model": "llama3-8b-8192",
"messages": [
{
"role": "system",
"content": (
"You are ToyBot, a friendly and helpful WhatsApp assistant for an online toy shop. "
"Your goal is to politely answer customer questions, help them choose the right toys, "
"provide order or delivery information, explain return policies, and guide them through purchases."
)
},
{"role": "user", "content": prompt},
],
"temperature": 0.5,
"max_tokens": 300,
}
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
return response.json()['choices'][0]['message']['content'].strip()
# --- Twilio Functions ---
def fetch_latest_incoming_message(client, conversation_sid):
try:
messages = client.conversations.v1.conversations(conversation_sid).messages.list()
for msg in reversed(messages):
if msg.author.startswith("whatsapp:"):
return {
"sid": msg.sid,
"body": msg.body,
"author": msg.author,
"timestamp": msg.date_created,
}
except TwilioRestException as e:
if e.status == 404:
print(f"Conversation {conversation_sid} not found, skipping...")
else:
print(f"Twilio error fetching messages for {conversation_sid}:", e)
except Exception as e:
#print(f"Unexpected error in fetch_latest_incoming_message for {conversation_sid}:", e)
pass
return None
def send_twilio_message(client, conversation_sid, body):
return client.conversations.v1.conversations(conversation_sid).messages.create(
author="system", body=body
)
# --- Load Knowledge Base ---
def setup_knowledge_base():
folder_path = "docs"
all_text = ""
# Process PDFs
for filename in ["FAQ.pdf", "ProductReturnPolicy.pdf"]:
pdf_path = os.path.join(folder_path, filename)
text, tables = extract_text_from_pdf(pdf_path)
all_text += clean_extracted_text(text) + "\n"
all_text += _format_tables_internal(tables) + "\n"
# Process CSVs
for filename in ["CustomerOrders.csv"]:
csv_path = os.path.join(folder_path, filename)
try:
with open(csv_path, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
line = f"Order ID: {row.get('OrderID')} | Customer Name: {row.get('CustomerName')} | Order Date: {row.get('OrderDate')} | ProductID: {row.get('ProductID')} | Date: {row.get('OrderDate')} | Quantity: {row.get('Quantity')} | UnitPrice(USD): {row.get('UnitPrice(USD)')} | TotalPrice(USD): {row.get('TotalPrice(USD)')} | ShippingAddress: {row.get('ShippingAddress')} | OrderStatus: {row.get('OrderStatus')}"
all_text += line + "\n"
except Exception as e:
print(f"❌ Error reading {filename}: {e}")
for filename in ["Products.csv"]:
csv_path = os.path.join(folder_path, filename)
try:
with open(csv_path, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
line = f"Product ID: {row.get('ProductID')} | Toy Name: {row.get('ToyName')} | Category: {row.get('Category')} | Price(USD): {row.get('Price(USD)')} | Stock Quantity: {row.get('StockQuantity')} | Description: {row.get('Description')}"
all_text += line + "\n"
except Exception as e:
print(f"❌ Error reading {filename}: {e}")
# Tokenization & chunking
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
chunks = chunk_text(all_text, tokenizer)
model = SentenceTransformer('all-mpnet-base-v2')
embeddings = model.encode(chunks, show_progress_bar=False, truncation=True, max_length=512)
dim = embeddings[0].shape[0]
index = faiss.IndexFlatL2(dim)
index.add(np.array(embeddings).astype('float32'))
return index, model, chunks
# --- Monitor Conversations ---
def start_conversation_monitor(client, index, embed_model, text_chunks):
processed_convos = set()
last_processed_timestamp = {}
def poll_conversation(convo_sid):
while True:
try:
latest_msg = fetch_latest_incoming_message(client, convo_sid)
if latest_msg:
msg_time = latest_msg["timestamp"]
if convo_sid not in last_processed_timestamp or msg_time > last_processed_timestamp[convo_sid]:
last_processed_timestamp[convo_sid] = msg_time
question = latest_msg["body"]
sender = latest_msg["author"]
print(f"\nπŸ“₯ New message from {sender} in {convo_sid}: {question}")
context = "\n\n".join(retrieve_chunks(question, index, embed_model, text_chunks))
answer = generate_answer_with_groq(question, context)
send_twilio_message(client, convo_sid, answer)
print(f"πŸ“€ Replied to {sender}: {answer}")
time.sleep(3)
except Exception as e:
print(f"❌ Error in convo {convo_sid} polling:", e)
time.sleep(5)
def poll_new_conversations():
print("➑️ Monitoring for new WhatsApp conversations...")
while True:
try:
conversations = client.conversations.v1.conversations.list(limit=20)
for convo in conversations:
convo_full = client.conversations.v1.conversations(convo.sid).fetch()
if convo.sid not in processed_convos and convo_full.date_created > APP_START_TIME:
participants = client.conversations.v1.conversations(convo.sid).participants.list()
for p in participants:
address = p.messaging_binding.get("address", "") if p.messaging_binding else ""
if address.startswith("whatsapp:"):
print(f"πŸ†• New WhatsApp convo found: {convo.sid}")
processed_convos.add(convo.sid)
threading.Thread(target=poll_conversation, args=(convo.sid,), daemon=True).start()
except Exception as e:
print("❌ Error polling conversations:", e)
time.sleep(5)
# βœ… Launch conversation polling monitor
threading.Thread(target=poll_new_conversations, daemon=True).start()
# --- Streamlit UI ---
st.set_page_config(page_title="Quasa – A Smart WhatsApp Chatbot", layout="wide")
st.title("πŸ“± Quasa – A Smart WhatsApp Chatbot")
account_sid = st.secrets.get("TWILIO_SID")
auth_token = st.secrets.get("TWILIO_TOKEN")
GROQ_API_KEY = st.secrets.get("GROQ_API_KEY")
if not all([account_sid, auth_token, GROQ_API_KEY]):
st.warning("⚠️ Provide all credentials below:")
account_sid = st.text_input("Twilio SID", value=account_sid or "")
auth_token = st.text_input("Twilio Token", type="password", value=auth_token or "")
GROQ_API_KEY = st.text_input("GROQ API Key", type="password", value=GROQ_API_KEY or "")
if all([account_sid, auth_token, GROQ_API_KEY]):
os.environ["GROQ_API_KEY"] = GROQ_API_KEY
client = Client(account_sid, auth_token)
st.success("🟒 Monitoring new WhatsApp conversations...")
index, model, chunks = setup_knowledge_base()
threading.Thread(target=start_conversation_monitor, args=(client, index, model, chunks), daemon=True).start()
st.info("⏳ Waiting for new messages...")