import gradio as gr import regex as re import csv import pandas as pd from typing import List, Dict, Tuple, Any import logging import os import time # Import core logic from other modules, as in app_old.py from analyzer import ( combine_repo_files_for_llm, parse_llm_json_response, analyze_combined_file, handle_load_repository ) from hf_utils import download_filtered_space_files, search_top_spaces from chatbot_page import chat_with_user, extract_keywords_from_conversation from repo_explorer import create_repo_explorer_tab, setup_repo_explorer_events # --- Configuration --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) CSV_FILE = "repo_ids.csv" CHATBOT_SYSTEM_PROMPT = ( "You are a helpful assistant whose ONLY job is to gather information about the user's ideal repository requirements. " "DO NOT suggest any specific repositories or give repository recommendations. " "Your role is to ask clarifying questions to understand exactly what the user is looking for. " "Ask about their use case, preferred programming language, specific features needed, project type, etc. " "When you feel you have gathered enough detailed information about their requirements, " "tell the user: 'I think I have enough information about your requirements. Please click the Extract Keywords button to search for repositories.' " "Focus on understanding their needs, not providing solutions." ) CHATBOT_INITIAL_MESSAGE = "Hello! I'm here to help you define your ideal Hugging Face repository requirements. I won't suggest specific repos - my job is to understand exactly what you're looking for. Tell me about your project: What type of application are you building? What's your use case?" # --- Helper Functions (Logic) --- def get_top_relevant_repos(df: pd.DataFrame, user_requirements: str, top_n: int = 3) -> pd.DataFrame: """ Uses LLM to select the top N most relevant repositories based on user requirements and analysis data. """ try: if df.empty: return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) # Filter out rows with no analysis data analyzed_df = df.copy() analyzed_df = analyzed_df[ (analyzed_df['strength'].str.strip() != '') | (analyzed_df['weaknesses'].str.strip() != '') | (analyzed_df['speciality'].str.strip() != '') | (analyzed_df['relevance rating'].str.strip() != '') ] if analyzed_df.empty: logger.warning("No analyzed repositories found for LLM selection") return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) # Create a prompt for the LLM csv_data = "" for idx, row in analyzed_df.iterrows(): csv_data += f"Repository: {row['repo id']}\n" csv_data += f"Strengths: {row['strength']}\n" csv_data += f"Weaknesses: {row['weaknesses']}\n" csv_data += f"Speciality: {row['speciality']}\n" csv_data += f"Relevance: {row['relevance rating']}\n\n" user_context = user_requirements if user_requirements.strip() else "General repository recommendation" prompt = f"""Based on the user's requirements and the analysis of repositories below, select the top {top_n} most relevant repositories. User Requirements: {user_context} Repository Analysis Data: {csv_data} Please analyze all repositories and select the {top_n} most relevant ones based on: 1. How well they match the user's specific requirements 2. Their strengths and capabilities 3. Their relevance rating 4. Their speciality alignment with user needs Return ONLY a JSON list of the repository IDs in order of relevance (most relevant first). Example format: ["repo1", "repo2", "repo3"] Selected repositories:""" try: from openai import OpenAI client = OpenAI(api_key=os.getenv("modal_api")) client.base_url = os.getenv("base_url") response = client.chat.completions.create( model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", messages=[ {"role": "system", "content": "You are an expert at analyzing and ranking repositories based on user requirements. Always return valid JSON."}, {"role": "user", "content": prompt} ], max_tokens=200, temperature=0.3 ) llm_response = response.choices[0].message.content.strip() logger.info(f"LLM response for top repos: {llm_response}") # Extract JSON from response import json import re # Try to find JSON array in the response json_match = re.search(r'\[.*\]', llm_response) if json_match: selected_repos = json.loads(json_match.group()) logger.info(f"LLM selected repositories: {selected_repos}") # Filter dataframe to only include selected repositories in order top_repos_list = [] for repo_id in selected_repos[:top_n]: matching_rows = analyzed_df[analyzed_df['repo id'] == repo_id] if not matching_rows.empty: top_repos_list.append(matching_rows.iloc[0]) if top_repos_list: top_repos = pd.DataFrame(top_repos_list) logger.info(f"Successfully selected {len(top_repos)} repositories using LLM") return top_repos # Fallback: if LLM response parsing fails, use first N analyzed repos logger.warning("Failed to parse LLM response, using fallback selection") return analyzed_df.head(top_n) except Exception as llm_error: logger.error(f"LLM selection failed: {llm_error}") # Fallback: return first N repositories with analysis data return analyzed_df.head(top_n) except Exception as e: logger.error(f"Error in LLM-based repo selection: {e}") return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) def write_repos_to_csv(repo_ids: List[str]) -> None: """Writes a list of repo IDs to the CSV file, overwriting the previous content.""" try: with open(CSV_FILE, mode="w", newline='', encoding="utf-8") as csvfile: writer = csv.writer(csvfile) writer.writerow(["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) for repo_id in repo_ids: writer.writerow([repo_id, "", "", "", ""]) logger.info(f"Wrote {len(repo_ids)} repo IDs to {CSV_FILE}") except Exception as e: logger.error(f"Error writing to CSV: {e}") def format_text_for_dataframe(text: str, max_length: int = 200) -> str: """Format text for better display in dataframe by truncating and cleaning.""" if not text or pd.isna(text): return "" # Clean the text text = str(text).strip() # Remove excessive whitespace and newlines text = re.sub(r'\s+', ' ', text) # Truncate if too long if len(text) > max_length: text = text[:max_length-3] + "..." return text def read_csv_to_dataframe() -> pd.DataFrame: """Reads the CSV file into a pandas DataFrame with full text preserved.""" try: df = pd.read_csv(CSV_FILE, dtype=str).fillna('') # Keep the full text intact - don't truncate here # The truncation will be handled in the UI display layer return df except FileNotFoundError: return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) except Exception as e: logger.error(f"Error reading CSV: {e}") return pd.DataFrame() def format_dataframe_for_display(df: pd.DataFrame) -> pd.DataFrame: """Returns dataframe with full text (no truncation) for display.""" if df.empty: return df # Return the dataframe as-is without any text truncation # This will show the full text content in the CSV display return df.copy() def analyze_and_update_single_repo(repo_id: str, user_requirements: str = "") -> Tuple[str, str, pd.DataFrame]: """ Downloads, analyzes a single repo, updates the CSV, and returns results. Now includes user requirements for better relevance rating. This function combines the logic of downloading, analyzing, and updating the CSV for one repo. """ try: logger.info(f"Starting analysis for repo: {repo_id}") download_filtered_space_files(repo_id, local_dir="repo_files", file_extensions=['.py', '.md', '.txt']) txt_path = combine_repo_files_for_llm() with open(txt_path, "r", encoding="utf-8") as f: combined_content = f.read() llm_output = analyze_combined_file(txt_path, user_requirements) last_start = llm_output.rfind('{') last_end = llm_output.rfind('}') final_json_str = llm_output[last_start:last_end+1] if last_start != -1 and last_end != -1 else "{}" llm_json = parse_llm_json_response(final_json_str) summary = "" if isinstance(llm_json, dict) and "error" not in llm_json: strengths = llm_json.get("strength", "N/A") weaknesses = llm_json.get("weaknesses", "N/A") relevance = llm_json.get("relevance rating", "N/A") summary = f"JSON extraction: SUCCESS\n\nStrengths:\n{strengths}\n\nWeaknesses:\n{weaknesses}\n\nRelevance: {relevance}" else: summary = f"JSON extraction: FAILED\nRaw response might not be valid JSON." # Update CSV df = read_csv_to_dataframe() repo_found_in_df = False for idx, row in df.iterrows(): if row["repo id"] == repo_id: if isinstance(llm_json, dict): df.at[idx, "strength"] = llm_json.get("strength", "") df.at[idx, "weaknesses"] = llm_json.get("weaknesses", "") df.at[idx, "speciality"] = llm_json.get("speciality", "") df.at[idx, "relevance rating"] = llm_json.get("relevance rating", "") repo_found_in_df = True break if not repo_found_in_df: logger.warning(f"Repo ID {repo_id} not found in CSV for updating.") # Write CSV with better error handling and flushing try: df.to_csv(CSV_FILE, index=False) # Force file system flush os.sync() if hasattr(os, 'sync') else None logger.info(f"Successfully updated CSV for {repo_id}") except Exception as csv_error: logger.error(f"Failed to write CSV for {repo_id}: {csv_error}") # Try once more with a small delay time.sleep(0.2) try: df.to_csv(CSV_FILE, index=False) logger.info(f"Successfully updated CSV for {repo_id} on retry") except Exception as retry_error: logger.error(f"Failed to write CSV for {repo_id} on retry: {retry_error}") logger.info(f"Successfully analyzed and updated CSV for {repo_id}") return combined_content, summary, df except Exception as e: logger.error(f"An error occurred during analysis of {repo_id}: {e}") error_summary = f"Error analyzing repo: {e}" return "", error_summary, format_dataframe_for_display(read_csv_to_dataframe()) # --- NEW: Helper for Chat History Conversion --- def convert_messages_to_tuples(history: List[Dict[str, str]]) -> List[Tuple[str, str]]: """ Converts Gradio's 'messages' format to the old 'tuple' format for compatibility. This robust version correctly handles histories that start with an assistant message. """ tuple_history = [] # Iterate through the history to find user messages for i, msg in enumerate(history): if msg['role'] == 'user': # Once a user message is found, check if the next message is from the assistant if i + 1 < len(history) and history[i+1]['role'] == 'assistant': user_content = msg['content'] assistant_content = history[i+1]['content'] tuple_history.append((user_content, assistant_content)) return tuple_history # --- Gradio UI --- def create_ui() -> gr.Blocks: """Creates and configures the entire Gradio interface.""" css = """ /* Modern sleek design */ .gradio-container { font-family: 'Inter', 'system-ui', sans-serif; background: linear-gradient(135deg, #0a0a0a 0%, #1a1a1a 100%); min-height: 100vh; } .gr-form { background: rgba(255, 255, 255, 0.95); backdrop-filter: blur(10px); border-radius: 16px; box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1); padding: 24px; margin: 16px; border: 1px solid rgba(255, 255, 255, 0.2); } .gr-button { background: linear-gradient(45deg, #667eea, #764ba2); border: none; border-radius: 12px; color: white; font-weight: 600; padding: 12px 24px; transition: all 0.3s ease; box-shadow: 0 4px 15px rgba(102, 126, 234, 0.4); } .gr-button:hover { transform: translateY(-2px); box-shadow: 0 6px 20px rgba(102, 126, 234, 0.6); } .gr-textbox { border: 2px solid rgba(102, 126, 234, 0.2); border-radius: 12px; background: rgba(255, 255, 255, 0.9); transition: all 0.3s ease; } .gr-textbox:focus { border-color: #667eea; box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1); } .gr-panel { background: rgba(255, 255, 255, 0.95); border-radius: 16px; box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1); border: 1px solid rgba(255, 255, 255, 0.2); } .gr-tab-nav { background: rgba(255, 255, 255, 0.95); border-radius: 12px 12px 0 0; backdrop-filter: blur(10px); } .gr-tab-nav button { background: transparent; border: none; padding: 16px 24px; font-weight: 600; color: #666; transition: all 0.3s ease; } .gr-tab-nav button.selected { background: linear-gradient(45deg, #667eea, #764ba2); color: white; border-radius: 8px; } .chatbot { border-radius: 16px; box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1); } /* Hide Gradio footer */ footer { display: none !important; } /* Custom scrollbar */ ::-webkit-scrollbar { width: 8px; } ::-webkit-scrollbar-track { background: rgba(255, 255, 255, 0.1); border-radius: 4px; } ::-webkit-scrollbar-thumb { background: linear-gradient(45deg, #667eea, #764ba2); border-radius: 4px; } /* Improved dataframe styling for full text display */ .gr-dataframe { border-radius: 12px; overflow: hidden; box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1); background: rgba(255, 255, 255, 0.98); } .gr-dataframe table { width: 100%; table-layout: fixed; border-collapse: collapse; } /* Column width specifications for both dataframes */ .gr-dataframe th, .gr-dataframe td { padding: 12px 16px; text-align: left; border-bottom: 1px solid rgba(0, 0, 0, 0.1); font-size: 0.95rem; line-height: 1.4; } /* Specific column widths - applying to both dataframes */ .gr-dataframe th:nth-child(1), .gr-dataframe td:nth-child(1) { width: 16.67% !important; min-width: 16.67% !important; max-width: 16.67% !important; } .gr-dataframe th:nth-child(2), .gr-dataframe td:nth-child(2) { width: 25% !important; min-width: 25% !important; max-width: 25% !important; } .gr-dataframe th:nth-child(3), .gr-dataframe td:nth-child(3) { width: 25% !important; min-width: 25% !important; max-width: 25% !important; } .gr-dataframe th:nth-child(4), .gr-dataframe td:nth-child(4) { width: 20.83% !important; min-width: 20.83% !important; max-width: 20.83% !important; } .gr-dataframe th:nth-child(5), .gr-dataframe td:nth-child(5) { width: 12.5% !important; min-width: 12.5% !important; max-width: 12.5% !important; } /* Additional specific targeting for both dataframes */ div[data-testid="dataframe"] table th:nth-child(1), div[data-testid="dataframe"] table td:nth-child(1) { width: 16.67% !important; } div[data-testid="dataframe"] table th:nth-child(2), div[data-testid="dataframe"] table td:nth-child(2) { width: 25% !important; } div[data-testid="dataframe"] table th:nth-child(3), div[data-testid="dataframe"] table td:nth-child(3) { width: 25% !important; } div[data-testid="dataframe"] table th:nth-child(4), div[data-testid="dataframe"] table td:nth-child(4) { width: 20.83% !important; } div[data-testid="dataframe"] table th:nth-child(5), div[data-testid="dataframe"] table td:nth-child(5) { width: 12.5% !important; } /* Make repository names clickable */ .gr-dataframe td:nth-child(1) { cursor: pointer; color: #667eea; font-weight: 600; transition: all 0.3s ease; } .gr-dataframe td:nth-child(1):hover { background-color: rgba(102, 126, 234, 0.1); color: #764ba2; transform: scale(1.02); } /* Content columns - readable styling with scroll for long text */ .gr-dataframe td:nth-child(2), .gr-dataframe td:nth-child(3), .gr-dataframe td:nth-child(4), .gr-dataframe td:nth-child(5) { cursor: default; font-size: 0.9rem; } .gr-dataframe tbody tr:hover { background-color: rgba(102, 126, 234, 0.05); } /* JavaScript for auto-scroll to top on tab change */ """ with gr.Blocks( theme=gr.themes.Soft( primary_hue="blue", secondary_hue="purple", neutral_hue="gray", font=["Inter", "system-ui", "sans-serif"] ), css=css, title="๐Ÿš€ HF Repo Analyzer" ) as app: # --- State Management --- # Using simple, separate state objects for robustness. repo_ids_state = gr.State([]) current_repo_idx_state = gr.State(0) user_requirements_state = gr.State("") # Store user requirements from chatbot loaded_repo_content_state = gr.State("") # Store loaded repository content current_repo_id_state = gr.State("") # Store current repository ID selected_repo_id_state = gr.State("") # Store selected repository ID for modal actions gr.Markdown( """

