Spaces:
Sleeping
Sleeping
File size: 5,520 Bytes
1725afa 972a93c 1086067 f5fc1c4 1086067 f5fc1c4 1086067 bcb887f f5fc1c4 1086067 1725afa 972a93c 1086067 7bf6ead 714b045 1725afa f5fc1c4 972a93c 1086067 f5fc1c4 1086067 f5fc1c4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
import os
import streamlit as st
import PyPDF2
from pdfminer.high_level import extract_text
from transformers import AutoTokenizer
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
from groq import Groq
import docx # to read .docx files
# --- Helper Functions ---
def extract_text_from_pdf(pdf_path):
try:
text = ""
with open(pdf_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
page_text = page.extract_text()
if page_text:
text += page_text
return text
except Exception as e:
st.warning(f"PyPDF2 failed with error: {e}. Trying pdfminer.six...")
return extract_text(pdf_path)
def extract_text_from_docx(docx_path):
try:
doc = docx.Document(docx_path)
full_text = []
for para in doc.paragraphs:
full_text.append(para.text)
return '\n'.join(full_text)
except Exception as e:
st.warning(f"Failed to read DOCX {docx_path}: {e}")
return ""
def chunk_text_with_tokenizer(text, tokenizer, chunk_size=150, chunk_overlap=30):
tokens = tokenizer.tokenize(text)
chunks = []
start = 0
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk_tokens = tokens[start:end]
chunk_text = tokenizer.convert_tokens_to_string(chunk_tokens)
chunks.append(chunk_text)
start += chunk_size - chunk_overlap
return chunks
def retrieve_relevant_chunks(question, index, embeddings_model, text_chunks, k=3):
question_embedding = embeddings_model.encode([question])[0]
D, I = index.search(np.array([question_embedding]), k)
relevant_chunks = [text_chunks[i] for i in I[0]]
return relevant_chunks
def generate_answer_with_groq(question, context):
prompt = f"Based on the following context, answer the question: '{question}'\n\nContext:\n{context}"
model_name = "llama-3.3-70b-versatile" # Adjust model if needed
try:
groq_client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
response = groq_client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": "You are an AI Assistant for Small Businesses. You are an SME expert."},
{"role": "user", "content": prompt},
]
)
return response.choices[0].message.content
except Exception as e:
st.error(f"Error generating answer with Groq API: {e}")
return "I'm sorry, I couldn't generate an answer at this time."
# --- Streamlit UI & Logic ---
st.set_page_config(page_title="SMEHelpBot π€", layout="wide")
st.title("π€ SMEHelpBot β Your AI Assistant for Small Businesses")
# GROQ API key check
GROQ_API_KEY = st.secrets.get("GROQ_API_KEY") or os.getenv("GROQ_API_KEY")
if not GROQ_API_KEY:
st.error("β Please set your GROQ_API_KEY in environment or .streamlit/secrets.toml")
st.stop()
os.environ["GROQ_API_KEY"] = GROQ_API_KEY
# Load and process all docs at startup
@st.cache_data(show_spinner=True)
def load_and_prepare_docs(folder_path="docs"):
all_text = ""
if not os.path.exists(folder_path):
st.error(f"Folder '{folder_path}' does not exist!")
return None, None, None
# Collect all pdf and docx files
files = [f for f in os.listdir(folder_path) if f.lower().endswith(('.pdf', '.docx', '.doc'))]
if not files:
st.error(f"No PDF or DOCX files found in folder '{folder_path}'.")
return None, None, None
for file in files:
path = os.path.join(folder_path, file)
if file.lower().endswith('.pdf'):
text = extract_text_from_pdf(path)
elif file.lower().endswith(('.docx', '.doc')):
text = extract_text_from_docx(path)
else:
text = ""
if text:
all_text += text + "\n\n"
if not all_text.strip():
st.error("No text extracted from documents.")
return None, None, None
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
text_chunks = chunk_text_with_tokenizer(all_text, tokenizer)
embedding_model = SentenceTransformer('all-mpnet-base-v2')
all_embeddings = embedding_model.encode(text_chunks) if text_chunks else None
if all_embeddings is None or len(all_embeddings) == 0:
st.error("No text chunks found to create embeddings.")
return None, None, None
embedding_dim = all_embeddings[0].shape[0]
index = faiss.IndexFlatL2(embedding_dim)
index.add(np.array(all_embeddings))
return index, embedding_model, text_chunks
index, embedding_model, text_chunks = load_and_prepare_docs()
user_question = st.text_input("π¬ Ask your question about SME documents:")
if st.button("Get Answer") and user_question:
if index is None or embedding_model is None or text_chunks is None:
st.error("The document knowledge base is not ready. Please check the errors above.")
else:
with st.spinner("Searching for relevant information and generating answer..."):
relevant_chunks = retrieve_relevant_chunks(user_question, index, embedding_model, text_chunks)
context = "\n\n".join(relevant_chunks)
answer = generate_answer_with_groq(user_question, context)
st.markdown("### Answer:")
st.success(answer)
|