Spaces:
Sleeping
Sleeping
import os | |
import time | |
import streamlit as st | |
from twilio.rest import Client | |
from pdfminer.high_level import extract_text | |
from sentence_transformers import SentenceTransformer | |
from transformers import AutoTokenizer | |
import faiss | |
import numpy as np | |
import docx | |
from groq import Groq | |
import PyPDF2 | |
import requests | |
# --- Auto-refresh every 10 seconds --- | |
if "last_refresh" not in st.session_state: | |
st.session_state.last_refresh = time.time() | |
elif time.time() - st.session_state.last_refresh > 10: | |
st.session_state.last_refresh = time.time() | |
st.experimental_rerun() | |
# --- Document Loaders --- | |
def extract_text_from_pdf(pdf_path): | |
try: | |
text = "" | |
with open(pdf_path, 'rb') as file: | |
pdf_reader = PyPDF2.PdfReader(file) | |
for page_num in range(len(pdf_reader.pages)): | |
page = pdf_reader.pages[page_num] | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text | |
return text | |
except: | |
return extract_text(pdf_path) | |
def extract_text_from_docx(docx_path): | |
try: | |
doc = docx.Document(docx_path) | |
return '\n'.join(para.text for para in doc.paragraphs) | |
except: | |
return "" | |
def chunk_text(text, tokenizer, chunk_size=150, chunk_overlap=30): | |
tokens = tokenizer.tokenize(text) | |
chunks, start = [], 0 | |
while start < len(tokens): | |
end = min(start + chunk_size, len(tokens)) | |
chunk_tokens = tokens[start:end] | |
chunks.append(tokenizer.convert_tokens_to_string(chunk_tokens)) | |
start += chunk_size - chunk_overlap | |
return chunks | |
def retrieve_chunks(question, index, embed_model, text_chunks, k=3): | |
question_embedding = embed_model.encode([question])[0] | |
D, I = index.search(np.array([question_embedding]), k) | |
return [text_chunks[i] for i in I[0]] | |
# --- GROQ Answer Generation --- | |
def generate_answer_with_groq(question, context, retries=3, delay=2): | |
url = "https://api.groq.com/openai/v1/chat/completions" | |
api_key = os.environ["GROQ_API_KEY"] | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json", | |
} | |
prompt = ( | |
f"Customer asked: '{question}'\n\n" | |
f"Here is the relevant product or policy info to help:\n{context}\n\n" | |
f"Respond in a friendly and helpful tone as a toy shop support agent." | |
) | |
payload = { | |
"model": "llama3-8b-8192", | |
"messages": [ | |
{ | |
"role": "system", | |
"content": ( | |
"You are ToyBot, a friendly and helpful WhatsApp assistant for an online toy shop. " | |
"Your goal is to politely answer customer questions, help them choose the right toys, " | |
"provide order or delivery information, explain return policies, and guide them through purchases. " | |
"Always sound warm, helpful, and trustworthy like a professional customer support agent." | |
) | |
}, | |
{"role": "user", "content": prompt}, | |
], | |
"temperature": 0.5, | |
"max_tokens": 300, | |
} | |
for attempt in range(retries): | |
try: | |
response = requests.post(url, headers=headers, json=payload) | |
result = response.json() | |
return result['choices'][0]['message']['content'].strip() | |
except Exception as e: | |
if "503" in str(e) and attempt < retries - 1: | |
time.sleep(delay) | |
continue | |
else: | |
return f"β οΈ Groq API Error: {str(e)}" | |
# --- Twilio Chat Handlers --- | |
def fetch_latest_conversation_sid(account_sid, auth_token): | |
try: | |
client = Client(account_sid, auth_token) | |
conversations = client.conversations.v1.conversations.list(limit=1) | |
if conversations: | |
return conversations[0].sid | |
except Exception as e: | |
st.error(f"β οΈ Could not fetch conversation SID: {e}") | |
return None | |
def fetch_latest_incoming_message(account_sid, auth_token, conversation_sid): | |
client = Client(account_sid, auth_token) | |
messages = client.conversations.v1.conversations(conversation_sid).messages.list(limit=10) | |
for msg in reversed(messages): | |
if msg.author.startswith("whatsapp:"): | |
return msg.body, msg.author, msg.index | |
return None, None, None | |
def send_twilio_message(account_sid, auth_token, conversation_sid, body): | |
try: | |
client = Client(account_sid, auth_token) | |
# Get participants to find the bot's WhatsApp identity | |
participants = client.conversations.v1.conversations(conversation_sid).participants.list() | |
# Pick the first participant whose identity starts with 'whatsapp:' | |
bot_identity = None | |
for p in participants: | |
if p.identity.startswith("whatsapp:"): | |
bot_identity = p.identity | |
break | |
if not bot_identity: | |
return "β οΈ Bot identity with whatsapp: prefix not found in participants." | |
message = client.conversations.v1.conversations(conversation_sid).messages.create( | |
author=bot_identity, | |
body=body | |
) | |
return message.sid | |
except Exception as e: | |
return str(e) | |
# --- Streamlit UI --- | |
st.set_page_config(page_title="Quasa β A Smart WhatsApp Chatbot", layout="wide") | |
st.markdown(""" | |
<style> | |
.big-font { font-size: 28px !important; font-weight: bold; } | |
.small-font { font-size: 16px; color: #555; } | |
.stButton > button { | |
background-color: #0066CC; color: white; | |
padding: 0.5em 1em; border-radius: 8px; font-size: 18px; | |
} | |
.stTextInput > div > input { font-size: 16px; } | |
</style> | |
""", unsafe_allow_html=True) | |
st.markdown('<div class="big-font">π± Quasa β A Smart WhatsApp Chatbot</div>', unsafe_allow_html=True) | |
st.markdown('<div class="small-font">Talk to your documents using WhatsApp. Powered by Groq, Twilio, and RAG.</div>', unsafe_allow_html=True) | |
# Load secrets or fallback | |
account_sid = st.secrets.get("TWILIO_SID") | |
auth_token = st.secrets.get("TWILIO_TOKEN") | |
GROQ_API_KEY = st.secrets.get("GROQ_API_KEY") | |
if not all([account_sid, auth_token, GROQ_API_KEY]): | |
st.warning("β οΈ Some secrets are missing. Please provide them manually:") | |
account_sid = st.text_input("Twilio SID", value=account_sid or "") | |
auth_token = st.text_input("Twilio Auth Token", type="password", value=auth_token or "") | |
GROQ_API_KEY = st.text_input("GROQ API Key", type="password", value=GROQ_API_KEY or "") | |
if all([account_sid, auth_token, GROQ_API_KEY]): | |
os.environ["GROQ_API_KEY"] = GROQ_API_KEY | |
conversation_sid = fetch_latest_conversation_sid(account_sid, auth_token) | |
if conversation_sid: | |
def setup_knowledge_base(): | |
folder_path = "docs" | |
all_text = "" | |
for file in os.listdir(folder_path): | |
if file.endswith(".pdf"): | |
all_text += extract_text_from_pdf(os.path.join(folder_path, file)) + "\n" | |
elif file.endswith((".docx", ".doc")): | |
all_text += extract_text_from_docx(os.path.join(folder_path, file)) + "\n" | |
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased') | |
chunks = chunk_text(all_text, tokenizer) | |
model = SentenceTransformer('all-mpnet-base-v2') | |
embeddings = model.encode(chunks) | |
dim = embeddings[0].shape[0] | |
index = faiss.IndexFlatL2(dim) | |
index.add(np.array(embeddings)) | |
return index, model, chunks | |
index, embedding_model, text_chunks = setup_knowledge_base() | |
st.success(f"β Knowledge base ready. Monitoring WhatsApp messages for conversation: `{conversation_sid}`") | |
if "last_processed_index" not in st.session_state: | |
st.session_state.last_processed_index = -1 | |
with st.spinner("Checking for new WhatsApp messages..."): | |
question, sender, msg_index = fetch_latest_incoming_message(account_sid, auth_token, conversation_sid) | |
if question and msg_index != st.session_state.last_processed_index: | |
st.session_state.last_processed_index = msg_index | |
st.info(f"π₯ New question from **{sender}**:\n\n> {question}") | |
relevant_chunks = retrieve_chunks(question, index, embedding_model, text_chunks) | |
context = "\n\n".join(relevant_chunks) | |
answer = generate_answer_with_groq(question, context) | |
send_twilio_message(account_sid, auth_token, conversation_sid, answer) | |
st.success("π€ Answer sent back to user on WhatsApp!") | |
st.markdown(f"### β¨ Answer:\n\n{answer}") | |
else: | |
st.warning("No new messages found.") | |
else: | |
st.warning("β No active conversation found.") | |
else: | |
st.warning("β Please provide all required credentials.") | |