Spaces:
Running
Running
import gradio as gr | |
import json | |
import numpy as np | |
from transformers import pipeline | |
import torch | |
import os | |
from typing import List, Dict, Any, Optional | |
import re | |
import math | |
from collections import defaultdict, Counter | |
from pathlib import Path | |
import logging | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Configure device | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
logger.info(f"Using device: {device}") | |
class DocumentProcessor: | |
"""Handles document processing and text extraction from markdown files.""" | |
def __init__(self, knowledge_base_dir: str = "knowledge_base"): | |
self.knowledge_base_dir = Path(knowledge_base_dir) | |
def load_markdown_files(self) -> List[Dict[str, Any]]: | |
"""Load and process all markdown files in the knowledge base directory.""" | |
documents = [] | |
file_priorities = { | |
'about.md': 10, | |
'research_details.md': 9, | |
'publications_detailed.md': 8, | |
'skills_expertise.md': 7, | |
'experience_detailed.md': 8, | |
'statistics.md': 9 | |
} | |
for file_path in self.knowledge_base_dir.glob("*.md"): | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
file_type = file_path.stem | |
priority = file_priorities.get(file_path.name, 5) | |
sections = self._split_markdown_into_sections(content) | |
for section in sections: | |
if len(section['content'].strip()) > 100: | |
doc = { | |
"id": f"{file_path.name}_{section['title']}_{len(documents)}", | |
"content": section['content'], | |
"metadata": { | |
"type": file_type, | |
"priority": priority, | |
"section": section['title'], | |
"source": file_path.name | |
} | |
} | |
documents.append(doc) | |
logger.info(f"β Loaded {file_path.name}") | |
except Exception as e: | |
logger.error(f"β Error loading {file_path.name}: {e}") | |
return documents | |
def _split_markdown_into_sections(self, content: str) -> List[Dict[str, str]]: | |
"""Split markdown content into sections based on headers.""" | |
sections = [] | |
lines = content.split('\n') | |
current_section = {'title': 'Introduction', 'content': ''} | |
for line in lines: | |
if line.startswith('#'): | |
if current_section['content'].strip(): | |
sections.append(current_section.copy()) | |
title = line.lstrip('#').strip() | |
current_section = { | |
'title': title, | |
'content': line + '\n' | |
} | |
else: | |
current_section['content'] += line + '\n' | |
if current_section['content'].strip(): | |
sections.append(current_section) | |
return sections | |
class BM25Searcher: | |
"""Implements BM25 search algorithm for keyword-based document retrieval.""" | |
def __init__(self, k1: float = 1.5, b: float = 0.75): | |
self.k1 = k1 | |
self.b = b | |
self.term_frequencies = {} | |
self.document_frequency = defaultdict(int) | |
self.document_lengths = {} | |
self.average_doc_length = 0 | |
self.total_documents = 0 | |
def build_index(self, documents: List[Dict[str, Any]]): | |
"""Build BM25 index from documents.""" | |
logger.info("Building BM25 index...") | |
self.term_frequencies = {} | |
self.document_frequency = defaultdict(int) | |
self.document_lengths = {} | |
total_length = 0 | |
for doc in documents: | |
doc_id = doc['id'] | |
terms = self._tokenize(doc['content']) | |
term_freq = Counter(terms) | |
self.term_frequencies[doc_id] = dict(term_freq) | |
doc_length = len(terms) | |
self.document_lengths[doc_id] = doc_length | |
total_length += doc_length | |
unique_terms = set(terms) | |
for term in unique_terms: | |
self.document_frequency[term] += 1 | |
self.total_documents = len(documents) | |
self.average_doc_length = total_length / self.total_documents if self.total_documents > 0 else 0 | |
logger.info(f"β BM25 index built: {len(self.document_frequency)} unique terms") | |
def search(self, query: str, documents: List[Dict[str, Any]], top_k: int = 10) -> List[Dict[str, Any]]: | |
"""Perform BM25 search.""" | |
query_terms = self._tokenize(query) | |
if not query_terms: | |
return [] | |
scores = {} | |
for doc in documents: | |
doc_id = doc['id'] | |
score = 0.0 | |
for term in query_terms: | |
score += self._calculate_bm25_score(term, doc_id) | |
if score > 0: | |
priority_boost = 1 + (doc['metadata']['priority'] / 50) | |
final_score = score * priority_boost | |
scores[doc_id] = { | |
'document': doc, | |
'score': final_score, | |
'search_type': 'bm25' | |
} | |
sorted_results = sorted(scores.values(), key=lambda x: x['score'], reverse=True) | |
return sorted_results[:top_k] | |
def _tokenize(self, text: str) -> List[str]: | |
"""Tokenize text for BM25.""" | |
text = re.sub(r'[^\w\s]', ' ', text.lower()) | |
words = [word for word in text.split() if len(word) > 2 and not self._is_stop_word(word)] | |
return words | |
def _is_stop_word(self, word: str) -> bool: | |
"""Check if word is a stop word.""" | |
stop_words = { | |
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', | |
'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', | |
'will', 'would', 'could', 'should', 'may', 'might', 'can', 'this', 'that', 'these', 'those' | |
} | |
return word in stop_words | |
def _calculate_bm25_score(self, term: str, doc_id: str) -> float: | |
"""Calculate BM25 score for a term in a document.""" | |
tf = self.term_frequencies.get(doc_id, {}).get(term, 0) | |
if tf == 0: | |
return 0.0 | |
df = self.document_frequency.get(term, 1) | |
doc_length = self.document_lengths.get(doc_id, 0) | |
idf = math.log((self.total_documents - df + 0.5) / (df + 0.5)) | |
numerator = tf * (self.k1 + 1) | |
denominator = tf + self.k1 * (1 - self.b + self.b * (doc_length / self.average_doc_length)) | |
return idf * (numerator / denominator) | |
class VectorSearcher: | |
"""Implements vector-based semantic search using transformer embeddings.""" | |
def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): | |
self.model_name = model_name | |
self.embedder = None | |
self.embeddings = [] | |
def initialize_model(self): | |
"""Initialize the embedding model.""" | |
try: | |
logger.info("Loading embedding model...") | |
self.embedder = pipeline( | |
'feature-extraction', | |
self.model_name, | |
device=0 if device == "cuda" else -1 | |
) | |
logger.info("β Embedding model loaded successfully") | |
except Exception as e: | |
logger.error(f"β Error loading embedding model: {e}") | |
raise e | |
def build_embeddings(self, documents: List[Dict[str, Any]]): | |
"""Build embeddings for all documents.""" | |
logger.info("Generating embeddings for knowledge base...") | |
self.embeddings = [] | |
for i, doc in enumerate(documents): | |
try: | |
content = doc["content"][:500] # Limit to 500 characters | |
embedding = self.embedder(content, return_tensors="pt") | |
embedding_np = embedding[0].mean(dim=0).detach().cpu().numpy() | |
self.embeddings.append(embedding_np) | |
except Exception as e: | |
logger.error(f"Error generating embedding for doc {doc['id']}: {e}") | |
self.embeddings.append(np.zeros(384)) | |
logger.info(f"β Generated {len(self.embeddings)} embeddings") | |
def search(self, query: str, documents: List[Dict[str, Any]], top_k: int = 10) -> List[Dict[str, Any]]: | |
"""Perform vector similarity search.""" | |
try: | |
query_embedding = self.embedder(query[:500], return_tensors="pt") | |
query_vector = query_embedding[0].mean(dim=0).detach().cpu().numpy() | |
similarities = [] | |
for i, doc_embedding in enumerate(self.embeddings): | |
if doc_embedding is not None and len(doc_embedding) > 0: | |
similarity = self._cosine_similarity(query_vector, doc_embedding) | |
priority_boost = 1 + (documents[i]['metadata']['priority'] / 100) | |
final_score = similarity * priority_boost | |
similarities.append({ | |
'document': documents[i], | |
'score': float(final_score), | |
'search_type': 'vector' | |
}) | |
similarities.sort(key=lambda x: x['score'], reverse=True) | |
return similarities[:top_k] | |
except Exception as e: | |
logger.error(f"Error in vector search: {e}") | |
return [] | |
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float: | |
"""Calculate cosine similarity between two vectors.""" | |
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) | |
class HybridSearchSystem: | |
"""Main hybrid search system combining BM25 and vector search.""" | |
def __init__(self): | |
self.doc_processor = DocumentProcessor() | |
self.bm25_searcher = BM25Searcher() | |
self.vector_searcher = VectorSearcher() | |
self.documents = [] | |
def initialize(self): | |
"""Initialize the entire search system.""" | |
logger.info("Initializing Hybrid Search RAGtim Bot...") | |
# Load documents | |
self.documents = self.doc_processor.load_markdown_files() | |
# Initialize models and build indices | |
self.vector_searcher.initialize_model() | |
self.vector_searcher.build_embeddings(self.documents) | |
self.bm25_searcher.build_index(self.documents) | |
logger.info(f"β System initialized with {len(self.documents)} documents") | |
def search(self, query: str, search_type: str = "hybrid", top_k: int = 5, | |
vector_weight: float = 0.6, bm25_weight: float = 0.4) -> List[Dict[str, Any]]: | |
"""Perform search based on specified method.""" | |
if search_type == "vector": | |
return self.vector_searcher.search(query, self.documents, top_k) | |
elif search_type == "bm25": | |
return self.bm25_searcher.search(query, self.documents, top_k) | |
else: # hybrid | |
return self._hybrid_search(query, top_k, vector_weight, bm25_weight) | |
def _hybrid_search(self, query: str, top_k: int = 10, | |
vector_weight: float = 0.6, bm25_weight: float = 0.4) -> List[Dict[str, Any]]: | |
"""Perform hybrid search combining vector and BM25 results.""" | |
try: | |
vector_results = self.vector_searcher.search(query, self.documents, top_k * 2) | |
bm25_results = self.bm25_searcher.search(query, self.documents, top_k * 2) | |
# Normalize scores | |
if vector_results: | |
max_vector_score = max(r['score'] for r in vector_results) | |
if max_vector_score > 0: | |
for result in vector_results: | |
result['normalized_score'] = result['score'] / max_vector_score | |
else: | |
for result in vector_results: | |
result['normalized_score'] = 0 | |
if bm25_results: | |
max_bm25_score = max(r['score'] for r in bm25_results) | |
if max_bm25_score > 0: | |
for result in bm25_results: | |
result['normalized_score'] = result['score'] / max_bm25_score | |
else: | |
for result in bm25_results: | |
result['normalized_score'] = 0 | |
# Combine results | |
combined_scores = {} | |
for result in vector_results: | |
doc_id = result['document']['id'] | |
combined_scores[doc_id] = { | |
'document': result['document'], | |
'vector_score': result['normalized_score'], | |
'bm25_score': 0.0, | |
'search_type': 'vector' | |
} | |
for result in bm25_results: | |
doc_id = result['document']['id'] | |
if doc_id in combined_scores: | |
combined_scores[doc_id]['bm25_score'] = result['normalized_score'] | |
combined_scores[doc_id]['search_type'] = 'hybrid' | |
else: | |
combined_scores[doc_id] = { | |
'document': result['document'], | |
'vector_score': 0.0, | |
'bm25_score': result['normalized_score'], | |
'search_type': 'bm25' | |
} | |
# Calculate final hybrid scores | |
final_results = [] | |
for doc_id, data in combined_scores.items(): | |
hybrid_score = (vector_weight * data['vector_score']) + (bm25_weight * data['bm25_score']) | |
final_results.append({ | |
'document': data['document'], | |
'score': hybrid_score, | |
'vector_score': data['vector_score'], | |
'bm25_score': data['bm25_score'], | |
'search_type': data['search_type'] | |
}) | |
final_results.sort(key=lambda x: x['score'], reverse=True) | |
return final_results[:top_k] | |
except Exception as e: | |
logger.error(f"Error in hybrid search: {e}") | |
return self.vector_searcher.search(query, self.documents, top_k) | |
# Initialize the search system | |
search_system = HybridSearchSystem() | |
search_system.initialize() | |
# API Functions | |
def search_api(query: str, top_k: int = 5, search_type: str = "hybrid", | |
vector_weight: float = 0.6, bm25_weight: float = 0.4) -> Dict[str, Any]: | |
"""API endpoint for search functionality.""" | |
try: | |
results = search_system.search(query, search_type, top_k, vector_weight, bm25_weight) | |
return { | |
"results": results, | |
"query": query, | |
"top_k": top_k, | |
"search_type": search_type, | |
"total_documents": len(search_system.documents), | |
"search_parameters": { | |
"vector_weight": vector_weight if search_type == "hybrid" else None, | |
"bm25_weight": bm25_weight if search_type == "hybrid" else None, | |
"bm25_k1": search_system.bm25_searcher.k1, | |
"bm25_b": search_system.bm25_searcher.b | |
} | |
} | |
except Exception as e: | |
logger.error(f"Error in search API: {e}") | |
return {"error": str(e), "results": []} | |
def get_stats_api() -> Dict[str, Any]: | |
"""API endpoint for system statistics.""" | |
try: | |
doc_types = {} | |
sections_by_file = {} | |
for doc in search_system.documents: | |
doc_type = doc["metadata"]["type"] | |
source_file = doc["metadata"]["source"] | |
doc_types[doc_type] = doc_types.get(doc_type, 0) + 1 | |
sections_by_file[source_file] = sections_by_file.get(source_file, 0) + 1 | |
return { | |
"total_documents": len(search_system.documents), | |
"document_types": doc_types, | |
"sections_by_file": sections_by_file, | |
"model_name": search_system.vector_searcher.model_name, | |
"embedding_dimension": 384, | |
"search_capabilities": [ | |
"Hybrid Search (Vector + BM25)", | |
"Semantic Vector Search", | |
"BM25 Keyword Search", | |
"GPU Accelerated", | |
"Transformer Embeddings" | |
], | |
"bm25_parameters": { | |
"k1": search_system.bm25_searcher.k1, | |
"b": search_system.bm25_searcher.b, | |
"unique_terms": len(search_system.bm25_searcher.document_frequency), | |
"average_doc_length": search_system.bm25_searcher.average_doc_length | |
}, | |
"backend_type": "Hugging Face Space with Hybrid Search", | |
"knowledge_sources": list(sections_by_file.keys()), | |
"status": "healthy" | |
} | |
except Exception as e: | |
logger.error(f"Error in get_stats_api: {e}") | |
return { | |
"error": str(e), | |
"status": "error", | |
"total_documents": 0, | |
"search_capabilities": ["Error"] | |
} | |
def chat_interface(message: str) -> str: | |
"""Enhanced chat interface with better formatting.""" | |
if not message.strip(): | |
return "Please ask me something about Raktim Mondol! I use hybrid search combining semantic similarity and keyword matching for the best results." | |
try: | |
search_results = search_system.search(message, "hybrid", 6) | |
if search_results: | |
response_parts = [] | |
response_parts.append(f"π **Found {len(search_results)} relevant results using hybrid search**\n") | |
best_match = search_results[0] | |
response_parts.append(f"**Primary Answer** (Score: {best_match['score']:.3f})") | |
response_parts.append(f"π Source: {best_match['document']['metadata']['source']} - {best_match['document']['metadata']['section']}") | |
response_parts.append(f"π Search Type: {best_match['search_type'].upper()}") | |
if 'vector_score' in best_match and 'bm25_score' in best_match: | |
response_parts.append(f"π Vector: {best_match['vector_score']:.3f} | BM25: {best_match['bm25_score']:.3f}") | |
response_parts.append(f"\n{best_match['document']['content']}\n") | |
if len(search_results) > 1: | |
response_parts.append("**Additional Context:**") | |
for i, result in enumerate(search_results[1:3], 1): | |
section_info = f"{result['document']['metadata']['source']} - {result['document']['metadata']['section']}" | |
search_info = f"({result['search_type'].upper()}, Score: {result['score']:.3f})" | |
response_parts.append(f"{i}. {section_info} {search_info}") | |
excerpt = result['document']['content'][:200] + "..." if len(result['document']['content']) > 200 else result['document']['content'] | |
response_parts.append(f" {excerpt}\n") | |
response_parts.append("\nπ€ **Powered by Hybrid Search Technology**") | |
response_parts.append("β’ Vector Search: Semantic understanding with transformers") | |
response_parts.append("β’ BM25 Search: Advanced keyword ranking") | |
response_parts.append("β’ Smart Fusion: Optimal relevance through weighted combination") | |
return "\n".join(response_parts) | |
else: | |
return "I don't have specific information about that topic in my knowledge base. Could you please ask something else about Raktim Mondol?" | |
except Exception as e: | |
logger.error(f"Error in chat interface: {e}") | |
return "I'm sorry, I encountered an error while processing your question. Please try again." | |
# Create Gradio Interface with modern Gradio 5 features | |
with gr.Blocks( | |
title="π₯ Hybrid Search RAGtim Bot", | |
theme=gr.themes.Soft(), | |
css=""" | |
.gradio-container { | |
max-width: 1200px !important; | |
} | |
.chat-container { | |
height: 600px; | |
} | |
""" | |
) as demo: | |
gr.Markdown(""" | |
# π₯ Hybrid Search RAGtim Bot | |
**Advanced AI-powered search system combining semantic understanding with keyword precision** | |
π§ **Semantic Vector Search** + π **BM25 Keyword Search** = β‘ **Optimal Results** | |
Built with Gradio 5, featuring modern UI components and enhanced performance | |
""") | |
with gr.Tabs(): | |
with gr.Tab("π¬ Chat Interface"): | |
gr.Markdown("### Ask anything about Raktim Mondol's research, skills, or experience") | |
chatbot = gr.Chatbot( | |
value=[], | |
label="RAGtim Bot", | |
height=400, | |
show_copy_button=True, | |
bubble_full_width=False | |
) | |
with gr.Row(): | |
msg = gr.Textbox( | |
label="Your Question", | |
placeholder="What would you like to know about Raktim's research or expertise?", | |
scale=4, | |
lines=2 | |
) | |
submit_btn = gr.Button("Ask", variant="primary", scale=1) | |
gr.Examples( | |
examples=[ | |
"What is Raktim's research in LLMs and RAG?", | |
"Tell me about BioFusionNet and statistical methods", | |
"What are his multimodal AI capabilities?", | |
"Describe his biostatistics expertise" | |
], | |
inputs=msg | |
) | |
def respond(message, history): | |
response = chat_interface(message) | |
history.append((message, response)) | |
return history, "" | |
submit_btn.click(respond, [msg, chatbot], [chatbot, msg]) | |
msg.submit(respond, [msg, chatbot], [chatbot, msg]) | |
with gr.Tab("π Advanced Search API"): | |
gr.Markdown("### Direct access to the hybrid search engine") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
search_query = gr.Textbox( | |
label="Search Query", | |
placeholder="Enter your search query here..." | |
) | |
with gr.Row(): | |
search_type = gr.Radio( | |
choices=["hybrid", "vector", "bm25"], | |
value="hybrid", | |
label="Search Method" | |
) | |
top_k = gr.Slider( | |
minimum=1, maximum=20, value=5, step=1, | |
label="Number of Results" | |
) | |
with gr.Row(): | |
vector_weight = gr.Slider( | |
minimum=0.0, maximum=1.0, value=0.6, step=0.1, | |
label="Vector Weight" | |
) | |
bm25_weight = gr.Slider( | |
minimum=0.0, maximum=1.0, value=0.4, step=0.1, | |
label="BM25 Weight" | |
) | |
search_btn = gr.Button("π Search", variant="primary") | |
with gr.Column(scale=3): | |
search_results = gr.JSON( | |
label="Search Results", | |
show_label=True | |
) | |
search_btn.click( | |
search_api, | |
inputs=[search_query, top_k, search_type, vector_weight, bm25_weight], | |
outputs=search_results | |
) | |
with gr.Tab("π System Statistics"): | |
gr.Markdown("### Knowledge base and system information") | |
stats_btn = gr.Button("π Get Statistics", variant="secondary") | |
stats_output = gr.JSON( | |
label="System Statistics", | |
show_label=True | |
) | |
stats_btn.click(get_stats_api, outputs=stats_output) | |
# Auto-load stats on tab open | |
demo.load(get_stats_api, outputs=stats_output) | |
if __name__ == "__main__": | |
logger.info("π Launching Hybrid Search RAGtim Bot...") | |
logger.info(f"π Loaded {len(search_system.documents)} documents") | |
logger.info(f"π BM25 index: {len(search_system.bm25_searcher.document_frequency)} unique terms") | |
logger.info(f"π§ Vector embeddings: {len(search_system.vector_searcher.embeddings)} documents") | |
logger.info("π₯ Hybrid search ready!") | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
show_error=True, | |
show_api=True | |
) |