Spaces:
Sleeping
Sleeping
import os | |
import re | |
import logging | |
import requests | |
import numpy as np | |
import faiss | |
from bs4 import BeautifulSoup | |
from sentence_transformers import SentenceTransformer | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import FAISS as LangchainFAISS | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.llms import Together | |
from langchain.chains import RetrievalQA | |
import gradio as gr | |
# Set Together.ai API key | |
os.environ["TOGETHER_API_KEY"] = os.getenv("TOGETHER_API_KEY", "a36246d65d8290f43667350b364c5b6bb8562eb50a4b947eec5bd7e79f2dffc6") | |
# Logging setup | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Step 1: Load and chunk webpage | |
def fetch_webpage_text(url): | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, "html.parser") | |
content_div = soup.find("div", {"id": "mw-content-text"}) or soup.body | |
return content_div.get_text(separator="\n", strip=True) | |
except Exception as e: | |
logger.error(f"Error fetching content from {url}: {e}") | |
return "" | |
def clean_text(text): | |
text = re.sub(r'\[\s*\d+\s*\]', '', text) | |
text = re.sub(r'\[\s*[a-zA-Z]+\s*\]', '', text) | |
text = re.sub(r'\n{2,}', '\n', text) | |
text = re.sub(r'[ \t]+', ' ', text) | |
return text.strip() | |
def chunk_text(text, chunk_size=500, overlap=50): | |
cleaned = clean_text(text) | |
splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=overlap) | |
return splitter.split_text(cleaned) | |
def load_and_chunk_webpage(url): | |
text = fetch_webpage_text(url) | |
return chunk_text(text) | |
# Step 2: Embed chunks using SentenceTransformer | |
def embed_chunks(chunks): | |
model = SentenceTransformer('all-MiniLM-L6-v2') | |
embeddings = model.encode(chunks, normalize_embeddings=True) | |
return embeddings, model | |
# Step 3: Build FAISS index using LangChain wrapper | |
def build_retriever(chunks): | |
embedding_func = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
db = LangchainFAISS.from_texts(chunks, embedding_func) | |
return db.as_retriever(search_type="similarity", search_kwargs={"k": 3}), db | |
# Step 4: Initialize LLM and RAG Chain | |
def initialize_llm(): | |
return Together( | |
model="meta-llama/Llama-3-8b-chat-hf", | |
temperature=0.7, | |
max_tokens=512 | |
) | |
# Initialize all components | |
wiki_url = "https://en.wikipedia.org/wiki/LLaMA" | |
chunks = load_and_chunk_webpage(wiki_url) | |
embeddings, embed_model = embed_chunks(chunks) | |
retriever, db = build_retriever(chunks) | |
llm = initialize_llm() | |
qa_chain = RetrievalQA.from_chain_type( | |
llm=llm, | |
retriever=retriever, | |
chain_type="stuff" | |
) | |
# Chat logic | |
def chat_with_bot(query): | |
if not query.strip(): | |
return "β Please enter a question." | |
return qa_chain.run(query) | |
# Summary logic | |
def summarize_content(): | |
sample_text = " ".join(chunks[:20]) | |
prompt = f"Summarize this text in 5 bullet points:\n\n{sample_text[:3000]}" | |
summary = llm.invoke(prompt) | |
return summary.content if hasattr(summary, "content") else summary | |
# Gradio UI | |
with gr.Blocks() as demo: | |
gr.Markdown("## π€ Chat with LLaMA Webpage Content") | |
with gr.Row(): | |
chatbot = gr.Chatbot(label="Chat History") | |
with gr.Row(): | |
question = gr.Textbox(label="Ask your question about LLaMA", placeholder="e.g., Who developed LLaMA?") | |
ask_btn = gr.Button("Submit") | |
clear_btn = gr.Button("Clear Chat") | |
summary_output = gr.Textbox(label="π Summary of the Webpage", lines=8) | |
summarize_btn = gr.Button("Summarize Content") | |
def user_chat_handler(q, history): | |
response = chat_with_bot(q) | |
history.append((q, response)) | |
return history, "" | |
ask_btn.click(fn=user_chat_handler, inputs=[question, chatbot], outputs=[chatbot, question]) | |
clear_btn.click(lambda: [], None, chatbot) | |
summarize_btn.click(fn=summarize_content, inputs=[], outputs=summary_output) | |
demo.launch() | |