๐Ÿš€ HF Repo Analyzer

Discover, analyze, and evaluate Hugging Face repositories with AI-powered insights

""" ) # Global Reset Button - visible on all tabs with gr.Row(): with gr.Column(scale=4): pass with gr.Column(scale=1): reset_all_btn = gr.Button("๐Ÿ”„ Reset Everything", variant="stop", size="lg") with gr.Column(scale=1): pass with gr.Tabs() as tabs: # --- Input Tab --- with gr.TabItem("๐Ÿ“ Input & Search", id="input_tab"): with gr.Row(equal_height=True): with gr.Column(scale=1): gr.Markdown("### ๐Ÿ“ Repository IDs") repo_id_input = gr.Textbox( label="Repository IDs", lines=8, placeholder="microsoft/DialoGPT-medium\nopenai/whisper\nhuggingface/transformers", info="Enter repo IDs separated by commas or new lines" ) submit_repo_btn = gr.Button("๐Ÿš€ Submit Repositories", variant="primary", size="lg") with gr.Column(scale=1): gr.Markdown("### ๐Ÿ” Keyword Search") keyword_input = gr.Textbox( label="Search Keywords", lines=8, placeholder="text generation\nimage classification\nsentiment analysis", info="Enter keywords to find relevant repositories" ) search_btn = gr.Button("๐Ÿ”Ž Search Repositories", variant="primary", size="lg") status_box_input = gr.Textbox(label="๐Ÿ“Š Status", interactive=False, lines=2) # --- Analysis Tab --- with gr.TabItem("๐Ÿ”ฌ Analysis", id="analysis_tab"): gr.Markdown("### ๐Ÿงช Repository Analysis Engine") # Display current user requirements with gr.Row(): current_requirements_display = gr.Textbox( label="๐Ÿ“‹ Current User Requirements", interactive=False, lines=3, info="Requirements extracted from AI chat conversation for relevance rating" ) with gr.Row(): analyze_all_btn = gr.Button("๐Ÿš€ Analyze All Repositories", variant="primary", size="lg", scale=1) with gr.Column(scale=2): status_box_analysis = gr.Textbox(label="๐Ÿ“ˆ Analysis Status", interactive=False, lines=2) # Progress bar for batch analysis with gr.Row(): analysis_progress = gr.Progress() # progress_display = gr.Textbox( # label="๐Ÿ“Š Batch Analysis Progress", # interactive=False, # lines=2, # visible=False, # info="Shows progress when analyzing all repositories" # ) with gr.Row(equal_height=True): # with gr.Column(): # content_output = gr.Textbox( # label="๐Ÿ“„ Repository Content", # lines=20, # show_copy_button=True, # info="Raw content extracted from the repository" # ) # with gr.Column(): # summary_output = gr.Textbox( # label="๐ŸŽฏ AI Analysis Summary", # lines=20, # show_copy_button=True, # info="Detailed analysis and insights from AI" # ) pass gr.Markdown("### ๐Ÿ“Š Results Dashboard") # Top 3 Most Relevant Repositories (initially hidden) with gr.Column(visible=False) as top_repos_section: gr.Markdown("### ๐Ÿ† Top 3 Most Relevant Repositories") gr.Markdown("๐ŸŽฏ **These are the highest-rated repositories based on your requirements:**") top_repos_df = gr.Dataframe( headers=["Repository", "Strengths", "Weaknesses", "Speciality", "Relevance"], column_widths=["16.67%", "25%", "25%", "20.83%", "12.5%"], wrap=True, interactive=False ) gr.Markdown("๐Ÿ’ก **Tip:** Full text is displayed directly in the table. Click on repository names to explore or visit them!") # Text expansion modal for showing full content (kept for backwards compatibility) with gr.Row(): with gr.Column(): text_expansion_modal = gr.Column(visible=False) with text_expansion_modal: gr.Markdown("### ๐Ÿ“„ Full Content View") expanded_content_title = gr.Textbox( label="Content Type", interactive=False, info="Full text content for the selected field" ) expanded_content_text = gr.Textbox( label="Full Text", lines=10, interactive=False, show_copy_button=True, info="Complete untruncated content" ) close_text_modal_btn = gr.Button("โŒ Close", size="lg") # Modal popup for repository action selection with gr.Row(): with gr.Column(): repo_action_modal = gr.Column(visible=False) with repo_action_modal: gr.Markdown("### ๐Ÿ”— Repository Actions") selected_repo_display = gr.Textbox( label="Selected Repository", interactive=False, info="Choose what you'd like to do with this repository" ) with gr.Row(): visit_repo_btn = gr.Button("๐ŸŒ Visit Hugging Face Space", variant="primary", size="lg") explore_repo_btn = gr.Button("๐Ÿ” Open in Repo Explorer", variant="secondary", size="lg") cancel_modal_btn = gr.Button("โŒ Cancel", size="lg") gr.Markdown("### ๐Ÿ“‹ All Analysis Results") df_output = gr.Dataframe( headers=["Repository", "Strengths", "Weaknesses", "Speciality", "Relevance"], column_widths=["16.67%", "25%", "25%", "20.83%", "12.5%"], wrap=True, interactive=False ) # --- Chatbot Tab --- with gr.TabItem("๐Ÿค– AI Assistant", id="chatbot_tab"): gr.Markdown("### ๐Ÿ’ฌ Intelligent Repository Discovery") chatbot = gr.Chatbot( label="๐Ÿค– AI Assistant", height=450, type="messages", avatar_images=( "https://cdn-icons-png.flaticon.com/512/149/149071.png", "https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.png" ), show_copy_button=True ) with gr.Row(): msg_input = gr.Textbox( label="๐Ÿ’ญ Your Message", placeholder="Tell me about your ideal repository...", lines=1, scale=4, info="Describe what you're looking for" ) send_btn = gr.Button("๐Ÿ“ค Send", variant="primary", scale=1) end_chat_btn = gr.Button("๐ŸŽฏ Extract Keywords", scale=1) use_keywords_btn = gr.Button("๐Ÿ”Ž Search Now", variant="primary", scale=1) with gr.Row(): with gr.Column(): extracted_keywords_output = gr.Textbox( label="๐Ÿท๏ธ Extracted Keywords", interactive=False, show_copy_button=True, info="AI-generated search terms from our conversation" ) with gr.Column(): status_box_chatbot = gr.Textbox( label="๐Ÿ“Š Chat Status", interactive=False, info="Current conversation status" ) # --- Repo Explorer Tab --- with gr.TabItem("๐Ÿ” Repo Explorer", id="repo_explorer_tab"): repo_components, repo_states = create_repo_explorer_tab() # --- Footer --- gr.Markdown( """

