import gradio as gr import os import logging from typing import List, Dict, Tuple import numpy as np from analyzer import combine_repo_files_for_llm, handle_load_repository from hf_utils import download_filtered_space_files # Setup logger logger = logging.getLogger(__name__) class SimpleVectorStore: """Simple in-memory vector store for repository chunks.""" def __init__(self): self.chunks = [] self.embeddings = [] self.chunk_metadata = [] self.model = None def _get_embedding_model(self): """Lazy load the embedding model.""" if self.model is None: try: from sentence_transformers import SentenceTransformer self.model = SentenceTransformer('all-MiniLM-L6-v2') # Lightweight, fast model logger.info("Loaded SentenceTransformer model for vectorization") except ImportError: logger.error("sentence-transformers not installed. Install with: pip install sentence-transformers") raise ImportError("sentence-transformers package is required for vectorization") return self.model def add_chunks(self, chunks: List[str], metadata: List[Dict] = None): """Add text chunks and create embeddings.""" try: model = self._get_embedding_model() embeddings = model.encode(chunks, convert_to_tensor=False) self.chunks.extend(chunks) self.embeddings.extend(embeddings) self.chunk_metadata.extend(metadata or [{} for _ in chunks]) logger.info(f"Added {len(chunks)} chunks to vector store") except Exception as e: logger.error(f"Error adding chunks to vector store: {e}") def search(self, query: str, top_k: int = 3) -> List[Tuple[str, float, Dict]]: """Search for similar chunks using cosine similarity.""" if not self.chunks or not self.embeddings: return [] try: model = self._get_embedding_model() query_embedding = model.encode([query], convert_to_tensor=False)[0] # Calculate cosine similarities similarities = [] for i, chunk_embedding in enumerate(self.embeddings): similarity = np.dot(query_embedding, chunk_embedding) / ( np.linalg.norm(query_embedding) * np.linalg.norm(chunk_embedding) ) similarities.append((self.chunks[i], similarity, self.chunk_metadata[i])) # Sort by similarity and return top_k similarities.sort(key=lambda x: x[1], reverse=True) return similarities[:top_k] except Exception as e: logger.error(f"Error searching vector store: {e}") return [] def clear(self): """Clear all stored data.""" self.chunks = [] self.embeddings = [] self.chunk_metadata = [] def get_stats(self) -> Dict: """Get statistics about the vector store.""" return { 'total_chunks': len(self.chunks), 'total_embeddings': len(self.embeddings), 'model_loaded': self.model is not None } # Global vector store instance vector_store = SimpleVectorStore() def vectorize_repository_content(repo_content: str, repo_id: str, chunk_size: int = 500) -> bool: """ Vectorize repository content by splitting into chunks and creating embeddings. Args: repo_content: The combined repository content repo_id: Repository identifier chunk_size: Number of lines per chunk Returns: bool: True if vectorization was successful """ try: # Clear previous data vector_store.clear() lines = repo_content.split('\n') chunks = [] metadata = [] # Split into chunks with overlap for better context overlap = 50 # lines of overlap between chunks for i in range(0, len(lines), chunk_size - overlap): chunk_lines = lines[i:i + chunk_size] chunk_text = '\n'.join(chunk_lines) if chunk_text.strip(): # Only add non-empty chunks chunks.append(chunk_text) metadata.append({ 'repo_id': repo_id, 'chunk_index': len(chunks) - 1, 'start_line': i, 'end_line': min(i + chunk_size, len(lines)) }) # Add chunks to vector store vector_store.add_chunks(chunks, metadata) logger.info(f"Successfully vectorized {len(chunks)} chunks for repository {repo_id}") return True except Exception as e: logger.error(f"Error vectorizing repository content: {e}") return False def create_repo_explorer_tab() -> Tuple[Dict[str, gr.components.Component], Dict[str, gr.State]]: """ Creates the Repo Explorer tab content and returns the component references and state variables. """ # State variables for repo explorer states = { "repo_context_summary": gr.State(""), "current_repo_id": gr.State("") } gr.Markdown("### šļø Deep Dive into a Specific Repository") with gr.Row(): with gr.Column(scale=2): repo_explorer_input = gr.Textbox( label="š Repository ID", placeholder="microsoft/DialoGPT-medium", info="Enter a Hugging Face repository ID to explore" ) with gr.Column(scale=1): load_repo_btn = gr.Button("š Load Repository", variant="primary", size="lg") with gr.Row(): visit_hf_link = gr.HTML( value="", label="š Repository Link", visible=False ) with gr.Row(): repo_status_display = gr.Textbox( label="š Repository Status", interactive=False, lines=4, info="Current repository loading status and vectorization info" ) with gr.Row(): with gr.Column(scale=2): repo_chatbot = gr.Chatbot( label="š¤ Repository Assistant", height=400, type="messages", avatar_images=( "https://cdn-icons-png.flaticon.com/512/149/149071.png", "https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.png" ), show_copy_button=True, value=[] # Start empty - welcome message will appear only after repo is loaded ) with gr.Row(): repo_msg_input = gr.Textbox( label="š Ask about this repository", placeholder="What does this repository do? How do I use it?", lines=1, scale=4, info="Ask anything about the loaded repository" ) repo_send_btn = gr.Button("š¤ Send", variant="primary", scale=1) # with gr.Column(scale=1): # # Repository content preview # repo_content_display = gr.Textbox( # label="š Repository Content Preview", # lines=20, # show_copy_button=True, # interactive=False, # info="Overview of the loaded repository structure and content" # ) # Component references components = { "repo_explorer_input": repo_explorer_input, "load_repo_btn": load_repo_btn, "visit_hf_link": visit_hf_link, "repo_status_display": repo_status_display, "repo_chatbot": repo_chatbot, "repo_msg_input": repo_msg_input, "repo_send_btn": repo_send_btn, # "repo_content_display": repo_content_display } return components, states def handle_repo_user_message(user_message: str, history: List[Dict[str, str]], repo_context_summary: str, repo_id: str) -> Tuple[List[Dict[str, str]], str]: """Handle user messages in the repo-specific chatbot.""" if not repo_context_summary.strip(): return history, "" # Initialize with repository-specific welcome message if empty if not history: welcome_msg = f"Hello! I'm your assistant for the '{repo_id}' repository. I have analyzed all the files and created a comprehensive understanding of this repository. I'm ready to answer any questions about its functionality, usage, architecture, and more. What would you like to know?" history = [{"role": "assistant", "content": welcome_msg}] if user_message: history.append({"role": "user", "content": user_message}) return history, "" def handle_repo_bot_response(history: List[Dict[str, str]], repo_context_summary: str, repo_id: str) -> List[Dict[str, str]]: """Generate bot response for repo-specific questions using comprehensive context and vector search.""" if not history or history[-1]["role"] != "user" or not repo_context_summary.strip(): return history user_message = history[-1]["content"] # Use vector search to find relevant chunks relevant_chunks = vector_store.search(user_message, top_k=3) # Build enhanced context using vector search results vector_context = "" if relevant_chunks: vector_context = "\n\n=== MOST RELEVANT CODE SECTIONS ===\n" for i, (chunk, similarity, metadata) in enumerate(relevant_chunks): chunk_id = metadata.get('chunk_index', i) start_line = metadata.get('start_line', 'unknown') end_line = metadata.get('end_line', 'unknown') vector_context += f"\n--- Relevant Section {i+1} (similarity: {similarity:.3f}, lines {start_line}-{end_line}) ---\n{chunk}\n" # Create a specialized prompt using both comprehensive context and vector search results repo_system_prompt = f"""You are an expert assistant for the Hugging Face repository '{repo_id}'. You have comprehensive knowledge about this repository based on detailed analysis of all its files and components. Use the following comprehensive analysis to answer user questions accurately and helpfully: {repo_context_summary} {vector_context} Instructions: - Answer questions clearly and conversationally about this specific repository - Reference specific components, functions, or features when relevant - Provide practical guidance on installation, usage, and implementation - If asked about code details, refer to the analysis above and the relevant code sections - Use the most relevant code sections to provide specific examples and implementation details - Be helpful and informative while staying focused on this repository - If something isn't covered in the analysis, acknowledge the limitation Answer the user's question based on your comprehensive knowledge of this repository.""" try: from openai import OpenAI client = OpenAI(api_key=os.getenv("modal_api")) client.base_url = os.getenv("base_url") response = client.chat.completions.create( model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", messages=[ {"role": "system", "content": repo_system_prompt}, {"role": "user", "content": user_message} ], max_tokens=1024, temperature=0.7 ) bot_response = response.choices[0].message.content history.append({"role": "assistant", "content": bot_response}) except Exception as e: logger.error(f"Error generating repo bot response: {e}") error_response = f"I apologize, but I encountered an error while processing your question: {e}" history.append({"role": "assistant", "content": error_response}) return history def get_huggingface_url(repo_id: str) -> str: """Generate the Hugging Face Spaces URL for a repository.""" if not repo_id.strip(): return "" return f"https://huggingface.co/spaces/{repo_id}" def generate_repo_link_html(repo_id: str) -> str: """Generate HTML with clickable link for the repository.""" if not repo_id or not repo_id.strip(): return "" clean_repo_id = str(repo_id).strip() hf_url = f"https://huggingface.co/spaces/{clean_repo_id}" html_link = f'''
''' return html_link def handle_load_repository_with_vectorization(repo_id: str) -> Tuple[str, str, gr.HTML]: """Load repository and create both context summary and vector embeddings.""" if not repo_id.strip(): return "Status: Please enter a repository ID.", "", gr.update(value="", visible=False) try: logger.info(f"Loading repository with vectorization: {repo_id}") # Download and process the repository (existing logic) try: download_filtered_space_files(repo_id, local_dir="repo_files", file_extensions=['.py', '.md', '.txt']) combined_text_path = combine_repo_files_for_llm() except Exception as e: logger.error(f"Error downloading repository {repo_id}: {e}") error_status = f"ā Error downloading repository: {e}" return error_status, "", gr.update(value="", visible=False) # Read the combined content with open(combined_text_path, "r", encoding="utf-8") as f: repo_content = f.read() # Create vectorized representation vectorization_success = vectorize_repository_content(repo_content, repo_id) # Get the original context summary from analyzer import create_repo_context_summary context_summary = create_repo_context_summary(repo_content, repo_id) # Update status message if vectorization_success: status = f"ā Repository '{repo_id}' loaded successfully!\nš Files processed and ready for exploration.\nš Vector embeddings created for semantic search.\nš¬ You can now ask questions about this repository." else: status = f"ā Repository '{repo_id}' loaded successfully!\nš Files processed and ready for exploration.\nā ļø Vectorization failed - using text-only analysis.\nš¬ You can now ask questions about this repository." # Generate the HTML link for the repository repo_link_html = generate_repo_link_html(repo_id) logger.info(f"Repository {repo_id} loaded and processed successfully") return status, context_summary, gr.update(value=repo_link_html, visible=True) except Exception as e: logger.error(f"Error loading repository {repo_id}: {e}") error_status = f"ā Error loading repository: {e}" return error_status, "", gr.update(value="", visible=False) def initialize_repo_chatbot(repo_status: str, repo_id: str, repo_context_summary: str) -> List[Dict[str, str]]: """Initialize the repository chatbot with a welcome message after successful repo loading.""" # Only initialize if repository was loaded successfully if repo_context_summary.strip() and "successfully" in repo_status.lower(): # Check if vectorization was successful vectorization_status = "š **Enhanced with vector search** for finding relevant code sections" if "Vector embeddings created" in repo_status else "š **Text-based analysis** (vector search unavailable)" welcome_msg = f"š Welcome! I've successfully analyzed the **{repo_id}** repository.\n\nš§ **I now have comprehensive knowledge of:**\n⢠All files and code structure\n⢠Key features and capabilities\n⢠Installation and usage instructions\n⢠Architecture and implementation details\n⢠Dependencies and requirements\n\n{vectorization_status}\n\nš¬ **Ask me anything about this repository!** \nFor example:\n⢠\"What does this repository do?\"\n⢠\"How do I install and use it?\"\n⢠\"What are the main components?\"\n⢠\"Show me usage examples\"\n\nWhat would you like to know? š¤" return [{"role": "assistant", "content": welcome_msg}] else: # Keep chatbot empty if loading failed return [] def setup_repo_explorer_events(components: Dict[str, gr.components.Component], states: Dict[str, gr.State]): """Setup event handlers for the repo explorer components.""" # Load repository event with vectorization components["load_repo_btn"].click( fn=handle_load_repository_with_vectorization, inputs=[components["repo_explorer_input"]], outputs=[components["repo_status_display"], states["repo_context_summary"], components["visit_hf_link"]] ).then( fn=lambda repo_id: repo_id, inputs=[components["repo_explorer_input"]], outputs=[states["current_repo_id"]] ).then( fn=initialize_repo_chatbot, inputs=[components["repo_status_display"], states["current_repo_id"], states["repo_context_summary"]], outputs=[components["repo_chatbot"]] ) # Chat message submission events components["repo_msg_input"].submit( fn=handle_repo_user_message, inputs=[components["repo_msg_input"], components["repo_chatbot"], states["repo_context_summary"], states["current_repo_id"]], outputs=[components["repo_chatbot"], components["repo_msg_input"]] ).then( fn=handle_repo_bot_response, inputs=[components["repo_chatbot"], states["repo_context_summary"], states["current_repo_id"]], outputs=[components["repo_chatbot"]] ) components["repo_send_btn"].click( fn=handle_repo_user_message, inputs=[components["repo_msg_input"], components["repo_chatbot"], states["repo_context_summary"], states["current_repo_id"]], outputs=[components["repo_chatbot"], components["repo_msg_input"]] ).then( fn=handle_repo_bot_response, inputs=[components["repo_chatbot"], states["repo_context_summary"], states["current_repo_id"]], outputs=[components["repo_chatbot"]] )