Quasa / app.py
masadonline's picture
Update app.py
9ef413c verified
raw
history blame
9.38 kB
import os
import time
import threading
import streamlit as st
from twilio.rest import Client
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer
import faiss
import numpy as np
import docx
from groq import Groq
import requests
from io import StringIO
from pdfminer.high_level import extract_text_to_fp
from pdfminer.layout import LAParams
from twilio.base.exceptions import TwilioRestException
import pdfplumber
import datetime
import csv
import json
import re
APP_START_TIME = datetime.datetime.now(datetime.timezone.utc)
os.environ["PYTORCH_JIT"] = "0"
# Twilio Setup
TWILIO_ACCOUNT_SID = os.getenv("TWILIO_ACCOUNT_SID")
TWILIO_AUTH_TOKEN = os.getenv("TWILIO_AUTH_TOKEN")
twilio_client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
# ---------------- PDF & DOCX & JSON Extraction ----------------
def _extract_tables_from_page(page):
tables = page.extract_tables()
formatted_tables = []
for table in tables:
formatted_row = [[cell if cell is not None else "" for cell in row] for row in table]
formatted_tables.append(formatted_row)
return formatted_tables
def extract_text_from_pdf(pdf_path):
text_output = StringIO()
all_tables = []
try:
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
all_tables.extend(_extract_tables_from_page(page))
text = page.extract_text()
if text:
text_output.write(text + "\n\n")
except Exception as e:
with open(pdf_path, 'rb') as file:
extract_text_to_fp(file, text_output, laparams=LAParams(), output_type='text')
return text_output.getvalue(), all_tables
def _format_tables_internal(tables):
formatted = []
for table in tables:
with StringIO() as csvfile:
writer = csv.writer(csvfile)
writer.writerows(table)
formatted.append(csvfile.getvalue())
return "\n\n".join(formatted)
def clean_extracted_text(text):
return '\n'.join(' '.join(line.strip().split()) for line in text.splitlines() if line.strip())
def extract_text_from_docx(path):
try:
doc = docx.Document(path)
return '\n'.join(p.text for p in doc.paragraphs)
except:
return ""
def load_json_data(path):
try:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, dict):
return "\n".join(f"{k}: {v}" for k, v in data.items() if not isinstance(v, (dict, list)))
elif isinstance(data, list):
return "\n\n".join("\n".join(f"{k}: {v}" for k, v in item.items() if isinstance(item, dict)) for item in data)
else:
return json.dumps(data, ensure_ascii=False, indent=2)
except Exception as e:
return ""
# ---------------- Chunking ----------------
def chunk_text(text, tokenizer, chunk_size=128, chunk_overlap=32):
tokens = tokenizer.tokenize(text)
chunks = []
start = 0
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk = tokens[start:end]
chunks.append(tokenizer.convert_tokens_to_string(chunk))
start += chunk_size - chunk_overlap
return chunks
def retrieve_chunks(question, index, embed_model, text_chunks, k=3):
q_embedding = embed_model.encode(question)
D, I = index.search(np.array([q_embedding]), k)
return [text_chunks[i] for i in I[0]]
# ---------------- Groq Answer Generator ----------------
def generate_answer_with_groq(question, context):
url = "https://api.groq.com/openai/v1/chat/completions"
api_key = os.getenv("GROQ_API_KEY")
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
prompt = f"Customer asked: '{question}'\n\nHere is the relevant information to help:\n{context}"
payload = {
"model": "llama3-8b-8192",
"messages": [
{"role": "system", "content": "You are ToyBot, a friendly WhatsApp assistant..."},
{"role": "user", "content": prompt},
],
"temperature": 0.5,
"max_tokens": 300,
}
response = requests.post(url, headers=headers, json=payload)
return response.json()['choices'][0]['message']['content'].strip()
# ---------------- Twilio Integration ----------------
def fetch_latest_incoming_message(client, conversation_sid):
try:
messages = client.conversations.v1.conversations(conversation_sid).messages.list()
for msg in reversed(messages):
if (
msg.author.startswith("whatsapp:") and
msg.date_created and
msg.date_created > APP_START_TIME
):
return {
"sid": msg.sid,
"body": msg.body,
"author": msg.author,
"timestamp": msg.date_created,
}
except TwilioRestException:
return None
def send_twilio_message(client, conversation_sid, body):
return client.conversations.v1.conversations(conversation_sid).messages.create(author="system", body=body)
def get_latest_whatsapp_conversation_sid(client):
try:
conversations = client.conversations.v1.conversations.list(limit=20)
filtered = [
c for c in conversations
if c.date_created and c.date_created > APP_START_TIME
]
for convo in sorted(filtered, key=lambda c: c.date_created, reverse=True):
messages = convo.messages.list(limit=1)
if messages and any(m.author.startswith("whatsapp:") and m.date_created > APP_START_TIME for m in messages):
return convo.sid
except Exception as e:
print("Error fetching valid conversation SID:", e)
return None
# ---------------- Load Orders ----------------
def load_orders():
orders_path = "docs/orders.json"
try:
with open(orders_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f"Error loading orders: {e}")
return {}
def extract_order_id(text):
pattern = r"\bORD\d{3,}\b"
match = re.search(pattern, text, re.IGNORECASE)
if match:
return match.group(0).upper()
return None
def format_order_response(order_id, order_data):
if not order_data:
return f"Sorry, I could not find details for order ID {order_id}."
details = [
f"Order ID: {order_id}",
f"Customer Name: {order_data.get('customer_name', 'N/A')}",
f"Address: {order_data.get('address', 'N/A')}",
f"Items: {', '.join(order_data.get('items', []))}",
f"Status: {order_data.get('status', 'N/A')}",
]
return "\n".join(details)
# ---------------- Knowledge Base Setup ----------------
def setup_knowledge_base():
folder = "docs"
text = ""
for f in os.listdir(folder):
path = os.path.join(folder, f)
if f.endswith(".pdf"):
t, tables = extract_text_from_pdf(path)
text += clean_extracted_text(t) + "\n" + _format_tables_internal(tables) + "\n"
elif f.endswith(".docx"):
text += clean_extracted_text(extract_text_from_docx(path)) + "\n"
elif f.endswith(".json"):
# Skip orders.json here to avoid mixing with KB text
if f == "orders.json":
continue
text += load_json_data(path) + "\n"
elif f.endswith(".csv"):
with open(path, newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
text += "\n".join(", ".join(row) for row in reader) + "\n"
return text
# ---------------- App Logic ----------------
def process_messages_loop():
embed_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
knowledge_text = setup_knowledge_base()
text_chunks = chunk_text(knowledge_text, tokenizer)
embeddings = embed_model.encode(text_chunks)
index = faiss.IndexFlatL2(embeddings.shape[1])
index.add(embeddings)
orders = load_orders() # Load orders once at start
seen_sids = set()
while True:
conversation_sid = get_latest_whatsapp_conversation_sid(twilio_client)
if not conversation_sid:
time.sleep(5)
continue
message = fetch_latest_incoming_message(twilio_client, conversation_sid)
if message and message["sid"] not in seen_sids:
seen_sids.add(message["sid"])
question = message["body"]
order_id = extract_order_id(question)
if order_id and order_id in orders:
answer = format_order_response(order_id, orders[order_id])
else:
chunks = retrieve_chunks(question, index, embed_model, text_chunks)
answer = generate_answer_with_groq(question, "\n\n".join(chunks))
send_twilio_message(twilio_client, conversation_sid, answer)
time.sleep(5)
# ---------------- Streamlit UI ----------------
st.title("ToyShop WhatsApp Assistant (Groq + Twilio)")
if st.button("Start WhatsApp Bot"):
thread = threading.Thread(target=process_messages_loop, daemon=True)
thread.start()
st.success("WhatsApp assistant started and monitoring for new messages.")