Quasa / app.py
masadonline's picture
Update app.py
c9d8fa5 verified
raw
history blame
7.4 kB
import os
import time
import threading
import streamlit as st
from twilio.rest import Client
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer
import faiss
import numpy as np
import docx
from groq import Groq
import requests
from io import StringIO
from pdfminer.high_level import extract_text_to_fp
from pdfminer.layout import LAParams
from twilio.base.exceptions import TwilioRestException
import pdfplumber
import datetime
import csv
import json
import re
APP_START_TIME = datetime.datetime.now(datetime.timezone.utc)
os.environ["PYTORCH_JIT"] = "0"
# Twilio Setup
TWILIO_ACCOUNT_SID = os.getenv("TWILIO_ACCOUNT_SID")
TWILIO_AUTH_TOKEN = os.getenv("TWILIO_AUTH_TOKEN")
twilio_client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
# ---------------- PDF & DOCX & JSON Extraction ----------------
def _extract_tables_from_page(page):
tables = page.extract_tables()
formatted_tables = []
for table in tables:
formatted_row = [[cell if cell is not None else "" for cell in row] for row in table]
formatted_tables.append(formatted_row)
return formatted_tables
def extract_text_from_pdf(pdf_path):
text_output = StringIO()
all_tables = []
try:
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
all_tables.extend(_extract_tables_from_page(page))
text = page.extract_text()
if text:
text_output.write(text + "\n\n")
except Exception as e:
with open(pdf_path, 'rb') as file:
extract_text_to_fp(file, text_output, laparams=LAParams(), output_type='text')
return text_output.getvalue(), all_tables
def _format_tables_internal(tables):
formatted = []
for table in tables:
with StringIO() as csvfile:
writer = csv.writer(csvfile)
writer.writerows(table)
formatted.append(csvfile.getvalue())
return "\n\n".join(formatted)
def clean_extracted_text(text):
return '\n'.join(' '.join(line.strip().split()) for line in text.splitlines() if line.strip())
def extract_text_from_docx(path):
try:
doc = docx.Document(path)
return '\n'.join(p.text for p in doc.paragraphs)
except:
return ""
def load_json_data(path):
try:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, dict):
return "\n".join(f"{k}: {v}" for k, v in data.items() if not isinstance(v, (dict, list)))
elif isinstance(data, list):
return "\n\n".join("\n".join(f"{k}: {v}" for k, v in item.items() if not isinstance(v, (dict, list))) for item in data if isinstance(item, dict))
else:
return json.dumps(data, ensure_ascii=False, indent=2)
except Exception as e:
return ""
# ---------------- Chunking ----------------
def chunk_text(text, tokenizer, chunk_size=128, chunk_overlap=32):
tokens = tokenizer.tokenize(text)
chunks = []
start = 0
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk = tokens[start:end]
chunks.append(tokenizer.convert_tokens_to_string(chunk))
start += chunk_size - chunk_overlap
return chunks
def retrieve_chunks(question, index, embed_model, text_chunks, k=3):
q_embedding = embed_model.encode(question)
D, I = index.search(np.array([q_embedding]), k)
return [text_chunks[i] for i in I[0]]
# ---------------- Groq Answer Generator ----------------
def generate_answer_with_groq(question, context):
url = "https://api.groq.com/openai/v1/chat/completions"
api_key = os.environ.get("GROQ_API_KEY")
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
prompt = f"Customer asked: '{question}'\n\nHere is the relevant information to help:\n{context}"
payload = {
"model": "llama3-8b-8192",
"messages": [
{"role": "system", "content": "You are ToyBot, a friendly WhatsApp assistant..."},
{"role": "user", "content": prompt},
],
"temperature": 0.5,
"max_tokens": 300,
}
response = requests.post(url, headers=headers, json=payload)
return response.json()['choices'][0]['message']['content'].strip()
# ---------------- Twilio Integration ----------------
def fetch_latest_incoming_message(client, conversation_sid):
try:
messages = client.conversations.v1.conversations(conversation_sid).messages.list()
for msg in reversed(messages):
if msg.author.startswith("whatsapp:"):
return {"sid": msg.sid, "body": msg.body, "author": msg.author, "timestamp": msg.date_created}
except TwilioRestException:
return None
def send_twilio_message(client, conversation_sid, body):
return client.conversations.v1.conversations(conversation_sid).messages.create(author="system", body=body)
# ---------------- Knowledge Base Setup ----------------
def setup_knowledge_base():
folder = "docs"
text = ""
for f in os.listdir(folder):
path = os.path.join(folder, f)
if f.endswith(".pdf"):
t, tables = extract_text_from_pdf(path)
text += clean_extracted_text(t) + "\n" + _format_tables_internal(tables) + "\n"
elif f.endswith(".docx"):
text += clean_extracted_text(extract_text_from_docx(path)) + "\n"
elif f.endswith(".json"):
text += load_json_data(path) + "\n"
elif f.endswith(".csv"):
with open(path, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
text += ' | '.join(f"{k}: {v}" for k, v in row.items()) + "\n"
return text
# ---------------- Message Processing Loop ----------------
def process_messages_loop(conversation_sid, index, text_chunks, tokenizer, embed_model):
processed_sids = set()
while True:
message = fetch_latest_incoming_message(twilio_client, conversation_sid)
if message and message['sid'] not in processed_sids and message['timestamp'] > APP_START_TIME:
question = message['body']
relevant = retrieve_chunks(question, index, embed_model, text_chunks)
answer = generate_answer_with_groq(question, '\n'.join(relevant))
send_twilio_message(twilio_client, conversation_sid, answer)
processed_sids.add(message['sid'])
time.sleep(5)
# ---------------- Streamlit UI ----------------
st.title("📱 ToyShop WhatsApp Chatbot")
kb_text = setup_knowledge_base()
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
embed_model = SentenceTransformer("all-MiniLM-L6-v2")
chunks = chunk_text(kb_text, tokenizer)
embeddings = embed_model.encode(chunks)
index = faiss.IndexFlatL2(len(embeddings[0]))
index.add(np.array(embeddings))
# Automatically fetch conversation SID
conversations = twilio_client.conversations.v1.conversations.list(limit=5)
conversation_sid = conversations[0].sid if conversations else None
if conversation_sid:
st.success(f"Monitoring Twilio conversation SID: {conversation_sid}")
threading.Thread(target=process_messages_loop, args=(conversation_sid, index, chunks, tokenizer, embed_model), daemon=True).start()
else:
st.error("No active Twilio conversation found.")