File size: 4,120 Bytes
c22f035 8f4ddfa d78024f c22f035 d78024f c22f035 7ff6802 d78024f bd118ce 4ba0755 8f4ddfa bd118ce d78024f c22f035 7ff6802 c22f035 7ff6802 c22f035 bd118ce c22f035 7ff6802 c22f035 4ba0755 c22f035 4ba0755 c22f035 7ff6802 bd118ce d78024f c22f035 bd118ce 4ba0755 c22f035 d78024f bd118ce 7ff6802 bd118ce d78024f c22f035 7ff6802 c22f035 7ff6802 8f4ddfa 7ff6802 d78024f 7ff6802 8f4ddfa 7ff6802 8f4ddfa 7ff6802 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
import streamlit as st
import requests
import re
from bs4 import BeautifulSoup
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
import chromadb
from sentence_transformers import SentenceTransformer
import google.generativeai as genai
# Page configuration
st.set_page_config(layout="wide")
# Initialize Gemini API
genai.configure(api_key="AIzaSyAxUd2tS-qj9C7frYuHRsv92tziXHgIvLo")
# Initialize ChromaDB
CHROMA_PATH = "chroma_db"
chroma_client = chromadb.PersistentClient(path=CHROMA_PATH)
# Initialize session state
if 'scraped' not in st.session_state:
st.session_state.scraped = False
if 'collection_name' not in st.session_state:
st.session_state.collection_name = "default_collection"
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
# Initialize embedding model
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
def clean_text(text):
return re.sub(r'\s+', ' ', re.sub(r'http\S+', '', text)).strip()
def split_content_into_chunks(content):
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200, length_function=len)
return text_splitter.split_documents([Document(page_content=content)])
def add_chunks_to_db(chunks, collection_name):
collection = chroma_client.get_or_create_collection(name=collection_name)
documents = [chunk.page_content for chunk in chunks]
embeddings = embedding_model.encode(documents, convert_to_list=True)
collection.upsert(documents=documents, ids=[f"ID{i}" for i in range(len(chunks))], embeddings=embeddings)
def scrape_text(url):
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
text = clean_text(soup.get_text())
chunks = split_content_into_chunks(text)
add_chunks_to_db(chunks, st.session_state.collection_name)
st.session_state.scraped = True
return "Scraping and processing complete. You can now ask questions!"
except requests.exceptions.RequestException as e:
return f"Error scraping {url}: {e}"
def ask_question(query, collection_name):
collection = chroma_client.get_or_create_collection(name=collection_name)
query_embedding = embedding_model.encode(query, convert_to_list=True)
results = collection.query(query_embeddings=[query_embedding], n_results=2)
top_chunks = results.get("documents", [[]])[0]
system_prompt = f"""
You are a helpful assistant. Answer only from the provided context.
If you lack information, say: "I don't have enough information to answer that question."
Context:
{str(top_chunks)}
"""
model = genai.GenerativeModel('gemini-2.0-flash')
response = model.generate_content(system_prompt + "\nUser Query: " + query)
return response.text
# Sidebar
with st.sidebar:
st.header("Database Management")
if st.button("Clear Chat History"):
st.session_state.chat_history = []
st.rerun()
st.header("Step 1: Scrape a Website")
url = st.text_input("Enter URL:")
if url and st.button("Scrape & Process"):
with st.spinner("Scraping..."):
st.success(scrape_text(url))
# Main content
st.title("Web Scraper & Q&A Chatbot")
if st.session_state.scraped:
st.subheader("Step 2: Ask Questions")
for message in st.session_state.chat_history:
with st.chat_message(message["role"]):
st.write(message["content"])
user_query = st.chat_input("Ask your question here")
if user_query:
st.session_state.chat_history.append({"role": "user", "content": user_query})
with st.spinner("Searching..."):
answer = ask_question(user_query, st.session_state.collection_name)
st.session_state.chat_history.append({"role": "assistant", "content": answer})
# Limit chat history to 6 messages
st.session_state.chat_history = st.session_state.chat_history[-6:]
st.rerun()
else:
st.info("Please scrape a website first.") |