Spaces:
Sleeping
Sleeping
import gradio as gr | |
import regex as re | |
import csv | |
import pandas as pd | |
from typing import List, Dict, Tuple, Any | |
import logging | |
import os | |
# Import core logic from other modules, as in app_old.py | |
from analyzer import combine_repo_files_for_llm, analyze_combined_file, parse_llm_json_response | |
from hf_utils import download_space_repo, search_top_spaces | |
from chatbot_page import chat_with_user, extract_keywords_from_conversation | |
from repo_explorer import create_repo_explorer_tab, setup_repo_explorer_events | |
# --- Configuration --- | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
CSV_FILE = "repo_ids.csv" | |
CHATBOT_SYSTEM_PROMPT = ( | |
"You are a helpful assistant. Your goal is to help the user describe their ideal open-source repo. " | |
"Ask questions to clarify what they want, their use case, preferred language, features, etc. " | |
"When the user clicks 'End Chat', analyze the conversation and return about 5 keywords for repo search. " | |
"Return only the keywords as a comma-separated list." | |
) | |
CHATBOT_INITIAL_MESSAGE = "Hello! Please tell me about your ideal Hugging Face repo. What use case, preferred language, or features are you looking for?" | |
# --- Helper Functions (Logic) --- | |
def write_repos_to_csv(repo_ids: List[str]) -> None: | |
"""Writes a list of repo IDs to the CSV file, overwriting the previous content.""" | |
try: | |
with open(CSV_FILE, mode="w", newline='', encoding="utf-8") as csvfile: | |
writer = csv.writer(csvfile) | |
writer.writerow(["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) | |
for repo_id in repo_ids: | |
writer.writerow([repo_id, "", "", "", ""]) | |
logger.info(f"Wrote {len(repo_ids)} repo IDs to {CSV_FILE}") | |
except Exception as e: | |
logger.error(f"Error writing to CSV: {e}") | |
def read_csv_to_dataframe() -> pd.DataFrame: | |
"""Reads the CSV file into a pandas DataFrame.""" | |
try: | |
return pd.read_csv(CSV_FILE, dtype=str).fillna('') | |
except FileNotFoundError: | |
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) | |
except Exception as e: | |
logger.error(f"Error reading CSV: {e}") | |
return pd.DataFrame() | |
def analyze_and_update_single_repo(repo_id: str, user_requirements: str = "") -> Tuple[str, str, pd.DataFrame]: | |
""" | |
Downloads, analyzes a single repo, updates the CSV, and returns results. | |
Now includes user requirements for better relevance rating. | |
This function combines the logic of downloading, analyzing, and updating the CSV for one repo. | |
""" | |
try: | |
logger.info(f"Starting analysis for repo: {repo_id}") | |
download_space_repo(repo_id, local_dir="repo_files") | |
txt_path = combine_repo_files_for_llm() | |
with open(txt_path, "r", encoding="utf-8") as f: | |
combined_content = f.read() | |
llm_output = analyze_combined_file(txt_path, user_requirements) | |
last_start = llm_output.rfind('{') | |
last_end = llm_output.rfind('}') | |
final_json_str = llm_output[last_start:last_end+1] if last_start != -1 and last_end != -1 else "{}" | |
llm_json = parse_llm_json_response(final_json_str) | |
summary = "" | |
if isinstance(llm_json, dict) and "error" not in llm_json: | |
strengths = llm_json.get("strength", "N/A") | |
weaknesses = llm_json.get("weaknesses", "N/A") | |
relevance = llm_json.get("relevance rating", "N/A") | |
summary = f"JSON extraction: SUCCESS\n\nStrengths:\n{strengths}\n\nWeaknesses:\n{weaknesses}\n\nRelevance: {relevance}" | |
else: | |
summary = f"JSON extraction: FAILED\nRaw response might not be valid JSON." | |
# Update CSV | |
df = read_csv_to_dataframe() | |
repo_found_in_df = False | |
for idx, row in df.iterrows(): | |
if row["repo id"] == repo_id: | |
if isinstance(llm_json, dict): | |
df.at[idx, "strength"] = llm_json.get("strength", "") | |
df.at[idx, "weaknesses"] = llm_json.get("weaknesses", "") | |
df.at[idx, "speciality"] = llm_json.get("speciality", "") | |
df.at[idx, "relevance rating"] = llm_json.get("relevance rating", "") | |
repo_found_in_df = True | |
break | |
if not repo_found_in_df: | |
logger.warning(f"Repo ID {repo_id} not found in CSV for updating.") | |
df.to_csv(CSV_FILE, index=False) | |
logger.info(f"Successfully analyzed and updated CSV for {repo_id}") | |
return combined_content, summary, df | |
except Exception as e: | |
logger.error(f"An error occurred during analysis of {repo_id}: {e}") | |
error_summary = f"Error analyzing repo: {e}" | |
return "", error_summary, read_csv_to_dataframe() | |
# --- NEW: Helper for Chat History Conversion --- | |
def convert_messages_to_tuples(history: List[Dict[str, str]]) -> List[Tuple[str, str]]: | |
""" | |
Converts Gradio's 'messages' format to the old 'tuple' format for compatibility. | |
This robust version correctly handles histories that start with an assistant message. | |
""" | |
tuple_history = [] | |
# Iterate through the history to find user messages | |
for i, msg in enumerate(history): | |
if msg['role'] == 'user': | |
# Once a user message is found, check if the next message is from the assistant | |
if i + 1 < len(history) and history[i+1]['role'] == 'assistant': | |
user_content = msg['content'] | |
assistant_content = history[i+1]['content'] | |
tuple_history.append((user_content, assistant_content)) | |
return tuple_history | |
# --- Gradio UI --- | |
def create_ui() -> gr.Blocks: | |
"""Creates and configures the entire Gradio interface.""" | |
css = """ | |
/* Modern sleek design */ | |
.gradio-container { | |
font-family: 'Inter', 'system-ui', sans-serif; | |
background: linear-gradient(135deg, #0a0a0a 0%, #1a1a1a 100%); | |
min-height: 100vh; | |
} | |
.gr-form { | |
background: rgba(255, 255, 255, 0.95); | |
backdrop-filter: blur(10px); | |
border-radius: 16px; | |
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1); | |
padding: 24px; | |
margin: 16px; | |
border: 1px solid rgba(255, 255, 255, 0.2); | |
} | |
.gr-button { | |
background: linear-gradient(45deg, #667eea, #764ba2); | |
border: none; | |
border-radius: 12px; | |
color: white; | |
font-weight: 600; | |
padding: 12px 24px; | |
transition: all 0.3s ease; | |
box-shadow: 0 4px 15px rgba(102, 126, 234, 0.4); | |
} | |
.gr-button:hover { | |
transform: translateY(-2px); | |
box-shadow: 0 6px 20px rgba(102, 126, 234, 0.6); | |
} | |
.gr-textbox { | |
border: 2px solid rgba(102, 126, 234, 0.2); | |
border-radius: 12px; | |
background: rgba(255, 255, 255, 0.9); | |
transition: all 0.3s ease; | |
} | |
.gr-textbox:focus { | |
border-color: #667eea; | |
box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1); | |
} | |
.gr-panel { | |
background: rgba(255, 255, 255, 0.95); | |
border-radius: 16px; | |
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1); | |
border: 1px solid rgba(255, 255, 255, 0.2); | |
} | |
.gr-tab-nav { | |
background: rgba(255, 255, 255, 0.95); | |
border-radius: 12px 12px 0 0; | |
backdrop-filter: blur(10px); | |
} | |
.gr-tab-nav button { | |
background: transparent; | |
border: none; | |
padding: 16px 24px; | |
font-weight: 600; | |
color: #666; | |
transition: all 0.3s ease; | |
} | |
.gr-tab-nav button.selected { | |
background: linear-gradient(45deg, #667eea, #764ba2); | |
color: white; | |
border-radius: 8px; | |
} | |
.chatbot { | |
border-radius: 16px; | |
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.1); | |
} | |
/* Hide Gradio footer */ | |
footer { | |
display: none !important; | |
} | |
/* Custom scrollbar */ | |
::-webkit-scrollbar { | |
width: 8px; | |
} | |
::-webkit-scrollbar-track { | |
background: rgba(255, 255, 255, 0.1); | |
border-radius: 4px; | |
} | |
::-webkit-scrollbar-thumb { | |
background: linear-gradient(45deg, #667eea, #764ba2); | |
border-radius: 4px; | |
} | |
""" | |
with gr.Blocks( | |
theme=gr.themes.Soft( | |
primary_hue="blue", | |
secondary_hue="purple", | |
neutral_hue="gray", | |
font=["Inter", "system-ui", "sans-serif"] | |
), | |
css=css, | |
title="🚀 HF Repo Analyzer" | |
) as app: | |
# --- State Management --- | |
# Using simple, separate state objects for robustness. | |
repo_ids_state = gr.State([]) | |
current_repo_idx_state = gr.State(0) | |
user_requirements_state = gr.State("") # Store user requirements from chatbot | |
loaded_repo_content_state = gr.State("") # Store loaded repository content | |
current_repo_id_state = gr.State("") # Store current repository ID | |
gr.Markdown( | |
""" | |
<div style="text-align: center; padding: 40px 20px; background: rgba(255, 255, 255, 0.1); border-radius: 20px; margin: 20px auto; max-width: 900px; backdrop-filter: blur(10px);"> | |
<h1 style="font-size: 3.5rem; font-weight: 800; margin: 0; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); -webkit-background-clip: text; -webkit-text-fill-color: transparent; background-clip: text;"> | |
🚀 HF Repo Analyzer | |
</h1> | |
<p style="font-size: 1.3rem; color: rgba(255, 255, 255, 0.9); margin: 16px 0 0 0; font-weight: 400; line-height: 1.6;"> | |
Discover, analyze, and evaluate Hugging Face repositories with AI-powered insights | |
</p> | |
<div style="height: 4px; width: 80px; background: linear-gradient(45deg, #667eea, #764ba2); margin: 24px auto; border-radius: 2px;"></div> | |
</div> | |
""" | |
) | |
with gr.Tabs() as tabs: | |
# --- Input Tab --- | |
with gr.TabItem("📝 Input & Search", id="input_tab"): | |
with gr.Row(equal_height=True): | |
with gr.Column(scale=1): | |
gr.Markdown("### 📁 Repository IDs") | |
repo_id_input = gr.Textbox( | |
label="Repository IDs", | |
lines=8, | |
placeholder="microsoft/DialoGPT-medium\nopenai/whisper\nhuggingface/transformers", | |
info="Enter repo IDs separated by commas or new lines" | |
) | |
submit_repo_btn = gr.Button("🚀 Submit Repositories", variant="primary", size="lg") | |
with gr.Column(scale=1): | |
gr.Markdown("### 🔍 Keyword Search") | |
keyword_input = gr.Textbox( | |
label="Search Keywords", | |
lines=8, | |
placeholder="text generation\nimage classification\nsentiment analysis", | |
info="Enter keywords to find relevant repositories" | |
) | |
search_btn = gr.Button("🔎 Search Repositories", variant="primary", size="lg") | |
status_box_input = gr.Textbox(label="📊 Status", interactive=False, lines=2) | |
# --- Analysis Tab --- | |
with gr.TabItem("🔬 Analysis", id="analysis_tab"): | |
gr.Markdown("### 🧪 Repository Analysis Engine") | |
# Display current user requirements | |
with gr.Row(): | |
current_requirements_display = gr.Textbox( | |
label="📋 Current User Requirements", | |
interactive=False, | |
lines=3, | |
info="Requirements extracted from AI chat conversation for relevance rating" | |
) | |
with gr.Row(): | |
analyze_next_btn = gr.Button("⚡ Analyze Next Repository", variant="primary", size="lg", scale=2) | |
with gr.Column(scale=3): | |
status_box_analysis = gr.Textbox(label="📈 Analysis Status", interactive=False, lines=2) | |
with gr.Row(equal_height=True): | |
with gr.Column(): | |
content_output = gr.Textbox( | |
label="📄 Repository Content", | |
lines=20, | |
show_copy_button=True, | |
info="Raw content extracted from the repository" | |
) | |
with gr.Column(): | |
summary_output = gr.Textbox( | |
label="🎯 AI Analysis Summary", | |
lines=20, | |
show_copy_button=True, | |
info="Detailed analysis and insights from AI" | |
) | |
gr.Markdown("### 📊 Results Dashboard") | |
df_output = gr.Dataframe( | |
headers=["Repository", "Strengths", "Weaknesses", "Speciality", "Relevance"], | |
wrap=True, | |
interactive=False | |
) | |
# --- Chatbot Tab --- | |
with gr.TabItem("🤖 AI Assistant", id="chatbot_tab"): | |
gr.Markdown("### 💬 Intelligent Repository Discovery") | |
chatbot = gr.Chatbot( | |
label="🤖 AI Assistant", | |
height=450, | |
type="messages", | |
avatar_images=( | |
"https://cdn-icons-png.flaticon.com/512/149/149071.png", | |
"https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.png" | |
), | |
show_copy_button=True | |
) | |
with gr.Row(): | |
msg_input = gr.Textbox( | |
label="💭 Your Message", | |
placeholder="Tell me about your ideal repository...", | |
lines=1, | |
scale=4, | |
info="Describe what you're looking for" | |
) | |
send_btn = gr.Button("📤 Send", variant="primary", scale=1) | |
end_chat_btn = gr.Button("🎯 Extract Keywords", scale=1) | |
use_keywords_btn = gr.Button("🔎 Search Now", variant="primary", scale=1) | |
with gr.Row(): | |
with gr.Column(): | |
extracted_keywords_output = gr.Textbox( | |
label="🏷️ Extracted Keywords", | |
interactive=False, | |
show_copy_button=True, | |
info="AI-generated search terms from our conversation" | |
) | |
with gr.Column(): | |
status_box_chatbot = gr.Textbox( | |
label="📊 Chat Status", | |
interactive=False, | |
info="Current conversation status" | |
) | |
# --- Repo Explorer Tab --- | |
repo_explorer_tab, repo_components, repo_states = create_repo_explorer_tab() | |
# --- Footer --- | |
gr.Markdown( | |
""" | |
<div style="text-align: center; padding: 30px 20px; margin-top: 40px; background: rgba(255, 255, 255, 0.1); border-radius: 16px; backdrop-filter: blur(10px);"> | |
<p style="margin: 0; color: rgba(255, 255, 255, 0.8); font-size: 0.95rem; font-weight: 500;"> | |
🚀 Powered by <span style="background: linear-gradient(45deg, #667eea, #764ba2); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-weight: 700;">Gradio</span> | |
& <span style="background: linear-gradient(45deg, #667eea, #764ba2); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-weight: 700;">Hugging Face</span> | |
</p> | |
<div style="height: 2px; width: 60px; background: linear-gradient(45deg, #667eea, #764ba2); margin: 16px auto; border-radius: 1px;"></div> | |
</div> | |
""" | |
) | |
# --- Event Handler Functions --- | |
def handle_repo_id_submission(text: str) -> Tuple[List[str], int, pd.DataFrame, str, Any]: | |
"""Processes submitted repo IDs, updates state, and prepares for analysis.""" | |
if not text: | |
return [], 0, pd.DataFrame(), "Status: Please enter repository IDs.", gr.update(selected="input_tab") | |
repo_ids = list(dict.fromkeys([repo.strip() for repo in re.split(r'[\n,]+', text) if repo.strip()])) | |
write_repos_to_csv(repo_ids) | |
df = read_csv_to_dataframe() | |
status = f"Status: {len(repo_ids)} repositories submitted. Ready for analysis." | |
return repo_ids, 0, df, status, gr.update(selected="analysis_tab") | |
def handle_keyword_search(keywords: str) -> Tuple[List[str], int, pd.DataFrame, str, Any]: | |
"""Processes submitted keywords, finds repos, updates state, and prepares for analysis.""" | |
if not keywords: | |
return [], 0, pd.DataFrame(), "Status: Please enter keywords.", gr.update(selected="input_tab") | |
keyword_list = [k.strip() for k in re.split(r'[\n,]+', keywords) if k.strip()] | |
repo_ids = [] | |
for kw in keyword_list: | |
repo_ids.extend(search_top_spaces(kw, limit=5)) | |
unique_repo_ids = list(dict.fromkeys(repo_ids)) | |
write_repos_to_csv(unique_repo_ids) | |
df = read_csv_to_dataframe() | |
status = f"Status: Found {len(unique_repo_ids)} repositories. Ready for analysis." | |
return unique_repo_ids, 0, df, status, gr.update(selected="analysis_tab") | |
def extract_user_requirements_from_chat(history: List[Dict[str, str]]) -> str: | |
"""Extract user requirements from chatbot conversation.""" | |
if not history: | |
return "" | |
user_messages = [] | |
for msg in history: | |
if msg.get('role') == 'user': | |
user_messages.append(msg.get('content', '')) | |
if not user_messages: | |
return "" | |
# Combine all user messages as requirements | |
requirements = "\n".join([f"- {msg}" for msg in user_messages if msg.strip()]) | |
return requirements | |
def handle_analyze_next(repo_ids: List[str], current_idx: int, user_requirements: str) -> Tuple[str, str, pd.DataFrame, int, str]: | |
"""Analyzes the next repository in the list.""" | |
if not repo_ids: | |
return "", "", pd.DataFrame(), 0, "Status: No repositories to analyze. Please submit repo IDs first." | |
if current_idx >= len(repo_ids): | |
return "", "", read_csv_to_dataframe(), current_idx, "Status: All repositories have been analyzed." | |
repo_id_to_analyze = repo_ids[current_idx] | |
status = f"Status: Analyzing repository {current_idx + 1}/{len(repo_ids)}: {repo_id_to_analyze}" | |
if user_requirements.strip(): | |
status += f"\nUsing user requirements for relevance rating." | |
content, summary, df = analyze_and_update_single_repo(repo_id_to_analyze, user_requirements) | |
next_idx = current_idx + 1 | |
if next_idx >= len(repo_ids): | |
status += "\n\nFinished all analyses." | |
return content, summary, df, next_idx, status | |
def handle_user_message(user_message: str, history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str]], str]: | |
"""Appends the user's message to the history, preparing for the bot's response.""" | |
# Initialize chatbot with welcome message if empty | |
if not history: | |
history = [{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}] | |
if user_message: | |
history.append({"role": "user", "content": user_message}) | |
return history, "" | |
def handle_bot_response(history: List[Dict[str, str]]) -> List[Dict[str, str]]: | |
"""Generates and appends the bot's response using the compatible history format.""" | |
if not history or history[-1]["role"] != "user": | |
return history | |
user_message = history[-1]["content"] | |
# Convert all messages *before* the last user message into tuples for the API | |
tuple_history_for_api = convert_messages_to_tuples(history[:-1]) | |
response = chat_with_user(user_message, tuple_history_for_api) | |
history.append({"role": "assistant", "content": response}) | |
return history | |
def handle_end_chat(history: List[Dict[str, str]]) -> Tuple[str, str, str]: | |
"""Ends the chat, extracts and sanitizes keywords from the conversation, and extracts user requirements.""" | |
if not history: | |
return "", "Status: Chat is empty, nothing to analyze.", "" | |
# Convert the full, valid history for the extraction logic | |
tuple_history = convert_messages_to_tuples(history) | |
if not tuple_history: | |
return "", "Status: No completed conversations to analyze.", "" | |
# Get raw keywords string from the LLM | |
raw_keywords_str = extract_keywords_from_conversation(tuple_history) | |
# Sanitize the LLM output to extract only keyword-like parts. | |
# A keyword can contain letters, numbers, underscores, spaces, and hyphens. | |
cleaned_keywords = re.findall(r'[\w\s-]+', raw_keywords_str) | |
# Trim whitespace from each found keyword and filter out any empty strings | |
cleaned_keywords = [kw.strip() for kw in cleaned_keywords if kw.strip()] | |
if not cleaned_keywords: | |
return "", f"Status: Could not extract valid keywords. Raw LLM output: '{raw_keywords_str}'", "" | |
# Join them into a clean, comma-separated string for the search tool | |
final_keywords_str = ", ".join(cleaned_keywords) | |
# Extract user requirements for analysis | |
user_requirements = extract_user_requirements_from_chat(history) | |
status = "Status: Keywords extracted. User requirements saved for analysis." | |
return final_keywords_str, status, user_requirements | |
# --- Component Event Wiring --- | |
# Initialize chatbot with welcome message on app load | |
app.load( | |
fn=lambda: [{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}], | |
outputs=[chatbot] | |
) | |
# Input Tab | |
submit_repo_btn.click( | |
fn=handle_repo_id_submission, | |
inputs=[repo_id_input], | |
outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs] | |
) | |
search_btn.click( | |
fn=handle_keyword_search, | |
inputs=[keyword_input], | |
outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs] | |
) | |
# Analysis Tab | |
analyze_next_btn.click( | |
fn=handle_analyze_next, | |
inputs=[repo_ids_state, current_repo_idx_state, user_requirements_state], | |
outputs=[content_output, summary_output, df_output, current_repo_idx_state, status_box_analysis] | |
) | |
# Chatbot Tab | |
msg_input.submit( | |
fn=handle_user_message, | |
inputs=[msg_input, chatbot], | |
outputs=[chatbot, msg_input] | |
).then( | |
fn=handle_bot_response, | |
inputs=[chatbot], | |
outputs=[chatbot] | |
) | |
send_btn.click( | |
fn=handle_user_message, | |
inputs=[msg_input, chatbot], | |
outputs=[chatbot, msg_input] | |
).then( | |
fn=handle_bot_response, | |
inputs=[chatbot], | |
outputs=[chatbot] | |
) | |
end_chat_btn.click( | |
fn=handle_end_chat, | |
inputs=[chatbot], | |
outputs=[extracted_keywords_output, status_box_chatbot, user_requirements_state] | |
).then( | |
fn=lambda req: req if req.strip() else "No specific requirements extracted from conversation.", | |
inputs=[user_requirements_state], | |
outputs=[current_requirements_display] | |
) | |
use_keywords_btn.click( | |
fn=handle_keyword_search, | |
inputs=[extracted_keywords_output], | |
outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs] | |
) | |
# Repo Explorer Tab | |
setup_repo_explorer_events(repo_components, repo_states) | |
return app | |
if __name__ == "__main__": | |
app = create_ui() | |
app.launch(debug=True) | |