ragtim-bot / app.py
raktim-mondol
refactored_code
4e2d884
import gradio as gr
import json
import numpy as np
from transformers import pipeline
import torch
import os
from typing import List, Dict, Any, Optional
import re
import math
from collections import defaultdict, Counter
from pathlib import Path
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configure device
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Using device: {device}")
class DocumentProcessor:
"""Handles document processing and text extraction from markdown files."""
def __init__(self, knowledge_base_dir: str = "knowledge_base"):
self.knowledge_base_dir = Path(knowledge_base_dir)
def load_markdown_files(self) -> List[Dict[str, Any]]:
"""Load and process all markdown files in the knowledge base directory."""
documents = []
file_priorities = {
'about.md': 10,
'research_details.md': 9,
'publications_detailed.md': 8,
'skills_expertise.md': 7,
'experience_detailed.md': 8,
'statistics.md': 9
}
for file_path in self.knowledge_base_dir.glob("*.md"):
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
file_type = file_path.stem
priority = file_priorities.get(file_path.name, 5)
sections = self._split_markdown_into_sections(content)
for section in sections:
if len(section['content'].strip()) > 100:
doc = {
"id": f"{file_path.name}_{section['title']}_{len(documents)}",
"content": section['content'],
"metadata": {
"type": file_type,
"priority": priority,
"section": section['title'],
"source": file_path.name
}
}
documents.append(doc)
logger.info(f"βœ… Loaded {file_path.name}")
except Exception as e:
logger.error(f"❌ Error loading {file_path.name}: {e}")
return documents
def _split_markdown_into_sections(self, content: str) -> List[Dict[str, str]]:
"""Split markdown content into sections based on headers."""
sections = []
lines = content.split('\n')
current_section = {'title': 'Introduction', 'content': ''}
for line in lines:
if line.startswith('#'):
if current_section['content'].strip():
sections.append(current_section.copy())
title = line.lstrip('#').strip()
current_section = {
'title': title,
'content': line + '\n'
}
else:
current_section['content'] += line + '\n'
if current_section['content'].strip():
sections.append(current_section)
return sections
class BM25Searcher:
"""Implements BM25 search algorithm for keyword-based document retrieval."""
def __init__(self, k1: float = 1.5, b: float = 0.75):
self.k1 = k1
self.b = b
self.term_frequencies = {}
self.document_frequency = defaultdict(int)
self.document_lengths = {}
self.average_doc_length = 0
self.total_documents = 0
def build_index(self, documents: List[Dict[str, Any]]):
"""Build BM25 index from documents."""
logger.info("Building BM25 index...")
self.term_frequencies = {}
self.document_frequency = defaultdict(int)
self.document_lengths = {}
total_length = 0
for doc in documents:
doc_id = doc['id']
terms = self._tokenize(doc['content'])
term_freq = Counter(terms)
self.term_frequencies[doc_id] = dict(term_freq)
doc_length = len(terms)
self.document_lengths[doc_id] = doc_length
total_length += doc_length
unique_terms = set(terms)
for term in unique_terms:
self.document_frequency[term] += 1
self.total_documents = len(documents)
self.average_doc_length = total_length / self.total_documents if self.total_documents > 0 else 0
logger.info(f"βœ… BM25 index built: {len(self.document_frequency)} unique terms")
def search(self, query: str, documents: List[Dict[str, Any]], top_k: int = 10) -> List[Dict[str, Any]]:
"""Perform BM25 search."""
query_terms = self._tokenize(query)
if not query_terms:
return []
scores = {}
for doc in documents:
doc_id = doc['id']
score = 0.0
for term in query_terms:
score += self._calculate_bm25_score(term, doc_id)
if score > 0:
priority_boost = 1 + (doc['metadata']['priority'] / 50)
final_score = score * priority_boost
scores[doc_id] = {
'document': doc,
'score': final_score,
'search_type': 'bm25'
}
sorted_results = sorted(scores.values(), key=lambda x: x['score'], reverse=True)
return sorted_results[:top_k]
def _tokenize(self, text: str) -> List[str]:
"""Tokenize text for BM25."""
text = re.sub(r'[^\w\s]', ' ', text.lower())
words = [word for word in text.split() if len(word) > 2 and not self._is_stop_word(word)]
return words
def _is_stop_word(self, word: str) -> bool:
"""Check if word is a stop word."""
stop_words = {
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by',
'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did',
'will', 'would', 'could', 'should', 'may', 'might', 'can', 'this', 'that', 'these', 'those'
}
return word in stop_words
def _calculate_bm25_score(self, term: str, doc_id: str) -> float:
"""Calculate BM25 score for a term in a document."""
tf = self.term_frequencies.get(doc_id, {}).get(term, 0)
if tf == 0:
return 0.0
df = self.document_frequency.get(term, 1)
doc_length = self.document_lengths.get(doc_id, 0)
idf = math.log((self.total_documents - df + 0.5) / (df + 0.5))
numerator = tf * (self.k1 + 1)
denominator = tf + self.k1 * (1 - self.b + self.b * (doc_length / self.average_doc_length))
return idf * (numerator / denominator)
class VectorSearcher:
"""Implements vector-based semantic search using transformer embeddings."""
def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
self.model_name = model_name
self.embedder = None
self.embeddings = []
def initialize_model(self):
"""Initialize the embedding model."""
try:
logger.info("Loading embedding model...")
self.embedder = pipeline(
'feature-extraction',
self.model_name,
device=0 if device == "cuda" else -1
)
logger.info("βœ… Embedding model loaded successfully")
except Exception as e:
logger.error(f"❌ Error loading embedding model: {e}")
raise e
def build_embeddings(self, documents: List[Dict[str, Any]]):
"""Build embeddings for all documents."""
logger.info("Generating embeddings for knowledge base...")
self.embeddings = []
for i, doc in enumerate(documents):
try:
content = doc["content"][:500] # Limit to 500 characters
embedding = self.embedder(content, return_tensors="pt")
embedding_np = embedding[0].mean(dim=0).detach().cpu().numpy()
self.embeddings.append(embedding_np)
except Exception as e:
logger.error(f"Error generating embedding for doc {doc['id']}: {e}")
self.embeddings.append(np.zeros(384))
logger.info(f"βœ… Generated {len(self.embeddings)} embeddings")
def search(self, query: str, documents: List[Dict[str, Any]], top_k: int = 10) -> List[Dict[str, Any]]:
"""Perform vector similarity search."""
try:
query_embedding = self.embedder(query[:500], return_tensors="pt")
query_vector = query_embedding[0].mean(dim=0).detach().cpu().numpy()
similarities = []
for i, doc_embedding in enumerate(self.embeddings):
if doc_embedding is not None and len(doc_embedding) > 0:
similarity = self._cosine_similarity(query_vector, doc_embedding)
priority_boost = 1 + (documents[i]['metadata']['priority'] / 100)
final_score = similarity * priority_boost
similarities.append({
'document': documents[i],
'score': float(final_score),
'search_type': 'vector'
})
similarities.sort(key=lambda x: x['score'], reverse=True)
return similarities[:top_k]
except Exception as e:
logger.error(f"Error in vector search: {e}")
return []
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""Calculate cosine similarity between two vectors."""
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
class HybridSearchSystem:
"""Main hybrid search system combining BM25 and vector search."""
def __init__(self):
self.doc_processor = DocumentProcessor()
self.bm25_searcher = BM25Searcher()
self.vector_searcher = VectorSearcher()
self.documents = []
def initialize(self):
"""Initialize the entire search system."""
logger.info("Initializing Hybrid Search RAGtim Bot...")
# Load documents
self.documents = self.doc_processor.load_markdown_files()
# Initialize models and build indices
self.vector_searcher.initialize_model()
self.vector_searcher.build_embeddings(self.documents)
self.bm25_searcher.build_index(self.documents)
logger.info(f"βœ… System initialized with {len(self.documents)} documents")
def search(self, query: str, search_type: str = "hybrid", top_k: int = 5,
vector_weight: float = 0.6, bm25_weight: float = 0.4) -> List[Dict[str, Any]]:
"""Perform search based on specified method."""
if search_type == "vector":
return self.vector_searcher.search(query, self.documents, top_k)
elif search_type == "bm25":
return self.bm25_searcher.search(query, self.documents, top_k)
else: # hybrid
return self._hybrid_search(query, top_k, vector_weight, bm25_weight)
def _hybrid_search(self, query: str, top_k: int = 10,
vector_weight: float = 0.6, bm25_weight: float = 0.4) -> List[Dict[str, Any]]:
"""Perform hybrid search combining vector and BM25 results."""
try:
vector_results = self.vector_searcher.search(query, self.documents, top_k * 2)
bm25_results = self.bm25_searcher.search(query, self.documents, top_k * 2)
# Normalize scores
if vector_results:
max_vector_score = max(r['score'] for r in vector_results)
if max_vector_score > 0:
for result in vector_results:
result['normalized_score'] = result['score'] / max_vector_score
else:
for result in vector_results:
result['normalized_score'] = 0
if bm25_results:
max_bm25_score = max(r['score'] for r in bm25_results)
if max_bm25_score > 0:
for result in bm25_results:
result['normalized_score'] = result['score'] / max_bm25_score
else:
for result in bm25_results:
result['normalized_score'] = 0
# Combine results
combined_scores = {}
for result in vector_results:
doc_id = result['document']['id']
combined_scores[doc_id] = {
'document': result['document'],
'vector_score': result['normalized_score'],
'bm25_score': 0.0,
'search_type': 'vector'
}
for result in bm25_results:
doc_id = result['document']['id']
if doc_id in combined_scores:
combined_scores[doc_id]['bm25_score'] = result['normalized_score']
combined_scores[doc_id]['search_type'] = 'hybrid'
else:
combined_scores[doc_id] = {
'document': result['document'],
'vector_score': 0.0,
'bm25_score': result['normalized_score'],
'search_type': 'bm25'
}
# Calculate final hybrid scores
final_results = []
for doc_id, data in combined_scores.items():
hybrid_score = (vector_weight * data['vector_score']) + (bm25_weight * data['bm25_score'])
final_results.append({
'document': data['document'],
'score': hybrid_score,
'vector_score': data['vector_score'],
'bm25_score': data['bm25_score'],
'search_type': data['search_type']
})
final_results.sort(key=lambda x: x['score'], reverse=True)
return final_results[:top_k]
except Exception as e:
logger.error(f"Error in hybrid search: {e}")
return self.vector_searcher.search(query, self.documents, top_k)
# Initialize the search system
search_system = HybridSearchSystem()
search_system.initialize()
# API Functions
def search_api(query: str, top_k: int = 5, search_type: str = "hybrid",
vector_weight: float = 0.6, bm25_weight: float = 0.4) -> Dict[str, Any]:
"""API endpoint for search functionality."""
try:
results = search_system.search(query, search_type, top_k, vector_weight, bm25_weight)
return {
"results": results,
"query": query,
"top_k": top_k,
"search_type": search_type,
"total_documents": len(search_system.documents),
"search_parameters": {
"vector_weight": vector_weight if search_type == "hybrid" else None,
"bm25_weight": bm25_weight if search_type == "hybrid" else None,
"bm25_k1": search_system.bm25_searcher.k1,
"bm25_b": search_system.bm25_searcher.b
}
}
except Exception as e:
logger.error(f"Error in search API: {e}")
return {"error": str(e), "results": []}
def get_stats_api() -> Dict[str, Any]:
"""API endpoint for system statistics."""
try:
doc_types = {}
sections_by_file = {}
for doc in search_system.documents:
doc_type = doc["metadata"]["type"]
source_file = doc["metadata"]["source"]
doc_types[doc_type] = doc_types.get(doc_type, 0) + 1
sections_by_file[source_file] = sections_by_file.get(source_file, 0) + 1
return {
"total_documents": len(search_system.documents),
"document_types": doc_types,
"sections_by_file": sections_by_file,
"model_name": search_system.vector_searcher.model_name,
"embedding_dimension": 384,
"search_capabilities": [
"Hybrid Search (Vector + BM25)",
"Semantic Vector Search",
"BM25 Keyword Search",
"GPU Accelerated",
"Transformer Embeddings"
],
"bm25_parameters": {
"k1": search_system.bm25_searcher.k1,
"b": search_system.bm25_searcher.b,
"unique_terms": len(search_system.bm25_searcher.document_frequency),
"average_doc_length": search_system.bm25_searcher.average_doc_length
},
"backend_type": "Hugging Face Space with Hybrid Search",
"knowledge_sources": list(sections_by_file.keys()),
"status": "healthy"
}
except Exception as e:
logger.error(f"Error in get_stats_api: {e}")
return {
"error": str(e),
"status": "error",
"total_documents": 0,
"search_capabilities": ["Error"]
}
def chat_interface(message: str) -> str:
"""Enhanced chat interface with better formatting."""
if not message.strip():
return "Please ask me something about Raktim Mondol! I use hybrid search combining semantic similarity and keyword matching for the best results."
try:
search_results = search_system.search(message, "hybrid", 6)
if search_results:
response_parts = []
response_parts.append(f"πŸ” **Found {len(search_results)} relevant results using hybrid search**\n")
best_match = search_results[0]
response_parts.append(f"**Primary Answer** (Score: {best_match['score']:.3f})")
response_parts.append(f"πŸ“„ Source: {best_match['document']['metadata']['source']} - {best_match['document']['metadata']['section']}")
response_parts.append(f"πŸ” Search Type: {best_match['search_type'].upper()}")
if 'vector_score' in best_match and 'bm25_score' in best_match:
response_parts.append(f"πŸ“Š Vector: {best_match['vector_score']:.3f} | BM25: {best_match['bm25_score']:.3f}")
response_parts.append(f"\n{best_match['document']['content']}\n")
if len(search_results) > 1:
response_parts.append("**Additional Context:**")
for i, result in enumerate(search_results[1:3], 1):
section_info = f"{result['document']['metadata']['source']} - {result['document']['metadata']['section']}"
search_info = f"({result['search_type'].upper()}, Score: {result['score']:.3f})"
response_parts.append(f"{i}. {section_info} {search_info}")
excerpt = result['document']['content'][:200] + "..." if len(result['document']['content']) > 200 else result['document']['content']
response_parts.append(f" {excerpt}\n")
response_parts.append("\nπŸ€– **Powered by Hybrid Search Technology**")
response_parts.append("β€’ Vector Search: Semantic understanding with transformers")
response_parts.append("β€’ BM25 Search: Advanced keyword ranking")
response_parts.append("β€’ Smart Fusion: Optimal relevance through weighted combination")
return "\n".join(response_parts)
else:
return "I don't have specific information about that topic in my knowledge base. Could you please ask something else about Raktim Mondol?"
except Exception as e:
logger.error(f"Error in chat interface: {e}")
return "I'm sorry, I encountered an error while processing your question. Please try again."
# Create Gradio Interface with modern Gradio 5 features
with gr.Blocks(
title="πŸ”₯ Hybrid Search RAGtim Bot",
theme=gr.themes.Soft(),
css="""
.gradio-container {
max-width: 1200px !important;
}
.chat-container {
height: 600px;
}
"""
) as demo:
gr.Markdown("""
# πŸ”₯ Hybrid Search RAGtim Bot
**Advanced AI-powered search system combining semantic understanding with keyword precision**
🧠 **Semantic Vector Search** + πŸ” **BM25 Keyword Search** = ⚑ **Optimal Results**
Built with Gradio 5, featuring modern UI components and enhanced performance
""")
with gr.Tabs():
with gr.Tab("πŸ’¬ Chat Interface"):
gr.Markdown("### Ask anything about Raktim Mondol's research, skills, or experience")
chatbot = gr.Chatbot(
value=[],
label="RAGtim Bot",
height=400,
show_copy_button=True,
bubble_full_width=False
)
with gr.Row():
msg = gr.Textbox(
label="Your Question",
placeholder="What would you like to know about Raktim's research or expertise?",
scale=4,
lines=2
)
submit_btn = gr.Button("Ask", variant="primary", scale=1)
gr.Examples(
examples=[
"What is Raktim's research in LLMs and RAG?",
"Tell me about BioFusionNet and statistical methods",
"What are his multimodal AI capabilities?",
"Describe his biostatistics expertise"
],
inputs=msg
)
def respond(message, history):
response = chat_interface(message)
history.append((message, response))
return history, ""
submit_btn.click(respond, [msg, chatbot], [chatbot, msg])
msg.submit(respond, [msg, chatbot], [chatbot, msg])
with gr.Tab("πŸ” Advanced Search API"):
gr.Markdown("### Direct access to the hybrid search engine")
with gr.Row():
with gr.Column(scale=2):
search_query = gr.Textbox(
label="Search Query",
placeholder="Enter your search query here..."
)
with gr.Row():
search_type = gr.Radio(
choices=["hybrid", "vector", "bm25"],
value="hybrid",
label="Search Method"
)
top_k = gr.Slider(
minimum=1, maximum=20, value=5, step=1,
label="Number of Results"
)
with gr.Row():
vector_weight = gr.Slider(
minimum=0.0, maximum=1.0, value=0.6, step=0.1,
label="Vector Weight"
)
bm25_weight = gr.Slider(
minimum=0.0, maximum=1.0, value=0.4, step=0.1,
label="BM25 Weight"
)
search_btn = gr.Button("πŸ” Search", variant="primary")
with gr.Column(scale=3):
search_results = gr.JSON(
label="Search Results",
show_label=True
)
search_btn.click(
search_api,
inputs=[search_query, top_k, search_type, vector_weight, bm25_weight],
outputs=search_results
)
with gr.Tab("πŸ“Š System Statistics"):
gr.Markdown("### Knowledge base and system information")
stats_btn = gr.Button("πŸ“Š Get Statistics", variant="secondary")
stats_output = gr.JSON(
label="System Statistics",
show_label=True
)
stats_btn.click(get_stats_api, outputs=stats_output)
# Auto-load stats on tab open
demo.load(get_stats_api, outputs=stats_output)
if __name__ == "__main__":
logger.info("πŸš€ Launching Hybrid Search RAGtim Bot...")
logger.info(f"πŸ“š Loaded {len(search_system.documents)} documents")
logger.info(f"πŸ” BM25 index: {len(search_system.bm25_searcher.document_frequency)} unique terms")
logger.info(f"🧠 Vector embeddings: {len(search_system.vector_searcher.embeddings)} documents")
logger.info("πŸ”₯ Hybrid search ready!")
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
show_api=True
)