|
import streamlit as st |
|
import requests |
|
import re |
|
from bs4 import BeautifulSoup |
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
from langchain.docstore.document import Document |
|
import chromadb |
|
from sentence_transformers import SentenceTransformer |
|
import google.generativeai as genai |
|
import uuid |
|
|
|
|
|
st.set_page_config(layout="wide") |
|
|
|
|
|
genai.configure(api_key="AIzaSyAxUd2tS-qj9C7frYuHRsv92tziXHgIvLo") |
|
|
|
|
|
CHROMA_PATH = "chroma_db" |
|
chroma_client = chromadb.PersistentClient(path=CHROMA_PATH) |
|
|
|
|
|
if 'scraped' not in st.session_state: |
|
st.session_state.scraped = False |
|
if 'collection_name' not in st.session_state: |
|
st.session_state.collection_name = "default_collection" |
|
if 'chat_history' not in st.session_state: |
|
st.session_state.chat_history = [] |
|
|
|
|
|
embedding_model = SentenceTransformer("all-MiniLM-L6-v2") |
|
|
|
def clean_text(text): |
|
text = re.sub(r'http\S+', '', text) |
|
text = re.sub(r'\s+', ' ', text).strip() |
|
return text |
|
|
|
def split_content_into_chunks(content): |
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200, length_function=len) |
|
documents = [Document(page_content=content)] |
|
return text_splitter.split_documents(documents) |
|
|
|
def add_chunks_to_db(chunks, collection_name): |
|
|
|
collection = chroma_client.get_or_create_collection(name=collection_name) |
|
|
|
documents = [chunk.page_content for chunk in chunks] |
|
ids = [f"ID{i}" for i in range(len(chunks))] |
|
embeddings = embedding_model.encode(documents, convert_to_list=True) |
|
collection.upsert(documents=documents, ids=ids, embeddings=embeddings) |
|
|
|
def scrape_text(url): |
|
try: |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
collection_name = st.session_state.collection_name |
|
|
|
text = clean_text(soup.get_text()) |
|
chunks = split_content_into_chunks(text) |
|
add_chunks_to_db(chunks, collection_name) |
|
|
|
|
|
st.session_state.scraped = True |
|
|
|
return "Scraping and processing complete. You can now ask questions!" |
|
except requests.exceptions.RequestException as e: |
|
return f"Error scraping {url}: {e}" |
|
|
|
def ask_question(query, collection_name): |
|
|
|
collection = chroma_client.get_or_create_collection(name=collection_name) |
|
|
|
query_embedding = embedding_model.encode(query, convert_to_list=True) |
|
results = collection.query(query_embeddings=[query_embedding], n_results=2) |
|
top_chunks = results.get("documents", [[]])[0] |
|
|
|
system_prompt = f""" |
|
You are a helpful assistant. You answer questions based on the provided context. |
|
Only answer based on the knowledge I'm providing you. Don't use your internal |
|
knowledge and don't make things up. |
|
If you don't know the answer based on the provided context, just say: "I don't have enough information to answer that question based on the scraped content." |
|
|
|
Context information: |
|
{str(top_chunks)} |
|
""" |
|
|
|
full_prompt = system_prompt + "\nUser Query: " + query |
|
model = genai.GenerativeModel('gemini-2.0-flash') |
|
response = model.generate_content(full_prompt) |
|
return response.text |
|
|
|
|
|
col1, main_col = st.columns([1, 3]) |
|
|
|
|
|
with col1: |
|
st.header("Database Management") |
|
|
|
|
|
try: |
|
|
|
collection_names = chroma_client.list_collections() |
|
|
|
if collection_names: |
|
st.write("Available data collections:") |
|
selected_collection = st.selectbox("Select a collection to query:", collection_names) |
|
|
|
if selected_collection and st.button("Load Selected Collection"): |
|
st.session_state.collection_name = selected_collection |
|
st.session_state.scraped = True |
|
st.success(f"Loaded collection: {selected_collection}") |
|
st.rerun() |
|
except Exception as e: |
|
st.error(f"Error: {str(e)}") |
|
|
|
|
|
if st.button("Clear Chat History"): |
|
st.session_state.chat_history = [] |
|
st.rerun() |
|
|
|
|
|
st.header("Step 1: Scrape a Website") |
|
|
|
url = st.text_input("Enter the URL to scrape:") |
|
|
|
if url: |
|
if st.button("Scrape & Process"): |
|
with st.spinner("Scraping and processing content..."): |
|
result = scrape_text(url) |
|
st.success(result) |
|
|
|
|
|
with main_col: |
|
st.title("Web Scraper & Q&A Chatbot") |
|
|
|
|
|
chat_container = st.container() |
|
|
|
|
|
st.markdown(""" |
|
<style> |
|
.chat-container { |
|
height: 500px; |
|
overflow-y: auto; |
|
border: 1px solid #ddd; |
|
border-radius: 5px; |
|
padding: 15px; |
|
margin-bottom: 10px; |
|
background-color: #f9f9f9; |
|
} |
|
.stChatInputContainer { |
|
position: sticky; |
|
bottom: 0; |
|
background-color: white; |
|
padding-top: 10px; |
|
z-index: 100; |
|
} |
|
</style> |
|
""", unsafe_allow_html=True) |
|
|
|
|
|
if st.session_state.scraped: |
|
st.subheader("Step 2: Ask Questions About the Scraped Content") |
|
|
|
|
|
st.markdown('<div class="chat-container">', unsafe_allow_html=True) |
|
|
|
|
|
for message in st.session_state.chat_history: |
|
with chat_container.chat_message(message["role"]): |
|
st.write(message["content"]) |
|
|
|
st.markdown('</div>', unsafe_allow_html=True) |
|
|
|
|
|
user_query = st.chat_input("Ask your question here") |
|
|
|
if user_query: |
|
|
|
st.session_state.chat_history.append({"role": "user", "content": user_query}) |
|
|
|
|
|
with st.spinner("Searching database..."): |
|
answer = ask_question(user_query, st.session_state.collection_name) |
|
|
|
|
|
st.session_state.chat_history.append({"role": "assistant", "content": answer}) |
|
|
|
|
|
st.rerun() |
|
else: |
|
st.info("Please scrape a website or load a collection to start chatting.") |