HF_RepoSense / analyzer.py
naman1102's picture
deployment
083f41a
import openai
import os
import json
import re
import logging
from typing import Tuple
from hf_utils import download_filtered_space_files
# Setup logger
logger = logging.getLogger(__name__)
def analyze_code(code: str) -> str:
"""
Uses qwen2.5-coder-7b-instruct-awq model to analyze the given code.
Returns the analysis as a string.
"""
from openai import OpenAI
client = OpenAI(api_key=os.getenv("modal_api"))
client.base_url = os.getenv("base_url")
system_prompt = (
"You are a highly precise and strict JSON generator. Analyze the code given to you. "
"Your ONLY output must be a valid JSON object with the following keys: 'strength', 'weaknesses', 'speciality', 'relevance rating'. "
"For 'relevance rating', you MUST use ONLY one of these exact values: 'very low', 'low', 'high', 'very high'. "
"Do NOT include any explanation, markdown, or text outside the JSON. Do NOT add any commentary, preamble, or postscript. "
"If you cannot answer, still return a valid JSON with empty strings for each key. "
"Example of the ONLY valid output:\n"
"{\n 'strength': '...', \n 'weaknesses': '...', \n 'speciality': '...', \n 'relevance rating': 'high'\n}"
)
response = client.chat.completions.create(
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", # Updated model
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": code}
],
max_tokens=512,
temperature=0.4
)
return response.choices[0].message.content
def parse_llm_json_response(response: str):
"""
Robust JSON parser with multiple fallback strategies for LLM responses.
"""
logger.info(f"Attempting to parse LLM response: {response[:100]}...")
# Strategy 1: Try direct JSON parsing (cleanest case)
try:
# Clean the response first
cleaned = response.strip()
result = json.loads(cleaned)
logger.info("βœ… Direct JSON parsing successful")
return result
except:
pass
# Strategy 2: Extract JSON block from response
try:
# Find the first complete JSON object
start = response.find('{')
if start == -1:
raise ValueError("No opening brace found")
# Find matching closing brace
brace_count = 0
end = start
for i, char in enumerate(response[start:], start):
if char == '{':
brace_count += 1
elif char == '}':
brace_count -= 1
if brace_count == 0:
end = i
break
if brace_count != 0:
# Fallback to last closing brace
end = response.rfind('}')
if end == -1 or end < start:
raise ValueError("No matching closing brace found")
json_str = response[start:end+1]
result = json.loads(json_str)
logger.info("βœ… JSON block extraction successful")
return result
except Exception as e:
logger.warning(f"JSON block extraction failed: {e}")
# Strategy 3: Clean and fix common JSON issues
try:
# Extract JSON part
start = response.find('{')
end = response.rfind('}')
if start != -1 and end != -1 and end > start:
json_str = response[start:end+1]
# Fix common issues
# Replace single quotes with double quotes (but be careful with contractions)
json_str = re.sub(r"(?<!\\)'([^']*)'(?=\s*[,}])", r'"\1"', json_str)
json_str = re.sub(r"(?<!\\)'([^']*)'(?=\s*:)", r'"\1"', json_str)
# Fix unescaped quotes in values
json_str = re.sub(r':\s*"([^"]*)"([^",}]*)"', r': "\1\2"', json_str)
# Remove trailing commas
json_str = re.sub(r',(\s*[}\]])', r'\1', json_str)
# Try parsing the cleaned version
result = json.loads(json_str)
logger.info("βœ… JSON cleaning and fixing successful")
return result
except Exception as e:
logger.warning(f"JSON cleaning failed: {e}")
# Strategy 4: Manual field extraction as last resort
try:
logger.info("Attempting manual field extraction...")
result = {}
# Extract each field using regex patterns
patterns = {
'strength': [
r'"strength"\s*:\s*"([^"]*)"',
r"'strength'\s*:\s*'([^']*)'",
r'strength[:\s]+"([^"]*)"',
r'strength[:\s]+\'([^\']*)\''
],
'weaknesses': [
r'"weaknesses"\s*:\s*"([^"]*)"',
r"'weaknesses'\s*:\s*'([^']*)'",
r'weaknesses[:\s]+"([^"]*)"',
r'weaknesses[:\s]+\'([^\']*)\''
],
'speciality': [
r'"speciality"\s*:\s*"([^"]*)"',
r"'speciality'\s*:\s*'([^']*)'",
r'speciality[:\s]+"([^"]*)"',
r'speciality[:\s]+\'([^\']*)\''
],
'relevance rating': [
r'"relevance rating"\s*:\s*"([^"]*)"',
r"'relevance rating'\s*:\s*'([^']*)'",
r'relevance[^:]*rating[:\s]+"([^"]*)"',
r'relevance[^:]*rating[:\s]+\'([^\']*)\''
]
}
for field, field_patterns in patterns.items():
found = False
for pattern in field_patterns:
match = re.search(pattern, response, re.IGNORECASE | re.DOTALL)
if match:
value = match.group(1).strip()
# Clean up the extracted value
value = re.sub(r'\\+(["\'])', r'\1', value) # Remove excessive escaping
value = value.replace('\\"', '"').replace("\\'", "'")
result[field] = value
found = True
break
if not found:
result[field] = ""
# Validate relevance rating
valid_ratings = ['very low', 'low', 'high', 'very high']
if result.get('relevance rating', '').lower() not in [r.lower() for r in valid_ratings]:
# Try to fix common variations
rating = result.get('relevance rating', '').lower()
if 'very' in rating and 'low' in rating:
result['relevance rating'] = 'very low'
elif 'very' in rating and 'high' in rating:
result['relevance rating'] = 'very high'
elif 'low' in rating:
result['relevance rating'] = 'low'
elif 'high' in rating:
result['relevance rating'] = 'high'
else:
result['relevance rating'] = 'low' # Default fallback
logger.info("βœ… Manual field extraction successful")
return result
except Exception as e:
logger.warning(f"Manual extraction failed: {e}")
# Strategy 5: Complete fallback with empty values
logger.error("All JSON parsing strategies failed, returning empty structure")
return {
"strength": "Analysis could not be completed - please try again",
"weaknesses": "Analysis could not be completed - please try again",
"speciality": "Analysis could not be completed - please try again",
"relevance rating": "low",
"error": f"Failed to parse LLM response after all strategies. Raw: {response[:200]}..."
}
def combine_repo_files_for_llm(repo_dir="repo_files", output_file="combined_repo.txt"):
"""
Combines all .py, .md, and .txt files in the given directory (recursively) into a single text file.
Returns the path to the combined file.
"""
combined_content = []
seen_files = set()
# Priority files
priority_files = ["app.py", "README.md", "requirements.txt"]
for pf in priority_files:
pf_path = os.path.join(repo_dir, pf)
if os.path.isfile(pf_path):
try:
with open(pf_path, "r", encoding="utf-8") as f:
combined_content.append(f"\n# ===== File: {pf} =====\n")
combined_content.append(f.read())
seen_files.add(os.path.abspath(pf_path))
except Exception as e:
combined_content.append(f"\n# Could not read {pf_path}: {e}\n")
# All other .py, .md, and .txt files
for root, _, files in os.walk(repo_dir):
for file in files:
if file.endswith(".py") or file.endswith(".md") or file.endswith(".txt"):
file_path = os.path.join(root, file)
abs_path = os.path.abspath(file_path)
if abs_path in seen_files:
continue
try:
with open(file_path, "r", encoding="utf-8") as f:
combined_content.append(f"\n# ===== File: {file} =====\n")
combined_content.append(f.read())
seen_files.add(abs_path)
except Exception as e:
combined_content.append(f"\n# Could not read {file_path}: {e}\n")
with open(output_file, "w", encoding="utf-8") as out_f:
out_f.write("\n".join(combined_content))
return output_file
def analyze_code_chunk(code: str, user_requirements: str = "") -> str:
"""
Analyzes a code chunk and returns a JSON summary for that chunk.
"""
from openai import OpenAI
client = OpenAI(api_key=os.getenv("modal_api"))
client.base_url = os.getenv("base_url")
# Build the user requirements section
requirements_section = ""
if user_requirements.strip():
requirements_section = f"\n\nUSER REQUIREMENTS:\n{user_requirements}\n\nWhen rating relevance, consider how well this code matches the user's stated requirements."
chunk_prompt = (
"You are a highly precise and strict JSON generator. Analyze the following code chunk. "
"Your ONLY output must be a valid JSON object with the following keys: 'strength', 'weaknesses', 'speciality', 'relevance rating'. "
"All property names and string values MUST use double quotes (\"). Do NOT use single quotes. "
"For 'relevance rating', you MUST use ONLY one of these exact values: 'very low', 'low', 'high', 'very high'. "
"Do NOT include any explanation, markdown, or text outside the JSON. Do NOT add any commentary, preamble, or postscript. "
"If you cannot answer, still return a valid JSON with empty strings for each key. "
f"{requirements_section}"
"Example of the ONLY valid output:\n"
'{\n "strength": "...", \n "weaknesses": "...", \n "speciality": "...", \n "relevance rating": "high"\n}'
)
response = client.chat.completions.create(
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ",
messages=[
{"role": "system", "content": chunk_prompt},
{"role": "user", "content": code}
],
temperature=0.4
)
return response.choices[0].message.content
def aggregate_chunk_analyses(chunk_jsons: list, user_requirements: str = "") -> str:
"""
Aggregates a list of chunk JSONs into a single JSON summary using the LLM.
"""
from openai import OpenAI
client = OpenAI(api_key=os.getenv("modal_api"))
client.base_url = os.getenv("base_url")
# Build the user requirements section
requirements_section = ""
if user_requirements.strip():
requirements_section = f"\n\nUSER REQUIREMENTS:\n{user_requirements}\n\nWhen aggregating the relevance rating, consider how well the overall repository matches the user's stated requirements."
aggregation_prompt = (
"You are a highly precise and strict, code analyzer and JSON generator. You are given a list of JSON analyses of code chunks. "
"Aggregate these into a SINGLE overall JSON summary with the same keys: 'strength', 'weaknesses', 'speciality', 'relevance rating'. "
"All property names and string values MUST use double quotes (\"). Do NOT use single quotes. "
"For 'relevance rating', you MUST use ONLY one of these exact values: 'very low', 'low', 'high', 'very high'. "
"Summarize and combine the information from all chunks. Do NOT include any explanation, markdown, or text outside the JSON. "
"If a key is missing in all chunks, use an empty string. "
f"{requirements_section}"
"Example of the ONLY valid output:\n"
'{\n "strength": "...", \n "weaknesses": "...", \n "speciality": "...", \n "relevance rating": "high"\n}'
)
user_content = "Here are the chunk analyses:\n" + "\n".join(chunk_jsons)
response = client.chat.completions.create(
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ",
messages=[
{"role": "system", "content": aggregation_prompt},
{"role": "user", "content": user_content}
],
max_tokens=512,
temperature=0.3
)
return response.choices[0].message.content
def analyze_combined_file(output_file="combined_repo.txt", user_requirements: str = ""):
"""
Reads the combined file, splits it into 500-line chunks, analyzes each chunk, and aggregates the LLM's output into a final summary.
Now includes user requirements for better relevance rating.
Returns the chunk JSONs (for debugging) and the aggregated analysis as a string.
"""
try:
with open(output_file, "r", encoding="utf-8") as f:
lines = f.readlines()
chunk_size = 1200
chunk_jsons = []
for i in range(0, len(lines), chunk_size):
chunk = "".join(lines[i:i+chunk_size])
analysis = analyze_code_chunk(chunk, user_requirements)
chunk_jsons.append(analysis)
final_summary = aggregate_chunk_analyses(chunk_jsons, user_requirements)
return final_summary
except Exception as e:
return f"Error analyzing combined file: {e}"
def analyze_repo_chunk_for_context(chunk: str, repo_id: str) -> str:
"""
Analyze a repository chunk to create conversational context for the chatbot.
This creates summaries focused on helping users understand the repository.
"""
try:
from openai import OpenAI
client = OpenAI(api_key=os.getenv("modal_api"))
client.base_url = os.getenv("base_url")
context_prompt = f"""You are analyzing a chunk of code from the repository '{repo_id}' to create a conversational summary for a chatbot assistant.
Create a concise but informative summary that helps understand:
- What this code section does
- Key functions, classes, or components
- Important features or capabilities
- How it relates to the overall repository purpose
- Any notable patterns or technologies used
Focus on information that would be useful for answering user questions about the repository.
Repository chunk:
{chunk}
Provide a clear, conversational summary in 2-3 paragraphs:"""
response = client.chat.completions.create(
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ",
messages=[
{"role": "system", "content": "You are an expert code analyst creating conversational summaries for a repository assistant chatbot."},
{"role": "user", "content": context_prompt}
],
max_tokens=600, # Increased for more detailed analysis with larger chunks
temperature=0.3
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"Error analyzing chunk for context: {e}")
return f"Code section analysis unavailable: {e}"
def create_repo_context_summary(repo_content: str, repo_id: str) -> str:
"""
Create a comprehensive context summary by analyzing the repository in chunks.
Returns a detailed summary that the chatbot can use to answer questions.
"""
try:
lines = repo_content.split('\n')
chunk_size = 1200 # Increased for better context and fewer API calls
chunk_summaries = []
logger.info(f"Analyzing repository {repo_id} in chunks for chatbot context")
for i in range(0, len(lines), chunk_size):
chunk = '\n'.join(lines[i:i+chunk_size])
if chunk.strip(): # Only analyze non-empty chunks
summary = analyze_repo_chunk_for_context(chunk, repo_id)
chunk_summaries.append(f"=== Section {len(chunk_summaries) + 1} ===\n{summary}")
# Create final comprehensive summary
try:
from openai import OpenAI
client = OpenAI(api_key=os.getenv("modal_api"))
client.base_url = os.getenv("base_url")
final_prompt = f"""Based on the following section summaries of repository '{repo_id}', create a comprehensive overview that a chatbot can use to answer user questions.
Section Summaries:
{chr(10).join(chunk_summaries)}
Create a well-structured overview covering:
1. Repository Purpose & Main Functionality
2. Key Components & Architecture
3. Important Features & Capabilities
4. Technology Stack & Dependencies
5. Usage Patterns & Examples
Make this comprehensive but conversational - it will be used by a chatbot to answer user questions about the repository."""
response = client.chat.completions.create(
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ",
messages=[
{"role": "system", "content": "You are creating a comprehensive repository summary for a chatbot assistant."},
{"role": "user", "content": final_prompt}
],
max_tokens=1500, # Increased for more comprehensive summaries
temperature=0.3
)
final_summary = response.choices[0].message.content
# Combine everything for the chatbot context
full_context = f"""=== REPOSITORY ANALYSIS FOR {repo_id.upper()} ===
{final_summary}
=== DETAILED SECTION SUMMARIES ===
{chr(10).join(chunk_summaries)}"""
logger.info(f"Created comprehensive context summary for {repo_id}")
return full_context
except Exception as e:
logger.error(f"Error creating final summary: {e}")
# Fallback to just section summaries
return f"=== REPOSITORY ANALYSIS FOR {repo_id.upper()} ===\n\n" + '\n\n'.join(chunk_summaries)
except Exception as e:
logger.error(f"Error creating repo context summary: {e}")
return f"Repository analysis unavailable: {e}"
def handle_load_repository(repo_id: str) -> Tuple[str, str]:
"""Load a specific repository and prepare it for exploration with chunk-based analysis."""
if not repo_id.strip():
return "Status: Please enter a repository ID.", ""
try:
logger.info(f"Loading repository for exploration: {repo_id}")
# Download and process the repository
try:
download_filtered_space_files(repo_id, local_dir="repo_files", file_extensions=['.py', '.md', '.txt'])
combined_text_path = combine_repo_files_for_llm()
except Exception as e:
logger.error(f"Error downloading repository {repo_id}: {e}")
error_status = f"❌ Error downloading repository: {e}"
return error_status, ""
with open(combined_text_path, "r", encoding="utf-8") as f:
repo_content = f.read()
status = f"βœ… Repository '{repo_id}' loaded successfully!\\nπŸ“ Files processed and ready for exploration.\\nπŸ”„ Analyzing repository in chunks for comprehensive context...\\nπŸ’¬ You can now ask questions about this repository."
# Create comprehensive context summary using chunk analysis
logger.info(f"Creating context summary for {repo_id}")
context_summary = create_repo_context_summary(repo_content, repo_id)
logger.info(f"Repository {repo_id} loaded and analyzed successfully for exploration")
return status, context_summary
except Exception as e:
logger.error(f"Error loading repository {repo_id}: {e}")
error_status = f"❌ Error loading repository: {e}"
return error_status, ""