HF_RepoSense / app.py
naman1102's picture
Update app.py
39207e4
raw
history blame
47 kB
import gradio as gr
import regex as re
import csv
import pandas as pd
from typing import List, Dict, Tuple, Any
import logging
import os
import time
# Import core logic from other modules, as in app_old.py
from analyzer import combine_repo_files_for_llm, analyze_combined_file, parse_llm_json_response
from hf_utils import download_space_repo, search_top_spaces
from chatbot_page import chat_with_user, extract_keywords_from_conversation
from repo_explorer import create_repo_explorer_tab, setup_repo_explorer_events
# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
CSV_FILE = "repo_ids.csv"
CHATBOT_SYSTEM_PROMPT = (
"You are a helpful assistant. Your goal is to help the user describe their ideal open-source repo. "
"Ask questions to clarify what they want, their use case, preferred language, features, etc. "
"When the user clicks 'End Chat', analyze the conversation and return about 5 keywords for repo search. "
"Return only the keywords as a comma-separated list."
)
CHATBOT_INITIAL_MESSAGE = "Hello! Please tell me about your ideal Hugging Face repo. What use case, preferred language, or features are you looking for?"
# --- Helper Functions (Logic) ---
def get_top_relevant_repos(df: pd.DataFrame, user_requirements: str, top_n: int = 3) -> pd.DataFrame:
"""
Uses LLM to select the top N most relevant repositories based on user requirements and analysis data.
"""
try:
if df.empty:
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
# Filter out rows with no analysis data
analyzed_df = df.copy()
analyzed_df = analyzed_df[
(analyzed_df['strength'].str.strip() != '') |
(analyzed_df['weaknesses'].str.strip() != '') |
(analyzed_df['speciality'].str.strip() != '') |
(analyzed_df['relevance rating'].str.strip() != '')
]
if analyzed_df.empty:
logger.warning("No analyzed repositories found for LLM selection")
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
# Create a prompt for the LLM
csv_data = ""
for idx, row in analyzed_df.iterrows():
csv_data += f"Repository: {row['repo id']}\n"
csv_data += f"Strengths: {row['strength']}\n"
csv_data += f"Weaknesses: {row['weaknesses']}\n"
csv_data += f"Speciality: {row['speciality']}\n"
csv_data += f"Relevance: {row['relevance rating']}\n\n"
user_context = user_requirements if user_requirements.strip() else "General repository recommendation"
prompt = f"""Based on the user's requirements and the analysis of repositories below, select the top {top_n} most relevant repositories.
User Requirements:
{user_context}
Repository Analysis Data:
{csv_data}
Please analyze all repositories and select the {top_n} most relevant ones based on:
1. How well they match the user's specific requirements
2. Their strengths and capabilities
3. Their relevance rating
4. Their speciality alignment with user needs
Return ONLY a JSON list of the repository IDs in order of relevance (most relevant first). Example format:
["repo1", "repo2", "repo3"]
Selected repositories:"""
try:
from openai import OpenAI
client = OpenAI(api_key=os.getenv("modal_api"))
client.base_url = os.getenv("base_url")
response = client.chat.completions.create(
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ",
messages=[
{"role": "system", "content": "You are an expert at analyzing and ranking repositories based on user requirements. Always return valid JSON."},
{"role": "user", "content": prompt}
],
max_tokens=200,
temperature=0.3
)
llm_response = response.choices[0].message.content.strip()
logger.info(f"LLM response for top repos: {llm_response}")
# Extract JSON from response
import json
import re
# Try to find JSON array in the response
json_match = re.search(r'\[.*\]', llm_response)
if json_match:
selected_repos = json.loads(json_match.group())
logger.info(f"LLM selected repositories: {selected_repos}")
# Filter dataframe to only include selected repositories in order
top_repos_list = []
for repo_id in selected_repos[:top_n]:
matching_rows = analyzed_df[analyzed_df['repo id'] == repo_id]
if not matching_rows.empty:
top_repos_list.append(matching_rows.iloc[0])
if top_repos_list:
top_repos = pd.DataFrame(top_repos_list)
logger.info(f"Successfully selected {len(top_repos)} repositories using LLM")
return top_repos
# Fallback: if LLM response parsing fails, use first N analyzed repos
logger.warning("Failed to parse LLM response, using fallback selection")
return analyzed_df.head(top_n)
except Exception as llm_error:
logger.error(f"LLM selection failed: {llm_error}")
# Fallback: return first N repositories with analysis data
return analyzed_df.head(top_n)
except Exception as e:
logger.error(f"Error in LLM-based repo selection: {e}")
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
def write_repos_to_csv(repo_ids: List[str]) -> None:
"""Writes a list of repo IDs to the CSV file, overwriting the previous content."""
try:
with open(CSV_FILE, mode="w", newline='', encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
for repo_id in repo_ids:
writer.writerow([repo_id, "", "", "", ""])
logger.info(f"Wrote {len(repo_ids)} repo IDs to {CSV_FILE}")
except Exception as e:
logger.error(f"Error writing to CSV: {e}")
def format_text_for_dataframe(text: str, max_length: int = 200) -> str:
"""Format text for better display in dataframe by truncating and cleaning."""
if not text or pd.isna(text):
return ""
# Clean the text
text = str(text).strip()
# Remove excessive whitespace and newlines
text = re.sub(r'\s+', ' ', text)
# Truncate if too long
if len(text) > max_length:
text = text[:max_length-3] + "..."
return text
def read_csv_to_dataframe() -> pd.DataFrame:
"""Reads the CSV file into a pandas DataFrame with formatted text for display."""
try:
df = pd.read_csv(CSV_FILE, dtype=str).fillna('')
# Format text columns for better display
if not df.empty:
df['repo id'] = df['repo id'].apply(lambda x: format_text_for_dataframe(x, 50))
df['strength'] = df['strength'].apply(lambda x: format_text_for_dataframe(x, 180))
df['weaknesses'] = df['weaknesses'].apply(lambda x: format_text_for_dataframe(x, 180))
df['speciality'] = df['speciality'].apply(lambda x: format_text_for_dataframe(x, 150))
# Keep relevance rating as is since it should be short
return df
except FileNotFoundError:
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"])
except Exception as e:
logger.error(f"Error reading CSV: {e}")
return pd.DataFrame()
def analyze_and_update_single_repo(repo_id: str, user_requirements: str = "") -> Tuple[str, str, pd.DataFrame]:
"""
Downloads, analyzes a single repo, updates the CSV, and returns results.
Now includes user requirements for better relevance rating.
This function combines the logic of downloading, analyzing, and updating the CSV for one repo.
"""
try:
logger.info(f"Starting analysis for repo: {repo_id}")
download_space_repo(repo_id, local_dir="repo_files")
txt_path = combine_repo_files_for_llm()
with open(txt_path, "r", encoding="utf-8") as f:
combined_content = f.read()
llm_output = analyze_combined_file(txt_path, user_requirements)
last_start = llm_output.rfind('{')
last_end = llm_output.rfind('}')
final_json_str = llm_output[last_start:last_end+1] if last_start != -1 and last_end != -1 else "{}"
llm_json = parse_llm_json_response(final_json_str)
summary = ""
if isinstance(llm_json, dict) and "error" not in llm_json:
strengths = llm_json.get("strength", "N/A")
weaknesses = llm_json.get("weaknesses", "N/A")
relevance = llm_json.get("relevance rating", "N/A")
summary = f"JSON extraction: SUCCESS\n\nStrengths:\n{strengths}\n\nWeaknesses:\n{weaknesses}\n\nRelevance: {relevance}"
else:
summary = f"JSON extraction: FAILED\nRaw response might not be valid JSON."
# Update CSV
df = read_csv_to_dataframe()
repo_found_in_df = False
for idx, row in df.iterrows():
if row["repo id"] == repo_id:
if isinstance(llm_json, dict):
df.at[idx, "strength"] = llm_json.get("strength", "")
df.at[idx, "weaknesses"] = llm_json.get("weaknesses", "")
df.at[idx, "speciality"] = llm_json.get("speciality", "")
df.at[idx, "relevance rating"] = llm_json.get("relevance rating", "")
repo_found_in_df = True
break
if not repo_found_in_df:
logger.warning(f"Repo ID {repo_id} not found in CSV for updating.")
# Write CSV with better error handling and flushing
try:
df.to_csv(CSV_FILE, index=False)
# Force file system flush
os.sync() if hasattr(os, 'sync') else None
logger.info(f"Successfully updated CSV for {repo_id}")
except Exception as csv_error:
logger.error(f"Failed to write CSV for {repo_id}: {csv_error}")
# Try once more with a small delay
time.sleep(0.2)
try:
df.to_csv(CSV_FILE, index=False)
logger.info(f"Successfully updated CSV for {repo_id} on retry")
except Exception as retry_error:
logger.error(f"Failed to write CSV for {repo_id} on retry: {retry_error}")
logger.info(f"Successfully analyzed and updated CSV for {repo_id}")
return combined_content, summary, df
except Exception as e:
logger.error(f"An error occurred during analysis of {repo_id}: {e}")
error_summary = f"Error analyzing repo: {e}"
return "", error_summary, read_csv_to_dataframe()
# --- NEW: Helper for Chat History Conversion ---
def convert_messages_to_tuples(history: List[Dict[str, str]]) -> List[Tuple[str, str]]:
"""
Converts Gradio's 'messages' format to the old 'tuple' format for compatibility.
This robust version correctly handles histories that start with an assistant message.
"""
tuple_history = []
# Iterate through the history to find user messages
for i, msg in enumerate(history):
if msg['role'] == 'user':
# Once a user message is found, check if the next message is from the assistant
if i + 1 < len(history) and history[i+1]['role'] == 'assistant':
user_content = msg['content']
assistant_content = history[i+1]['content']
tuple_history.append((user_content, assistant_content))
return tuple_history
# --- Gradio UI ---
def create_ui() -> gr.Blocks:
"""Creates and configures the entire Gradio interface."""
css = """
/* Modern sleek design */
.gradio-container {
font-family: 'Inter', 'system-ui', sans-serif;
background: linear-gradient(135deg, #0a0a0a 0%, #1a1a1a 100%);
min-height: 100vh;
}
.gr-form {
background: rgba(255, 255, 255, 0.95);
backdrop-filter: blur(10px);
border-radius: 16px;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1);
padding: 24px;
margin: 16px;
border: 1px solid rgba(255, 255, 255, 0.2);
}
.gr-button {
background: linear-gradient(45deg, #667eea, #764ba2);
border: none;
border-radius: 12px;
color: white;
font-weight: 600;
padding: 12px 24px;
transition: all 0.3s ease;
box-shadow: 0 4px 15px rgba(102, 126, 234, 0.4);
}
.gr-button:hover {
transform: translateY(-2px);
box-shadow: 0 6px 20px rgba(102, 126, 234, 0.6);
}
.gr-textbox {
border: 2px solid rgba(102, 126, 234, 0.2);
border-radius: 12px;
background: rgba(255, 255, 255, 0.9);
transition: all 0.3s ease;
}
.gr-textbox:focus {
border-color: #667eea;
box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1);
}
.gr-panel {
background: rgba(255, 255, 255, 0.95);
border-radius: 16px;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1);
border: 1px solid rgba(255, 255, 255, 0.2);
}
.gr-tab-nav {
background: rgba(255, 255, 255, 0.95);
border-radius: 12px 12px 0 0;
backdrop-filter: blur(10px);
}
.gr-tab-nav button {
background: transparent;
border: none;
padding: 16px 24px;
font-weight: 600;
color: #666;
transition: all 0.3s ease;
}
.gr-tab-nav button.selected {
background: linear-gradient(45deg, #667eea, #764ba2);
color: white;
border-radius: 8px;
}
.chatbot {
border-radius: 16px;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1);
}
/* Hide Gradio footer */
footer {
display: none !important;
}
/* Custom scrollbar */
::-webkit-scrollbar {
width: 8px;
}
::-webkit-scrollbar-track {
background: rgba(255, 255, 255, 0.1);
border-radius: 4px;
}
::-webkit-scrollbar-thumb {
background: linear-gradient(45deg, #667eea, #764ba2);
border-radius: 4px;
}
/* Improved dataframe styling */
.gr-dataframe {
max-height: 400px;
overflow-y: auto;
}
.gr-dataframe table {
table-layout: fixed;
width: 100%;
}
.gr-dataframe th,
.gr-dataframe td {
padding: 8px 12px;
vertical-align: top;
word-wrap: break-word;
overflow-wrap: break-word;
max-height: 100px;
overflow-y: auto;
}
.gr-dataframe th:nth-child(1),
.gr-dataframe td:nth-child(1) { width: 15%; }
.gr-dataframe th:nth-child(2),
.gr-dataframe td:nth-child(2) { width: 25%; }
.gr-dataframe th:nth-child(3),
.gr-dataframe td:nth-child(3) { width: 25%; }
.gr-dataframe th:nth-child(4),
.gr-dataframe td:nth-child(4) { width: 20%; }
.gr-dataframe th:nth-child(5),
.gr-dataframe td:nth-child(5) { width: 15%; }
/* Make repository names clickable */
.gr-dataframe td:nth-child(1) {
cursor: pointer;
color: #667eea;
font-weight: 600;
transition: all 0.3s ease;
}
.gr-dataframe td:nth-child(1):hover {
background-color: rgba(102, 126, 234, 0.1);
color: #764ba2;
transform: scale(1.02);
}
/* Remove hover effect from other cells */
.gr-dataframe td:nth-child(n+2) {
cursor: default;
}
.gr-dataframe tbody tr:hover {
background-color: rgba(102, 126, 234, 0.05);
}
"""
with gr.Blocks(
theme=gr.themes.Soft(
primary_hue="blue",
secondary_hue="purple",
neutral_hue="gray",
font=["Inter", "system-ui", "sans-serif"]
),
css=css,
title="🚀 HF Repo Analyzer"
) as app:
# --- State Management ---
# Using simple, separate state objects for robustness.
repo_ids_state = gr.State([])
current_repo_idx_state = gr.State(0)
user_requirements_state = gr.State("") # Store user requirements from chatbot
loaded_repo_content_state = gr.State("") # Store loaded repository content
current_repo_id_state = gr.State("") # Store current repository ID
gr.Markdown(
"""
<div style="text-align: center; padding: 40px 20px; background: rgba(255, 255, 255, 0.1); border-radius: 20px; margin: 20px auto; max-width: 900px; backdrop-filter: blur(10px);">
<h1 style="font-size: 3.5rem; font-weight: 800; margin: 0; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); -webkit-background-clip: text; -webkit-text-fill-color: transparent; background-clip: text;">
🚀 HF Repo Analyzer
</h1>
<p style="font-size: 1.3rem; color: rgba(255, 255, 255, 0.9); margin: 16px 0 0 0; font-weight: 400; line-height: 1.6;">
Discover, analyze, and evaluate Hugging Face repositories with AI-powered insights
</p>
<div style="height: 4px; width: 80px; background: linear-gradient(45deg, #667eea, #764ba2); margin: 24px auto; border-radius: 2px;"></div>
</div>
"""
)
with gr.Tabs() as tabs:
# --- Input Tab ---
with gr.TabItem("📝 Input & Search", id="input_tab"):
with gr.Row(equal_height=True):
with gr.Column(scale=1):
gr.Markdown("### 📁 Repository IDs")
repo_id_input = gr.Textbox(
label="Repository IDs",
lines=8,
placeholder="microsoft/DialoGPT-medium\nopenai/whisper\nhuggingface/transformers",
info="Enter repo IDs separated by commas or new lines"
)
submit_repo_btn = gr.Button("🚀 Submit Repositories", variant="primary", size="lg")
with gr.Column(scale=1):
gr.Markdown("### 🔍 Keyword Search")
keyword_input = gr.Textbox(
label="Search Keywords",
lines=8,
placeholder="text generation\nimage classification\nsentiment analysis",
info="Enter keywords to find relevant repositories"
)
search_btn = gr.Button("🔎 Search Repositories", variant="primary", size="lg")
status_box_input = gr.Textbox(label="📊 Status", interactive=False, lines=2)
# --- Analysis Tab ---
with gr.TabItem("🔬 Analysis", id="analysis_tab"):
gr.Markdown("### 🧪 Repository Analysis Engine")
# Display current user requirements
with gr.Row():
current_requirements_display = gr.Textbox(
label="📋 Current User Requirements",
interactive=False,
lines=3,
info="Requirements extracted from AI chat conversation for relevance rating"
)
with gr.Row():
analyze_next_btn = gr.Button("⚡ Analyze Next Repository", variant="primary", size="lg", scale=1)
analyze_all_btn = gr.Button("🚀 Analyze All Repositories", variant="secondary", size="lg", scale=1)
with gr.Column(scale=2):
status_box_analysis = gr.Textbox(label="📈 Analysis Status", interactive=False, lines=2)
# Progress bar for batch analysis
with gr.Row():
analysis_progress = gr.Progress()
# progress_display = gr.Textbox(
# label="📊 Batch Analysis Progress",
# interactive=False,
# lines=2,
# visible=False,
# info="Shows progress when analyzing all repositories"
# )
with gr.Row(equal_height=True):
# with gr.Column():
# content_output = gr.Textbox(
# label="📄 Repository Content",
# lines=20,
# show_copy_button=True,
# info="Raw content extracted from the repository"
# )
# with gr.Column():
# summary_output = gr.Textbox(
# label="🎯 AI Analysis Summary",
# lines=20,
# show_copy_button=True,
# info="Detailed analysis and insights from AI"
# )
pass
gr.Markdown("### 📊 Results Dashboard")
# Top 3 Most Relevant Repositories (initially hidden)
with gr.Column(visible=False) as top_repos_section:
gr.Markdown("### 🏆 Top 3 Most Relevant Repositories")
gr.Markdown("🎯 **These are the highest-rated repositories based on your requirements:**")
top_repos_df = gr.Dataframe(
headers=["Repository", "Strengths", "Weaknesses", "Speciality", "Relevance"],
wrap=True,
interactive=False,
height=200,
info="Click on any repository name to explore or visit"
)
gr.Markdown("💡 **Tip:** Click on any repository name to explore it in detail!")
# Modal popup for repository action selection
with gr.Row():
with gr.Column():
repo_action_modal = gr.Column(visible=False)
with repo_action_modal:
gr.Markdown("### 🔗 Repository Actions")
selected_repo_display = gr.Textbox(
label="Selected Repository",
interactive=False,
info="Choose what you'd like to do with this repository"
)
with gr.Row():
visit_repo_btn = gr.Button("🌐 Visit Hugging Face Space", variant="primary", size="lg")
explore_repo_btn = gr.Button("🔍 Open in Repo Explorer", variant="secondary", size="lg")
cancel_modal_btn = gr.Button("❌ Cancel", size="lg")
gr.Markdown("### 📋 All Analysis Results")
df_output = gr.Dataframe(
headers=["Repository", "Strengths", "Weaknesses", "Speciality", "Relevance"],
wrap=True,
interactive=False # Prevent editing but allow selection
)
# --- Chatbot Tab ---
with gr.TabItem("🤖 AI Assistant", id="chatbot_tab"):
gr.Markdown("### 💬 Intelligent Repository Discovery")
chatbot = gr.Chatbot(
label="🤖 AI Assistant",
height=450,
type="messages",
avatar_images=(
"https://cdn-icons-png.flaticon.com/512/149/149071.png",
"https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.png"
),
show_copy_button=True
)
with gr.Row():
msg_input = gr.Textbox(
label="💭 Your Message",
placeholder="Tell me about your ideal repository...",
lines=1,
scale=4,
info="Describe what you're looking for"
)
send_btn = gr.Button("📤 Send", variant="primary", scale=1)
end_chat_btn = gr.Button("🎯 Extract Keywords", scale=1)
use_keywords_btn = gr.Button("🔎 Search Now", variant="primary", scale=1)
with gr.Row():
with gr.Column():
extracted_keywords_output = gr.Textbox(
label="🏷️ Extracted Keywords",
interactive=False,
show_copy_button=True,
info="AI-generated search terms from our conversation"
)
with gr.Column():
status_box_chatbot = gr.Textbox(
label="📊 Chat Status",
interactive=False,
info="Current conversation status"
)
# --- Repo Explorer Tab ---
with gr.TabItem("🔍 Repo Explorer", id="repo_explorer_tab"):
repo_components, repo_states = create_repo_explorer_tab()
# --- Footer ---
gr.Markdown(
"""
<div style="text-align: center; padding: 30px 20px; margin-top: 40px; background: rgba(255, 255, 255, 0.1); border-radius: 16px; backdrop-filter: blur(10px);">
<p style="margin: 0; color: rgba(255, 255, 255, 0.8); font-size: 0.95rem; font-weight: 500;">
🚀 Powered by <span style="background: linear-gradient(45deg, #667eea, #764ba2); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-weight: 700;">Gradio</span>
& <span style="background: linear-gradient(45deg, #667eea, #764ba2); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-weight: 700;">Hugging Face</span>
</p>
<div style="height: 2px; width: 60px; background: linear-gradient(45deg, #667eea, #764ba2); margin: 16px auto; border-radius: 1px;"></div>
</div>
"""
)
# --- Event Handler Functions ---
def handle_repo_id_submission(text: str) -> Tuple[List[str], int, pd.DataFrame, str, Any]:
"""Processes submitted repo IDs, updates state, and prepares for analysis."""
if not text:
return [], 0, pd.DataFrame(), "Status: Please enter repository IDs.", gr.update(selected="input_tab")
repo_ids = list(dict.fromkeys([repo.strip() for repo in re.split(r'[\n,]+', text) if repo.strip()]))
write_repos_to_csv(repo_ids)
df = read_csv_to_dataframe()
status = f"Status: {len(repo_ids)} repositories submitted. Ready for analysis."
return repo_ids, 0, df, status, gr.update(selected="analysis_tab")
def handle_keyword_search(keywords: str) -> Tuple[List[str], int, pd.DataFrame, str, Any]:
"""Processes submitted keywords, finds repos, updates state, and prepares for analysis."""
if not keywords:
return [], 0, pd.DataFrame(), "Status: Please enter keywords.", gr.update(selected="input_tab")
keyword_list = [k.strip() for k in re.split(r'[\n,]+', keywords) if k.strip()]
repo_ids = []
for kw in keyword_list:
repo_ids.extend(search_top_spaces(kw, limit=5))
unique_repo_ids = list(dict.fromkeys(repo_ids))
write_repos_to_csv(unique_repo_ids)
df = read_csv_to_dataframe()
status = f"Status: Found {len(unique_repo_ids)} repositories. Ready for analysis."
return unique_repo_ids, 0, df, status, gr.update(selected="analysis_tab")
def extract_user_requirements_from_chat(history: List[Dict[str, str]]) -> str:
"""Extract user requirements from chatbot conversation."""
if not history:
return ""
user_messages = []
for msg in history:
if msg.get('role') == 'user':
user_messages.append(msg.get('content', ''))
if not user_messages:
return ""
# Combine all user messages as requirements
requirements = "\n".join([f"- {msg}" for msg in user_messages if msg.strip()])
return requirements
def handle_analyze_next(repo_ids: List[str], current_idx: int, user_requirements: str) -> Tuple[pd.DataFrame, int, str]:
"""Analyzes the next repository in the list."""
if not repo_ids:
return pd.DataFrame(), 0, "Status: No repositories to analyze. Please submit repo IDs first."
if current_idx >= len(repo_ids):
return read_csv_to_dataframe(), current_idx, "Status: All repositories have been analyzed."
repo_id_to_analyze = repo_ids[current_idx]
status = f"Status: Analyzing repository {current_idx + 1}/{len(repo_ids)}: {repo_id_to_analyze}"
if user_requirements.strip():
status += f"\nUsing user requirements for relevance rating."
content, summary, df = analyze_and_update_single_repo(repo_id_to_analyze, user_requirements)
next_idx = current_idx + 1
if next_idx >= len(repo_ids):
status += "\n\nFinished all analyses."
return df, next_idx, status
def handle_user_message(user_message: str, history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str]], str]:
"""Appends the user's message to the history, preparing for the bot's response."""
# Initialize chatbot with welcome message if empty
if not history:
history = [{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}]
if user_message:
history.append({"role": "user", "content": user_message})
return history, ""
def handle_bot_response(history: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Generates and appends the bot's response using the compatible history format."""
if not history or history[-1]["role"] != "user":
return history
user_message = history[-1]["content"]
# Convert all messages *before* the last user message into tuples for the API
tuple_history_for_api = convert_messages_to_tuples(history[:-1])
response = chat_with_user(user_message, tuple_history_for_api)
history.append({"role": "assistant", "content": response})
return history
def handle_end_chat(history: List[Dict[str, str]]) -> Tuple[str, str, str]:
"""Ends the chat, extracts and sanitizes keywords from the conversation, and extracts user requirements."""
if not history:
return "", "Status: Chat is empty, nothing to analyze.", ""
# Convert the full, valid history for the extraction logic
tuple_history = convert_messages_to_tuples(history)
if not tuple_history:
return "", "Status: No completed conversations to analyze.", ""
# Get raw keywords string from the LLM
raw_keywords_str = extract_keywords_from_conversation(tuple_history)
# Sanitize the LLM output to extract only keyword-like parts.
# A keyword can contain letters, numbers, underscores, spaces, and hyphens.
cleaned_keywords = re.findall(r'[\w\s-]+', raw_keywords_str)
# Trim whitespace from each found keyword and filter out any empty strings
cleaned_keywords = [kw.strip() for kw in cleaned_keywords if kw.strip()]
if not cleaned_keywords:
return "", f"Status: Could not extract valid keywords. Raw LLM output: '{raw_keywords_str}'", ""
# Join them into a clean, comma-separated string for the search tool
final_keywords_str = ", ".join(cleaned_keywords)
# Extract user requirements for analysis
user_requirements = extract_user_requirements_from_chat(history)
status = "Status: Keywords extracted. User requirements saved for analysis."
return final_keywords_str, status, user_requirements
def handle_dataframe_select(evt: gr.SelectData, df_data) -> Tuple[str, Any, Any]:
"""Handle dataframe row selection - only repo ID column triggers modal."""
print(f"DEBUG: Selection event triggered!")
print(f"DEBUG: evt = {evt}")
print(f"DEBUG: df_data type = {type(df_data)}")
if evt is None:
return "", gr.update(visible=False), gr.update()
try:
# Get the selected row and column from the event
row_idx = evt.index[0]
col_idx = evt.index[1]
print(f"DEBUG: Selected row {row_idx}, column {col_idx}")
# Only respond to clicks on the repo ID column (column 0)
if col_idx != 0:
print(f"DEBUG: Clicked on column {col_idx}, ignoring (only repo ID column responds)")
return "", gr.update(visible=False), gr.update()
# Handle pandas DataFrame
if isinstance(df_data, pd.DataFrame) and not df_data.empty and row_idx < len(df_data):
# Get the repository ID from the first column
repo_id = df_data.iloc[row_idx, 0] # First column contains repo id
print(f"DEBUG: Extracted repo_id = '{repo_id}'")
# Only proceed if we actually have a repository ID
if repo_id and str(repo_id).strip() and str(repo_id).strip() != 'nan':
clean_repo_id = str(repo_id).strip()
logger.info(f"Showing modal for repository: {clean_repo_id}")
# Show modal and populate selected repo
return clean_repo_id, gr.update(visible=True), gr.update()
else:
print(f"DEBUG: df_data is not a DataFrame or row_idx {row_idx} out of range")
except Exception as e:
print(f"DEBUG: Exception occurred: {e}")
logger.error(f"Error handling dataframe selection: {e}")
return "", gr.update(visible=False), gr.update()
def handle_analyze_all_repos(repo_ids: List[str], user_requirements: str, progress=gr.Progress()) -> Tuple[pd.DataFrame, str, pd.DataFrame, Any]:
"""Analyzes all repositories in the CSV file with progress tracking."""
if not repo_ids:
return pd.DataFrame(), "Status: No repositories to analyze. Please submit repo IDs first.", pd.DataFrame(), gr.update(visible=False)
total_repos = len(repo_ids)
try:
# Start the progress tracking
progress(0, desc="Initializing batch analysis...")
successful_analyses = 0
failed_analyses = 0
csv_update_failures = 0
for i, repo_id in enumerate(repo_ids):
# Update progress
progress_percent = (i / total_repos)
progress(progress_percent, desc=f"Analyzing {repo_id} ({i+1}/{total_repos})")
try:
logger.info(f"Batch analysis: Processing {repo_id} ({i+1}/{total_repos})")
# Analyze the repository
content, summary, df = analyze_and_update_single_repo(repo_id, user_requirements)
# Verify the CSV was actually updated by checking if the repo has analysis data
updated_df = read_csv_to_dataframe()
repo_updated = False
for idx, row in updated_df.iterrows():
if row["repo id"] == repo_id:
# Check if any analysis field is populated
if (row.get("strength", "").strip() or
row.get("weaknesses", "").strip() or
row.get("speciality", "").strip() or
row.get("relevance rating", "").strip()):
repo_updated = True
break
if repo_updated:
successful_analyses += 1
else:
# CSV update failed - try once more
logger.warning(f"CSV update failed for {repo_id}, attempting retry...")
time.sleep(0.5) # Wait a bit longer
# Force re-read and re-update
df_retry = read_csv_to_dataframe()
retry_success = False
# Re-parse the analysis if available
if summary and "JSON extraction: SUCCESS" in summary:
# Extract the analysis from summary - this is a fallback
logger.info(f"Attempting to re-update CSV for {repo_id}")
content_retry, summary_retry, df_retry = analyze_and_update_single_repo(repo_id, user_requirements)
# Check again
final_df = read_csv_to_dataframe()
for idx, row in final_df.iterrows():
if row["repo id"] == repo_id:
if (row.get("strength", "").strip() or
row.get("weaknesses", "").strip() or
row.get("speciality", "").strip() or
row.get("relevance rating", "").strip()):
retry_success = True
break
if retry_success:
successful_analyses += 1
else:
csv_update_failures += 1
# Longer delay to prevent file conflicts
time.sleep(0.3)
except Exception as e:
logger.error(f"Error analyzing {repo_id}: {e}")
failed_analyses += 1
# Still wait to prevent rapid failures
time.sleep(0.2)
# Complete the progress
progress(1.0, desc="Batch analysis completed!")
# Get final updated dataframe
updated_df = read_csv_to_dataframe()
# Get top 3 most relevant repositories
top_repos = get_top_relevant_repos(updated_df, user_requirements, top_n=3)
# Final status with detailed breakdown
final_status = f"🎉 Batch Analysis Complete!\n✅ Successful: {successful_analyses}/{total_repos}\n❌ Failed: {failed_analyses}/{total_repos}"
if csv_update_failures > 0:
final_status += f"\n⚠️ CSV Update Issues: {csv_update_failures}/{total_repos}"
# Add top repos info if available
if not top_repos.empty:
final_status += f"\n\n🏆 Top {len(top_repos)} most relevant repositories selected!"
# Show top repos section if we have results
show_top_section = gr.update(visible=not top_repos.empty)
logger.info(f"Batch analysis completed: {successful_analyses} successful, {failed_analyses} failed, {csv_update_failures} CSV update issues")
return updated_df, final_status, top_repos, show_top_section
except Exception as e:
logger.error(f"Error in batch analysis: {e}")
error_status = f"❌ Batch analysis failed: {e}"
return read_csv_to_dataframe(), error_status, pd.DataFrame(), gr.update(visible=False)
def handle_visit_repo(repo_id: str) -> Tuple[Any, str]:
"""Handle visiting the Hugging Face Space for the repository."""
if repo_id and repo_id.strip():
hf_url = f"https://huggingface.co/spaces/{repo_id.strip()}"
logger.info(f"User chose to visit: {hf_url}")
return gr.update(visible=False), hf_url
return gr.update(visible=False), ""
def handle_explore_repo(repo_id: str) -> Tuple[Any, Any, str]:
"""Handle navigating to the repo explorer for the repository."""
if repo_id and repo_id.strip():
logger.info(f"User chose to explore: {repo_id.strip()}")
return gr.update(visible=False), gr.update(selected="repo_explorer_tab"), repo_id.strip()
return gr.update(visible=False), gr.update(), ""
def handle_cancel_modal() -> Any:
"""Handle closing the modal."""
return gr.update(visible=False)
# --- Component Event Wiring ---
# Initialize chatbot with welcome message on app load
app.load(
fn=lambda: [{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}],
outputs=[chatbot]
)
# Input Tab
submit_repo_btn.click(
fn=handle_repo_id_submission,
inputs=[repo_id_input],
outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs]
)
search_btn.click(
fn=handle_keyword_search,
inputs=[keyword_input],
outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs]
)
# Analysis Tab
analyze_next_btn.click(
fn=handle_analyze_next,
inputs=[repo_ids_state, current_repo_idx_state, user_requirements_state],
outputs=[df_output, current_repo_idx_state, status_box_analysis]
)
analyze_all_btn.click(
fn=lambda: None, # No need to show progress display since it's commented out
outputs=[]
).then(
fn=handle_analyze_all_repos,
inputs=[repo_ids_state, user_requirements_state],
outputs=[df_output, status_box_analysis, top_repos_df, top_repos_section]
)
# Chatbot Tab
msg_input.submit(
fn=handle_user_message,
inputs=[msg_input, chatbot],
outputs=[chatbot, msg_input]
).then(
fn=handle_bot_response,
inputs=[chatbot],
outputs=[chatbot]
)
send_btn.click(
fn=handle_user_message,
inputs=[msg_input, chatbot],
outputs=[chatbot, msg_input]
).then(
fn=handle_bot_response,
inputs=[chatbot],
outputs=[chatbot]
)
end_chat_btn.click(
fn=handle_end_chat,
inputs=[chatbot],
outputs=[extracted_keywords_output, status_box_chatbot, user_requirements_state]
).then(
fn=lambda req: req if req.strip() else "No specific requirements extracted from conversation.",
inputs=[user_requirements_state],
outputs=[current_requirements_display]
)
use_keywords_btn.click(
fn=handle_keyword_search,
inputs=[extracted_keywords_output],
outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs]
)
# Repo Explorer Tab
setup_repo_explorer_events(repo_components, repo_states)
# Modal button events
visit_repo_btn.click(
fn=handle_visit_repo,
inputs=[selected_repo_display],
outputs=[repo_action_modal, selected_repo_display],
js="(repo_id) => { if(repo_id && repo_id.trim()) { window.open('https://huggingface.co/spaces/' + repo_id.trim(), '_blank'); } }"
)
explore_repo_btn.click(
fn=handle_explore_repo,
inputs=[selected_repo_display],
outputs=[repo_action_modal, tabs, repo_components["repo_explorer_input"]]
)
cancel_modal_btn.click(
fn=handle_cancel_modal,
outputs=[repo_action_modal]
)
# Add dataframe selection event
df_output.select(
fn=handle_dataframe_select,
inputs=[df_output],
outputs=[selected_repo_display, repo_action_modal, tabs]
)
# Add selection event for top repositories dataframe too
top_repos_df.select(
fn=handle_dataframe_select,
inputs=[top_repos_df],
outputs=[selected_repo_display, repo_action_modal, tabs]
)
return app
if __name__ == "__main__":
app = create_ui()
app.launch(debug=True)