Spaces:
Sleeping
Sleeping
import os | |
import time | |
import threading | |
import streamlit as st | |
from twilio.rest import Client | |
from sentence_transformers import SentenceTransformer | |
from transformers import AutoTokenizer | |
import faiss | |
import numpy as np | |
import docx | |
from groq import Groq | |
import requests | |
from io import StringIO | |
from pdfminer.high_level import extract_text_to_fp | |
from pdfminer.layout import LAParams | |
import datetime | |
APP_START_TIME = datetime.datetime.now(datetime.timezone.utc) | |
os.environ["PYTORCH_JIT"] = "0" | |
# --- PDF Extraction --- | |
def extract_text_from_pdf(pdf_path): | |
output_string = StringIO() | |
with open(pdf_path, 'rb') as file: | |
extract_text_to_fp(file, output_string, laparams=LAParams(), output_type='text', codec=None) | |
return output_string.getvalue() | |
def clean_extracted_text(text): | |
lines = text.splitlines() | |
cleaned = [] | |
for line in lines: | |
line = line.strip() | |
if line: | |
line = ' '.join(line.split()) | |
cleaned.append(line) | |
return '\n'.join(cleaned) | |
# --- DOCX Extraction --- | |
def extract_text_from_docx(docx_path): | |
try: | |
doc = docx.Document(docx_path) | |
return '\n'.join(para.text for para in doc.paragraphs) | |
except: | |
return "" | |
# --- Chunking --- | |
def chunk_text(text, tokenizer, chunk_size=128, chunk_overlap=32, max_tokens=512): | |
tokens = tokenizer.tokenize(text) | |
chunks = [] | |
start = 0 | |
while start < len(tokens): | |
end = min(start + chunk_size, len(tokens)) | |
chunk_tokens = tokens[start:end] | |
chunk_text = tokenizer.convert_tokens_to_string(chunk_tokens) | |
chunks.append(chunk_text) | |
if end == len(tokens): | |
break | |
start += chunk_size - chunk_overlap | |
return chunks | |
def retrieve_chunks(question, index, embed_model, text_chunks, k=3): | |
question_embedding = embed_model.encode(question) | |
D, I = index.search(np.array([question_embedding]), k) | |
return [text_chunks[i] for i in I[0]] | |
# --- Groq Answer Generator --- | |
def generate_answer_with_groq(question, context): | |
url = "https://api.groq.com/openai/v1/chat/completions" | |
api_key = os.environ.get("GROQ_API_KEY") | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json", | |
} | |
prompt = ( | |
f"Customer asked: '{question}'\n\n" | |
f"Here is the relevant product or policy info to help:\n{context}\n\n" | |
f"Respond in a friendly and helpful tone as a toy shop support agent." | |
) | |
payload = { | |
"model": "llama3-8b-8192", | |
"messages": [ | |
{ | |
"role": "system", | |
"content": ( | |
"You are ToyBot, a friendly and helpful WhatsApp assistant for an online toy shop. " | |
"Your goal is to politely answer customer questions, help them choose the right toys, " | |
"provide order or delivery information, explain return policies, and guide them through purchases." | |
) | |
}, | |
{"role": "user", "content": prompt}, | |
], | |
"temperature": 0.5, | |
"max_tokens": 300, | |
} | |
response = requests.post(url, headers=headers, json=payload) | |
response.raise_for_status() | |
return response.json()['choices'][0]['message']['content'].strip() | |
# --- Twilio Functions --- | |
def fetch_latest_incoming_message(client, conversation_sid): | |
messages = client.conversations.v1.conversations(conversation_sid).messages.list(limit=10) | |
for msg in reversed(messages): | |
if msg.author.startswith("whatsapp:"): | |
return { | |
"sid": msg.sid, | |
"body": msg.body, | |
"author": msg.author, | |
"timestamp": msg.date_created, | |
} | |
return None | |
def send_twilio_message(client, conversation_sid, body): | |
return client.conversations.v1.conversations(conversation_sid).messages.create( | |
author="system", body=body | |
) | |
# --- Load Knowledge Base --- | |
def setup_knowledge_base(): | |
folder_path = "docs" | |
all_text = "" | |
for file in os.listdir(folder_path): | |
path = os.path.join(folder_path, file) | |
if file.endswith(".pdf"): | |
raw_text = extract_text_from_pdf(path) | |
all_text += clean_extracted_text(raw_text) + "\n" | |
elif file.endswith((".docx", ".doc")): | |
all_text += extract_text_from_docx(path) + "\n" | |
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased') | |
chunks = chunk_text(all_text, tokenizer) | |
model = SentenceTransformer('all-mpnet-base-v2') | |
embeddings = model.encode(chunks, show_progress_bar=False, truncation=True, max_length=512) | |
dim = embeddings[0].shape[0] | |
index = faiss.IndexFlatL2(dim) | |
index.add(np.array(embeddings).astype('float32')) | |
return index, model, chunks | |
# --- Monitor Conversations --- | |
def start_conversation_monitor(client, index, embed_model, text_chunks): | |
processed_convos = set() | |
last_processed_timestamp = {} | |
def poll_conversation(convo_sid): | |
while True: | |
try: | |
latest_msg = fetch_latest_incoming_message(client, convo_sid) | |
if latest_msg: | |
msg_time = latest_msg["timestamp"] | |
if convo_sid not in last_processed_timestamp or msg_time > last_processed_timestamp[convo_sid]: | |
last_processed_timestamp[convo_sid] = msg_time | |
question = latest_msg["body"] | |
sender = latest_msg["author"] | |
print(f"\nπ₯ New message from {sender} in {convo_sid}: {question}") | |
context = "\n\n".join(retrieve_chunks(question, index, embed_model, text_chunks)) | |
answer = generate_answer_with_groq(question, context) | |
send_twilio_message(client, convo_sid, answer) | |
print(f"π€ Replied to {sender}: {answer}") | |
time.sleep(3) | |
except Exception as e: | |
print(f"β Error in convo {convo_sid} polling:", e) | |
time.sleep(5) | |
def poll_new_conversations(): | |
print("β‘οΈ Monitoring for new WhatsApp conversations...") | |
while True: | |
try: | |
conversations = client.conversations.v1.conversations.list(limit=20) | |
for convo in conversations: | |
convo_full = client.conversations.v1.conversations(convo.sid).fetch() | |
if convo.sid not in processed_convos and convo_full.date_created > APP_START_TIME: | |
participants = client.conversations.v1.conversations(convo.sid).participants.list() | |
for p in participants: | |
address = p.messaging_binding.get("address", "") if p.messaging_binding else "" | |
if address.startswith("whatsapp:"): | |
print(f"π New WhatsApp convo found: {convo.sid}") | |
processed_convos.add(convo.sid) | |
threading.Thread(target=poll_conversation, args=(convo.sid,), daemon=True).start() | |
except Exception as e: | |
print("β Error polling conversations:", e) | |
time.sleep(5) | |
# β Launch conversation polling monitor | |
threading.Thread(target=poll_new_conversations, daemon=True).start() | |
# --- Streamlit UI --- | |
st.set_page_config(page_title="Quasa β A Smart WhatsApp Chatbot", layout="wide") | |
st.title("π± Quasa β A Smart WhatsApp Chatbot") | |
account_sid = st.secrets.get("TWILIO_SID") | |
auth_token = st.secrets.get("TWILIO_TOKEN") | |
GROQ_API_KEY = st.secrets.get("GROQ_API_KEY") | |
if not all([account_sid, auth_token, GROQ_API_KEY]): | |
st.warning("β οΈ Provide all credentials below:") | |
account_sid = st.text_input("Twilio SID", value=account_sid or "") | |
auth_token = st.text_input("Twilio Token", type="password", value=auth_token or "") | |
GROQ_API_KEY = st.text_input("GROQ API Key", type="password", value=GROQ_API_KEY or "") | |
if all([account_sid, auth_token, GROQ_API_KEY]): | |
os.environ["GROQ_API_KEY"] = GROQ_API_KEY | |
client = Client(account_sid, auth_token) | |
st.success("π’ Monitoring new WhatsApp conversations...") | |
index, model, chunks = setup_knowledge_base() | |
threading.Thread(target=start_conversation_monitor, args=(client, index, model, chunks), daemon=True).start() | |
st.info("β³ Waiting for new messages...") |