Quasa / app.py
masadonline's picture
Update app.py
0e167eb verified
raw
history blame
10.7 kB
import os
import time
import threading
import streamlit as st
from twilio.rest import Client
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer
import faiss
import numpy as np
import docx
from groq import Groq
import requests
from io import StringIO
from pdfminer.high_level import extract_text_to_fp
from pdfminer.layout import LAParams
from twilio.base.exceptions import TwilioRestException
import pdfplumber
import datetime
import csv
import json
import re
APP_START_TIME = datetime.datetime.now(datetime.timezone.utc)
print(f"APP_START_TIME: {APP_START_TIME}")
os.environ["PYTORCH_JIT"] = "0"
# ---------------- PDF & DOCX & JSON Extraction ----------------
def _extract_tables_from_page(page):
tables = page.extract_tables()
formatted_tables = []
for table in tables:
formatted_table = []
for row in table:
formatted_row = [cell if cell is not None else "" for cell in row]
formatted_table.append(formatted_row)
formatted_tables.append(formatted_table)
return formatted_tables
def extract_text_from_pdf(pdf_path):
text_output = StringIO()
all_tables = []
try:
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
all_tables.extend(_extract_tables_from_page(page))
text = page.extract_text()
if text:
text_output.write(text + "\n\n")
except Exception as e:
print(f"pdfplumber error: {e}")
with open(pdf_path, 'rb') as file:
extract_text_to_fp(file, text_output, laparams=LAParams(), output_type='text')
return text_output.getvalue(), all_tables
def _format_tables_internal(tables):
formatted_tables_str = []
for table in tables:
with StringIO() as csvfile:
writer = csv.writer(csvfile)
writer.writerows(table)
formatted_tables_str.append(csvfile.getvalue())
return "\n\n".join(formatted_tables_str)
def clean_extracted_text(text):
return '\n'.join(' '.join(line.strip().split()) for line in text.splitlines() if line.strip())
def extract_text_from_docx(docx_path):
try:
doc = docx.Document(docx_path)
return '\n'.join(para.text for para in doc.paragraphs)
except:
return ""
def load_json_data(json_path):
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, dict):
# Flatten dictionary values (avoiding nested structures as strings)
return "\n".join(f"{key}: {value}" for key, value in data.items() if not isinstance(value, (dict, list)))
elif isinstance(data, list):
# Flatten list of dictionaries
all_items = []
for item in data:
if isinstance(item, dict):
all_items.append("\n".join(f"{key}: {value}" for key, value in item.items() if not isinstance(value, (dict, list))))
return "\n\n".join(all_items)
else:
return json.dumps(data, ensure_ascii=False, indent=2)
except Exception as e:
print(f"JSON read error: {e}")
return ""
# ---------------- Chunking ----------------
def chunk_text(text, tokenizer, chunk_size=128, chunk_overlap=32):
tokens = tokenizer.tokenize(text)
chunks = []
start = 0
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk = tokens[start:end]
chunks.append(tokenizer.convert_tokens_to_string(chunk))
if end == len(tokens): break
start += chunk_size - chunk_overlap
return chunks
def retrieve_chunks(question, index, embed_model, text_chunks, k=3):
q_embedding = embed_model.encode(question)
D, I = index.search(np.array([q_embedding]), k)
return [text_chunks[i] for i in I[0]]
# ---------------- Groq Answer Generator ----------------
def generate_answer_with_groq(question, context):
url = "https://api.groq.com/openai/v1/chat/completions"
api_key = os.environ.get("GROQ_API_KEY")
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
prompt = (
f"Customer asked: '{question}'\n\n"
f"Here is the relevant information to help:\n{context}\n\n"
f"Respond in a friendly and helpful tone as a toy shop support agent, "
f"addressing the customer by their name if it's available in the context."
)
payload = {
"model": "llama3-8b-8192",
"messages": [
{
"role": "system",
"content": (
"You are ToyBot, a friendly WhatsApp assistant for an online toy shop. "
"Help customers with toys, delivery, and returns in a helpful tone. "
"When responding, try to find the customer's name in the provided context "
"and address them directly. If the context contains order details and status, "
"include that information in your response."
)
},
{"role": "user", "content": prompt},
],
"temperature": 0.5,
"max_tokens": 300,
}
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
return response.json()['choices'][0]['message']['content'].strip()
# ---------------- Twilio Integration ----------------
def fetch_latest_incoming_message(client, conversation_sid):
try:
print(f"fetch_latest_incoming_message Twilio SID : {conversation_sid}")
messages = client.conversations.v1.conversations(conversation_sid).messages.list()
for msg in reversed(messages):
if msg.author.startswith("whatsapp:"):
return {
"sid": msg.sid,
"body": msg.body,
"author": msg.author,
"timestamp": msg.date_created,
}
except TwilioRestException as e:
print(f"❌ Twilio error for SID {conversation_sid}: {e}")
return None
def send_twilio_message(client, conversation_sid, body):
return client.conversations.v1.conversations(conversation_sid).messages.create(
author="system", body=body
)
# ---------------- Knowledge Base Setup ----------------
def setup_knowledge_base():
folder_path = "docs"
all_text = ""
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
if filename.endswith(".pdf"):
text, tables = extract_text_from_pdf(file_path)
all_text += clean_extracted_text(text) + "\n"
all_text += _format_tables_internal(tables) + "\n"
elif filename.endswith(".docx"):
text = extract_text_from_docx(file_path)
all_text += clean_extracted_text(text) + "\n"
elif filename.endswith(".json"):
text = load_json_data(file_path)
all_text += text + "\n"
elif filename.endswith(".csv"):
try:
with open(file_path, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
line = ' | '.join(f"{k}: {v}" for k, v in row.items())
all_text += line + "\n"
except Exception as e:
print(f"CSV read error: {e}")
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
chunks = chunk_text(all_text, tokenizer)
model = SentenceTransformer('all-mpnet-base-v2')
embeddings = model.encode(chunks, show_progress_bar=False)
dim = embeddings[0].shape[0]
index = faiss.IndexFlatL2(dim)
index.add(np.array(embeddings).astype('float32'))
return index, model, chunks
# ---------------- Monitor Twilio Conversations ----------------
def start_conversation_monitor(client, index, embed_model, text_chunks):
processed_convos = set()
last_processed_timestamp = {}
def poll_convo(convo_sid):
print(f"🧡 Started polling for SID: {convo_sid}")
while True:
try:
latest_msg = fetch_latest_incoming_message(client, convo_sid)
if latest_msg:
msg_time = latest_msg["timestamp"]
prev_time = last_processed_timestamp.get(convo_sid)
if prev_time is None or msg_time > prev_time:
last_processed_timestamp[convo_sid] = msg_time
question = latest_msg["body"]
sender = latest_msg["author"]
print(f"πŸ“© New message from {sender}: {question}")
context = "\n\n".join(retrieve_chunks(question, index, embed_model, text_chunks))
answer = generate_answer_with_groq(question, context)
send_twilio_message(client, convo_sid, answer)
except Exception as e:
print(f"⚠️ Error in poll_convo: {e}")
time.sleep(120)
# Get all conversations and find the most recent one after APP_START_TIME
conversations = client.conversations.v1.conversations.list(limit=20)
print("Conversations (date_created) sorted in descending order:")
for c in sorted_convos:
print(c.date_created)
sorted_convos = sorted(
[c for c in conversations if c.date_created > APP_START_TIME],
key=lambda c: c.date_created,
reverse=True
)
if not sorted_convos:
print("❗ No new Twilio conversations found after app startup.")
return
latest_convo = sorted_convos[0]
if latest_convo.sid not in processed_convos:
processed_convos.add(latest_convo.sid)
print(f"βœ… Monitoring latest conversation SID: {latest_convo.sid}, Created: {latest_convo.date_created}")
threading.Thread(target=poll_convo, args=(latest_convo.sid,), daemon=True).start()
# ---------------- Main Entry ----------------
if __name__ == "__main__":
st.title("πŸ€– ToyBot WhatsApp Assistant")
st.write("Initializing knowledge base...")
index, model, chunks = setup_knowledge_base()
st.success("Knowledge base loaded.")
st.write("Waiting for WhatsApp messages...")
account_sid = os.environ.get("TWILIO_ACCOUNT_SID")
auth_token = os.environ.get("TWILIO_AUTH_TOKEN")
if not account_sid or not auth_token:
st.error("❌ Twilio credentials not set.")
else:
client = Client(account_sid, auth_token)
start_conversation_monitor(client, index, model, chunks)
st.info("βœ… Bot is now monitoring the latest Twilio conversation.")