๐Ÿš€ Powered by Gradio & Hugging Face

""" ) # --- Event Handler Functions --- def handle_repo_id_submission(text: str) -> Tuple[List[str], int, pd.DataFrame, str, Any]: """Processes submitted repo IDs, updates state, and prepares for analysis.""" if not text: return [], 0, pd.DataFrame(), "Status: Please enter repository IDs.", gr.update(selected="input_tab") repo_ids = list(dict.fromkeys([repo.strip() for repo in re.split(r'[\n,]+', text) if repo.strip()])) write_repos_to_csv(repo_ids) df = format_dataframe_for_display(read_csv_to_dataframe()) status = f"Status: {len(repo_ids)} repositories submitted. Ready for analysis." return repo_ids, 0, df, status, gr.update(selected="analysis_tab") def handle_keyword_search(keywords: str) -> Tuple[List[str], int, pd.DataFrame, str, Any]: """Processes submitted keywords, finds repos, updates state, and prepares for analysis.""" if not keywords: return [], 0, pd.DataFrame(), "Status: Please enter keywords.", gr.update(selected="input_tab") keyword_list = [k.strip() for k in re.split(r'[\n,]+', keywords) if k.strip()] repo_ids = [] for kw in keyword_list: repo_ids.extend(search_top_spaces(kw, limit=5)) unique_repo_ids = list(dict.fromkeys(repo_ids)) write_repos_to_csv(unique_repo_ids) df = format_dataframe_for_display(read_csv_to_dataframe()) status = f"Status: Found {len(unique_repo_ids)} repositories. Ready for analysis." return unique_repo_ids, 0, df, status, gr.update(selected="analysis_tab") def extract_user_requirements_from_chat(history: List[Dict[str, str]]) -> str: """Extract user requirements from chatbot conversation.""" if not history: return "" user_messages = [] for msg in history: if msg.get('role') == 'user': user_messages.append(msg.get('content', '')) if not user_messages: return "" # Combine all user messages as requirements requirements = "\n".join([f"- {msg}" for msg in user_messages if msg.strip()]) return requirements def handle_user_message(user_message: str, history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str]], str]: """Appends the user's message to the history, preparing for the bot's response.""" # Initialize chatbot with welcome message if empty if not history: history = [{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}] if user_message: history.append({"role": "user", "content": user_message}) return history, "" def handle_bot_response(history: List[Dict[str, str]]) -> List[Dict[str, str]]: """Generates and appends the bot's response using the compatible history format.""" if not history or history[-1]["role"] != "user": return history user_message = history[-1]["content"] # Convert all messages *before* the last user message into tuples for the API tuple_history_for_api = convert_messages_to_tuples(history[:-1]) response = chat_with_user(user_message, tuple_history_for_api) history.append({"role": "assistant", "content": response}) return history def handle_end_chat(history: List[Dict[str, str]]) -> Tuple[str, str, str]: """Ends the chat, extracts and sanitizes keywords from the conversation, and extracts user requirements.""" if not history: return "", "Status: Chat is empty, nothing to analyze.", "" # Convert the full, valid history for the extraction logic tuple_history = convert_messages_to_tuples(history) if not tuple_history: return "", "Status: No completed conversations to analyze.", "" # Get raw keywords string from the LLM raw_keywords_str = extract_keywords_from_conversation(tuple_history) # Sanitize the LLM output to extract only keyword-like parts. # A keyword can contain letters, numbers, underscores, spaces, and hyphens. cleaned_keywords = re.findall(r'[\w\s-]+', raw_keywords_str) # Trim whitespace from each found keyword and filter out any empty strings cleaned_keywords = [kw.strip() for kw in cleaned_keywords if kw.strip()] if not cleaned_keywords: return "", f"Status: Could not extract valid keywords. Raw LLM output: '{raw_keywords_str}'", "" # Join them into a clean, comma-separated string for the search tool final_keywords_str = ", ".join(cleaned_keywords) # Extract user requirements for analysis user_requirements = extract_user_requirements_from_chat(history) status = "Status: Keywords extracted. User requirements saved for analysis." return final_keywords_str, status, user_requirements def handle_dataframe_select(evt: gr.SelectData, df_data) -> Tuple[str, Any, Any, str, str, Any, str]: """Handle dataframe row selection - only repo ID (column 0) shows modal since full text is now displayed directly.""" print(f"DEBUG: Selection event triggered!") print(f"DEBUG: evt = {evt}") print(f"DEBUG: df_data type = {type(df_data)}") if evt is None: return "", gr.update(visible=False), gr.update(), "", "", gr.update(visible=False), "" try: # Get the selected row and column from the event row_idx = evt.index[0] col_idx = evt.index[1] print(f"DEBUG: Selected row {row_idx}, column {col_idx}") # Handle pandas DataFrame if isinstance(df_data, pd.DataFrame) and not df_data.empty and row_idx < len(df_data): if col_idx == 0: # Repository name column - show action modal repo_id = df_data.iloc[row_idx, 0] print(f"DEBUG: Extracted repo_id = '{repo_id}'") if repo_id and str(repo_id).strip() and str(repo_id).strip() != 'nan': clean_repo_id = str(repo_id).strip() logger.info(f"Showing modal for repository: {clean_repo_id}") return clean_repo_id, gr.update(visible=True), gr.update(), "", "", gr.update(visible=False), clean_repo_id # For content columns (1,2,3) and relevance (4), do nothing since full text is shown directly else: print(f"DEBUG: Clicked on column {col_idx}, full text already shown in table") return "", gr.update(visible=False), gr.update(), "", "", gr.update(visible=False), "" else: print(f"DEBUG: df_data is not a DataFrame or row_idx {row_idx} out of range") except Exception as e: print(f"DEBUG: Exception occurred: {e}") logger.error(f"Error handling dataframe selection: {e}") return "", gr.update(visible=False), gr.update(), "", "", gr.update(visible=False), "" def handle_analyze_all_repos(repo_ids: List[str], user_requirements: str, progress=gr.Progress()) -> Tuple[pd.DataFrame, str, pd.DataFrame, Any]: """Analyzes all repositories in the CSV file with progress tracking.""" if not repo_ids: return pd.DataFrame(), "Status: No repositories to analyze. Please submit repo IDs first.", pd.DataFrame(), gr.update(visible=False) total_repos = len(repo_ids) try: # Start the progress tracking progress(0, desc="Initializing batch analysis...") successful_analyses = 0 failed_analyses = 0 csv_update_failures = 0 for i, repo_id in enumerate(repo_ids): # Update progress progress_percent = (i / total_repos) progress(progress_percent, desc=f"Analyzing {repo_id} ({i+1}/{total_repos})") try: logger.info(f"Batch analysis: Processing {repo_id} ({i+1}/{total_repos})") # Analyze the repository content, summary, df = analyze_and_update_single_repo(repo_id, user_requirements) # Verify the CSV was actually updated by checking if the repo has analysis data updated_df = read_csv_to_dataframe() repo_updated = False for idx, row in updated_df.iterrows(): if row["repo id"] == repo_id: # Check if any analysis field is populated if (row.get("strength", "").strip() or row.get("weaknesses", "").strip() or row.get("speciality", "").strip() or row.get("relevance rating", "").strip()): repo_updated = True break if repo_updated: successful_analyses += 1 else: # CSV update failed - try once more logger.warning(f"CSV update failed for {repo_id}, attempting retry...") time.sleep(0.5) # Wait a bit longer # Force re-read and re-update df_retry = read_csv_to_dataframe() retry_success = False # Re-parse the analysis if available if summary and "JSON extraction: SUCCESS" in summary: # Extract the analysis from summary - this is a fallback logger.info(f"Attempting to re-update CSV for {repo_id}") content_retry, summary_retry, df_retry = analyze_and_update_single_repo(repo_id, user_requirements) # Check again final_df = read_csv_to_dataframe() for idx, row in final_df.iterrows(): if row["repo id"] == repo_id: if (row.get("strength", "").strip() or row.get("weaknesses", "").strip() or row.get("speciality", "").strip() or row.get("relevance rating", "").strip()): retry_success = True break if retry_success: successful_analyses += 1 else: csv_update_failures += 1 # Longer delay to prevent file conflicts time.sleep(0.3) except Exception as e: logger.error(f"Error analyzing {repo_id}: {e}") failed_analyses += 1 # Still wait to prevent rapid failures time.sleep(0.2) # Complete the progress progress(1.0, desc="Batch analysis completed!") # Get final updated dataframe updated_df = read_csv_to_dataframe() # Filter out rows with no analysis data for consistent display with top 3 analyzed_df = updated_df.copy() analyzed_df = analyzed_df[ (analyzed_df['strength'].str.strip() != '') | (analyzed_df['weaknesses'].str.strip() != '') | (analyzed_df['speciality'].str.strip() != '') | (analyzed_df['relevance rating'].str.strip() != '') ] # Get top 3 most relevant repositories using full data top_repos = get_top_relevant_repos(updated_df, user_requirements, top_n=3) # Final status with detailed breakdown final_status = f"๐ŸŽ‰ Batch Analysis Complete!\nโœ… Successful: {successful_analyses}/{total_repos}\nโŒ Failed: {failed_analyses}/{total_repos}" if csv_update_failures > 0: final_status += f"\nโš ๏ธ CSV Update Issues: {csv_update_failures}/{total_repos}" # Add top repos info if available if not top_repos.empty: final_status += f"\n\n๐Ÿ† Top {len(top_repos)} most relevant repositories selected!" # Show top repos section if we have results show_top_section = gr.update(visible=not top_repos.empty) logger.info(f"Batch analysis completed: {successful_analyses} successful, {failed_analyses} failed, {csv_update_failures} CSV update issues") return format_dataframe_for_display(analyzed_df), final_status, format_dataframe_for_display(top_repos), show_top_section except Exception as e: logger.error(f"Error in batch analysis: {e}") error_status = f"โŒ Batch analysis failed: {e}" return format_dataframe_for_display(read_csv_to_dataframe()), error_status, pd.DataFrame(), gr.update(visible=False) def handle_visit_repo(repo_id: str) -> Tuple[Any, str]: """Handle visiting the Hugging Face Space for the repository.""" if repo_id and repo_id.strip(): hf_url = f"https://huggingface.co/spaces/{repo_id.strip()}" logger.info(f"User chose to visit: {hf_url}") return gr.update(visible=False), hf_url return gr.update(visible=False), "" def handle_explore_repo(selected_repo_id: str) -> Tuple[Any, Any, Any]: """Handle navigating to the repo explorer and populate the repo ID.""" logger.info(f"DEBUG: handle_explore_repo called with selected_repo_id: '{selected_repo_id}'") logger.info(f"DEBUG: selected_repo_id type: {type(selected_repo_id)}") logger.info(f"DEBUG: selected_repo_id length: {len(selected_repo_id) if selected_repo_id else 'None'}") if selected_repo_id and selected_repo_id.strip() and selected_repo_id.strip() != 'nan': clean_repo_id = selected_repo_id.strip() return ( gr.update(visible=False), # close modal gr.update(selected="repo_explorer_tab"), # switch tab gr.update(value=clean_repo_id) # populate repo explorer input ) else: return ( gr.update(visible=False), # close modal gr.update(selected="repo_explorer_tab"), # switch tab gr.update() # don't change repo explorer input ) def handle_cancel_modal() -> Any: """Handle closing the modal.""" return gr.update(visible=False) def handle_close_text_modal() -> Any: """Handle closing the text expansion modal.""" return gr.update(visible=False) def handle_reset_everything() -> Tuple[List[str], int, str, pd.DataFrame, pd.DataFrame, Any, Any, Any, List[Dict[str, str]], str, str, str]: """Reset everything to initial state - clear all data, CSV, and UI components.""" try: # Clear the CSV file if os.path.exists(CSV_FILE): os.remove(CSV_FILE) logger.info("CSV file deleted for reset") # Create empty dataframe empty_df = pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) # Reset state variables repo_ids_reset = [] current_idx_reset = 0 user_requirements_reset = "" # Reset status status_reset = "Status: Everything has been reset. Ready to start fresh!" # Reset UI components current_requirements_reset = "No requirements extracted yet." extracted_keywords_reset = "" # Reset chatbot to initial message chatbot_reset = [{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}] logger.info("Complete system reset performed") return ( repo_ids_reset, # repo_ids_state current_idx_reset, # current_repo_idx_state user_requirements_reset, # user_requirements_state empty_df, # df_output empty_df, # top_repos_df gr.update(visible=False), # top_repos_section gr.update(visible=False), # repo_action_modal gr.update(visible=False), # text_expansion_modal chatbot_reset, # chatbot status_reset, # status_box_analysis current_requirements_reset, # current_requirements_display extracted_keywords_reset # extracted_keywords_output ) except Exception as e: logger.error(f"Error during reset: {e}") error_status = f"Reset failed: {e}" return ( [], # repo_ids_state 0, # current_repo_idx_state "", # user_requirements_state pd.DataFrame(), # df_output pd.DataFrame(), # top_repos_df gr.update(visible=False), # top_repos_section gr.update(visible=False), # repo_action_modal gr.update(visible=False), # text_expansion_modal [{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}], # chatbot error_status, # status_box_analysis "No requirements extracted yet.", # current_requirements_display "" # extracted_keywords_output ) # --- Component Event Wiring --- # Initialize chatbot with welcome message on app load app.load( fn=lambda: [{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}], outputs=[chatbot] ) # Input Tab submit_repo_btn.click( fn=handle_repo_id_submission, inputs=[repo_id_input], outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs] ) search_btn.click( fn=handle_keyword_search, inputs=[keyword_input], outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs] ) # Analysis Tab analyze_all_btn.click( fn=lambda: None, # No need to show progress display since it's commented out outputs=[] ).then( fn=handle_analyze_all_repos, inputs=[repo_ids_state, user_requirements_state], outputs=[df_output, status_box_analysis, top_repos_df, top_repos_section] ) # Chatbot Tab msg_input.submit( fn=handle_user_message, inputs=[msg_input, chatbot], outputs=[chatbot, msg_input] ).then( fn=handle_bot_response, inputs=[chatbot], outputs=[chatbot] ) send_btn.click( fn=handle_user_message, inputs=[msg_input, chatbot], outputs=[chatbot, msg_input] ).then( fn=handle_bot_response, inputs=[chatbot], outputs=[chatbot] ) end_chat_btn.click( fn=handle_end_chat, inputs=[chatbot], outputs=[extracted_keywords_output, status_box_chatbot, user_requirements_state] ).then( fn=lambda req: req if req.strip() else "No specific requirements extracted from conversation.", inputs=[user_requirements_state], outputs=[current_requirements_display] ) use_keywords_btn.click( fn=handle_keyword_search, inputs=[extracted_keywords_output], outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs] ) # Repo Explorer Tab setup_repo_explorer_events(repo_components, repo_states) # Modal button events visit_repo_btn.click( fn=handle_visit_repo, inputs=[selected_repo_display], outputs=[repo_action_modal, selected_repo_display], js="(repo_id) => { if(repo_id && repo_id.trim()) { window.open('https://huggingface.co/spaces/' + repo_id.trim(), '_blank'); } }" ) explore_repo_btn.click( fn=handle_explore_repo, inputs=[selected_repo_id_state], outputs=[ repo_action_modal, tabs, repo_components["repo_explorer_input"] ], js="""(repo_id) => { console.log('DEBUG: Navigate to repo explorer for:', repo_id); setTimeout(() => { window.scrollTo({top: 0, behavior: 'smooth'}); }, 200); }""" ) cancel_modal_btn.click( fn=handle_cancel_modal, outputs=[repo_action_modal] ) # Text expansion modal events close_text_modal_btn.click( fn=handle_close_text_modal, outputs=[text_expansion_modal] ) # Add dataframe selection event df_output.select( fn=handle_dataframe_select, inputs=[df_output], outputs=[selected_repo_display, repo_action_modal, tabs, expanded_content_title, expanded_content_text, text_expansion_modal, selected_repo_id_state] ) # Add selection event for top repositories dataframe too top_repos_df.select( fn=handle_dataframe_select, inputs=[top_repos_df], outputs=[selected_repo_display, repo_action_modal, tabs, expanded_content_title, expanded_content_text, text_expansion_modal, selected_repo_id_state] ) # Reset button event reset_all_btn.click( fn=handle_reset_everything, outputs=[repo_ids_state, current_repo_idx_state, user_requirements_state, df_output, top_repos_df, top_repos_section, repo_action_modal, text_expansion_modal, chatbot, status_box_analysis, current_requirements_display, extracted_keywords_output] ) return app if __name__ == "__main__": app = create_ui() app.launch(debug=True)