Spaces:
Build error
Build error
import os | |
import gradio as gr | |
import torch | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import FAISS | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
# Configuration | |
DOCS_DIR = ".business_docs" | |
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" | |
MODEL_NAME = "microsoft/phi-3-mini-4k-instruct" # CPU-optimized model | |
# System Initialization | |
def initialize_system(): | |
# Validate documents folder | |
if not os.path.exists(DOCS_DIR): | |
raise FileNotFoundError(f"Missing documents folder: {DOCS_DIR}") | |
# Process PDFs | |
pdf_files = [os.path.join(DOCS_DIR, f) for f in os.listdir(DOCS_DIR) if f.endswith(".pdf")] | |
if not pdf_files: | |
raise ValueError(f"No PDFs found in {DOCS_DIR}") | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=512, # Optimized for CPU | |
chunk_overlap=50 | |
) | |
documents = [] | |
for pdf_path in pdf_files: | |
try: | |
loader = PyPDFLoader(pdf_path) | |
documents.extend(loader.load_and_split(text_splitter)) | |
except Exception as e: | |
print(f"Error processing {pdf_path}: {str(e)}") | |
# Create embeddings | |
embeddings = HuggingFaceEmbeddings( | |
model_name=EMBEDDING_MODEL, | |
model_kwargs={'device': 'cpu'}, | |
encode_kwargs={'normalize_embeddings': True} | |
) | |
vector_store = FAISS.from_documents(documents, embeddings) | |
# Load CPU-optimized model | |
try: | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_NAME, | |
trust_remote_code=True, | |
torch_dtype=torch.float32, | |
device_map="cpu" | |
) | |
except Exception as e: | |
raise RuntimeError(f"Model loading failed: {str(e)}") | |
return vector_store, model, tokenizer | |
# Initialize system | |
try: | |
vector_store, model, tokenizer = initialize_system() | |
print("β System ready with business documents") | |
except Exception as e: | |
print(f"β Initialization failed: {str(e)}") | |
raise | |
# Response Generation | |
def generate_response(query): | |
try: | |
# Context retrieval | |
docs = vector_store.similarity_search(query, k=2) | |
context = "\n".join([d.page_content for d in docs]) | |
# Phi-3 prompt template | |
prompt = f"""<|system|> | |
Answer ONLY using the business documents. Respond to unknown queries with: "This information is not available in our current documentation." | |
Context: {context}</s> | |
<|user|> | |
{query}</s> | |
<|assistant|> | |
""" | |
# Generate response | |
inputs = tokenizer(prompt, return_tensors="pt", return_attention_mask=False) | |
outputs = model.generate( | |
inputs.input_ids, | |
max_new_tokens=200, | |
temperature=0.1, | |
do_sample=True, | |
pad_token_id=tokenizer.eos_token_id | |
) | |
response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
return response.split("<|assistant|>")[-1].strip() | |
except Exception as e: | |
return f"Error: Please try again. ({str(e)[:50]})" | |
# Gradio Interface | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# π Business Documentation Assistant") | |
chatbot = gr.Chatbot(height=300) | |
msg = gr.Textbox(placeholder="Ask about our services...", label="") | |
clear = gr.Button("Clear History") | |
def respond(message, history): | |
response = generate_response(message) | |
history.append((message, response)) | |
return "", history | |
msg.submit(respond, [msg, chatbot], [msg, chatbot]) | |
clear.click(lambda: None, None, chatbot, queue=False) | |
if __name__ == "__main__": | |
demo.launch(server_name="0.0.0.0", server_port=7860) |