HF_RepoSense / app.py
naman1102's picture
abstraction
123c678
raw
history blame
53.7 kB
import gradio as gr
import regex as re
import csv
import pandas as pd
from typing import List, Dict, Tuple, Any
import logging
import os
import time
# Import core logic from other modules, as in app_old.py
from analyzer import (
combine_repo_files_for_llm,
parse_llm_json_response,
analyze_combined_file,
handle_load_repository
)
from hf_utils import download_filtered_space_files, search_top_spaces
from chatbot_page import chat_with_user, extract_keywords_from_conversation
from repo_explorer import create_repo_explorer_tab, setup_repo_explorer_events
# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
CSV_FILE = "repo_ids.csv"
CHATBOT_SYSTEM_PROMPT = (
"You are a helpful assistant whose ONLY job is to gather information about the user's ideal repository requirements. "
"DO NOT suggest any specific repositories or give repository recommendations. "
"Your role is to ask clarifying questions to understand exactly what the user is looking for. "
"Ask about their use case, preferred programming language, specific features needed, project type, etc. "
"When you feel you have gathered enough detailed information about their requirements, "
"tell the user: 'I think I have enough information about your requirements. I'll now search for relevant repositories automatically.' "
"Focus on understanding their needs, not providing solutions."
)
CHATBOT_INITIAL_MESSAGE = "Hello! I'm here to help you find the perfect Hugging Face repository. Tell me about your project - what are you trying to build? I'll ask some questions to understand your needs and then automatically find relevant repositories for you."
# --- Helper Functions (Logic) ---
def is_repo_id_format(text: str) -> bool:
"""Check if text looks like repository IDs (contains forward slashes)."""
lines = [line.strip() for line in re.split(r'[\n,]+', text) if line.strip()]
if not lines:
return False
# If most lines contain forward slashes, treat as repo IDs
slash_count = sum(1 for line in lines if '/' in line)
return slash_count >= len(lines) * 0.5 # At least 50% have slashes
def should_auto_extract_keywords(history: List[Dict[str, str]]) -> bool:
"""Determine if we should automatically extract keywords from conversation."""
if not history or len(history) < 4: # Need at least 2 exchanges
return False
# Check if the last assistant message suggests we have enough info
last_assistant_msg = ""
for msg in reversed(history):
if msg.get('role') == 'assistant':
last_assistant_msg = msg.get('content', '').lower()
break
# Look for key phrases that indicate readiness
ready_phrases = [
"enough information",
"search for repositories",
"find repositories",
"look for repositories",
"automatically",
"ready to search"
]
return any(phrase in last_assistant_msg for phrase in ready_phrases)
def get_top_relevant_repos(df: pd.DataFrame, user_requirements: str, top_n: int = 3) -> pd.DataFrame:
"""
Uses LLM to select the top N most relevant repositories based on user requirements and analysis data.
"""
try:
if df.empty:
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
# Filter out rows with no analysis data
analyzed_df = df.copy()
analyzed_df = analyzed_df[
(analyzed_df['strength'].str.strip() != '') |
(analyzed_df['weaknesses'].str.strip() != '') |
(analyzed_df['speciality'].str.strip() != '') |
(analyzed_df['relevance rating'].str.strip() != '')
]
if analyzed_df.empty:
logger.warning("No analyzed repositories found for LLM selection")
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
# Create a prompt for the LLM
csv_data = ""
for idx, row in analyzed_df.iterrows():
csv_data += f"Repository: {row['repo id']}\n"
csv_data += f"Strengths: {row['strength']}\n"
csv_data += f"Weaknesses: {row['weaknesses']}\n"
csv_data += f"Speciality: {row['speciality']}\n"
csv_data += f"Relevance: {row['relevance rating']}\n\n"
user_context = user_requirements if user_requirements.strip() else "General repository recommendation"
prompt = f"""Based on the user's requirements and the analysis of repositories below, select the top {top_n} most relevant repositories.
User Requirements:
{user_context}
Repository Analysis Data:
{csv_data}
Please analyze all repositories and select the {top_n} most relevant ones based on:
1. How well they match the user's specific requirements
2. Their strengths and capabilities
3. Their relevance rating
4. Their speciality alignment with user needs
Return ONLY a JSON list of the repository IDs in order of relevance (most relevant first). Example format:
["repo1", "repo2", "repo3"]
Selected repositories:"""
try:
from openai import OpenAI
client = OpenAI(api_key=os.getenv("modal_api"))
client.base_url = os.getenv("base_url")
response = client.chat.completions.create(
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ",
messages=[
{"role": "system", "content": "You are an expert at analyzing and ranking repositories based on user requirements. Always return valid JSON."},
{"role": "user", "content": prompt}
],
max_tokens=200,
temperature=0.3
)
llm_response = response.choices[0].message.content.strip()
logger.info(f"LLM response for top repos: {llm_response}")
# Extract JSON from response
import json
import re
# Try to find JSON array in the response
json_match = re.search(r'\[.*\]', llm_response)
if json_match:
selected_repos = json.loads(json_match.group())
logger.info(f"LLM selected repositories: {selected_repos}")
# Filter dataframe to only include selected repositories in order
top_repos_list = []
for repo_id in selected_repos[:top_n]:
matching_rows = analyzed_df[analyzed_df['repo id'] == repo_id]
if not matching_rows.empty:
top_repos_list.append(matching_rows.iloc[0])
if top_repos_list:
top_repos = pd.DataFrame(top_repos_list)
logger.info(f"Successfully selected {len(top_repos)} repositories using LLM")
return top_repos
# Fallback: if LLM response parsing fails, use first N analyzed repos
logger.warning("Failed to parse LLM response, using fallback selection")
return analyzed_df.head(top_n)
except Exception as llm_error:
logger.error(f"LLM selection failed: {llm_error}")
# Fallback: return first N repositories with analysis data
return analyzed_df.head(top_n)
except Exception as e:
logger.error(f"Error in LLM-based repo selection: {e}")
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
def write_repos_to_csv(repo_ids: List[str]) -> None:
"""Writes a list of repo IDs to the CSV file, overwriting the previous content."""
try:
with open(CSV_FILE, mode="w", newline='', encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
for repo_id in repo_ids:
writer.writerow([repo_id, "", "", "", ""])
logger.info(f"Wrote {len(repo_ids)} repo IDs to {CSV_FILE}")
except Exception as e:
logger.error(f"Error writing to CSV: {e}")
def format_text_for_dataframe(text: str, max_length: int = 200) -> str:
"""Format text for better display in dataframe by truncating and cleaning."""
if not text or pd.isna(text):
return ""
# Clean the text
text = str(text).strip()
# Remove excessive whitespace and newlines
text = re.sub(r'\s+', ' ', text)
# Truncate if too long
if len(text) > max_length:
text = text[:max_length-3] + "..."
return text
def read_csv_to_dataframe() -> pd.DataFrame:
"""Reads the CSV file into a pandas DataFrame with full text preserved."""
try:
df = pd.read_csv(CSV_FILE, dtype=str).fillna('')
# Keep the full text intact - don't truncate here
# The truncation will be handled in the UI display layer
return df
except FileNotFoundError:
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
except Exception as e:
logger.error(f"Error reading CSV: {e}")
return pd.DataFrame()
def format_dataframe_for_display(df: pd.DataFrame) -> pd.DataFrame:
"""Returns dataframe with full text (no truncation) for display."""
if df.empty:
return df
# Return the dataframe as-is without any text truncation
# This will show the full text content in the CSV display
return df.copy()
def analyze_and_update_single_repo(repo_id: str, user_requirements: str = "") -> Tuple[str, str, pd.DataFrame]:
"""
Downloads, analyzes a single repo, updates the CSV, and returns results.
Now includes user requirements for better relevance rating.
This function combines the logic of downloading, analyzing, and updating the CSV for one repo.
"""
try:
logger.info(f"Starting analysis for repo: {repo_id}")
download_filtered_space_files(repo_id, local_dir="repo_files", file_extensions=['.py', '.md', '.txt'])
txt_path = combine_repo_files_for_llm()
with open(txt_path, "r", encoding="utf-8") as f:
combined_content = f.read()
llm_output = analyze_combined_file(txt_path, user_requirements)
last_start = llm_output.rfind('{')
last_end = llm_output.rfind('}')
final_json_str = llm_output[last_start:last_end+1] if last_start != -1 and last_end != -1 else "{}"
llm_json = parse_llm_json_response(final_json_str)
summary = ""
if isinstance(llm_json, dict) and "error" not in llm_json:
strengths = llm_json.get("strength", "N/A")
weaknesses = llm_json.get("weaknesses", "N/A")
relevance = llm_json.get("relevance rating", "N/A")
summary = f"JSON extraction: SUCCESS\n\nStrengths:\n{strengths}\n\nWeaknesses:\n{weaknesses}\n\nRelevance: {relevance}"
else:
summary = f"JSON extraction: FAILED\nRaw response might not be valid JSON."
# Update CSV
df = read_csv_to_dataframe()
repo_found_in_df = False
for idx, row in df.iterrows():
if row["repo id"] == repo_id:
if isinstance(llm_json, dict):
df.at[idx, "strength"] = llm_json.get("strength", "")
df.at[idx, "weaknesses"] = llm_json.get("weaknesses", "")
df.at[idx, "speciality"] = llm_json.get("speciality", "")
df.at[idx, "relevance rating"] = llm_json.get("relevance rating", "")
repo_found_in_df = True
break
if not repo_found_in_df:
logger.warning(f"Repo ID {repo_id} not found in CSV for updating.")
# Write CSV with better error handling and flushing
try:
df.to_csv(CSV_FILE, index=False)
# Force file system flush
os.sync() if hasattr(os, 'sync') else None
logger.info(f"Successfully updated CSV for {repo_id}")
except Exception as csv_error:
logger.error(f"Failed to write CSV for {repo_id}: {csv_error}")
# Try once more with a small delay
time.sleep(0.2)
try:
df.to_csv(CSV_FILE, index=False)
logger.info(f"Successfully updated CSV for {repo_id} on retry")
except Exception as retry_error:
logger.error(f"Failed to write CSV for {repo_id} on retry: {retry_error}")
logger.info(f"Successfully analyzed and updated CSV for {repo_id}")
return combined_content, summary, df
except Exception as e:
logger.error(f"An error occurred during analysis of {repo_id}: {e}")
error_summary = f"Error analyzing repo: {e}"
return "", error_summary, format_dataframe_for_display(read_csv_to_dataframe())
# --- NEW: Helper for Chat History Conversion ---
def convert_messages_to_tuples(history: List[Dict[str, str]]) -> List[Tuple[str, str]]:
"""
Converts Gradio's 'messages' format to the old 'tuple' format for compatibility.
This robust version correctly handles histories that start with an assistant message.
"""
tuple_history = []
# Iterate through the history to find user messages
for i, msg in enumerate(history):
if msg['role'] == 'user':
# Once a user message is found, check if the next message is from the assistant
if i + 1 < len(history) and history[i+1]['role'] == 'assistant':
user_content = msg['content']
assistant_content = history[i+1]['content']
tuple_history.append((user_content, assistant_content))
return tuple_history
# --- Gradio UI ---
def create_ui() -> gr.Blocks:
"""Creates and configures the entire Gradio interface."""
css = """
/* Modern sleek design */
.gradio-container {
font-family: 'Inter', 'system-ui', sans-serif;
background: linear-gradient(135deg, #0a0a0a 0%, #1a1a1a 100%);
min-height: 100vh;
}
.gr-form {
background: rgba(255, 255, 255, 0.95);
backdrop-filter: blur(10px);
border-radius: 16px;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1);
padding: 24px;
margin: 16px;
border: 1px solid rgba(255, 255, 255, 0.2);
}
.gr-button {
background: linear-gradient(45deg, #667eea, #764ba2);
border: none;
border-radius: 12px;
color: white;
font-weight: 600;
padding: 12px 24px;
transition: all 0.3s ease;
box-shadow: 0 4px 15px rgba(102, 126, 234, 0.4);
}
.gr-button:hover {
transform: translateY(-2px);
box-shadow: 0 6px 20px rgba(102, 126, 234, 0.6);
}
.gr-textbox {
border: 2px solid rgba(102, 126, 234, 0.2);
border-radius: 12px;
background: rgba(255, 255, 255, 0.9);
transition: all 0.3s ease;
}
.gr-textbox:focus {
border-color: #667eea;
box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1);
}
.gr-panel {
background: rgba(255, 255, 255, 0.95);
border-radius: 16px;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1);
border: 1px solid rgba(255, 255, 255, 0.2);
}
.gr-tab-nav {
background: rgba(255, 255, 255, 0.95);
border-radius: 12px 12px 0 0;
backdrop-filter: blur(10px);
}
.gr-tab-nav button {
background: transparent;
border: none;
padding: 16px 24px;
font-weight: 600;
color: #666;
transition: all 0.3s ease;
}
.gr-tab-nav button.selected {
background: linear-gradient(45deg, #667eea, #764ba2);
color: white;
border-radius: 8px;
}
.chatbot {
border-radius: 16px;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1);
}
/* Hide Gradio footer */
footer {
display: none !important;
}
/* Custom scrollbar */
::-webkit-scrollbar {
width: 8px;
}
::-webkit-scrollbar-track {
background: rgba(255, 255, 255, 0.1);
border-radius: 4px;
}
::-webkit-scrollbar-thumb {
background: linear-gradient(45deg, #667eea, #764ba2);
border-radius: 4px;
}
/* Improved dataframe styling for full text display */
.gr-dataframe {
border-radius: 12px;
overflow: hidden;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1);
background: rgba(255, 255, 255, 0.98);
}
.gr-dataframe table {
width: 100%;
table-layout: fixed;
border-collapse: collapse;
}
/* Column width specifications for both dataframes */
.gr-dataframe th,
.gr-dataframe td {
padding: 12px 16px;
text-align: left;
border-bottom: 1px solid rgba(0, 0, 0, 0.1);
font-size: 0.95rem;
line-height: 1.4;
}
/* Specific column widths - applying to both dataframes */
.gr-dataframe th:nth-child(1),
.gr-dataframe td:nth-child(1) { width: 16.67% !important; min-width: 16.67% !important; max-width: 16.67% !important; }
.gr-dataframe th:nth-child(2),
.gr-dataframe td:nth-child(2) { width: 25% !important; min-width: 25% !important; max-width: 25% !important; }
.gr-dataframe th:nth-child(3),
.gr-dataframe td:nth-child(3) { width: 25% !important; min-width: 25% !important; max-width: 25% !important; }
.gr-dataframe th:nth-child(4),
.gr-dataframe td:nth-child(4) { width: 20.83% !important; min-width: 20.83% !important; max-width: 20.83% !important; }
.gr-dataframe th:nth-child(5),
.gr-dataframe td:nth-child(5) { width: 12.5% !important; min-width: 12.5% !important; max-width: 12.5% !important; }
/* Additional specific targeting for both dataframes */
div[data-testid="dataframe"] table th:nth-child(1),
div[data-testid="dataframe"] table td:nth-child(1) { width: 16.67% !important; }
div[data-testid="dataframe"] table th:nth-child(2),
div[data-testid="dataframe"] table td:nth-child(2) { width: 25% !important; }
div[data-testid="dataframe"] table th:nth-child(3),
div[data-testid="dataframe"] table td:nth-child(3) { width: 25% !important; }
div[data-testid="dataframe"] table th:nth-child(4),
div[data-testid="dataframe"] table td:nth-child(4) { width: 20.83% !important; }
div[data-testid="dataframe"] table th:nth-child(5),
div[data-testid="dataframe"] table td:nth-child(5) { width: 12.5% !important; }
/* Make repository names clickable */
.gr-dataframe td:nth-child(1) {
cursor: pointer;
color: #667eea;
font-weight: 600;
transition: all 0.3s ease;
}
.gr-dataframe td:nth-child(1):hover {
background-color: rgba(102, 126, 234, 0.1);
color: #764ba2;
transform: scale(1.02);
}
/* Content columns - readable styling with scroll for long text */
.gr-dataframe td:nth-child(2),
.gr-dataframe td:nth-child(3),
.gr-dataframe td:nth-child(4),
.gr-dataframe td:nth-child(5) {
cursor: default;
font-size: 0.9rem;
}
.gr-dataframe tbody tr:hover {
background-color: rgba(102, 126, 234, 0.05);
}
/* JavaScript for auto-scroll to top on tab change */
<script>
document.addEventListener('DOMContentLoaded', function() {
// Function to scroll to top
function scrollToTop() {
window.scrollTo({
top: 0,
behavior: 'smooth'
});
}
// Observer for tab changes
const observer = new MutationObserver(function(mutations) {
mutations.forEach(function(mutation) {
if (mutation.type === 'attributes' && mutation.attributeName === 'class') {
const target = mutation.target;
if (target.classList && target.classList.contains('selected')) {
// Tab was selected, scroll to top
setTimeout(scrollToTop, 100);
}
}
});
});
// Observe tab navigation buttons
const tabButtons = document.querySelectorAll('.gr-tab-nav button');
tabButtons.forEach(button => {
observer.observe(button, { attributes: true });
// Also add click listener for immediate scroll
button.addEventListener('click', function() {
setTimeout(scrollToTop, 150);
});
});
// Enhanced listener for programmatic tab changes (button-triggered navigation)
let lastSelectedTab = null;
const checkInterval = setInterval(function() {
const currentSelectedTab = document.querySelector('.gr-tab-nav button.selected');
if (currentSelectedTab && currentSelectedTab !== lastSelectedTab) {
lastSelectedTab = currentSelectedTab;
setTimeout(scrollToTop, 100);
}
}, 100);
// Additional scroll trigger for repo explorer navigation
window.addEventListener('repoExplorerNavigation', function() {
setTimeout(scrollToTop, 200);
});
// Watch for specific tab transitions to repo explorer
const repoExplorerObserver = new MutationObserver(function(mutations) {
mutations.forEach(function(mutation) {
if (mutation.type === 'attributes' && mutation.attributeName === 'class') {
const target = mutation.target;
if (target.textContent && target.textContent.includes('🔍 Repo Explorer') && target.classList.contains('selected')) {
setTimeout(scrollToTop, 150);
}
}
});
});
// Start observing for repo explorer specific changes
setTimeout(function() {
const repoExplorerTab = Array.from(document.querySelectorAll('.gr-tab-nav button')).find(btn =>
btn.textContent && btn.textContent.includes('🔍 Repo Explorer')
);
if (repoExplorerTab) {
repoExplorerObserver.observe(repoExplorerTab, { attributes: true });
}
}, 1000);
});
</script>
"""
with gr.Blocks(
theme=gr.themes.Soft(
primary_hue="blue",
secondary_hue="purple",
neutral_hue="gray",
font=["Inter", "system-ui", "sans-serif"]
),
css=css,
title="🚀 HF Repo Analyzer"
) as app:
# --- State Management ---
# Using simple, separate state objects for robustness.
repo_ids_state = gr.State([])
current_repo_idx_state = gr.State(0)
user_requirements_state = gr.State("") # Store user requirements from chatbot
loaded_repo_content_state = gr.State("") # Store loaded repository content
current_repo_id_state = gr.State("") # Store current repository ID
selected_repo_id_state = gr.State("") # Store selected repository ID for modal actions
gr.Markdown(
"""
<div style="text-align: center; padding: 40px 20px; background: rgba(255, 255, 255, 0.1); border-radius: 20px; margin: 20px auto; max-width: 900px; backdrop-filter: blur(10px);">
<h1 style="font-size: 3.5rem; font-weight: 800; margin: 0; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); -webkit-background-clip: text; -webkit-text-fill-color: transparent; background-clip: text;">
🚀 HF Repo Analyzer
</h1>
<p style="font-size: 1.3rem; color: rgba(255, 255, 255, 0.9); margin: 16px 0 0 0; font-weight: 400; line-height: 1.6;">
Discover, analyze, and evaluate Hugging Face repositories with AI-powered insights
</p>
<div style="height: 4px; width: 80px; background: linear-gradient(45deg, #667eea, #764ba2); margin: 24px auto; border-radius: 2px;"></div>
</div>
"""
)
# Global Reset Button - visible on all tabs
with gr.Row():
with gr.Column(scale=4):
pass
with gr.Column(scale=1):
reset_all_btn = gr.Button("🔄 Reset Everything", variant="stop", size="lg")
with gr.Column(scale=1):
pass
with gr.Tabs() as tabs:
# --- Input Tab ---
with gr.TabItem("📝 Smart Search", id="input_tab"):
gr.Markdown("### 🔍 Intelligent Repository Discovery")
gr.Markdown("💡 **Enter repository IDs (owner/repo) or keywords - I'll automatically detect which type and process accordingly!**")
with gr.Row():
smart_input = gr.Textbox(
label="Repository IDs or Keywords",
lines=6,
placeholder="Examples:\n• Repository IDs: microsoft/DialoGPT-medium, openai/whisper\n• Keywords: text generation, image classification, sentiment analysis",
info="Smart detection: Use / for repo IDs, or enter keywords for search"
)
with gr.Row():
auto_analyze_checkbox = gr.Checkbox(
label="🚀 Auto-analyze repositories",
value=True,
info="Automatically start analysis when repositories are found"
)
status_box_input = gr.Textbox(label="📊 Status", interactive=False, lines=2)
# --- Analysis Tab ---
with gr.TabItem("🔬 Analysis & Results", id="analysis_tab"):
gr.Markdown("### 🧪 Repository Analysis Results")
# Display current user requirements
with gr.Row():
current_requirements_display = gr.Textbox(
label="📋 Active Requirements Context",
interactive=False,
lines=2,
info="Requirements from AI chat for better relevance scoring"
)
# Manual analysis trigger (hidden by default, shown only when auto-analyze is off)
with gr.Row(visible=False) as manual_analysis_row:
analyze_all_btn = gr.Button("🚀 Analyze All Repositories", variant="primary", size="lg")
status_box_analysis = gr.Textbox(label="📈 Analysis Status", interactive=False, lines=2)
# Progress bar for batch analysis
analysis_progress = gr.Progress()
gr.Markdown("### 📊 Results Dashboard")
# Top 3 Most Relevant Repositories (initially hidden)
with gr.Column(visible=False) as top_repos_section:
gr.Markdown("### 🏆 Top 3 Most Relevant Repositories")
gr.Markdown("🎯 **Click repository names to visit them directly on Hugging Face:**")
top_repos_df = gr.Dataframe(
headers=["Repository", "Strengths", "Weaknesses", "Speciality", "Relevance"],
column_widths=["16.67%", "25%", "25%", "20.83%", "12.5%"],
wrap=True,
interactive=False
)
gr.Markdown("### 📋 All Analysis Results")
gr.Markdown("💡 **Click repository names to visit them on Hugging Face**")
df_output = gr.Dataframe(
headers=["Repository", "Strengths", "Weaknesses", "Speciality", "Relevance"],
column_widths=["16.67%", "25%", "25%", "20.83%", "12.5%"],
wrap=True,
interactive=False
)
# --- Chatbot Tab ---
with gr.TabItem("🤖 AI Assistant", id="chatbot_tab"):
gr.Markdown("### 💬 Intelligent Repository Discovery Assistant")
gr.Markdown("🎯 **Tell me what you're building, and I'll automatically find the best repositories for you!**")
chatbot = gr.Chatbot(
label="🤖 AI Assistant",
height=500,
type="messages",
avatar_images=(
"https://cdn-icons-png.flaticon.com/512/149/149071.png",
"https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.png"
),
show_copy_button=True
)
with gr.Row():
msg_input = gr.Textbox(
label="💭 Your Message",
placeholder="Tell me about your project...",
lines=1,
scale=5,
info="Describe what you're building and I'll find the perfect repositories"
)
send_btn = gr.Button("📤", variant="primary", scale=1)
# Status and extracted info (auto-updated, no manual buttons needed)
with gr.Row():
with gr.Column():
chat_status = gr.Textbox(
label="🎯 Chat Status",
interactive=False,
lines=2,
info="Conversation progress and auto-actions"
)
with gr.Column():
extracted_keywords_output = gr.Textbox(
label="🏷️ Auto-Extracted Keywords",
interactive=False,
show_copy_button=True,
info="Keywords automatically extracted and used for search"
)
# --- Repo Explorer Tab ---
with gr.TabItem("🔍 Repo Explorer", id="repo_explorer_tab"):
repo_components, repo_states = create_repo_explorer_tab()
# --- Footer ---
gr.Markdown(
"""
<div style="text-align: center; padding: 30px 20px; margin-top: 40px; background: rgba(255, 255, 255, 0.1); border-radius: 16px; backdrop-filter: blur(10px);">
<p style="margin: 0; color: rgba(255, 255, 255, 0.8); font-size: 0.95rem; font-weight: 500;">
🚀 Powered by <span style="background: linear-gradient(45deg, #667eea, #764ba2); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-weight: 700;">Gradio</span>
& <span style="background: linear-gradient(45deg, #667eea, #764ba2); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-weight: 700;">Hugging Face</span>
</p>
<div style="height: 2px; width: 60px; background: linear-gradient(45deg, #667eea, #764ba2); margin: 16px auto; border-radius: 1px;"></div>
</div>
"""
)
# --- Event Handler Functions ---
def handle_smart_input(text: str, auto_analyze: bool) -> Tuple[List[str], int, pd.DataFrame, str, Any, str]:
"""Smart input handler that detects if input is repo IDs or keywords and processes accordingly."""
if not text.strip():
return [], 0, pd.DataFrame(), "Status: Please enter repository IDs or keywords.", gr.update(selected="input_tab"), ""
# Determine input type
if is_repo_id_format(text):
# Process as repository IDs
repo_ids = list(dict.fromkeys([repo.strip() for repo in re.split(r'[\n,]+', text) if repo.strip()]))
write_repos_to_csv(repo_ids)
df = format_dataframe_for_display(read_csv_to_dataframe())
status = f"✅ Found {len(repo_ids)} repository IDs. "
if auto_analyze:
status += "Starting automatic analysis..."
return repo_ids, 0, df, status, gr.update(selected="analysis_tab"), "auto_analyze"
else:
status += "Ready for manual analysis."
return repo_ids, 0, df, status, gr.update(selected="analysis_tab"), ""
else:
# Process as keywords
keyword_list = [k.strip() for k in re.split(r'[\n,]+', text) if k.strip()]
repo_ids = []
for kw in keyword_list:
repo_ids.extend(search_top_spaces(kw, limit=5))
unique_repo_ids = list(dict.fromkeys(repo_ids))
write_repos_to_csv(unique_repo_ids)
df = format_dataframe_for_display(read_csv_to_dataframe())
status = f"🔍 Found {len(unique_repo_ids)} repositories from keywords. "
if auto_analyze:
status += "Starting automatic analysis..."
return unique_repo_ids, 0, df, status, gr.update(selected="analysis_tab"), "auto_analyze"
else:
status += "Ready for manual analysis."
return unique_repo_ids, 0, df, status, gr.update(selected="analysis_tab"), ""
def handle_auto_analyze_toggle(auto_analyze: bool) -> Any:
"""Show/hide manual analysis controls based on auto-analyze setting."""
return gr.update(visible=not auto_analyze)
def handle_user_message(user_message: str, history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str]], str]:
"""Appends the user's message to the history, preparing for the bot's response."""
# Initialize chatbot with welcome message if empty
if not history:
history = [{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}]
if user_message:
history.append({"role": "user", "content": user_message})
return history, ""
def handle_bot_response(history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str]], str, str, str, List[str], int, pd.DataFrame, Any]:
"""Generates bot response and automatically extracts keywords if conversation is ready."""
if not history or history[-1]["role"] != "user":
return history, "", "", "", [], 0, pd.DataFrame(), gr.update()
user_message = history[-1]["content"]
# Convert all messages *before* the last user message into tuples for the API
tuple_history_for_api = convert_messages_to_tuples(history[:-1])
response = chat_with_user(user_message, tuple_history_for_api)
history.append({"role": "assistant", "content": response})
# Check if we should auto-extract keywords and search
if should_auto_extract_keywords(history):
# Auto-extract keywords
tuple_history = convert_messages_to_tuples(history)
raw_keywords_str = extract_keywords_from_conversation(tuple_history)
# Sanitize keywords
cleaned_keywords = re.findall(r'[\w\s-]+', raw_keywords_str)
cleaned_keywords = [kw.strip() for kw in cleaned_keywords if kw.strip()]
if cleaned_keywords:
final_keywords_str = ", ".join(cleaned_keywords)
# Extract user requirements
user_requirements = extract_user_requirements_from_chat(history)
# Auto-search repositories
repo_ids = []
for kw in cleaned_keywords[:3]: # Use top 3 keywords to avoid too many results
repo_ids.extend(search_top_spaces(kw, limit=5))
unique_repo_ids = list(dict.fromkeys(repo_ids))
write_repos_to_csv(unique_repo_ids)
df = format_dataframe_for_display(read_csv_to_dataframe())
chat_status = f"🎯 Auto-extracted keywords and found {len(unique_repo_ids)} repositories. Analysis starting automatically..."
return history, chat_status, final_keywords_str, user_requirements, unique_repo_ids, 0, df, gr.update(selected="analysis_tab")
return history, "💬 Conversation continuing...", "", "", [], 0, pd.DataFrame(), gr.update()
def handle_repo_click(evt: gr.SelectData, df_data) -> str:
"""Handle direct repository clicks - open HF space directly."""
if evt is None:
return ""
try:
row_idx = evt.index[0]
col_idx = evt.index[1]
# Only handle clicks on the repository name column (column 0)
if col_idx == 0 and isinstance(df_data, pd.DataFrame) and not df_data.empty and row_idx < len(df_data):
repo_id = df_data.iloc[row_idx, 0]
if repo_id and str(repo_id).strip() and str(repo_id).strip() != 'nan':
hf_url = f"https://huggingface.co/spaces/{str(repo_id).strip()}"
logger.info(f"Opening repository: {hf_url}")
return hf_url
except Exception as e:
logger.error(f"Error handling repository click: {e}")
return ""
def extract_user_requirements_from_chat(history: List[Dict[str, str]]) -> str:
"""Extract user requirements from chatbot conversation."""
if not history:
return ""
user_messages = []
for msg in history:
if msg.get('role') == 'user':
user_messages.append(msg.get('content', ''))
if not user_messages:
return ""
# Combine all user messages as requirements
requirements = "\n".join([f"- {msg}" for msg in user_messages if msg.strip()])
return requirements
def handle_analyze_all_repos(repo_ids: List[str], user_requirements: str, progress=gr.Progress()) -> Tuple[pd.DataFrame, str, pd.DataFrame, Any]:
"""Analyzes all repositories in the CSV file with progress tracking."""
if not repo_ids:
return pd.DataFrame(), "Status: No repositories to analyze. Please submit repo IDs first.", pd.DataFrame(), gr.update(visible=False)
total_repos = len(repo_ids)
try:
# Start the progress tracking
progress(0, desc="Initializing batch analysis...")
successful_analyses = 0
failed_analyses = 0
csv_update_failures = 0
for i, repo_id in enumerate(repo_ids):
# Update progress
progress_percent = (i / total_repos)
progress(progress_percent, desc=f"Analyzing {repo_id} ({i+1}/{total_repos})")
try:
logger.info(f"Batch analysis: Processing {repo_id} ({i+1}/{total_repos})")
# Analyze the repository
content, summary, df = analyze_and_update_single_repo(repo_id, user_requirements)
# Verify the CSV was actually updated by checking if the repo has analysis data
updated_df = read_csv_to_dataframe()
repo_updated = False
for idx, row in updated_df.iterrows():
if row["repo id"] == repo_id:
# Check if any analysis field is populated
if (row.get("strength", "").strip() or
row.get("weaknesses", "").strip() or
row.get("speciality", "").strip() or
row.get("relevance rating", "").strip()):
repo_updated = True
break
if repo_updated:
successful_analyses += 1
else:
# CSV update failed - try once more
logger.warning(f"CSV update failed for {repo_id}, attempting retry...")
time.sleep(0.5) # Wait a bit longer
# Force re-read and re-update
df_retry = read_csv_to_dataframe()
retry_success = False
# Re-parse the analysis if available
if summary and "JSON extraction: SUCCESS" in summary:
# Extract the analysis from summary - this is a fallback
logger.info(f"Attempting to re-update CSV for {repo_id}")
content_retry, summary_retry, df_retry = analyze_and_update_single_repo(repo_id, user_requirements)
# Check again
final_df = read_csv_to_dataframe()
for idx, row in final_df.iterrows():
if row["repo id"] == repo_id:
if (row.get("strength", "").strip() or
row.get("weaknesses", "").strip() or
row.get("speciality", "").strip() or
row.get("relevance rating", "").strip()):
retry_success = True
break
if retry_success:
successful_analyses += 1
else:
csv_update_failures += 1
# Longer delay to prevent file conflicts
time.sleep(0.3)
except Exception as e:
logger.error(f"Error analyzing {repo_id}: {e}")
failed_analyses += 1
# Still wait to prevent rapid failures
time.sleep(0.2)
# Complete the progress
progress(1.0, desc="Batch analysis completed!")
# Get final updated dataframe
updated_df = read_csv_to_dataframe()
# Filter out rows with no analysis data for consistent display with top 3
analyzed_df = updated_df.copy()
analyzed_df = analyzed_df[
(analyzed_df['strength'].str.strip() != '') |
(analyzed_df['weaknesses'].str.strip() != '') |
(analyzed_df['speciality'].str.strip() != '') |
(analyzed_df['relevance rating'].str.strip() != '')
]
# Get top 3 most relevant repositories using full data
top_repos = get_top_relevant_repos(updated_df, user_requirements, top_n=3)
# Final status with detailed breakdown
final_status = f"🎉 Batch Analysis Complete!\n✅ Successful: {successful_analyses}/{total_repos}\n❌ Failed: {failed_analyses}/{total_repos}"
if csv_update_failures > 0:
final_status += f"\n⚠️ CSV Update Issues: {csv_update_failures}/{total_repos}"
# Add top repos info if available
if not top_repos.empty:
final_status += f"\n\n🏆 Top {len(top_repos)} most relevant repositories selected!"
# Show top repos section if we have results
show_top_section = gr.update(visible=not top_repos.empty)
logger.info(f"Batch analysis completed: {successful_analyses} successful, {failed_analyses} failed, {csv_update_failures} CSV update issues")
return format_dataframe_for_display(analyzed_df), final_status, format_dataframe_for_display(top_repos), show_top_section
except Exception as e:
logger.error(f"Error in batch analysis: {e}")
error_status = f"❌ Batch analysis failed: {e}"
return format_dataframe_for_display(read_csv_to_dataframe()), error_status, pd.DataFrame(), gr.update(visible=False)
def handle_reset_everything() -> Tuple[List[str], int, str, pd.DataFrame, pd.DataFrame, Any, List[Dict[str, str]], str, str, str]:
"""Reset everything to initial state - clear all data, CSV, and UI components."""
try:
# Clear the CSV file
if os.path.exists(CSV_FILE):
os.remove(CSV_FILE)
logger.info("CSV file deleted for reset")
# Create empty dataframe
empty_df = pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
# Reset state variables
repo_ids_reset = []
current_idx_reset = 0
user_requirements_reset = ""
# Reset status
status_reset = "Status: Everything has been reset. Ready to start fresh!"
# Reset UI components
current_requirements_reset = "No requirements extracted yet."
extracted_keywords_reset = ""
# Reset chatbot to initial message
chatbot_reset = [{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}]
logger.info("Complete system reset performed")
return (
repo_ids_reset, # repo_ids_state
current_idx_reset, # current_repo_idx_state
user_requirements_reset, # user_requirements_state
empty_df, # df_output
empty_df, # top_repos_df
gr.update(visible=False), # top_repos_section
chatbot_reset, # chatbot
status_reset, # status_box_input
current_requirements_reset, # current_requirements_display
extracted_keywords_reset # extracted_keywords_output
)
except Exception as e:
logger.error(f"Error during reset: {e}")
error_status = f"Reset failed: {e}"
return (
[], # repo_ids_state
0, # current_repo_idx_state
"", # user_requirements_state
pd.DataFrame(), # df_output
pd.DataFrame(), # top_repos_df
gr.update(visible=False), # top_repos_section
[{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}], # chatbot
error_status, # status_box_input
"No requirements extracted yet.", # current_requirements_display
"" # extracted_keywords_output
)
# --- Component Event Wiring ---
# Initialize chatbot with welcome message on app load
app.load(
fn=lambda: [{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}],
outputs=[chatbot]
)
# Smart Input with Auto-processing
smart_input.submit(
fn=handle_smart_input,
inputs=[smart_input, auto_analyze_checkbox],
outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_input, tabs, status_box_input]
).then(
# If auto_analyze is enabled and we got repos, start analysis automatically
fn=lambda repo_ids, user_reqs, trigger: handle_analyze_all_repos(repo_ids, user_reqs) if trigger == "auto_analyze" and repo_ids else (pd.DataFrame(), "Ready for analysis.", pd.DataFrame(), gr.update(visible=False)),
inputs=[repo_ids_state, user_requirements_state, status_box_input],
outputs=[df_output, status_box_input, top_repos_df, top_repos_section]
)
# Auto-analyze checkbox toggle
auto_analyze_checkbox.change(
fn=handle_auto_analyze_toggle,
inputs=[auto_analyze_checkbox],
outputs=[manual_analysis_row]
)
# Manual analysis button (when auto-analyze is disabled)
analyze_all_btn.click(
fn=handle_analyze_all_repos,
inputs=[repo_ids_state, user_requirements_state],
outputs=[df_output, status_box_analysis, top_repos_df, top_repos_section]
)
# Chatbot with Auto-extraction and Auto-search
msg_input.submit(
fn=handle_user_message,
inputs=[msg_input, chatbot],
outputs=[chatbot, msg_input]
).then(
fn=handle_bot_response,
inputs=[chatbot],
outputs=[chatbot, chat_status, extracted_keywords_output, user_requirements_state, repo_ids_state, current_repo_idx_state, df_output, tabs]
).then(
# Update requirements display when they change
fn=lambda req: req if req.strip() else "No specific requirements extracted from conversation.",
inputs=[user_requirements_state],
outputs=[current_requirements_display]
).then(
# If we got repos from chatbot, auto-analyze them
fn=lambda repo_ids, user_reqs: handle_analyze_all_repos(repo_ids, user_reqs) if repo_ids else (pd.DataFrame(), "", pd.DataFrame(), gr.update(visible=False)),
inputs=[repo_ids_state, user_requirements_state],
outputs=[df_output, chat_status, top_repos_df, top_repos_section]
)
send_btn.click(
fn=handle_user_message,
inputs=[msg_input, chatbot],
outputs=[chatbot, msg_input]
).then(
fn=handle_bot_response,
inputs=[chatbot],
outputs=[chatbot, chat_status, extracted_keywords_output, user_requirements_state, repo_ids_state, current_repo_idx_state, df_output, tabs]
).then(
# Update requirements display when they change
fn=lambda req: req if req.strip() else "No specific requirements extracted from conversation.",
inputs=[user_requirements_state],
outputs=[current_requirements_display]
).then(
# If we got repos from chatbot, auto-analyze them
fn=lambda repo_ids, user_reqs: handle_analyze_all_repos(repo_ids, user_reqs) if repo_ids else (pd.DataFrame(), "", pd.DataFrame(), gr.update(visible=False)),
inputs=[repo_ids_state, user_requirements_state],
outputs=[df_output, chat_status, top_repos_df, top_repos_section]
)
# Repo Explorer Tab
setup_repo_explorer_events(repo_components, repo_states)
# Direct Repository Clicks - Open HF Space
df_output.select(
fn=handle_repo_click,
inputs=[df_output],
outputs=[status_box_input],
js="(url) => { if(url && url.trim()) { window.open(url, '_blank'); } }"
)
top_repos_df.select(
fn=handle_repo_click,
inputs=[top_repos_df],
outputs=[status_box_input],
js="(url) => { if(url && url.trim()) { window.open(url, '_blank'); } }"
)
# Reset button event
reset_all_btn.click(
fn=handle_reset_everything,
outputs=[repo_ids_state, current_repo_idx_state, user_requirements_state, df_output, top_repos_df, top_repos_section, chatbot, status_box_input, current_requirements_display, extracted_keywords_output]
)
return app
if __name__ == "__main__":
app = create_ui()
app.launch(debug=True)