|
import gradio as gr |
|
import regex as re |
|
import csv |
|
import pandas as pd |
|
from typing import List, Dict, Tuple, Any |
|
import logging |
|
import os |
|
|
|
|
|
from analyzer import combine_repo_files_for_llm, analyze_combined_file, parse_llm_json_response |
|
from hf_utils import download_space_repo, search_top_spaces |
|
from chatbot_page import chat_with_user, extract_keywords_from_conversation |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
CSV_FILE = "repo_ids.csv" |
|
CHATBOT_SYSTEM_PROMPT = ( |
|
"You are a helpful assistant. Your goal is to help the user describe their ideal open-source repo. " |
|
"Ask questions to clarify what they want, their use case, preferred language, features, etc. " |
|
"When the user clicks 'End Chat', analyze the conversation and return about 5 keywords for repo search. " |
|
"Return only the keywords as a comma-separated list." |
|
) |
|
CHATBOT_INITIAL_MESSAGE = "Hello! Please tell me about your ideal Hugging Face repo. What use case, preferred language, or features are you looking for?" |
|
|
|
|
|
|
|
def write_repos_to_csv(repo_ids: List[str]) -> None: |
|
"""Writes a list of repo IDs to the CSV file, overwriting the previous content.""" |
|
try: |
|
with open(CSV_FILE, mode="w", newline='', encoding="utf-8") as csvfile: |
|
writer = csv.writer(csvfile) |
|
writer.writerow(["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) |
|
for repo_id in repo_ids: |
|
writer.writerow([repo_id, "", "", "", ""]) |
|
logger.info(f"Wrote {len(repo_ids)} repo IDs to {CSV_FILE}") |
|
except Exception as e: |
|
logger.error(f"Error writing to CSV: {e}") |
|
|
|
def read_csv_to_dataframe() -> pd.DataFrame: |
|
"""Reads the CSV file into a pandas DataFrame.""" |
|
try: |
|
return pd.read_csv(CSV_FILE, dtype=str).fillna('') |
|
except FileNotFoundError: |
|
return pd.DataFrame(columns=["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) |
|
except Exception as e: |
|
logger.error(f"Error reading CSV: {e}") |
|
return pd.DataFrame() |
|
|
|
def analyze_and_update_single_repo(repo_id: str) -> Tuple[str, str, pd.DataFrame]: |
|
""" |
|
Downloads, analyzes a single repo, updates the CSV, and returns results. |
|
This function combines the logic of downloading, analyzing, and updating the CSV for one repo. |
|
""" |
|
try: |
|
logger.info(f"Starting analysis for repo: {repo_id}") |
|
download_space_repo(repo_id, local_dir="repo_files") |
|
txt_path = combine_repo_files_for_llm() |
|
|
|
with open(txt_path, "r", encoding="utf-8") as f: |
|
combined_content = f.read() |
|
|
|
llm_output = analyze_combined_file(txt_path) |
|
|
|
last_start = llm_output.rfind('{') |
|
last_end = llm_output.rfind('}') |
|
final_json_str = llm_output[last_start:last_end+1] if last_start != -1 and last_end != -1 else "{}" |
|
|
|
llm_json = parse_llm_json_response(final_json_str) |
|
|
|
summary = "" |
|
if isinstance(llm_json, dict) and "error" not in llm_json: |
|
strengths = llm_json.get("strength", "N/A") |
|
weaknesses = llm_json.get("weaknesses", "N/A") |
|
summary = f"JSON extraction: SUCCESS\n\nStrengths:\n{strengths}\n\nWeaknesses:\n{weaknesses}" |
|
else: |
|
summary = f"JSON extraction: FAILED\nRaw response might not be valid JSON." |
|
|
|
|
|
df = read_csv_to_dataframe() |
|
repo_found_in_df = False |
|
for idx, row in df.iterrows(): |
|
if row["repo id"] == repo_id: |
|
if isinstance(llm_json, dict): |
|
df.at[idx, "strength"] = llm_json.get("strength", "") |
|
df.at[idx, "weaknesses"] = llm_json.get("weaknesses", "") |
|
df.at[idx, "speciality"] = llm_json.get("speciality", "") |
|
df.at[idx, "relevance rating"] = llm_json.get("relevance rating", "") |
|
repo_found_in_df = True |
|
break |
|
|
|
if not repo_found_in_df: |
|
logger.warning(f"Repo ID {repo_id} not found in CSV for updating.") |
|
|
|
df.to_csv(CSV_FILE, index=False) |
|
logger.info(f"Successfully analyzed and updated CSV for {repo_id}") |
|
return combined_content, summary, df |
|
|
|
except Exception as e: |
|
logger.error(f"An error occurred during analysis of {repo_id}: {e}") |
|
error_summary = f"Error analyzing repo: {e}" |
|
return "", error_summary, read_csv_to_dataframe() |
|
|
|
|
|
def convert_messages_to_tuples(history: List[Dict[str, str]]) -> List[Tuple[str, str]]: |
|
"""Converts Gradio's 'messages' format to the old 'tuple' format for compatibility.""" |
|
tuple_history = [] |
|
|
|
for i in range(0, len(history), 2): |
|
if i + 1 < len(history) and history[i]['role'] == 'user' and history[i+1]['role'] == 'assistant': |
|
tuple_history.append((history[i]['content'], history[i+1]['content'])) |
|
return tuple_history |
|
|
|
|
|
|
|
def create_ui() -> gr.Blocks: |
|
"""Creates and configures the entire Gradio interface.""" |
|
|
|
with gr.Blocks(theme=gr.themes.Soft(), title="Hugging Face Repo Analyzer") as app: |
|
|
|
|
|
|
|
repo_ids_state = gr.State([]) |
|
current_repo_idx_state = gr.State(0) |
|
|
|
gr.Markdown("# Hugging Face Repository Analyzer") |
|
|
|
with gr.Tabs() as tabs: |
|
|
|
with gr.TabItem("1. Input Repositories", id="input_tab"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown("## Enter Repository IDs") |
|
repo_id_input = gr.Textbox( |
|
label="Enter repo IDs (comma or newline separated)", |
|
lines=8, |
|
placeholder="org/repo1, org/repo2" |
|
) |
|
submit_repo_btn = gr.Button("Submit Repository IDs", variant="primary") |
|
with gr.Column(): |
|
gr.Markdown("## Or Search by Keywords") |
|
keyword_input = gr.Textbox( |
|
label="Enter keywords to search", |
|
lines=8, |
|
placeholder="e.g., text generation, image classification" |
|
) |
|
search_btn = gr.Button("Search by Keywords", variant="primary") |
|
|
|
status_box_input = gr.Textbox(label="Status", interactive=False) |
|
|
|
|
|
with gr.TabItem("2. Analyze Repositories", id="analysis_tab"): |
|
gr.Markdown("## Repository Analysis") |
|
analyze_next_btn = gr.Button("Analyze Next Repository", variant="primary") |
|
status_box_analysis = gr.Textbox(label="Status", interactive=False) |
|
|
|
with gr.Row(): |
|
content_output = gr.Textbox(label="Repository Content", lines=20) |
|
summary_output = gr.Textbox(label="Analysis Summary", lines=20) |
|
|
|
gr.Markdown("### Analysis Results Table") |
|
df_output = gr.Dataframe(headers=["repo id", "strength", "weaknesses", "speciality", "relevance rating"]) |
|
|
|
|
|
with gr.TabItem("3. Find Repos with AI", id="chatbot_tab"): |
|
gr.Markdown("## Chat with an Assistant to Find Repositories") |
|
chatbot = gr.Chatbot( |
|
value=[{"role": "assistant", "content": CHATBOT_INITIAL_MESSAGE}], |
|
label="Chat with Assistant", |
|
height=400, |
|
type="messages" |
|
) |
|
msg_input = gr.Textbox(label="Your Message", placeholder="Type your message here...", lines=2) |
|
with gr.Row(): |
|
send_btn = gr.Button("Send", variant="primary") |
|
end_chat_btn = gr.Button("End Chat & Get Keywords") |
|
|
|
gr.Markdown("### Extracted Keywords") |
|
extracted_keywords_output = gr.Textbox(label="Keywords", interactive=False) |
|
use_keywords_btn = gr.Button("Use These Keywords to Search", variant="primary") |
|
status_box_chatbot = gr.Textbox(label="Status", interactive=False) |
|
|
|
|
|
|
|
def handle_repo_id_submission(text: str) -> Tuple[List[str], int, pd.DataFrame, str, Any]: |
|
"""Processes submitted repo IDs, updates state, and prepares for analysis.""" |
|
if not text: |
|
return [], 0, pd.DataFrame(), "Status: Please enter repository IDs.", gr.update(selected="input_tab") |
|
|
|
repo_ids = list(dict.fromkeys([repo.strip() for repo in re.split(r'[\n,]+', text) if repo.strip()])) |
|
write_repos_to_csv(repo_ids) |
|
df = read_csv_to_dataframe() |
|
status = f"Status: {len(repo_ids)} repositories submitted. Ready for analysis." |
|
return repo_ids, 0, df, status, gr.update(selected="analysis_tab") |
|
|
|
def handle_keyword_search(keywords: str) -> Tuple[List[str], int, pd.DataFrame, str, Any]: |
|
"""Processes submitted keywords, finds repos, updates state, and prepares for analysis.""" |
|
if not keywords: |
|
return [], 0, pd.DataFrame(), "Status: Please enter keywords.", gr.update(selected="input_tab") |
|
|
|
keyword_list = [k.strip() for k in re.split(r'[\n,]+', keywords) if k.strip()] |
|
repo_ids = [] |
|
for kw in keyword_list: |
|
repo_ids.extend(search_top_spaces(kw, limit=5)) |
|
|
|
unique_repo_ids = list(dict.fromkeys(repo_ids)) |
|
write_repos_to_csv(unique_repo_ids) |
|
df = read_csv_to_dataframe() |
|
status = f"Status: Found {len(unique_repo_ids)} repositories. Ready for analysis." |
|
return unique_repo_ids, 0, df, status, gr.update(selected="analysis_tab") |
|
|
|
def handle_analyze_next(repo_ids: List[str], current_idx: int) -> Tuple[str, str, pd.DataFrame, int, str]: |
|
"""Analyzes the next repository in the list.""" |
|
if not repo_ids: |
|
return "", "", pd.DataFrame(), 0, "Status: No repositories to analyze. Please submit repo IDs first." |
|
if current_idx >= len(repo_ids): |
|
return "", "", read_csv_to_dataframe(), current_idx, "Status: All repositories have been analyzed." |
|
|
|
repo_id_to_analyze = repo_ids[current_idx] |
|
status = f"Status: Analyzing repository {current_idx + 1}/{len(repo_ids)}: {repo_id_to_analyze}" |
|
|
|
content, summary, df = analyze_and_update_single_repo(repo_id_to_analyze) |
|
|
|
next_idx = current_idx + 1 |
|
if next_idx >= len(repo_ids): |
|
status += "\n\nFinished all analyses." |
|
|
|
return content, summary, df, next_idx, status |
|
|
|
def handle_user_message(user_message: str, history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str]], str]: |
|
"""Appends the user's message to the history, preparing for the bot's response.""" |
|
if user_message: |
|
history.append({"role": "user", "content": user_message}) |
|
return history, "" |
|
|
|
def handle_bot_response(history: List[Dict[str, str]]) -> List[Dict[str, str]]: |
|
"""Generates and appends the bot's response using the compatible history format.""" |
|
if not history or history[-1]["role"] != "user": |
|
return history |
|
|
|
user_message = history[-1]["content"] |
|
|
|
tuple_history_for_api = convert_messages_to_tuples(history[:-1]) |
|
|
|
response = chat_with_user(user_message, tuple_history_for_api) |
|
history.append({"role": "assistant", "content": response}) |
|
return history |
|
|
|
def handle_end_chat(history: List[Dict[str, str]]) -> Tuple[str, str]: |
|
"""Ends the chat, extracts and sanitizes keywords from the conversation.""" |
|
if not history: |
|
return "", "Status: Chat is empty, nothing to analyze." |
|
|
|
|
|
tuple_history = convert_messages_to_tuples(history) |
|
if not tuple_history: |
|
return "", "Status: No completed conversations to analyze." |
|
|
|
|
|
raw_keywords_str = extract_keywords_from_conversation(tuple_history) |
|
|
|
|
|
|
|
cleaned_keywords = re.findall(r'[\w\s-]+', raw_keywords_str) |
|
|
|
|
|
cleaned_keywords = [kw.strip() for kw in cleaned_keywords if kw.strip()] |
|
|
|
if not cleaned_keywords: |
|
return "", f"Status: Could not extract valid keywords. Raw LLM output: '{raw_keywords_str}'" |
|
|
|
|
|
final_keywords_str = ", ".join(cleaned_keywords) |
|
|
|
status = "Status: Keywords extracted. You can now use them to search." |
|
return final_keywords_str, status |
|
|
|
|
|
|
|
|
|
submit_repo_btn.click( |
|
fn=handle_repo_id_submission, |
|
inputs=[repo_id_input], |
|
outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs] |
|
) |
|
search_btn.click( |
|
fn=handle_keyword_search, |
|
inputs=[keyword_input], |
|
outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs] |
|
) |
|
|
|
|
|
analyze_next_btn.click( |
|
fn=handle_analyze_next, |
|
inputs=[repo_ids_state, current_repo_idx_state], |
|
outputs=[content_output, summary_output, df_output, current_repo_idx_state, status_box_analysis] |
|
) |
|
|
|
|
|
msg_input.submit( |
|
fn=handle_user_message, |
|
inputs=[msg_input, chatbot], |
|
outputs=[chatbot, msg_input] |
|
).then( |
|
fn=handle_bot_response, |
|
inputs=[chatbot], |
|
outputs=[chatbot] |
|
) |
|
send_btn.click( |
|
fn=handle_user_message, |
|
inputs=[msg_input, chatbot], |
|
outputs=[chatbot, msg_input] |
|
).then( |
|
fn=handle_bot_response, |
|
inputs=[chatbot], |
|
outputs=[chatbot] |
|
) |
|
end_chat_btn.click( |
|
fn=handle_end_chat, |
|
inputs=[chatbot], |
|
outputs=[extracted_keywords_output, status_box_chatbot] |
|
) |
|
use_keywords_btn.click( |
|
fn=handle_keyword_search, |
|
inputs=[extracted_keywords_output], |
|
outputs=[repo_ids_state, current_repo_idx_state, df_output, status_box_analysis, tabs] |
|
) |
|
|
|
return app |
|
|
|
if __name__ == "__main__": |
|
app = create_ui() |
|
app.launch(debug=True) |
|
|