import gradio as gr
import regex as re
import csv
import pandas as pd
from typing import List, Dict, Tuple, Any
import logging
import os
import time
# Import core logic from other modules, as in app_old.py
from analyzer import (
combine_repo_files_for_llm,
parse_llm_json_response,
analyze_combined_file,
handle_load_repository
)
from hf_utils import download_filtered_space_files, search_top_spaces
from chatbot_page import chat_with_user, extract_keywords_from_conversation
from repo_explorer import create_repo_explorer_tab, setup_repo_explorer_events
# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
CSV_FILE = "repo_ids.csv"
CHATBOT_SYSTEM_PROMPT = (
"You are a helpful assistant whose ONLY job is to gather information about the user's ideal repository requirements. "
"DO NOT suggest any specific repositories or give repository recommendations. "
"Your role is to ask clarifying questions to understand exactly what the user is looking for. "
"Ask about their use case, preferred programming language, specific features needed, project type, etc. "
"When you feel you have gathered enough detailed information about their requirements, "
"tell the user: 'I think I have enough information about your requirements. Please click the Extract Keywords button to search for repositories.' "
"Focus on understanding their needs, not providing solutions."
)
CHATBOT_INITIAL_MESSAGE = "Hello! I'm here to help you define your ideal Hugging Face repository requirements. I won't suggest specific repos - my job is to understand exactly what you're looking for. Tell me about your project: What type of application are you building? What's your use case?"
# --- Helper Functions (Logic) ---
def get_top_relevant_repos(df: pd.DataFrame, user_requirements: str, top_n: int = 3) -> pd.DataFrame:
"""
Uses LLM to select the top N most relevant repositories based on user requirements and analysis data.
"""
try:
if df.empty:
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
# Filter out rows with no analysis data
analyzed_df = df.copy()
analyzed_df = analyzed_df[
(analyzed_df['strength'].str.strip() != '') |
(analyzed_df['weaknesses'].str.strip() != '') |
(analyzed_df['speciality'].str.strip() != '') |
(analyzed_df['relevance rating'].str.strip() != '')
]
if analyzed_df.empty:
logger.warning("No analyzed repositories found for LLM selection")
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
# Create a prompt for the LLM
csv_data = ""
for idx, row in analyzed_df.iterrows():
csv_data += f"Repository: {row['repo id']}\n"
csv_data += f"Strengths: {row['strength']}\n"
csv_data += f"Weaknesses: {row['weaknesses']}\n"
csv_data += f"Speciality: {row['speciality']}\n"
csv_data += f"Relevance: {row['relevance rating']}\n\n"
user_context = user_requirements if user_requirements.strip() else "General repository recommendation"
prompt = f"""Based on the user's requirements and the analysis of repositories below, select the top {top_n} most relevant repositories.
User Requirements:
{user_context}
Repository Analysis Data:
{csv_data}
Please analyze all repositories and select the {top_n} most relevant ones based on:
1. How well they match the user's specific requirements
2. Their strengths and capabilities
3. Their relevance rating
4. Their speciality alignment with user needs
Return ONLY a JSON list of the repository IDs in order of relevance (most relevant first). Example format:
["repo1", "repo2", "repo3"]
Selected repositories:"""
try:
from openai import OpenAI
client = OpenAI(api_key=os.getenv("modal_api"))
client.base_url = os.getenv("base_url")
response = client.chat.completions.create(
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ",
messages=[
{"role": "system", "content": "You are an expert at analyzing and ranking repositories based on user requirements. Always return valid JSON."},
{"role": "user", "content": prompt}
],
max_tokens=200,
temperature=0.3
)
llm_response = response.choices[0].message.content.strip()
logger.info(f"LLM response for top repos: {llm_response}")
# Extract JSON from response
import json
import re
# Try to find JSON array in the response
json_match = re.search(r'\[.*\]', llm_response)
if json_match:
selected_repos = json.loads(json_match.group())
logger.info(f"LLM selected repositories: {selected_repos}")
# Filter dataframe to only include selected repositories in order
top_repos_list = []
for repo_id in selected_repos[:top_n]:
matching_rows = analyzed_df[analyzed_df['repo id'] == repo_id]
if not matching_rows.empty:
top_repos_list.append(matching_rows.iloc[0])
if top_repos_list:
top_repos = pd.DataFrame(top_repos_list)
logger.info(f"Successfully selected {len(top_repos)} repositories using LLM")
return top_repos
# Fallback: if LLM response parsing fails, use first N analyzed repos
logger.warning("Failed to parse LLM response, using fallback selection")
return analyzed_df.head(top_n)
except Exception as llm_error:
logger.error(f"LLM selection failed: {llm_error}")
# Fallback: return first N repositories with analysis data
return analyzed_df.head(top_n)
except Exception as e:
logger.error(f"Error in LLM-based repo selection: {e}")
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
def write_repos_to_csv(repo_ids: List[str]) -> None:
"""Writes a list of repo IDs to the CSV file, overwriting the previous content."""
try:
with open(CSV_FILE, mode="w", newline='', encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
for repo_id in repo_ids:
writer.writerow([repo_id, "", "", "", ""])
logger.info(f"Wrote {len(repo_ids)} repo IDs to {CSV_FILE}")
except Exception as e:
logger.error(f"Error writing to CSV: {e}")
def format_text_for_dataframe(text: str, max_length: int = 200) -> str:
"""Format text for better display in dataframe by truncating and cleaning."""
if not text or pd.isna(text):
return ""
# Clean the text
text = str(text).strip()
# Remove excessive whitespace and newlines
text = re.sub(r'\s+', ' ', text)
# Truncate if too long
if len(text) > max_length:
text = text[:max_length-3] + "..."
return text
def read_csv_to_dataframe() -> pd.DataFrame:
"""Reads the CSV file into a pandas DataFrame with full text preserved."""
try:
df = pd.read_csv(CSV_FILE, dtype=str).fillna('')
# Keep the full text intact - don't truncate here
# The truncation will be handled in the UI display layer
return df
except FileNotFoundError:
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
except Exception as e:
logger.error(f"Error reading CSV: {e}")
return pd.DataFrame()
def format_dataframe_for_display(df: pd.DataFrame) -> pd.DataFrame:
"""Returns dataframe with full text (no truncation) for display."""
if df.empty:
return df
# Return the dataframe as-is without any text truncation
# This will show the full text content in the CSV display
return df.copy()
def analyze_and_update_single_repo(repo_id: str, user_requirements: str = "") -> Tuple[str, str, pd.DataFrame]:
"""
Downloads, analyzes a single repo, updates the CSV, and returns results.
Now includes user requirements for better relevance rating.
This function combines the logic of downloading, analyzing, and updating the CSV for one repo.
"""
try:
logger.info(f"Starting analysis for repo: {repo_id}")
download_filtered_space_files(repo_id, local_dir="repo_files", file_extensions=['.py', '.md', '.txt'])
txt_path = combine_repo_files_for_llm()
with open(txt_path, "r", encoding="utf-8") as f:
combined_content = f.read()
llm_output = analyze_combined_file(txt_path, user_requirements)
last_start = llm_output.rfind('{')
last_end = llm_output.rfind('}')
final_json_str = llm_output[last_start:last_end+1] if last_start != -1 and last_end != -1 else "{}"
llm_json = parse_llm_json_response(final_json_str)
summary = ""
if isinstance(llm_json, dict) and "error" not in llm_json:
strengths = llm_json.get("strength", "N/A")
weaknesses = llm_json.get("weaknesses", "N/A")
relevance = llm_json.get("relevance rating", "N/A")
summary = f"JSON extraction: SUCCESS\n\nStrengths:\n{strengths}\n\nWeaknesses:\n{weaknesses}\n\nRelevance: {relevance}"
else:
summary = f"JSON extraction: FAILED\nRaw response might not be valid JSON."
# Update CSV
df = read_csv_to_dataframe()
repo_found_in_df = False
for idx, row in df.iterrows():
if row["repo id"] == repo_id:
if isinstance(llm_json, dict):
df.at[idx, "strength"] = llm_json.get("strength", "")
df.at[idx, "weaknesses"] = llm_json.get("weaknesses", "")
df.at[idx, "speciality"] = llm_json.get("speciality", "")
df.at[idx, "relevance rating"] = llm_json.get("relevance rating", "")
repo_found_in_df = True
break
if not repo_found_in_df:
logger.warning(f"Repo ID {repo_id} not found in CSV for updating.")
# Write CSV with better error handling and flushing
try:
df.to_csv(CSV_FILE, index=False)
# Force file system flush
os.sync() if hasattr(os, 'sync') else None
logger.info(f"Successfully updated CSV for {repo_id}")
except Exception as csv_error:
logger.error(f"Failed to write CSV for {repo_id}: {csv_error}")
# Try once more with a small delay
time.sleep(0.2)
try:
df.to_csv(CSV_FILE, index=False)
logger.info(f"Successfully updated CSV for {repo_id} on retry")
except Exception as retry_error:
logger.error(f"Failed to write CSV for {repo_id} on retry: {retry_error}")
logger.info(f"Successfully analyzed and updated CSV for {repo_id}")
return combined_content, summary, df
except Exception as e:
logger.error(f"An error occurred during analysis of {repo_id}: {e}")
error_summary = f"Error analyzing repo: {e}"
return "", error_summary, format_dataframe_for_display(read_csv_to_dataframe())
# --- NEW: Helper for Chat History Conversion ---
def convert_messages_to_tuples(history: List[Dict[str, str]]) -> List[Tuple[str, str]]:
"""
Converts Gradio's 'messages' format to the old 'tuple' format for compatibility.
This robust version correctly handles histories that start with an assistant message.
"""
tuple_history = []
# Iterate through the history to find user messages
for i, msg in enumerate(history):
if msg['role'] == 'user':
# Once a user message is found, check if the next message is from the assistant
if i + 1 < len(history) and history[i+1]['role'] == 'assistant':
user_content = msg['content']
assistant_content = history[i+1]['content']
tuple_history.append((user_content, assistant_content))
return tuple_history
# --- Gradio UI ---
def create_ui() -> gr.Blocks:
"""Creates and configures the entire Gradio interface."""
css = """
/* Modern sleek design */
.gradio-container {
font-family: 'Inter', 'system-ui', sans-serif;
background: linear-gradient(135deg, #0a0a0a 0%, #1a1a1a 100%);
min-height: 100vh;
}
.gr-form {
background: rgba(255, 255, 255, 0.95);
backdrop-filter: blur(10px);
border-radius: 16px;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1);
padding: 24px;
margin: 16px;
border: 1px solid rgba(255, 255, 255, 0.2);
}
.gr-button {
background: linear-gradient(45deg, #667eea, #764ba2);
border: none;
border-radius: 12px;
color: white;
font-weight: 600;
padding: 12px 24px;
transition: all 0.3s ease;
box-shadow: 0 4px 15px rgba(102, 126, 234, 0.4);
}
.gr-button:hover {
transform: translateY(-2px);
box-shadow: 0 6px 20px rgba(102, 126, 234, 0.6);
}
.gr-textbox {
border: 2px solid rgba(102, 126, 234, 0.2);
border-radius: 12px;
background: rgba(255, 255, 255, 0.9);
transition: all 0.3s ease;
}
.gr-textbox:focus {
border-color: #667eea;
box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1);
}
.gr-panel {
background: rgba(255, 255, 255, 0.95);
border-radius: 16px;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1);
border: 1px solid rgba(255, 255, 255, 0.2);
}
.gr-tab-nav {
background: rgba(255, 255, 255, 0.95);
border-radius: 12px 12px 0 0;
backdrop-filter: blur(10px);
}
.gr-tab-nav button {
background: transparent;
border: none;
padding: 16px 24px;
font-weight: 600;
color: #666;
transition: all 0.3s ease;
}
.gr-tab-nav button.selected {
background: linear-gradient(45deg, #667eea, #764ba2);
color: white;
border-radius: 8px;
}
.chatbot {
border-radius: 16px;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1);
}
/* Hide Gradio footer */
footer {
display: none !important;
}
/* Custom scrollbar */
::-webkit-scrollbar {
width: 8px;
}
::-webkit-scrollbar-track {
background: rgba(255, 255, 255, 0.1);
border-radius: 4px;
}
::-webkit-scrollbar-thumb {
background: linear-gradient(45deg, #667eea, #764ba2);
border-radius: 4px;
}
/* Improved dataframe styling for full text display */
.gr-dataframe {
border-radius: 12px;
overflow: hidden;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1);
background: rgba(255, 255, 255, 0.98);
}
.gr-dataframe table {
width: 100%;
table-layout: fixed;
border-collapse: collapse;
}
/* Column width specifications for both dataframes */
.gr-dataframe th,
.gr-dataframe td {
padding: 12px 16px;
text-align: left;
border-bottom: 1px solid rgba(0, 0, 0, 0.1);
font-size: 0.95rem;
line-height: 1.4;
}
/* Specific column widths - applying to both dataframes */
.gr-dataframe th:nth-child(1),
.gr-dataframe td:nth-child(1) { width: 16.67% !important; min-width: 16.67% !important; max-width: 16.67% !important; }
.gr-dataframe th:nth-child(2),
.gr-dataframe td:nth-child(2) { width: 25% !important; min-width: 25% !important; max-width: 25% !important; }
.gr-dataframe th:nth-child(3),
.gr-dataframe td:nth-child(3) { width: 25% !important; min-width: 25% !important; max-width: 25% !important; }
.gr-dataframe th:nth-child(4),
.gr-dataframe td:nth-child(4) { width: 20.83% !important; min-width: 20.83% !important; max-width: 20.83% !important; }
.gr-dataframe th:nth-child(5),
.gr-dataframe td:nth-child(5) { width: 12.5% !important; min-width: 12.5% !important; max-width: 12.5% !important; }
/* Additional specific targeting for both dataframes */
div[data-testid="dataframe"] table th:nth-child(1),
div[data-testid="dataframe"] table td:nth-child(1) { width: 16.67% !important; }
div[data-testid="dataframe"] table th:nth-child(2),
div[data-testid="dataframe"] table td:nth-child(2) { width: 25% !important; }
div[data-testid="dataframe"] table th:nth-child(3),
div[data-testid="dataframe"] table td:nth-child(3) { width: 25% !important; }
div[data-testid="dataframe"] table th:nth-child(4),
div[data-testid="dataframe"] table td:nth-child(4) { width: 20.83% !important; }
div[data-testid="dataframe"] table th:nth-child(5),
div[data-testid="dataframe"] table td:nth-child(5) { width: 12.5% !important; }
/* Make repository names clickable */
.gr-dataframe td:nth-child(1) {
cursor: pointer;
color: #667eea;
font-weight: 600;
transition: all 0.3s ease;
}
.gr-dataframe td:nth-child(1):hover {
background-color: rgba(102, 126, 234, 0.1);
color: #764ba2;
transform: scale(1.02);
}
/* Content columns - readable styling with scroll for long text */
.gr-dataframe td:nth-child(2),
.gr-dataframe td:nth-child(3),
.gr-dataframe td:nth-child(4),
.gr-dataframe td:nth-child(5) {
cursor: default;
font-size: 0.9rem;
}
.gr-dataframe tbody tr:hover {
background-color: rgba(102, 126, 234, 0.05);
}
/* JavaScript for auto-scroll to top on tab change */
"""
with gr.Blocks(
theme=gr.themes.Soft(
primary_hue="blue",
secondary_hue="purple",
neutral_hue="gray",
font=["Inter", "system-ui", "sans-serif"]
),
css=css,
title="๐ HF Repo Analyzer"
) as app:
# --- State Management ---
# Using simple, separate state objects for robustness.
repo_ids_state = gr.State([])
current_repo_idx_state = gr.State(0)
user_requirements_state = gr.State("") # Store user requirements from chatbot
loaded_repo_content_state = gr.State("") # Store loaded repository content
current_repo_id_state = gr.State("") # Store current repository ID
selected_repo_id_state = gr.State("") # Store selected repository ID for modal actions
gr.Markdown(
"""
๐ HF Repo Analyzer
Discover, analyze, and evaluate Hugging Face repositories with AI-powered insights
"""
)
# Global Reset Button - visible on all tabs
with gr.Row():
with gr.Column(scale=4):
pass
with gr.Column(scale=1):
reset_all_btn = gr.Button("๐ Reset Everything", variant="stop", size="lg")
with gr.Column(scale=1):
pass
with gr.Tabs() as tabs:
# --- Input Tab ---
with gr.TabItem("๐ Input & Search", id="input_tab"):
with gr.Row(equal_height=True):
with gr.Column(scale=1):
gr.Markdown("### ๐ Repository IDs")
repo_id_input = gr.Textbox(
label="Repository IDs",
lines=8,
placeholder="microsoft/DialoGPT-medium\nopenai/whisper\nhuggingface/transformers",
info="Enter repo IDs separated by commas or new lines"
)
submit_repo_btn = gr.Button("๐ Submit Repositories", variant="primary", size="lg")
with gr.Column(scale=1):
gr.Markdown("### ๐ Keyword Search")
keyword_input = gr.Textbox(
label="Search Keywords",
lines=8,
placeholder="text generation\nimage classification\nsentiment analysis",
info="Enter keywords to find relevant repositories"
)
search_btn = gr.Button("๐ Search Repositories", variant="primary", size="lg")
status_box_input = gr.Textbox(label="๐ Status", interactive=False, lines=2)
# --- Analysis Tab ---
with gr.TabItem("๐ฌ Analysis", id="analysis_tab"):
gr.Markdown("### ๐งช Repository Analysis Engine")
# Display current user requirements
with gr.Row():
current_requirements_display = gr.Textbox(
label="๐ Current User Requirements",
interactive=False,
lines=3,
info="Requirements extracted from AI chat conversation for relevance rating"
)
with gr.Row():
analyze_all_btn = gr.Button("๐ Analyze All Repositories", variant="primary", size="lg", scale=1)
with gr.Column(scale=2):
status_box_analysis = gr.Textbox(label="๐ Analysis Status", interactive=False, lines=2)
# Progress bar for batch analysis
with gr.Row():
analysis_progress = gr.Progress()
# progress_display = gr.Textbox(
# label="๐ Batch Analysis Progress",
# interactive=False,
# lines=2,
# visible=False,
# info="Shows progress when analyzing all repositories"
# )
with gr.Row(equal_height=True):
# with gr.Column():
# content_output = gr.Textbox(
# label="๐ Repository Content",
# lines=20,
# show_copy_button=True,
# info="Raw content extracted from the repository"
# )
# with gr.Column():
# summary_output = gr.Textbox(
# label="๐ฏ AI Analysis Summary",
# lines=20,
# show_copy_button=True,
# info="Detailed analysis and insights from AI"
# )
pass
gr.Markdown("### ๐ Results Dashboard")
# Top 3 Most Relevant Repositories (initially hidden)
with gr.Column(visible=False) as top_repos_section:
gr.Markdown("### ๐ Top 3 Most Relevant Repositories")
gr.Markdown("๐ฏ **These are the highest-rated repositories based on your requirements:**")
top_repos_df = gr.Dataframe(
headers=["Repository", "Strengths", "Weaknesses", "Speciality", "Relevance"],
column_widths=["16.67%", "25%", "25%", "20.83%", "12.5%"],
wrap=True,
interactive=False
)
gr.Markdown("๐ก **Tip:** Full text is displayed directly in the table. Click on repository names to explore or visit them!")
# Text expansion modal for showing full content (kept for backwards compatibility)
with gr.Row():
with gr.Column():
text_expansion_modal = gr.Column(visible=False)
with text_expansion_modal:
gr.Markdown("### ๐ Full Content View")
expanded_content_title = gr.Textbox(
label="Content Type",
interactive=False,
info="Full text content for the selected field"
)
expanded_content_text = gr.Textbox(
label="Full Text",
lines=10,
interactive=False,
show_copy_button=True,
info="Complete untruncated content"
)
close_text_modal_btn = gr.Button("โ Close", size="lg")
# Modal popup for repository action selection
with gr.Row():
with gr.Column():
repo_action_modal = gr.Column(visible=False)
with repo_action_modal:
gr.Markdown("### ๐ Repository Actions")
selected_repo_display = gr.Textbox(
label="Selected Repository",
interactive=False,
info="Choose what you'd like to do with this repository"
)
with gr.Row():
visit_repo_btn = gr.Button("๐ Visit Hugging Face Space", variant="primary", size="lg")
explore_repo_btn = gr.Button("๐ Open in Repo Explorer", variant="secondary", size="lg")
cancel_modal_btn = gr.Button("โ Cancel", size="lg")
gr.Markdown("### ๐ All Analysis Results")
df_output = gr.Dataframe(
headers=["Repository", "Strengths", "Weaknesses", "Speciality", "Relevance"],
column_widths=["16.67%", "25%", "25%", "20.83%", "12.5%"],
wrap=True,
interactive=False
)
# --- Chatbot Tab ---
with gr.TabItem("๐ค AI Assistant", id="chatbot_tab"):
gr.Markdown("### ๐ฌ Intelligent Repository Discovery")
chatbot = gr.Chatbot(
label="๐ค AI Assistant",
height=450,
type="messages",
avatar_images=(
"https://cdn-icons-png.flaticon.com/512/149/149071.png",
"https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.png"
),
show_copy_button=True
)
with gr.Row():
msg_input = gr.Textbox(
label="๐ญ Your Message",
placeholder="Tell me about your ideal repository...",
lines=1,
scale=4,
info="Describe what you're looking for"
)
send_btn = gr.Button("๐ค Send", variant="primary", scale=1)
end_chat_btn = gr.Button("๐ฏ Extract Keywords", scale=1)
use_keywords_btn = gr.Button("๐ Search Now", variant="primary", scale=1)
with gr.Row():
with gr.Column():
extracted_keywords_output = gr.Textbox(
label="๐ท๏ธ Extracted Keywords",
interactive=False,
show_copy_button=True,
info="AI-generated search terms from our conversation"
)
with gr.Column():
status_box_chatbot = gr.Textbox(
label="๐ Chat Status",
interactive=False,
info="Current conversation status"
)
# --- Repo Explorer Tab ---
with gr.TabItem("๐ Repo Explorer", id="repo_explorer_tab"):
repo_components, repo_states = create_repo_explorer_tab()
# --- Footer ---
gr.Markdown(
"""
๐ Powered by Gradio
& Hugging Face
"""
)
# --- Event Handler Functions ---
def handle_repo_id_submission(text: str) -> Tuple[List[str], int, pd.DataFrame, str, Any]:
"""Processes submitted repo IDs, updates state, and prepares for analysis."""
if not text:
return [], 0, pd.DataFrame(), "Status: Please enter repository IDs.", gr.update(selected="input_tab")
repo_ids = list(dict.fromkeys([repo.strip() for repo in re.split(r'[\n,]+', text) if repo.strip()]))
write_repos_to_csv(repo_ids)
df = format_dataframe_for_display(read_csv_to_dataframe())
status = f"Status: {len(repo_ids)} repositories submitted. Ready for analysis."
return repo_ids, 0, df, status, gr.update(selected="analysis_tab")
def handle_keyword_search(keywords: str) -> Tuple[List[str], int, pd.DataFrame, str, Any]:
"""Processes submitted keywords, finds repos, updates state, and prepares for analysis."""
if not keywords:
return [], 0, pd.DataFrame(), "Status: Please enter keywords.", gr.update(selected="input_tab")
keyword_list = [k.strip() for k in re.split(r'[\n,]+', keywords) if k.strip()]
repo_ids = []
for kw in keyword_list:
repo_ids.extend(search_top_spaces(kw, limit=5))
unique_repo_ids = list(dict.fromkeys(repo_ids))
write_repos_to_csv(unique_repo_ids)
df = format_dataframe_for_display(read_csv_to_dataframe())
status = f"Status: Found {len(unique_repo_ids)} repositories. Ready for analysis."
return unique_repo_ids, 0, df, status, gr.update(selected="analysis_tab")
def extract_user_requirements_from_chat(history: List[Dict[str, str]]) -> str:
"""Extract user requirements from chatbot conversation."""
if not history:
return ""
user_messages = []
for msg in history:
if msg.get('role') == 'user':
user_messages.append(msg.get('content', ''))
if not user_messages:
return ""
# Combine all user messages as requirements
requirements = "\n".join([f"- {msg}" for msg in user_messages if msg.strip()])
return requirements
def handle_user_message(user_message: str, history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str]], str]:
"""Appends the user's message to the history, preparing for the bot's response."""
# Initialize chatbot with welcome message if empty
if not history:
history = [{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}]
if user_message:
history.append({"role": "user", "content": user_message})
return history, ""
def handle_bot_response(history: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Generates and appends the bot's response using the compatible history format."""
if not history or history[-1]["role"] != "user":
return history
user_message = history[-1]["content"]
# Convert all messages *before* the last user message into tuples for the API
tuple_history_for_api = convert_messages_to_tuples(history[:-1])
response = chat_with_user(user_message, tuple_history_for_api)
history.append({"role": "assistant", "content": response})
return history
def handle_end_chat(history: List[Dict[str, str]]) -> Tuple[str, str, str]:
"""Ends the chat, extracts and sanitizes keywords from the conversation, and extracts user requirements."""
if not history:
return "", "Status: Chat is empty, nothing to analyze.", ""
# Convert the full, valid history for the extraction logic
tuple_history = convert_messages_to_tuples(history)
if not tuple_history:
return "", "Status: No completed conversations to analyze.", ""
# Get raw keywords string from the LLM
raw_keywords_str = extract_keywords_from_conversation(tuple_history)
# Sanitize the LLM output to extract only keyword-like parts.
# A keyword can contain letters, numbers, underscores, spaces, and hyphens.
cleaned_keywords = re.findall(r'[\w\s-]+', raw_keywords_str)
# Trim whitespace from each found keyword and filter out any empty strings
cleaned_keywords = [kw.strip() for kw in cleaned_keywords if kw.strip()]
if not cleaned_keywords:
return "", f"Status: Could not extract valid keywords. Raw LLM output: '{raw_keywords_str}'", ""
# Join them into a clean, comma-separated string for the search tool
final_keywords_str = ", ".join(cleaned_keywords)
# Extract user requirements for analysis
user_requirements = extract_user_requirements_from_chat(history)
status = "Status: Keywords extracted. User requirements saved for analysis."
return final_keywords_str, status, user_requirements
def handle_dataframe_select(evt: gr.SelectData, df_data) -> Tuple[str, Any, Any, str, str, Any, str]:
"""Handle dataframe row selection - only repo ID (column 0) shows modal since full text is now displayed directly."""
print(f"DEBUG: Selection event triggered!")
print(f"DEBUG: evt = {evt}")
print(f"DEBUG: df_data type = {type(df_data)}")
if evt is None:
return "", gr.update(visible=False), gr.update(), "", "", gr.update(visible=False), ""
try:
# Get the selected row and column from the event
row_idx = evt.index[0]
col_idx = evt.index[1]
print(f"DEBUG: Selected row {row_idx}, column {col_idx}")
# Handle pandas DataFrame
if isinstance(df_data, pd.DataFrame) and not df_data.empty and row_idx < len(df_data):
if col_idx == 0: # Repository name column - show action modal
repo_id = df_data.iloc[row_idx, 0]
print(f"DEBUG: Extracted repo_id = '{repo_id}'")
if repo_id and str(repo_id).strip() and str(repo_id).strip() != 'nan':
clean_repo_id = str(repo_id).strip()
logger.info(f"Showing modal for repository: {clean_repo_id}")
return clean_repo_id, gr.update(visible=True), gr.update(), "", "", gr.update(visible=False), clean_repo_id
# For content columns (1,2,3) and relevance (4), do nothing since full text is shown directly
else:
print(f"DEBUG: Clicked on column {col_idx}, full text already shown in table")
return "", gr.update(visible=False), gr.update(), "", "", gr.update(visible=False), ""
else:
print(f"DEBUG: df_data is not a DataFrame or row_idx {row_idx} out of range")
except Exception as e:
print(f"DEBUG: Exception occurred: {e}")
logger.error(f"Error handling dataframe selection: {e}")
return "", gr.update(visible=False), gr.update(), "", "", gr.update(visible=False), ""
def handle_analyze_all_repos(repo_ids: List[str], user_requirements: str, progress=gr.Progress()) -> Tuple[pd.DataFrame, str, pd.DataFrame, Any]:
"""Analyzes all repositories in the CSV file with progress tracking."""
if not repo_ids:
return pd.DataFrame(), "Status: No repositories to analyze. Please submit repo IDs first.", pd.DataFrame(), gr.update(visible=False)
total_repos = len(repo_ids)
try:
# Start the progress tracking
progress(0, desc="Initializing batch analysis...")
successful_analyses = 0
failed_analyses = 0
csv_update_failures = 0
for i, repo_id in enumerate(repo_ids):
# Update progress
progress_percent = (i / total_repos)
progress(progress_percent, desc=f"Analyzing {repo_id} ({i+1}/{total_repos})")
try:
logger.info(f"Batch analysis: Processing {repo_id} ({i+1}/{total_repos})")
# Analyze the repository
content, summary, df = analyze_and_update_single_repo(repo_id, user_requirements)
# Verify the CSV was actually updated by checking if the repo has analysis data
updated_df = read_csv_to_dataframe()
repo_updated = False
for idx, row in updated_df.iterrows():
if row["repo id"] == repo_id:
# Check if any analysis field is populated
if (row.get("strength", "").strip() or
row.get("weaknesses", "").strip() or
row.get("speciality", "").strip() or
row.get("relevance rating", "").strip()):
repo_updated = True
break
if repo_updated:
successful_analyses += 1
else:
# CSV update failed - try once more
logger.warning(f"CSV update failed for {repo_id}, attempting retry...")
time.sleep(0.5) # Wait a bit longer
# Force re-read and re-update
df_retry = read_csv_to_dataframe()
retry_success = False
# Re-parse the analysis if available
if summary and "JSON extraction: SUCCESS" in summary:
# Extract the analysis from summary - this is a fallback
logger.info(f"Attempting to re-update CSV for {repo_id}")
content_retry, summary_retry, df_retry = analyze_and_update_single_repo(repo_id, user_requirements)
# Check again
final_df = read_csv_to_dataframe()
for idx, row in final_df.iterrows():
if row["repo id"] == repo_id:
if (row.get("strength", "").strip() or
row.get("weaknesses", "").strip() or
row.get("speciality", "").strip() or
row.get("relevance rating", "").strip()):
retry_success = True
break
if retry_success:
successful_analyses += 1
else:
csv_update_failures += 1
# Longer delay to prevent file conflicts
time.sleep(0.3)
except Exception as e:
logger.error(f"Error analyzing {repo_id}: {e}")
failed_analyses += 1
# Still wait to prevent rapid failures
time.sleep(0.2)
# Complete the progress
progress(1.0, desc="Batch analysis completed!")
# Get final updated dataframe
updated_df = read_csv_to_dataframe()
# Filter out rows with no analysis data for consistent display with top 3
analyzed_df = updated_df.copy()
analyzed_df = analyzed_df[
(analyzed_df['strength'].str.strip() != '') |
(analyzed_df['weaknesses'].str.strip() != '') |
(analyzed_df['speciality'].str.strip() != '') |
(analyzed_df['relevance rating'].str.strip() != '')
]
# Get top 3 most relevant repositories using full data
top_repos = get_top_relevant_repos(updated_df, user_requirements, top_n=3)
# Final status with detailed breakdown
final_status = f"๐ Batch Analysis Complete!\nโ
Successful: {successful_analyses}/{total_repos}\nโ Failed: {failed_analyses}/{total_repos}"
if csv_update_failures > 0:
final_status += f"\nโ ๏ธ CSV Update Issues: {csv_update_failures}/{total_repos}"
# Add top repos info if available
if not top_repos.empty:
final_status += f"\n\n๐ Top {len(top_repos)} most relevant repositories selected!"
# Show top repos section if we have results
show_top_section = gr.update(visible=not top_repos.empty)
logger.info(f"Batch analysis completed: {successful_analyses} successful, {failed_analyses} failed, {csv_update_failures} CSV update issues")
return format_dataframe_for_display(analyzed_df), final_status, format_dataframe_for_display(top_repos), show_top_section
except Exception as e:
logger.error(f"Error in batch analysis: {e}")
error_status = f"โ Batch analysis failed: {e}"
return format_dataframe_for_display(read_csv_to_dataframe()), error_status, pd.DataFrame(), gr.update(visible=False)
def handle_visit_repo(repo_id: str) -> Tuple[Any, str]:
"""Handle visiting the Hugging Face Space for the repository."""
if repo_id and repo_id.strip():
hf_url = f"https://huggingface.co/spaces/{repo_id.strip()}"
logger.info(f"User chose to visit: {hf_url}")
return gr.update(visible=False), hf_url
return gr.update(visible=False), ""
def handle_explore_repo(selected_repo_id: str) -> Tuple[Any, Any, Any]:
"""Handle navigating to the repo explorer and populate the repo ID."""
logger.info(f"DEBUG: handle_explore_repo called with selected_repo_id: '{selected_repo_id}'")
logger.info(f"DEBUG: selected_repo_id type: {type(selected_repo_id)}")
logger.info(f"DEBUG: selected_repo_id length: {len(selected_repo_id) if selected_repo_id else 'None'}")
if selected_repo_id and selected_repo_id.strip() and selected_repo_id.strip() != 'nan':
clean_repo_id = selected_repo_id.strip()
return (
gr.update(visible=False), # close modal
gr.update(selected="repo_explorer_tab"), # switch tab
gr.update(value=clean_repo_id) # populate repo explorer input
)
else:
return (
gr.update(visible=False), # close modal
gr.update(selected="repo_explorer_tab"), # switch tab
gr.update() # don't change repo explorer input
)
def handle_cancel_modal() -> Any:
"""Handle closing the modal."""
return gr.update(visible=False)
def handle_close_text_modal() -> Any:
"""Handle closing the text expansion modal."""
return gr.update(visible=False)
def handle_reset_everything() -> Tuple[List[str], int, str, pd.DataFrame, pd.DataFrame, Any, Any, Any, List[Dict[str, str]], str, str, str]:
"""Reset everything to initial state - clear all data, CSV, and UI components."""
try:
# Clear the CSV file
if os.path.exists(CSV_FILE):
os.remove(CSV_FILE)
logger.info("CSV file deleted for reset")
# Create empty dataframe
empty_df = pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
# Reset state variables
repo_ids_reset = []
current_idx_reset = 0
user_requirements_reset = ""
# Reset status
status_reset = "Status: Everything has been reset. Ready to start fresh!"
# Reset UI components
current_requirements_reset = "No requirements extracted yet."
extracted_keywords_reset = ""
# Reset chatbot to initial message
chatbot_reset = [{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}]
logger.info("Complete system reset performed")
return (
repo_ids_reset, # repo_ids_state
current_idx_reset, # current_repo_idx_state
user_requirements_reset, # user_requirements_state
empty_df, # df_output
empty_df, # top_repos_df
gr.update(visible=False), # top_repos_section
gr.update(visible=False), # repo_action_modal
gr.update(visible=False), # text_expansion_modal
chatbot_reset, # chatbot
status_reset, # status_box_analysis
current_requirements_reset, # current_requirements_display
extracted_keywords_reset # extracted_keywords_output
)
except Exception as e:
logger.error(f"Error during reset: {e}")
error_status = f"Reset failed: {e}"
return (
[], # repo_ids_state
0, # current_repo_idx_state
"", # user_requirements_state
pd.DataFrame(), # df_output
pd.DataFrame(), # top_repos_df
gr.update(visible=False), # top_repos_section
gr.update(visible=False), # repo_action_modal
gr.update(visible=False), # text_expansion_modal
[{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}], # chatbot
error_status, # status_box_analysis
"No requirements extracted yet.", # current_requirements_display
"" # extracted_keywords_output
)
# --- Component Event Wiring ---
# Initialize chatbot with welcome message on app load
app.load(
fn=lambda: [{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}],
outputs=[chatbot]
)
# Input Tab
submit_repo_btn.click(
fn=handle_repo_id_submission,
inputs=[repo_id_input],
outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs]
)
search_btn.click(
fn=handle_keyword_search,
inputs=[keyword_input],
outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs]
)
# Analysis Tab
analyze_all_btn.click(
fn=lambda: None, # No need to show progress display since it's commented out
outputs=[]
).then(
fn=handle_analyze_all_repos,
inputs=[repo_ids_state, user_requirements_state],
outputs=[df_output, status_box_analysis, top_repos_df, top_repos_section]
)
# Chatbot Tab
msg_input.submit(
fn=handle_user_message,
inputs=[msg_input, chatbot],
outputs=[chatbot, msg_input]
).then(
fn=handle_bot_response,
inputs=[chatbot],
outputs=[chatbot]
)
send_btn.click(
fn=handle_user_message,
inputs=[msg_input, chatbot],
outputs=[chatbot, msg_input]
).then(
fn=handle_bot_response,
inputs=[chatbot],
outputs=[chatbot]
)
end_chat_btn.click(
fn=handle_end_chat,
inputs=[chatbot],
outputs=[extracted_keywords_output, status_box_chatbot, user_requirements_state]
).then(
fn=lambda req: req if req.strip() else "No specific requirements extracted from conversation.",
inputs=[user_requirements_state],
outputs=[current_requirements_display]
)
use_keywords_btn.click(
fn=handle_keyword_search,
inputs=[extracted_keywords_output],
outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs]
)
# Repo Explorer Tab
setup_repo_explorer_events(repo_components, repo_states)
# Modal button events
visit_repo_btn.click(
fn=handle_visit_repo,
inputs=[selected_repo_display],
outputs=[repo_action_modal, selected_repo_display],
js="(repo_id) => { if(repo_id && repo_id.trim()) { window.open('https://huggingface.co/spaces/' + repo_id.trim(), '_blank'); } }"
)
explore_repo_btn.click(
fn=handle_explore_repo,
inputs=[selected_repo_id_state],
outputs=[
repo_action_modal,
tabs,
repo_components["repo_explorer_input"]
],
js="""(repo_id) => {
console.log('DEBUG: Navigate to repo explorer for:', repo_id);
setTimeout(() => {
window.scrollTo({top: 0, behavior: 'smooth'});
}, 200);
}"""
)
cancel_modal_btn.click(
fn=handle_cancel_modal,
outputs=[repo_action_modal]
)
# Text expansion modal events
close_text_modal_btn.click(
fn=handle_close_text_modal,
outputs=[text_expansion_modal]
)
# Add dataframe selection event
df_output.select(
fn=handle_dataframe_select,
inputs=[df_output],
outputs=[selected_repo_display, repo_action_modal, tabs, expanded_content_title, expanded_content_text, text_expansion_modal, selected_repo_id_state]
)
# Add selection event for top repositories dataframe too
top_repos_df.select(
fn=handle_dataframe_select,
inputs=[top_repos_df],
outputs=[selected_repo_display, repo_action_modal, tabs, expanded_content_title, expanded_content_text, text_expansion_modal, selected_repo_id_state]
)
# Reset button event
reset_all_btn.click(
fn=handle_reset_everything,
outputs=[repo_ids_state, current_repo_idx_state, user_requirements_state, df_output, top_repos_df, top_repos_section, repo_action_modal, text_expansion_modal, chatbot, status_box_analysis, current_requirements_display, extracted_keywords_output]
)
return app
if __name__ == "__main__":
app = create_ui()
app.launch(debug=True)