Spaces:
Sleeping
Sleeping
| import openai | |
| import os | |
| import json | |
| import re | |
| import logging | |
| from typing import Tuple | |
| from hf_utils import download_filtered_space_files | |
| # Setup logger | |
| logger = logging.getLogger(__name__) | |
| def analyze_code(code: str) -> str: | |
| """ | |
| Uses qwen2.5-coder-7b-instruct-awq model to analyze the given code. | |
| Returns the analysis as a string. | |
| """ | |
| from openai import OpenAI | |
| client = OpenAI(api_key=os.getenv("modal_api")) | |
| client.base_url = os.getenv("base_url") | |
| system_prompt = ( | |
| "You are a highly precise and strict JSON generator. Analyze the code given to you. " | |
| "Your ONLY output must be a valid JSON object with the following keys: 'strength', 'weaknesses', 'speciality', 'relevance rating'. " | |
| "For 'relevance rating', you MUST use ONLY one of these exact values: 'very low', 'low', 'high', 'very high'. " | |
| "Do NOT include any explanation, markdown, or text outside the JSON. Do NOT add any commentary, preamble, or postscript. " | |
| "If you cannot answer, still return a valid JSON with empty strings for each key. " | |
| "Example of the ONLY valid output:\n" | |
| "{\n 'strength': '...', \n 'weaknesses': '...', \n 'speciality': '...', \n 'relevance rating': 'high'\n}" | |
| ) | |
| response = client.chat.completions.create( | |
| model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", # Updated model | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": code} | |
| ], | |
| max_tokens=512, | |
| temperature=0.4 | |
| ) | |
| return response.choices[0].message.content | |
| def parse_llm_json_response(response: str): | |
| """ | |
| Robust JSON parser with multiple fallback strategies for LLM responses. | |
| """ | |
| logger.info(f"Attempting to parse LLM response: {response[:100]}...") | |
| # Strategy 1: Try direct JSON parsing (cleanest case) | |
| try: | |
| # Clean the response first | |
| cleaned = response.strip() | |
| result = json.loads(cleaned) | |
| logger.info("β Direct JSON parsing successful") | |
| return result | |
| except: | |
| pass | |
| # Strategy 2: Extract JSON block from response | |
| try: | |
| # Find the first complete JSON object | |
| start = response.find('{') | |
| if start == -1: | |
| raise ValueError("No opening brace found") | |
| # Find matching closing brace | |
| brace_count = 0 | |
| end = start | |
| for i, char in enumerate(response[start:], start): | |
| if char == '{': | |
| brace_count += 1 | |
| elif char == '}': | |
| brace_count -= 1 | |
| if brace_count == 0: | |
| end = i | |
| break | |
| if brace_count != 0: | |
| # Fallback to last closing brace | |
| end = response.rfind('}') | |
| if end == -1 or end < start: | |
| raise ValueError("No matching closing brace found") | |
| json_str = response[start:end+1] | |
| result = json.loads(json_str) | |
| logger.info("β JSON block extraction successful") | |
| return result | |
| except Exception as e: | |
| logger.warning(f"JSON block extraction failed: {e}") | |
| # Strategy 3: Clean and fix common JSON issues | |
| try: | |
| # Extract JSON part | |
| start = response.find('{') | |
| end = response.rfind('}') | |
| if start != -1 and end != -1 and end > start: | |
| json_str = response[start:end+1] | |
| # Fix common issues | |
| # Replace single quotes with double quotes (but be careful with contractions) | |
| json_str = re.sub(r"(?<!\\)'([^']*)'(?=\s*[,}])", r'"\1"', json_str) | |
| json_str = re.sub(r"(?<!\\)'([^']*)'(?=\s*:)", r'"\1"', json_str) | |
| # Fix unescaped quotes in values | |
| json_str = re.sub(r':\s*"([^"]*)"([^",}]*)"', r': "\1\2"', json_str) | |
| # Remove trailing commas | |
| json_str = re.sub(r',(\s*[}\]])', r'\1', json_str) | |
| # Try parsing the cleaned version | |
| result = json.loads(json_str) | |
| logger.info("β JSON cleaning and fixing successful") | |
| return result | |
| except Exception as e: | |
| logger.warning(f"JSON cleaning failed: {e}") | |
| # Strategy 4: Manual field extraction as last resort | |
| try: | |
| logger.info("Attempting manual field extraction...") | |
| result = {} | |
| # Extract each field using regex patterns | |
| patterns = { | |
| 'strength': [ | |
| r'"strength"\s*:\s*"([^"]*)"', | |
| r"'strength'\s*:\s*'([^']*)'", | |
| r'strength[:\s]+"([^"]*)"', | |
| r'strength[:\s]+\'([^\']*)\'' | |
| ], | |
| 'weaknesses': [ | |
| r'"weaknesses"\s*:\s*"([^"]*)"', | |
| r"'weaknesses'\s*:\s*'([^']*)'", | |
| r'weaknesses[:\s]+"([^"]*)"', | |
| r'weaknesses[:\s]+\'([^\']*)\'' | |
| ], | |
| 'speciality': [ | |
| r'"speciality"\s*:\s*"([^"]*)"', | |
| r"'speciality'\s*:\s*'([^']*)'", | |
| r'speciality[:\s]+"([^"]*)"', | |
| r'speciality[:\s]+\'([^\']*)\'' | |
| ], | |
| 'relevance rating': [ | |
| r'"relevance rating"\s*:\s*"([^"]*)"', | |
| r"'relevance rating'\s*:\s*'([^']*)'", | |
| r'relevance[^:]*rating[:\s]+"([^"]*)"', | |
| r'relevance[^:]*rating[:\s]+\'([^\']*)\'' | |
| ] | |
| } | |
| for field, field_patterns in patterns.items(): | |
| found = False | |
| for pattern in field_patterns: | |
| match = re.search(pattern, response, re.IGNORECASE | re.DOTALL) | |
| if match: | |
| value = match.group(1).strip() | |
| # Clean up the extracted value | |
| value = re.sub(r'\\+(["\'])', r'\1', value) # Remove excessive escaping | |
| value = value.replace('\\"', '"').replace("\\'", "'") | |
| result[field] = value | |
| found = True | |
| break | |
| if not found: | |
| result[field] = "" | |
| # Validate relevance rating | |
| valid_ratings = ['very low', 'low', 'high', 'very high'] | |
| if result.get('relevance rating', '').lower() not in [r.lower() for r in valid_ratings]: | |
| # Try to fix common variations | |
| rating = result.get('relevance rating', '').lower() | |
| if 'very' in rating and 'low' in rating: | |
| result['relevance rating'] = 'very low' | |
| elif 'very' in rating and 'high' in rating: | |
| result['relevance rating'] = 'very high' | |
| elif 'low' in rating: | |
| result['relevance rating'] = 'low' | |
| elif 'high' in rating: | |
| result['relevance rating'] = 'high' | |
| else: | |
| result['relevance rating'] = 'low' # Default fallback | |
| logger.info("β Manual field extraction successful") | |
| return result | |
| except Exception as e: | |
| logger.warning(f"Manual extraction failed: {e}") | |
| # Strategy 5: Complete fallback with empty values | |
| logger.error("All JSON parsing strategies failed, returning empty structure") | |
| return { | |
| "strength": "Analysis could not be completed - please try again", | |
| "weaknesses": "Analysis could not be completed - please try again", | |
| "speciality": "Analysis could not be completed - please try again", | |
| "relevance rating": "low", | |
| "error": f"Failed to parse LLM response after all strategies. Raw: {response[:200]}..." | |
| } | |
| def combine_repo_files_for_llm(repo_dir="repo_files", output_file="combined_repo.txt"): | |
| """ | |
| Combines all .py, .md, and .txt files in the given directory (recursively) into a single text file. | |
| Returns the path to the combined file. | |
| """ | |
| combined_content = [] | |
| seen_files = set() | |
| # Priority files | |
| priority_files = ["app.py", "README.md", "requirements.txt"] | |
| for pf in priority_files: | |
| pf_path = os.path.join(repo_dir, pf) | |
| if os.path.isfile(pf_path): | |
| try: | |
| with open(pf_path, "r", encoding="utf-8") as f: | |
| combined_content.append(f"\n# ===== File: {pf} =====\n") | |
| combined_content.append(f.read()) | |
| seen_files.add(os.path.abspath(pf_path)) | |
| except Exception as e: | |
| combined_content.append(f"\n# Could not read {pf_path}: {e}\n") | |
| # All other .py, .md, and .txt files | |
| for root, _, files in os.walk(repo_dir): | |
| for file in files: | |
| if file.endswith(".py") or file.endswith(".md") or file.endswith(".txt"): | |
| file_path = os.path.join(root, file) | |
| abs_path = os.path.abspath(file_path) | |
| if abs_path in seen_files: | |
| continue | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| combined_content.append(f"\n# ===== File: {file} =====\n") | |
| combined_content.append(f.read()) | |
| seen_files.add(abs_path) | |
| except Exception as e: | |
| combined_content.append(f"\n# Could not read {file_path}: {e}\n") | |
| with open(output_file, "w", encoding="utf-8") as out_f: | |
| out_f.write("\n".join(combined_content)) | |
| return output_file | |
| def analyze_code_chunk(code: str, user_requirements: str = "") -> str: | |
| """ | |
| Analyzes a code chunk and returns a JSON summary for that chunk. | |
| """ | |
| from openai import OpenAI | |
| client = OpenAI(api_key=os.getenv("modal_api")) | |
| client.base_url = os.getenv("base_url") | |
| # Build the user requirements section | |
| requirements_section = "" | |
| if user_requirements.strip(): | |
| requirements_section = f"\n\nUSER REQUIREMENTS:\n{user_requirements}\n\nWhen rating relevance, consider how well this code matches the user's stated requirements." | |
| chunk_prompt = ( | |
| "You are a highly precise and strict JSON generator. Analyze the following code chunk. " | |
| "Your ONLY output must be a valid JSON object with the following keys: 'strength', 'weaknesses', 'speciality', 'relevance rating'. " | |
| "All property names and string values MUST use double quotes (\"). Do NOT use single quotes. " | |
| "For 'relevance rating', you MUST use ONLY one of these exact values: 'very low', 'low', 'high', 'very high'. " | |
| "Do NOT include any explanation, markdown, or text outside the JSON. Do NOT add any commentary, preamble, or postscript. " | |
| "If you cannot answer, still return a valid JSON with empty strings for each key. " | |
| f"{requirements_section}" | |
| "Example of the ONLY valid output:\n" | |
| '{\n "strength": "...", \n "weaknesses": "...", \n "speciality": "...", \n "relevance rating": "high"\n}' | |
| ) | |
| response = client.chat.completions.create( | |
| model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", | |
| messages=[ | |
| {"role": "system", "content": chunk_prompt}, | |
| {"role": "user", "content": code} | |
| ], | |
| temperature=0.4 | |
| ) | |
| return response.choices[0].message.content | |
| def aggregate_chunk_analyses(chunk_jsons: list, user_requirements: str = "") -> str: | |
| """ | |
| Aggregates a list of chunk JSONs into a single JSON summary using the LLM. | |
| """ | |
| from openai import OpenAI | |
| client = OpenAI(api_key=os.getenv("modal_api")) | |
| client.base_url = os.getenv("base_url") | |
| # Build the user requirements section | |
| requirements_section = "" | |
| if user_requirements.strip(): | |
| requirements_section = f"\n\nUSER REQUIREMENTS:\n{user_requirements}\n\nWhen aggregating the relevance rating, consider how well the overall repository matches the user's stated requirements." | |
| aggregation_prompt = ( | |
| "You are a highly precise and strict, code analyzer and JSON generator. You are given a list of JSON analyses of code chunks. " | |
| "Aggregate these into a SINGLE overall JSON summary with the same keys: 'strength', 'weaknesses', 'speciality', 'relevance rating'. " | |
| "All property names and string values MUST use double quotes (\"). Do NOT use single quotes. " | |
| "For 'relevance rating', you MUST use ONLY one of these exact values: 'very low', 'low', 'high', 'very high'. " | |
| "Summarize and combine the information from all chunks. Do NOT include any explanation, markdown, or text outside the JSON. " | |
| "If a key is missing in all chunks, use an empty string. " | |
| f"{requirements_section}" | |
| "Example of the ONLY valid output:\n" | |
| '{\n "strength": "...", \n "weaknesses": "...", \n "speciality": "...", \n "relevance rating": "high"\n}' | |
| ) | |
| user_content = "Here are the chunk analyses:\n" + "\n".join(chunk_jsons) | |
| response = client.chat.completions.create( | |
| model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", | |
| messages=[ | |
| {"role": "system", "content": aggregation_prompt}, | |
| {"role": "user", "content": user_content} | |
| ], | |
| max_tokens=512, | |
| temperature=0.3 | |
| ) | |
| return response.choices[0].message.content | |
| def analyze_combined_file(output_file="combined_repo.txt", user_requirements: str = ""): | |
| """ | |
| Reads the combined file, splits it into 500-line chunks, analyzes each chunk, and aggregates the LLM's output into a final summary. | |
| Now includes user requirements for better relevance rating. | |
| Returns the chunk JSONs (for debugging) and the aggregated analysis as a string. | |
| """ | |
| try: | |
| with open(output_file, "r", encoding="utf-8") as f: | |
| lines = f.readlines() | |
| chunk_size = 1200 | |
| chunk_jsons = [] | |
| for i in range(0, len(lines), chunk_size): | |
| chunk = "".join(lines[i:i+chunk_size]) | |
| analysis = analyze_code_chunk(chunk, user_requirements) | |
| chunk_jsons.append(analysis) | |
| final_summary = aggregate_chunk_analyses(chunk_jsons, user_requirements) | |
| debug_output = ( | |
| "==== Chunk JSON Outputs ====" | |
| + "\n\n".join([f"Chunk {i+1} JSON:\n{chunk_jsons[i]}" for i in range(len(chunk_jsons))]) | |
| + "\n\n==== Final Aggregated Summary ====" | |
| + f"\n{final_summary}" | |
| ) | |
| return debug_output | |
| except Exception as e: | |
| return f"Error analyzing combined file: {e}" | |
| def analyze_repo_chunk_for_context(chunk: str, repo_id: str) -> str: | |
| """ | |
| Analyze a repository chunk to create conversational context for the chatbot. | |
| This creates summaries focused on helping users understand the repository. | |
| """ | |
| try: | |
| from openai import OpenAI | |
| client = OpenAI(api_key=os.getenv("modal_api")) | |
| client.base_url = os.getenv("base_url") | |
| context_prompt = f"""You are analyzing a chunk of code from the repository '{repo_id}' to create a conversational summary for a chatbot assistant. | |
| Create a concise but informative summary that helps understand: | |
| - What this code section does | |
| - Key functions, classes, or components | |
| - Important features or capabilities | |
| - How it relates to the overall repository purpose | |
| - Any notable patterns or technologies used | |
| Focus on information that would be useful for answering user questions about the repository. | |
| Repository chunk: | |
| {chunk} | |
| Provide a clear, conversational summary in 2-3 paragraphs:""" | |
| response = client.chat.completions.create( | |
| model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", | |
| messages=[ | |
| {"role": "system", "content": "You are an expert code analyst creating conversational summaries for a repository assistant chatbot."}, | |
| {"role": "user", "content": context_prompt} | |
| ], | |
| max_tokens=600, # Increased for more detailed analysis with larger chunks | |
| temperature=0.3 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| logger.error(f"Error analyzing chunk for context: {e}") | |
| return f"Code section analysis unavailable: {e}" | |
| def create_repo_context_summary(repo_content: str, repo_id: str) -> str: | |
| """ | |
| Create a comprehensive context summary by analyzing the repository in chunks. | |
| Returns a detailed summary that the chatbot can use to answer questions. | |
| """ | |
| try: | |
| lines = repo_content.split('\n') | |
| chunk_size = 1200 # Increased for better context and fewer API calls | |
| chunk_summaries = [] | |
| logger.info(f"Analyzing repository {repo_id} in chunks for chatbot context") | |
| for i in range(0, len(lines), chunk_size): | |
| chunk = '\n'.join(lines[i:i+chunk_size]) | |
| if chunk.strip(): # Only analyze non-empty chunks | |
| summary = analyze_repo_chunk_for_context(chunk, repo_id) | |
| chunk_summaries.append(f"=== Section {len(chunk_summaries) + 1} ===\n{summary}") | |
| # Create final comprehensive summary | |
| try: | |
| from openai import OpenAI | |
| client = OpenAI(api_key=os.getenv("modal_api")) | |
| client.base_url = os.getenv("base_url") | |
| final_prompt = f"""Based on the following section summaries of repository '{repo_id}', create a comprehensive overview that a chatbot can use to answer user questions. | |
| Section Summaries: | |
| {chr(10).join(chunk_summaries)} | |
| Create a well-structured overview covering: | |
| 1. Repository Purpose & Main Functionality | |
| 2. Key Components & Architecture | |
| 3. Important Features & Capabilities | |
| 4. Technology Stack & Dependencies | |
| 5. Usage Patterns & Examples | |
| Make this comprehensive but conversational - it will be used by a chatbot to answer user questions about the repository.""" | |
| response = client.chat.completions.create( | |
| model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", | |
| messages=[ | |
| {"role": "system", "content": "You are creating a comprehensive repository summary for a chatbot assistant."}, | |
| {"role": "user", "content": final_prompt} | |
| ], | |
| max_tokens=1500, # Increased for more comprehensive summaries | |
| temperature=0.3 | |
| ) | |
| final_summary = response.choices[0].message.content | |
| # Combine everything for the chatbot context | |
| full_context = f"""=== REPOSITORY ANALYSIS FOR {repo_id.upper()} === | |
| {final_summary} | |
| === DETAILED SECTION SUMMARIES === | |
| {chr(10).join(chunk_summaries)}""" | |
| logger.info(f"Created comprehensive context summary for {repo_id}") | |
| return full_context | |
| except Exception as e: | |
| logger.error(f"Error creating final summary: {e}") | |
| # Fallback to just section summaries | |
| return f"=== REPOSITORY ANALYSIS FOR {repo_id.upper()} ===\n\n" + '\n\n'.join(chunk_summaries) | |
| except Exception as e: | |
| logger.error(f"Error creating repo context summary: {e}") | |
| return f"Repository analysis unavailable: {e}" | |
| def handle_load_repository(repo_id: str) -> Tuple[str, str]: | |
| """Load a specific repository and prepare it for exploration with chunk-based analysis.""" | |
| if not repo_id.strip(): | |
| return "Status: Please enter a repository ID.", "" | |
| try: | |
| logger.info(f"Loading repository for exploration: {repo_id}") | |
| # Download and process the repository | |
| try: | |
| download_filtered_space_files(repo_id, local_dir="repo_files", file_extensions=['.py', '.md', '.txt']) | |
| combined_text_path = combine_repo_files_for_llm() | |
| except Exception as e: | |
| logger.error(f"Error downloading repository {repo_id}: {e}") | |
| error_status = f"β Error downloading repository: {e}" | |
| return error_status, "" | |
| with open(combined_text_path, "r", encoding="utf-8") as f: | |
| repo_content = f.read() | |
| status = f"β Repository '{repo_id}' loaded successfully!\\nπ Files processed and ready for exploration.\\nπ Analyzing repository in chunks for comprehensive context...\\nπ¬ You can now ask questions about this repository." | |
| # Create comprehensive context summary using chunk analysis | |
| logger.info(f"Creating context summary for {repo_id}") | |
| context_summary = create_repo_context_summary(repo_content, repo_id) | |
| logger.info(f"Repository {repo_id} loaded and analyzed successfully for exploration") | |
| return status, context_summary | |
| except Exception as e: | |
| logger.error(f"Error loading repository {repo_id}: {e}") | |
| error_status = f"β Error loading repository: {e}" | |
| return error_status, "" | |