Spaces:
Sleeping
Sleeping
File size: 5,753 Bytes
1725afa 6785822 1086067 6785822 1086067 6785822 1086067 6785822 1086067 f5fc1c4 6785822 f5fc1c4 1086067 6785822 1086067 bcb887f f5fc1c4 1086067 6785822 1086067 6785822 1086067 f5fc1c4 6785822 f5fc1c4 6785822 f5fc1c4 6785822 f5fc1c4 6785822 f5fc1c4 6785822 f5fc1c4 6785822 f5fc1c4 6785822 f5fc1c4 6785822 f5fc1c4 972a93c 6785822 1086067 f5fc1c4 6785822 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
import os
from flask import Flask, request
from twilio.twiml.messaging_response import MessagingResponse
import PyPDF2
from pdfminer.high_level import extract_text
from transformers import AutoTokenizer
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
from groq import Groq
import docx
# --- Helper functions from your code ---
def extract_text_from_pdf(pdf_path):
try:
text = ""
with open(pdf_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
page_text = page.extract_text()
if page_text:
text += page_text
return text
except Exception as e:
print(f"PyPDF2 failed with error: {e}. Trying pdfminer.six...")
return extract_text(pdf_path)
def extract_text_from_docx(docx_path):
try:
doc = docx.Document(docx_path)
full_text = []
for para in doc.paragraphs:
full_text.append(para.text)
return '\n'.join(full_text)
except Exception as e:
print(f"Failed to read DOCX {docx_path}: {e}")
return ""
def chunk_text_with_tokenizer(text, tokenizer, chunk_size=150, chunk_overlap=30):
tokens = tokenizer.tokenize(text)
chunks = []
start = 0
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk_tokens = tokens[start:end]
chunk_text = tokenizer.convert_tokens_to_string(chunk_tokens)
chunks.append(chunk_text)
start += chunk_size - chunk_overlap
return chunks
def retrieve_relevant_chunks(question, index, embeddings_model, text_chunks, k=3):
question_embedding = embeddings_model.encode([question])[0]
D, I = index.search(np.array([question_embedding]), k)
relevant_chunks = [text_chunks[i] for i in I[0]]
return relevant_chunks
def generate_answer_with_groq(question, context):
prompt = f"Based on the following context, answer the question: '{question}'\n\nContext:\n{context}"
model_name = "llama-3.3-70b-versatile" # Adjust if needed
try:
groq_client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
response = groq_client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": "You are an AI Assistant for Small Businesses. You are an SME expert."},
{"role": "user", "content": prompt},
]
)
return response.choices[0].message.content
except Exception as e:
print(f"Error generating answer with Groq API: {e}")
return "I'm sorry, I couldn't generate an answer at this time."
# --- Load and prepare docs on startup ---
def load_and_prepare_docs(folder_path="docs"):
print("Loading documents from", folder_path)
all_text = ""
if not os.path.exists(folder_path):
print(f"Folder '{folder_path}' does not exist!")
return None, None, None
files = [f for f in os.listdir(folder_path) if f.lower().endswith(('.pdf', '.docx', '.doc'))]
if not files:
print(f"No PDF or DOCX files found in folder '{folder_path}'.")
return None, None, None
for file in files:
path = os.path.join(folder_path, file)
if file.lower().endswith('.pdf'):
text = extract_text_from_pdf(path)
else:
text = extract_text_from_docx(path)
if text:
all_text += text + "\n\n"
if not all_text.strip():
print("No text extracted from documents.")
return None, None, None
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
text_chunks = chunk_text_with_tokenizer(all_text, tokenizer)
embedding_model = SentenceTransformer('all-mpnet-base-v2')
all_embeddings = embedding_model.encode(text_chunks) if text_chunks else None
if all_embeddings is None or len(all_embeddings) == 0:
print("No text chunks found to create embeddings.")
return None, None, None
embedding_dim = all_embeddings[0].shape[0]
index = faiss.IndexFlatL2(embedding_dim)
index.add(np.array(all_embeddings))
print("Documents loaded and FAISS index created.")
return index, embedding_model, text_chunks
# --- Flask app and WhatsApp webhook ---
from flask_cors import CORS
app = Flask(__name__)
CORS(app) # Optional, if you call API from other domains
# Load documents once at start
index, embedding_model, text_chunks = load_and_prepare_docs()
@app.route("/whatsapp", methods=["POST"])
def whatsapp_reply():
incoming_msg = request.values.get('Body', '').strip()
from_number = request.values.get('From', '')
print(f"Incoming message from {from_number}: {incoming_msg}")
resp = MessagingResponse()
msg = resp.message()
if not incoming_msg:
msg.body("Please send a question.")
return str(resp)
if index is None or embedding_model is None or text_chunks is None:
msg.body("Sorry, the knowledge base is not ready. Please try again later.")
return str(resp)
# Retrieve context and generate answer
relevant_chunks = retrieve_relevant_chunks(incoming_msg, index, embedding_model, text_chunks)
context = "\n\n".join(relevant_chunks)
answer = generate_answer_with_groq(incoming_msg, context)
msg.body(answer)
return str(resp)
if __name__ == "__main__":
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
if not GROQ_API_KEY:
print("Please set the GROQ_API_KEY environment variable before running.")
exit(1)
print("Starting WhatsApp SMEHelpBot server...")
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 5000)))
|