Spaces:
Sleeping
Sleeping
import requests | |
import time | |
import os | |
from urllib.parse import urlparse | |
from treelib import Tree | |
from typing import Dict, List, Optional, Tuple | |
import logging | |
from dataclasses import dataclass | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from groq import Groq, GroqError | |
import gradio as gr | |
from tqdm.auto import tqdm | |
# --- Basic Configuration --- | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# --- Data Structures --- | |
class FileInfo: | |
"""Data class to store file information""" | |
path: str | |
name: str | |
content: str | |
explanation: str | |
size: int | |
file_type: str | |
# --- Core Application Logic --- | |
class GitHubRepositoryAnalyzer: | |
""" | |
A class to analyze GitHub repositories by fetching file structures, | |
downloading content, and using an LLM to explain the code. | |
""" | |
def __init__(self, github_token: Optional[str] = None, groq_api_key: Optional[str] = None): | |
self.github_token = github_token | |
self.session = requests.Session() | |
self.file_contents: Dict[str, FileInfo] = {} | |
# Configure GitHub API access | |
if self.github_token: | |
logger.info("Using provided GitHub token for higher rate limits.") | |
self.session.headers.update({'Authorization': f'token {self.github_token}'}) | |
# Authenticated GitHub API: 5000 requests/hour | |
self.rate_limiter = RateLimiter(max_calls=4500, time_window=3600) | |
else: | |
logger.warning("No GitHub token. Using unauthenticated access with lower rate limits.") | |
# Unauthenticated: 60 requests/hour | |
self.rate_limiter = RateLimiter(max_calls=50, time_window=3600) | |
# Configure Groq client | |
if groq_api_key: | |
self.groq_client = Groq(api_key=groq_api_key) | |
logger.info("Groq client initialized.") | |
else: | |
self.groq_client = None | |
logger.warning("Groq API key not provided. Code analysis will be skipped.") | |
# File types to analyze | |
self.analyzable_extensions = { | |
'.py', '.js', '.ts', '.java', '.cpp', '.c', '.h', '.cs', '.php', | |
'.rb', '.go', '.rs', '.swift', '.kt', '.scala', '.sh', '.bash', | |
'.sql', '.html', '.css', '.scss', '.less', '.json', '.xml', '.yaml', '.yml', | |
'.md', '.txt', '.cfg', '.conf', '.ini', '.env', '.dockerfile', 'requirements.txt' | |
} | |
def extract_repo_info(self, repo_url: str) -> Tuple[str, str]: | |
"""Extract owner and repository name from a GitHub URL.""" | |
try: | |
parsed_url = urlparse(repo_url) | |
path = parsed_url.path.strip('/').replace('.git', '') | |
parts = path.split('/') | |
if len(parts) >= 2: | |
return parts[0], parts[1] | |
raise ValueError("Invalid repository URL format") | |
except Exception as e: | |
logger.error(f"Error parsing repository URL: {e}") | |
raise | |
def get_repository_structure(self, owner: str, repo: str, path: str = "") -> List[Dict]: | |
"""Recursively fetch the entire file structure of the repository.""" | |
all_files = [] | |
try: | |
contents = self._fetch_contents(owner, repo, path) | |
for item in contents: | |
if item['type'] == 'dir': | |
all_files.extend(self.get_repository_structure(owner, repo, item['path'])) | |
else: | |
all_files.append(item) | |
except Exception as e: | |
logger.error(f"Failed to get repository structure for {path}: {e}") | |
return all_files | |
def _fetch_contents(self, owner: str, repo: str, path: str) -> List[Dict]: | |
"""Helper to fetch contents of a specific directory with pagination.""" | |
url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}" | |
items = [] | |
while url: | |
self.rate_limiter.wait_if_needed() | |
response = self.session.get(url) | |
response.raise_for_status() | |
items.extend(response.json()) | |
url = response.links.get('next', {}).get('url') | |
return items | |
def map_reduce_analysis(self, owner: str, repo: str, files: List[Dict], progress: gr.Progress): | |
""" | |
Analyzes files in parallel (Map phase) and aggregates results (Reduce phase). | |
This method uses a ThreadPoolExecutor to perform I/O-bound tasks concurrently. | |
""" | |
# --- MAP PHASE --- | |
# Each file is processed independently in a separate thread. | |
# This is efficient for tasks that wait for network responses (API calls). | |
logger.info(f"Starting Map phase: Analyzing {len(files)} files in parallel.") | |
with ThreadPoolExecutor(max_workers=10) as executor: | |
future_to_file = {executor.submit(self._process_single_file, owner, repo, file_item): file_item for file_item in files} | |
# tqdm progress tracking integrated with Gradio | |
pbar = tqdm(as_completed(future_to_file), total=len(files), desc="Analyzing Files") | |
for future in pbar: | |
try: | |
file_info = future.result() | |
if file_info: | |
# Store the result of the map phase | |
self.file_contents[file_info.path] = file_info | |
pbar.set_description(f"Analyzed {file_info.name}") | |
# Update Gradio progress bar | |
progress(pbar.n / pbar.total, desc=pbar.desc) | |
except Exception as e: | |
file_item = future_to_file[future] | |
logger.error(f"Error processing {file_item['path']}: {e}") | |
# --- REDUCE PHASE --- | |
# The reduce phase is the aggregation and structuring of the mapped results, | |
# which happens after the loop when creating the tree and summary. | |
logger.info("Reduce phase: Aggregating results.") | |
tree = self._create_directory_tree(owner, repo) | |
details = self._format_detailed_explanations() | |
summary = self._format_summary(owner, repo) | |
return tree, details, summary | |
def _process_single_file(self, owner: str, repo: str, file_item: Dict) -> Optional[FileInfo]: | |
"""Processes a single file: download, check, and analyze.""" | |
file_path = file_item['path'] | |
file_size = file_item.get('size', 0) | |
if not self._should_analyze_file(file_path, file_size): | |
return None | |
content = self._get_raw_file(owner, repo, file_path) | |
if content is None: | |
return None | |
explanation = self._analyze_code_with_llm(content, file_path) | |
return FileInfo( | |
path=file_path, name=file_item['name'], content=content, | |
explanation=explanation, size=file_size, file_type=os.path.splitext(file_path)[1] | |
) | |
def _should_analyze_file(self, file_path: str, file_size: int) -> bool: | |
"""Determine if a file should be analyzed based on extension and size.""" | |
if file_size > 1024 * 1024: return False # Skip files > 1MB | |
file_name = os.path.basename(file_path) | |
_, file_ext = os.path.splitext(file_name) | |
return file_ext.lower() in self.analyzable_extensions or file_name in self.analyzable_extensions | |
def _get_raw_file(self, owner: str, repo: str, file_path: str) -> Optional[str]: | |
"""Fetch raw file content with fallback branches.""" | |
for branch in ['main', 'master']: | |
url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{file_path}" | |
try: | |
response = self.session.get(url, timeout=10) | |
if response.status_code == 200: | |
# Simple check for binary content | |
return response.text if '\x00' not in response.text else None | |
except (requests.RequestException, UnicodeDecodeError) as e: | |
logger.warning(f"Could not fetch or decode {file_path} from branch {branch}: {e}") | |
return None | |
def _analyze_code_with_llm(self, code: str, file_path: str) -> str: | |
"""Analyze code using Groq LLM API.""" | |
if not self.groq_client: | |
return "Analysis skipped: Groq API key not provided." | |
max_code_length = 8000 | |
if len(code) > max_code_length: code = code[:max_code_length] + "\n... (truncated)" | |
prompt = f"""Analyze the following code from file '{file_path}'. Provide a concise explanation of its functionality, purpose, and key components. | |
``` | |
{code} | |
``` | |
Structure your analysis with these points: | |
1. **Main Purpose**: What is the primary goal of this file? | |
2. **Key Functions/Classes**: What are the main components and what do they do? | |
3. **Overall Role**: How does this file fit into the larger project? | |
""" | |
try: | |
chat_completion = self.groq_client.chat.completions.create( | |
messages=[{"role": "user", "content": prompt}], | |
model="llama3-8b-8192", | |
temperature=0.2, max_tokens=1024 | |
) | |
return chat_completion.choices[0].message.content.strip() | |
except GroqError as e: | |
logger.error(f"Groq API error for {file_path}: {e}") | |
return f"Error: Groq API request failed - {e.message}" | |
except Exception as e: | |
logger.error(f"Error calling Groq API for {file_path}: {e}") | |
return f"Error: {e}" | |
def _create_directory_tree(self, owner: str, repo: str) -> str: | |
"""Creates a string representation of the directory tree.""" | |
tree = Tree() | |
root_id = f"{owner}/{repo}" | |
tree.create_node(f"π³ {root_id}", root_id) | |
created_nodes = {root_id} | |
for file_path in sorted(self.file_contents.keys()): | |
path_parts = file_path.split('/') | |
parent_id = root_id | |
for i, part in enumerate(path_parts[:-1]): | |
node_id = f"{root_id}/{'/'.join(path_parts[:i+1])}" | |
if node_id not in created_nodes: | |
tree.create_node(f"π {part}", node_id, parent=parent_id) | |
created_nodes.add(node_id) | |
parent_id = node_id | |
file_name = path_parts[-1] | |
file_type = self.file_contents[file_path].file_type | |
emoji = self.get_file_emoji(file_type) | |
tree.create_node(f"{emoji} {file_name}", f"{root_id}/{file_path}", parent=parent_id) | |
return f"```\n{tree.show(line_type='ascii-ex')}\n```" | |
def _format_detailed_explanations(self) -> str: | |
"""Formats all file explanations into a single Markdown string.""" | |
if not self.file_contents: return "No files were analyzed." | |
output = [] | |
for path, info in sorted(self.file_contents.items()): | |
output.append(f"### π `{path}`") | |
output.append(f"**Size**: {info.size:,} bytes") | |
output.append("---") | |
output.append(info.explanation) | |
output.append("\n---\n") | |
return "\n".join(output) | |
def _format_summary(self, owner: str, repo: str) -> str: | |
"""Creates a summary of the analysis.""" | |
total_files = len(self.file_contents) | |
total_size = sum(info.size for info in self.file_contents.values()) | |
return ( | |
f"## Analysis Summary for `{owner}/{repo}`\n" | |
f"- **Total Files Analyzed**: {total_files}\n" | |
f"- **Total Code Size Analyzed**: {total_size:,} bytes" | |
) | |
def get_file_emoji(file_type: str) -> str: | |
"""Returns an emoji for a given file type.""" | |
emoji_map = { | |
'.py': 'π', '.js': 'π¨', '.ts': 'π·', '.java': 'β', '.html': 'π', | |
'.css': 'π¨', '.json': 'π', '.md': 'π', '.sh': 'π', '.yml': 'βοΈ', | |
'.yaml': 'βοΈ', '.dockerfile': 'π³', '.sql': 'ποΈ', 'requirements.txt': 'π¦' | |
} | |
return emoji_map.get(file_type.lower(), 'π') | |
class RateLimiter: | |
"""Simple rate limiter to avoid exceeding API limits.""" | |
def __init__(self, max_calls: int, time_window: int): | |
self.max_calls = max_calls | |
self.time_window = time_window | |
self.calls = [] | |
def wait_if_needed(self): | |
now = time.time() | |
self.calls = [t for t in self.calls if now - t < self.time_window] | |
if len(self.calls) >= self.max_calls: | |
sleep_time = self.time_window - (now - self.calls[0]) | |
if sleep_time > 0: | |
logger.info(f"Rate limit reached. Sleeping for {sleep_time:.2f} seconds.") | |
time.sleep(sleep_time) | |
self.calls.append(time.time()) | |
# --- Gradio Interface --- | |
def analyze_repo_gradio(repo_url: str, github_token: str, groq_key: str, progress=gr.Progress(track_tqdm=True)): | |
"""The main function executed by the Gradio interface.""" | |
if not repo_url: | |
return "Please enter a GitHub repository URL.", "", "" | |
if not groq_key: | |
return "Please enter your Groq API Key.", "", "" | |
try: | |
analyzer = GitHubRepositoryAnalyzer(github_token=github_token, groq_api_key=groq_key) | |
progress(0, desc="Extracting repo info...") | |
owner, repo = analyzer.extract_repo_info(repo_url) | |
progress(0.1, desc="Fetching repository file structure...") | |
all_files = analyzer.get_repository_structure(owner, repo) | |
if not all_files: | |
return "Could not retrieve repository structure. Check URL or token.", "", "" | |
tree, details, summary = analyzer.map_reduce_analysis(owner, repo, all_files, progress) | |
return tree, details, summary | |
except Exception as e: | |
logger.error(f"A critical error occurred: {e}", exc_info=True) | |
return f"An error occurred: {e}", "", "" | |
def create_gradio_interface(): | |
"""Builds and returns the Gradio web interface.""" | |
with gr.Blocks(theme=gr.themes.Soft(), title="GitHub Repo Analyzer") as demo: | |
gr.Markdown("# π€ AI-Powered GitHub Repository Analyzer") | |
gr.Markdown("Enter a GitHub repository URL to generate a file tree, get AI-powered explanations for each file, and see a summary.") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
repo_url = gr.Textbox(label="GitHub Repository URL", placeholder="e.g., https://github.com/google/generative-ai-python") | |
with gr.Accordion("API Keys (Optional but Recommended)", open=False): | |
github_token = gr.Textbox(label="GitHub Token", placeholder="Enter your GitHub token for a higher rate limit", type="password") | |
groq_key = gr.Textbox(label="Groq API Key", placeholder="Enter your Groq API key for code analysis", type="password") | |
analyze_btn = gr.Button("Analyze Repository", variant="primary") | |
with gr.Tabs(): | |
with gr.TabItem("π Summary"): | |
summary_output = gr.Markdown() | |
with gr.TabItem("π³ File Tree"): | |
tree_output = gr.Markdown() | |
with gr.TabItem("π Detailed Analysis"): | |
details_output = gr.Markdown() | |
analyze_btn.click( | |
fn=analyze_repo_gradio, | |
inputs=[repo_url, github_token, groq_key], | |
outputs=[tree_output, details_output, summary_output] | |
) | |
return demo | |
if __name__ == "__main__": | |
app = create_gradio_interface() | |
app.launch(debug=True) |