Spaces:
Sleeping
Sleeping
import nltk | |
nltk.download('punkt') | |
nltk.download('punkt_tab') | |
# SECTIONED URL LIST (in case we want to tag later) | |
url_dict = { | |
"Website Designing": [ | |
"https://www.imageonline.co.in/website-designing-mumbai.html", | |
"https://www.imageonline.co.in/domain-hosting-services-india.html", | |
"https://www.imageonline.co.in/best-seo-company-mumbai.html", | |
"https://www.imageonline.co.in/wordpress-blog-designing-india.html", | |
"https://www.imageonline.co.in/social-media-marketing-company-mumbai.html", | |
"https://www.imageonline.co.in/website-template-customization-india.html", | |
"https://www.imageonline.co.in/regular-website-maintanence-services.html", | |
"https://www.imageonline.co.in/mobile-app-designing-mumbai.html", | |
"https://www.imageonline.co.in/web-application-screen-designing.html" | |
], | |
"Website Development": [ | |
"https://www.imageonline.co.in/website-development-mumbai.html", | |
"https://www.imageonline.co.in/open-source-customization.html", | |
"https://www.imageonline.co.in/ecommerce-development-company-mumbai.html", | |
"https://www.imageonline.co.in/website-with-content-management-system.html", | |
"https://www.imageonline.co.in/web-application-development-india.html" | |
], | |
"Mobile App Development": [ | |
"https://www.imageonline.co.in/mobile-app-development-company-mumbai.html" | |
], | |
"About Us": [ | |
"https://www.imageonline.co.in/about-us.html", | |
"https://www.imageonline.co.in/vision.html", | |
"https://www.imageonline.co.in/team.html" | |
], | |
"Testimonials": [ | |
"https://www.imageonline.co.in/testimonial.html" | |
] | |
} | |
import trafilatura | |
import requests | |
# Function to extract clean text using trafilatura | |
def extract_clean_text(url): | |
""" | |
Fetch and extract clean main content from a URL using trafilatura. | |
Returns None if content couldn't be extracted. | |
""" | |
try: | |
downloaded = trafilatura.fetch_url(url) | |
if downloaded: | |
content = trafilatura.extract(downloaded, include_comments=False, include_tables=False) | |
return content | |
except Exception as e: | |
print(f"Error fetching {url}: {e}") | |
return None | |
# Scrape data and prepare for RAG with metadata | |
scraped_data = [] | |
for section, urls in url_dict.items(): | |
for url in urls: | |
print(f"π© Scraping: {url}") | |
text = extract_clean_text(url) | |
if text: | |
print(f"β Extracted {len(text)} characters.\n") | |
scraped_data.append({ | |
"content": text, | |
"metadata": { | |
"source": url, | |
"section": section | |
} | |
}) | |
else: | |
print(f"β Failed to extract content from {url}.\n") | |
print(f"Total pages scraped: {len(scraped_data)}") | |
import tiktoken | |
from nltk.tokenize import sent_tokenize | |
# Initialize GPT tokenizer (cl100k_base works with Together.ai and OpenAI APIs) | |
tokenizer = tiktoken.get_encoding("cl100k_base") | |
def chunk_text(text, max_tokens=400): | |
""" | |
Chunk text into overlapping segments based on sentence boundaries and token limits. | |
""" | |
sentences = sent_tokenize(text) | |
chunks = [] | |
current_chunk = [] | |
for sentence in sentences: | |
current_chunk.append(sentence) | |
tokens = tokenizer.encode(" ".join(current_chunk)) | |
if len(tokens) > max_tokens: | |
# Finalize current chunk without last sentence | |
current_chunk.pop() | |
chunks.append(" ".join(current_chunk).strip()) | |
current_chunk = [sentence] # Start new chunk with overflow sentence | |
# Append final chunk | |
if current_chunk: | |
chunks.append(" ".join(current_chunk).strip()) | |
return chunks | |
chunked_data = [] | |
for item in scraped_data: | |
text = item["content"] | |
metadata = item["metadata"] | |
chunks = chunk_text(text, max_tokens=400) | |
for chunk in chunks: | |
chunked_data.append({ | |
"content": chunk, | |
"metadata": metadata # Keep the same URL + section for each chunk | |
}) | |
# Extract text chunks from chunked_data for embedding | |
texts_to_embed = [item["content"] for item in chunked_data] | |
from sentence_transformers import SentenceTransformer | |
# Load the embedding model | |
embedding_model = SentenceTransformer("BAAI/bge-base-en-v1.5") | |
def embed_chunks(text_list, model): | |
""" | |
Generate embeddings for a list of text chunks. | |
""" | |
return model.encode(text_list, convert_to_numpy=True) | |
# Generate embeddings | |
embeddings = embed_chunks(texts_to_embed, embedding_model) | |
print(f"β Generated {len(embeddings)} embeddings") | |
print(f"πΉ Shape of first embedding: {embeddings[0].shape}") | |
import chromadb | |
import uuid | |
# Initialize ChromaDB client (persistent storage) | |
chroma_client = chromadb.PersistentClient(path="./chroma_store") | |
# Create or get collection | |
collection = chroma_client.get_or_create_collection(name="imageonline_chunks") | |
# Extract documents, embeddings, metadatas | |
documents = [item["content"] for item in chunked_data] | |
metadatas = [item["metadata"] for item in chunked_data] | |
ids = [str(uuid.uuid4()) for _ in documents] | |
# Safety check | |
assert len(documents) == len(embeddings) == len(metadatas), "Data length mismatch!" | |
# Add to ChromaDB | |
collection.add( | |
documents=documents, | |
embeddings=embeddings.tolist(), | |
metadatas=metadatas, | |
ids=ids | |
) | |
# Sample query | |
query = "web design company" | |
query_embedding = embedding_model.encode([query])[0] | |
# Query ChromaDB | |
results = collection.query( | |
query_embeddings=[query_embedding.tolist()], | |
n_results=3 | |
) | |
# Display results | |
for i in range(len(results['documents'][0])): | |
print(f"\nπ Match {i+1}:") | |
print(f"Content: {results['documents'][0][i][:200]}...") | |
print(f"π Metadata: {results['metadatas'][0][i]}") | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_core.runnables import RunnableLambda, RunnablePassthrough | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_together import ChatTogether | |
from langchain_community.vectorstores import Chroma | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
# Initialize vectorstore | |
embedding_function = HuggingFaceEmbeddings(model_name="BAAI/bge-base-en-v1.5") | |
vectorstore = Chroma( | |
client=chroma_client, # from your previous chroma setup | |
collection_name="imageonline_chunks", | |
embedding_function=embedding_function | |
) | |
# Create retriever | |
retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) | |
def retrieve_and_format(query): | |
docs = retriever.get_relevant_documents(query) | |
context_strings = [] | |
for doc in docs: | |
content = doc.page_content | |
metadata = doc.metadata | |
source = metadata.get("source", "") | |
section = metadata.get("section", "") | |
context_strings.append(f"[{section}] {content}\n(Source: {source})") | |
return "\n\n".join(context_strings) | |
llm = ChatTogether( | |
model="meta-llama/Llama-3-8b-chat-hf", | |
temperature=0.3, | |
max_tokens=1024, | |
top_p=0.7, | |
together_api_key="a36246d65d8290f43667350b364c5b6bb8562eb50a4b947eec5bd7e79f2dffc6" # Replace before deployment or use os.getenv | |
) | |
prompt = ChatPromptTemplate.from_template(""" | |
You are an expert assistant for ImageOnline Web Solutions. | |
Answer the user's query based ONLY on the following context: | |
{context} | |
Query: {question} | |
""") | |
rag_chain = ( | |
{"context": RunnableLambda(retrieve_and_format), "question": RunnablePassthrough()} | |
| prompt | |
| llm | |
| StrOutputParser() | |
) | |
import gradio as gr | |
def chat_interface(message, history): | |
history = history or [] | |
# Display user message | |
history.append(("π§ You: " + message, "β³ Generating response...")) | |
try: | |
# Call RAG pipeline | |
answer = rag_chain.invoke(message) | |
# Replace placeholder with actual response | |
history[-1] = ("π§ You: " + message, "π€ Bot: " + answer) | |
except Exception as e: | |
error_msg = f"β οΈ Error: {str(e)}" | |
history[-1] = ("π§ You: " + message, f"π€ Bot: {error_msg}") | |
return history, history | |
def launch_gradio(): | |
with gr.Blocks() as demo: | |
gr.Markdown("# π¬ ImageOnline RAG Chatbot") | |
gr.Markdown("Ask about Website Designing, App Development, SEO, Hosting, etc.") | |
chatbot = gr.Chatbot() | |
state = gr.State([]) | |
with gr.Row(): | |
msg = gr.Textbox(placeholder="Ask your question here...", show_label=False, scale=8) | |
send_btn = gr.Button("π¨ Send", scale=1) | |
msg.submit(chat_interface, inputs=[msg, state], outputs=[chatbot, state]) | |
send_btn.click(chat_interface, inputs=[msg, state], outputs=[chatbot, state]) | |
with gr.Row(): | |
clear_btn = gr.Button("π§Ή Clear Chat") | |
clear_btn.click(fn=lambda: ([], []), outputs=[chatbot, state]) | |
return demo | |
if __name__ == "__main__": | |
demo = launch_gradio() | |
demo.launch() |