Spaces:
Sleeping
Sleeping
import os | |
from flask import Flask, request | |
from twilio.twiml.messaging_response import MessagingResponse | |
import PyPDF2 | |
from pdfminer.high_level import extract_text | |
from transformers import AutoTokenizer | |
from sentence_transformers import SentenceTransformer | |
import faiss | |
import numpy as np | |
from groq import Groq | |
import docx | |
# --- Helper functions from your code --- | |
def extract_text_from_pdf(pdf_path): | |
try: | |
text = "" | |
with open(pdf_path, 'rb') as file: | |
pdf_reader = PyPDF2.PdfReader(file) | |
for page_num in range(len(pdf_reader.pages)): | |
page = pdf_reader.pages[page_num] | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text | |
return text | |
except Exception as e: | |
print(f"PyPDF2 failed with error: {e}. Trying pdfminer.six...") | |
return extract_text(pdf_path) | |
def extract_text_from_docx(docx_path): | |
try: | |
doc = docx.Document(docx_path) | |
full_text = [] | |
for para in doc.paragraphs: | |
full_text.append(para.text) | |
return '\n'.join(full_text) | |
except Exception as e: | |
print(f"Failed to read DOCX {docx_path}: {e}") | |
return "" | |
def chunk_text_with_tokenizer(text, tokenizer, chunk_size=150, chunk_overlap=30): | |
tokens = tokenizer.tokenize(text) | |
chunks = [] | |
start = 0 | |
while start < len(tokens): | |
end = min(start + chunk_size, len(tokens)) | |
chunk_tokens = tokens[start:end] | |
chunk_text = tokenizer.convert_tokens_to_string(chunk_tokens) | |
chunks.append(chunk_text) | |
start += chunk_size - chunk_overlap | |
return chunks | |
def retrieve_relevant_chunks(question, index, embeddings_model, text_chunks, k=3): | |
question_embedding = embeddings_model.encode([question])[0] | |
D, I = index.search(np.array([question_embedding]), k) | |
relevant_chunks = [text_chunks[i] for i in I[0]] | |
return relevant_chunks | |
def generate_answer_with_groq(question, context): | |
prompt = f"Based on the following context, answer the question: '{question}'\n\nContext:\n{context}" | |
model_name = "llama-3.3-70b-versatile" # Adjust if needed | |
try: | |
groq_client = Groq(api_key=os.environ.get("GROQ_API_KEY")) | |
response = groq_client.chat.completions.create( | |
model=model_name, | |
messages=[ | |
{"role": "system", "content": "You are an AI Assistant for Small Businesses. You are an SME expert."}, | |
{"role": "user", "content": prompt}, | |
] | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
print(f"Error generating answer with Groq API: {e}") | |
return "I'm sorry, I couldn't generate an answer at this time." | |
# --- Load and prepare docs on startup --- | |
def load_and_prepare_docs(folder_path="docs"): | |
print("Loading documents from", folder_path) | |
all_text = "" | |
if not os.path.exists(folder_path): | |
print(f"Folder '{folder_path}' does not exist!") | |
return None, None, None | |
files = [f for f in os.listdir(folder_path) if f.lower().endswith(('.pdf', '.docx', '.doc'))] | |
if not files: | |
print(f"No PDF or DOCX files found in folder '{folder_path}'.") | |
return None, None, None | |
for file in files: | |
path = os.path.join(folder_path, file) | |
if file.lower().endswith('.pdf'): | |
text = extract_text_from_pdf(path) | |
else: | |
text = extract_text_from_docx(path) | |
if text: | |
all_text += text + "\n\n" | |
if not all_text.strip(): | |
print("No text extracted from documents.") | |
return None, None, None | |
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased') | |
text_chunks = chunk_text_with_tokenizer(all_text, tokenizer) | |
embedding_model = SentenceTransformer('all-mpnet-base-v2') | |
all_embeddings = embedding_model.encode(text_chunks) if text_chunks else None | |
if all_embeddings is None or len(all_embeddings) == 0: | |
print("No text chunks found to create embeddings.") | |
return None, None, None | |
embedding_dim = all_embeddings[0].shape[0] | |
index = faiss.IndexFlatL2(embedding_dim) | |
index.add(np.array(all_embeddings)) | |
print("Documents loaded and FAISS index created.") | |
return index, embedding_model, text_chunks | |
# --- Flask app and WhatsApp webhook --- | |
from flask_cors import CORS | |
app = Flask(__name__) | |
CORS(app) # Optional, if you call API from other domains | |
# Load documents once at start | |
index, embedding_model, text_chunks = load_and_prepare_docs() | |
def whatsapp_reply(): | |
incoming_msg = request.values.get('Body', '').strip() | |
from_number = request.values.get('From', '') | |
print(f"Incoming message from {from_number}: {incoming_msg}") | |
resp = MessagingResponse() | |
msg = resp.message() | |
if not incoming_msg: | |
msg.body("Please send a question.") | |
return str(resp) | |
if index is None or embedding_model is None or text_chunks is None: | |
msg.body("Sorry, the knowledge base is not ready. Please try again later.") | |
return str(resp) | |
# Retrieve context and generate answer | |
relevant_chunks = retrieve_relevant_chunks(incoming_msg, index, embedding_model, text_chunks) | |
context = "\n\n".join(relevant_chunks) | |
answer = generate_answer_with_groq(incoming_msg, context) | |
msg.body(answer) | |
return str(resp) | |
if __name__ == "__main__": | |
GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
if not GROQ_API_KEY: | |
print("Please set the GROQ_API_KEY environment variable before running.") | |
exit(1) | |
print("Starting WhatsApp SMEHelpBot server...") | |
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 5000))) | |