Spaces:
Sleeping
Sleeping
import gradio as gr | |
import regex as re | |
import csv | |
import pandas as pd | |
from typing import Dict, List, Tuple, Optional, Any | |
import logging | |
from pathlib import Path | |
import os | |
from analyzer import ( | |
combine_repo_files_for_llm, | |
analyze_combined_file, | |
parse_llm_json_response, | |
analyze_code | |
) | |
from hf_utils import download_space_repo, search_top_spaces | |
from chatbot_page import chat_with_user, extract_keywords_from_conversation | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# Constants | |
CHATBOT_SYSTEM_PROMPT = ( | |
"You are a helpful assistant. Your goal is to help the user describe their ideal open-source repo. " | |
"Ask questions to clarify what they want, their use case, preferred language, features, etc. " | |
"When the user clicks 'End Chat', analyze the conversation and return about 5 keywords for repo search. " | |
"Return only the keywords as a comma-separated list." | |
) | |
CHATBOT_INITIAL_MESSAGE = ( | |
"Hello! Please tell me about your ideal Hugging Face repo. " | |
"What use case, preferred language, or features are you looking for?" | |
) | |
# State management | |
class AppState: | |
def __init__(self): | |
self.repo_ids: List[str] = [] | |
self.current_repo_idx: int = 0 | |
self.generated_keywords: List[str] = [] | |
self.analysis_results: Dict[str, Dict[str, Any]] = {} | |
self.chat_history: List[Tuple[str, str]] = [] | |
def reset(self): | |
self.__init__() | |
# Helper functions | |
def read_csv_as_text(csv_filename: str) -> pd.DataFrame: | |
"""Read CSV file and return as DataFrame with string dtype.""" | |
try: | |
return pd.read_csv(csv_filename, dtype=str) | |
except Exception as e: | |
logger.error(f"Error reading CSV file {csv_filename}: {e}") | |
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) | |
def write_repos_to_csv(repo_ids: List[str], csv_filename: str = "repo_ids.csv") -> None: | |
"""Write repo IDs to CSV file.""" | |
try: | |
with open(csv_filename, mode="w", newline='', encoding="utf-8") as csvfile: | |
writer = csv.writer(csvfile) | |
writer.writerow(["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) | |
for repo_id in repo_ids: | |
writer.writerow([repo_id, "", "", "", ""]) | |
except Exception as e: | |
logger.error(f"Error writing to CSV file {csv_filename}: {e}") | |
def process_repo_input(text: str, state: AppState) -> pd.DataFrame: | |
"""Process input text containing repo IDs and update state.""" | |
if not text: | |
state.repo_ids = [] | |
state.current_repo_idx = 0 | |
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) | |
repo_ids = [repo.strip() for repo in re.split(r'[\n,]+', text) if repo.strip()] | |
state.repo_ids = repo_ids | |
state.current_repo_idx = 0 | |
write_repos_to_csv(repo_ids) | |
return read_csv_as_text("repo_ids.csv") | |
def analyze_single_repo(repo_id: str) -> Tuple[str, str, Dict[str, Any]]: | |
"""Analyze a single repository and return combined content, summary, and analysis results.""" | |
try: | |
download_space_repo(repo_id, local_dir="repo_files") | |
txt_path = combine_repo_files_for_llm() | |
with open(txt_path, "r", encoding="utf-8") as f: | |
combined_content = f.read() | |
llm_output = analyze_combined_file(txt_path) | |
last_start = llm_output.rfind('{') | |
last_end = llm_output.rfind('}') | |
final_json_str = llm_output[last_start:last_end+1] if last_start != -1 and last_end != -1 and last_end > last_start else llm_output | |
llm_json = parse_llm_json_response(final_json_str) | |
if isinstance(llm_json, dict) and "error" not in llm_json: | |
strengths = llm_json.get("strength", "") | |
weaknesses = llm_json.get("weaknesses", "") | |
summary = f"JSON extraction: SUCCESS\n\nStrengths:\n{strengths}\n\nWeaknesses:\n{weaknesses}" | |
else: | |
summary = f"JSON extraction: FAILED\nRaw: {llm_json.get('raw', '') if isinstance(llm_json, dict) else llm_json}" | |
return combined_content, summary, llm_json | |
except Exception as e: | |
logger.error(f"Error analyzing repo {repo_id}: {e}") | |
return "", f"Error analyzing repo: {e}", {"error": str(e)} | |
def update_csv_with_analysis(repo_id: str, analysis_results: Dict[str, Any], csv_filename: str = "repo_ids.csv") -> pd.DataFrame: | |
"""Update CSV file with analysis results for a repository.""" | |
try: | |
df = read_csv_as_text(csv_filename) | |
updated = False | |
for idx, row in df.iterrows(): | |
if row["repo id"] == repo_id: | |
if isinstance(analysis_results, dict) and "error" not in analysis_results: | |
df.at[idx, "strength"] = analysis_results.get("strength", "") | |
df.at[idx, "weaknesses"] = analysis_results.get("weaknesses", "") | |
df.at[idx, "speciality"] = analysis_results.get("speciality", "") | |
df.at[idx, "relevance rating"] = analysis_results.get("relevance rating", "") | |
updated = True | |
break | |
if not updated and isinstance(analysis_results, dict) and "error" not in analysis_results: | |
new_row = { | |
"repo id": repo_id, | |
"strength": analysis_results.get("strength", ""), | |
"weaknesses": analysis_results.get("weaknesses", ""), | |
"speciality": analysis_results.get("speciality", ""), | |
"relevance rating": analysis_results.get("relevance rating", "") | |
} | |
df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) | |
df.to_csv(csv_filename, index=False) | |
return df | |
except Exception as e: | |
logger.error(f"Error updating CSV for repo {repo_id}: {e}") | |
return read_csv_as_text(csv_filename) | |
def show_combined_repo_and_llm(state: AppState) -> Tuple[str, str, pd.DataFrame]: | |
"""Show combined repo content and LLM analysis for current repo.""" | |
if not state.repo_ids: | |
return "No repo ID available. Please submit repo IDs first.", "", pd.DataFrame() | |
if state.current_repo_idx >= len(state.repo_ids): | |
return "All repo IDs have been processed.", "", read_csv_as_text("repo_ids.csv") | |
repo_id = state.repo_ids[state.current_repo_idx] | |
combined_content, summary, analysis_results = analyze_single_repo(repo_id) | |
df = update_csv_with_analysis(repo_id, analysis_results) | |
state.current_repo_idx += 1 | |
return combined_content, summary, df | |
def keyword_search_and_update(keyword: str, state: AppState) -> pd.DataFrame: | |
"""Search for repos using keywords and update state.""" | |
if not keyword: | |
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) | |
keyword_list = [k.strip() for k in re.split(r'[\n,]+', keyword) if k.strip()] | |
repo_ids = [] | |
for kw in keyword_list: | |
repo_ids.extend(search_top_spaces(kw, limit=5)) | |
# Remove duplicates while preserving order | |
seen = set() | |
unique_repo_ids = [] | |
for rid in repo_ids: | |
if rid not in seen: | |
unique_repo_ids.append(rid) | |
seen.add(rid) | |
state.repo_ids = unique_repo_ids | |
state.current_repo_idx = 0 | |
write_repos_to_csv(unique_repo_ids) | |
return read_csv_as_text("repo_ids.csv") | |
# UI Components | |
def create_ui() -> gr.Blocks: | |
"""Create the Gradio interface.""" | |
state = gr.State(AppState()) | |
with gr.Blocks(title="Hugging Face Repo Analyzer", theme=gr.themes.Soft()) as app: | |
gr.Markdown("# Hugging Face Repository Analyzer") | |
# Navigation state | |
current_page = gr.State("start") | |
# Start Page | |
with gr.Group(visible=True) as start_page: | |
gr.Markdown(""" | |
# Welcome to the Hugging Face Repository Analyzer! | |
This tool helps you analyze and understand Hugging Face repositories. You can: | |
- Enter repository IDs directly | |
- Search repositories using keywords | |
- Chat with an AI assistant to find the perfect repository | |
- Get detailed analysis of repositories | |
Click 'Start Analysis' to begin! | |
""") | |
with gr.Row(): | |
start_btn = gr.Button("Start Analysis", variant="primary") | |
help_btn = gr.Button("View Help Guide", variant="secondary") | |
# Help Guide | |
with gr.Group(visible=False) as help_page: | |
gr.Markdown(""" | |
# Help Guide | |
## Quick Start | |
1. Enter repository IDs or search by keywords | |
2. Start the analysis | |
3. Review the results | |
## Features | |
- **Repository Analysis**: Get detailed insights about repositories | |
- **Keyword Search**: Find repositories matching your criteria | |
- **AI Assistant**: Chat to find the perfect repository | |
- **Comparison**: Compare repositories side by side | |
## Keyboard Shortcuts | |
- `Ctrl + Enter`: Send message in chat | |
- `Ctrl + S`: Start new analysis | |
- `Ctrl + H`: Toggle help guide | |
""") | |
back_btn = gr.Button("Back to Start", variant="primary") | |
# Input Page | |
with gr.Group(visible=False) as input_page: | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Enter Repository IDs") | |
repo_id_input = gr.Textbox( | |
label="Enter repo IDs (comma or newline separated)", | |
lines=5, | |
placeholder="repo1, repo2\nrepo3" | |
) | |
submit_btn = gr.Button("Submit Repo IDs", variant="primary") | |
submit_status = gr.Textbox(label="Status", visible=False) | |
with gr.Column(): | |
gr.Markdown("### Or Search by Keywords") | |
keyword_input = gr.Textbox( | |
label="Enter keywords to search", | |
lines=3, | |
placeholder="Enter keywords separated by commas" | |
) | |
search_btn = gr.Button("Search by Keywords", variant="primary") | |
search_status = gr.Textbox(label="Status", visible=False) | |
df_output = gr.Dataframe( | |
headers=["repo id", "strength", "weaknesses", "speciality", "relevance rating"], | |
datatype=["str", "str", "str", "str", "str"] | |
) | |
with gr.Row(): | |
analyze_btn = gr.Button("Start Analysis", variant="primary") | |
analyze_status = gr.Textbox(label="Status", visible=False) | |
compare_btn = gr.Button("Compare Repositories", variant="secondary") | |
# Analysis Page | |
with gr.Group(visible=False) as analysis_page: | |
gr.Markdown("### Repository Analysis") | |
progress = gr.Slider( | |
minimum=0, | |
maximum=100, | |
value=0, | |
label="Analysis Progress", | |
interactive=False | |
) | |
with gr.Row(): | |
with gr.Column(): | |
content_output = gr.Textbox(label="Repository Content", lines=10) | |
with gr.Column(): | |
summary_output = gr.Textbox(label="Analysis Summary", lines=10) | |
with gr.Row(): | |
next_btn = gr.Button("Analyze Next Repository", variant="primary") | |
next_status = gr.Textbox(label="Status", visible=False) | |
finish_btn = gr.Button("Finish Analysis", variant="secondary") | |
export_btn = gr.Button("Export Results", variant="secondary") | |
export_status = gr.Textbox(label="Status", visible=False) | |
# Comparison Page | |
with gr.Group(visible=False) as comparison_page: | |
gr.Markdown("### Repository Comparison") | |
with gr.Row(): | |
with gr.Column(): | |
repo1_select = gr.Dropdown( | |
label="Select First Repository", | |
choices=[], | |
interactive=True | |
) | |
repo1_content = gr.Textbox(label="Repository 1 Content", lines=10) | |
repo1_summary = gr.Textbox(label="Repository 1 Summary", lines=10) | |
with gr.Column(): | |
repo2_select = gr.Dropdown( | |
label="Select Second Repository", | |
choices=[], | |
interactive=True | |
) | |
repo2_content = gr.Textbox(label="Repository 2 Content", lines=10) | |
repo2_summary = gr.Textbox(label="Repository 2 Summary", lines=10) | |
compare_btn = gr.Button("Compare", variant="primary") | |
back_to_analysis_btn = gr.Button("Back to Analysis", variant="secondary") | |
# Chatbot Page | |
with gr.Group(visible=False) as chatbot_page: | |
gr.Markdown("### Chat with Assistant") | |
gr.Markdown(""" | |
Tell me about your ideal repository. I'll help you find the perfect match! | |
What are you looking for? Consider: | |
- Your use case | |
- Preferred programming language | |
- Required features | |
- Any specific requirements | |
""") | |
chatbot = gr.Chatbot( | |
label="Chat with Assistant", | |
height=400, | |
type="messages" | |
) | |
msg = gr.Textbox( | |
label="Message", | |
placeholder="Type your message here...", | |
lines=2 | |
) | |
with gr.Row(): | |
send_btn = gr.Button("Send", variant="primary") | |
send_status = gr.Textbox(label="Status", visible=False) | |
end_chat_btn = gr.Button("End Chat", variant="secondary") | |
end_chat_status = gr.Textbox(label="Status", visible=False) | |
# Results Page | |
with gr.Group(visible=False) as results_page: | |
gr.Markdown("### Analysis Results") | |
with gr.Row(): | |
with gr.Column(): | |
results_df = gr.Dataframe( | |
headers=["repo id", "strength", "weaknesses", "speciality", "relevance rating"], | |
datatype=["str", "str", "str", "str", "str"] | |
) | |
with gr.Column(): | |
gr.Markdown("### Repository Metrics") | |
metrics_plot = gr.Plot(label="Repository Metrics") | |
with gr.Row(): | |
restart_btn = gr.Button("Start New Analysis", variant="primary") | |
export_btn = gr.Button("Export Results", variant="secondary") | |
history_btn = gr.Button("View History", variant="secondary") | |
# History Page | |
with gr.Group(visible=False) as history_page: | |
gr.Markdown("### Analysis History") | |
history_df = gr.Dataframe( | |
headers=["Date", "Repositories", "Keywords", "Results"], | |
datatype=["str", "str", "str", "str"] | |
) | |
back_to_results_btn = gr.Button("Back to Results", variant="primary") | |
# Navigation functions | |
def navigate_to(page: str) -> List[gr.update]: | |
"""Navigate to a specific page.""" | |
updates = [] | |
for p in ["start", "input", "analysis", "chatbot", "results", "help", "comparison", "history"]: | |
updates.append(gr.update(visible=(p == page))) | |
return updates | |
# Event handlers | |
start_btn.click( | |
fn=lambda: navigate_to("input"), | |
inputs=[], | |
outputs=[start_page, input_page, analysis_page, chatbot_page, results_page, help_page, comparison_page, history_page] | |
) | |
help_btn.click( | |
fn=lambda: navigate_to("help"), | |
inputs=[], | |
outputs=[start_page, input_page, analysis_page, chatbot_page, results_page, help_page, comparison_page, history_page] | |
) | |
back_btn.click( | |
fn=lambda: navigate_to("start"), | |
inputs=[], | |
outputs=[start_page, input_page, analysis_page, chatbot_page, results_page, help_page, comparison_page, history_page] | |
) | |
# Modified event handlers with status updates | |
def process_repo_input_with_status(text: str, state: AppState) -> Tuple[pd.DataFrame, gr.update]: | |
"""Process repo input with status update.""" | |
return process_repo_input(text, state), gr.update(value="", visible=False) | |
def keyword_search_with_status(keyword: str, state: AppState) -> Tuple[pd.DataFrame, gr.update]: | |
"""Search keywords with status update.""" | |
return keyword_search_and_update(keyword, state), gr.update(value="", visible=False) | |
def analyze_with_status(state: AppState) -> Tuple[str, str, pd.DataFrame, gr.update]: | |
"""Analyze with status update.""" | |
return *show_combined_repo_and_llm(state), gr.update(value="", visible=False) | |
def send_message_with_status(user_message: str, history: List[Dict[str, str]], state: AppState) -> Tuple[List[Dict[str, str]], str, gr.update]: | |
"""Send message with status update.""" | |
if not user_message: | |
return history, "", gr.update(value="", visible=False) | |
history.append({"role": "user", "content": user_message}) | |
response = chat_with_user(user_message, history, CHATBOT_SYSTEM_PROMPT) | |
history.append({"role": "assistant", "content": response}) | |
return history, "", gr.update(value="", visible=False) | |
def end_chat_with_status(history: List[Dict[str, str]], state: AppState) -> Tuple[List[str], gr.update, gr.update]: | |
"""End chat and extract keywords.""" | |
if not history: | |
return [], gr.update(visible=True), gr.update(value="", visible=False) | |
keywords = extract_keywords_from_conversation(history) | |
state.generated_keywords = keywords | |
return keywords, gr.update(visible=True), gr.update(value="", visible=False) | |
def export_with_status(df: pd.DataFrame) -> Tuple[str, gr.update]: | |
"""Export with status update.""" | |
return export_results(df), gr.update(value="", visible=False) | |
# Update event handlers with status updates | |
submit_btn.click( | |
fn=lambda: gr.update(value="Processing...", visible=True), | |
inputs=[], | |
outputs=[submit_status] | |
).then( | |
fn=process_repo_input_with_status, | |
inputs=[repo_id_input, state], | |
outputs=[df_output, submit_status] | |
) | |
search_btn.click( | |
fn=lambda: gr.update(value="Searching...", visible=True), | |
inputs=[], | |
outputs=[search_status] | |
).then( | |
fn=keyword_search_with_status, | |
inputs=[keyword_input, state], | |
outputs=[df_output, search_status] | |
) | |
next_btn.click( | |
fn=lambda: gr.update(value="Analyzing...", visible=True), | |
inputs=[], | |
outputs=[next_status] | |
).then( | |
fn=analyze_with_status, | |
inputs=[state], | |
outputs=[content_output, summary_output, df_output, next_status] | |
) | |
send_btn.click( | |
fn=lambda: gr.update(value="Sending...", visible=True), | |
inputs=[], | |
outputs=[send_status] | |
).then( | |
fn=send_message_with_status, | |
inputs=[msg, chatbot, state], | |
outputs=[chatbot, msg, send_status] | |
) | |
end_chat_btn.click( | |
fn=lambda: gr.update(value="Processing...", visible=True), | |
inputs=[], | |
outputs=[end_chat_status] | |
).then( | |
fn=end_chat_with_status, | |
inputs=[chatbot, state], | |
outputs=[gr.Textbox(label="Extracted Keywords"), results_page, end_chat_status] | |
) | |
export_btn.click( | |
fn=lambda: gr.update(value="Exporting...", visible=True), | |
inputs=[], | |
outputs=[export_status] | |
).then( | |
fn=export_with_status, | |
inputs=[results_df], | |
outputs=[gr.Textbox(label="Export Status"), export_status] | |
) | |
restart_btn.click( | |
fn=lambda: (state.reset(), navigate_to("start")), | |
inputs=[state], | |
outputs=[start_page, input_page, analysis_page, chatbot_page, results_page] | |
) | |
def update_progress(current: int, total: int) -> float: | |
"""Update progress bar.""" | |
return (current / total) * 100 | |
def export_results(df: pd.DataFrame) -> str: | |
"""Export results to CSV.""" | |
try: | |
filename = f"analysis_results_{pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
df.to_csv(filename, index=False) | |
return f"Results exported to {filename}" | |
except Exception as e: | |
return f"Error exporting results: {e}" | |
def load_history() -> pd.DataFrame: | |
"""Load analysis history.""" | |
try: | |
return pd.read_csv("analysis_history.csv") | |
except: | |
return pd.DataFrame(columns=["Date", "Repositories", "Keywords", "Results"]) | |
def save_to_history(repos: List[str], keywords: List[str], results: pd.DataFrame) -> None: | |
"""Save current analysis to history.""" | |
try: | |
history_df = load_history() | |
new_row = { | |
"Date": pd.Timestamp.now().strftime("%Y-%m-%d %H:%M:%S"), | |
"Repositories": ", ".join(repos), | |
"Keywords": ", ".join(keywords), | |
"Results": results.to_json() | |
} | |
history_df = pd.concat([history_df, pd.DataFrame([new_row])], ignore_index=True) | |
history_df.to_csv("analysis_history.csv", index=False) | |
except Exception as e: | |
logger.error(f"Error saving to history: {e}") | |
# Add new event handlers for new features | |
history_btn.click( | |
fn=lambda: (load_history(), navigate_to("history")), | |
inputs=[], | |
outputs=[history_df, start_page, input_page, analysis_page, chatbot_page, results_page, help_page, comparison_page, history_page] | |
) | |
back_to_results_btn.click( | |
fn=lambda: navigate_to("results"), | |
inputs=[], | |
outputs=[start_page, input_page, analysis_page, chatbot_page, results_page, help_page, comparison_page, history_page] | |
) | |
compare_btn.click( | |
fn=lambda: (update_repo_choices(state), navigate_to("comparison")), | |
inputs=[state], | |
outputs=[repo1_select, repo2_select, start_page, input_page, analysis_page, chatbot_page, results_page, help_page, comparison_page, history_page] | |
) | |
back_to_analysis_btn.click( | |
fn=lambda: navigate_to("analysis"), | |
inputs=[], | |
outputs=[start_page, input_page, analysis_page, chatbot_page, results_page, help_page, comparison_page, history_page] | |
) | |
return app | |
def update_repo_choices(state: AppState) -> Tuple[List[str], List[str]]: | |
"""Update repository choices for comparison.""" | |
choices = state.repo_ids | |
return choices, choices | |
if __name__ == "__main__": | |
app = create_ui() | |
app.launch() | |