Spaces:
Sleeping
Sleeping
import openai | |
import os | |
import json | |
import re | |
import logging | |
from typing import Tuple | |
from hf_utils import download_filtered_space_files | |
# Setup logger | |
logger = logging.getLogger(__name__) | |
def analyze_code(code: str) -> str: | |
""" | |
Uses qwen2.5-coder-7b-instruct-awq model to analyze the given code. | |
Returns the analysis as a string. | |
""" | |
from openai import OpenAI | |
client = OpenAI(api_key=os.getenv("modal_api")) | |
client.base_url = os.getenv("base_url") | |
system_prompt = ( | |
"You are a highly precise and strict JSON generator. Analyze the code given to you. " | |
"Your ONLY output must be a valid JSON object with the following keys: 'strength', 'weaknesses', 'speciality', 'relevance rating'. " | |
"For 'relevance rating', you MUST use ONLY one of these exact values: 'very low', 'low', 'high', 'very high'. " | |
"Do NOT include any explanation, markdown, or text outside the JSON. Do NOT add any commentary, preamble, or postscript. " | |
"If you cannot answer, still return a valid JSON with empty strings for each key. " | |
"Example of the ONLY valid output:\n" | |
"{\n 'strength': '...', \n 'weaknesses': '...', \n 'speciality': '...', \n 'relevance rating': 'high'\n}" | |
) | |
response = client.chat.completions.create( | |
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", # Updated model | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": code} | |
], | |
max_tokens=512, | |
temperature=0.4 | |
) | |
return response.choices[0].message.content | |
def parse_llm_json_response(response: str): | |
""" | |
Robust JSON parser with multiple fallback strategies for LLM responses. | |
""" | |
logger.info(f"Attempting to parse LLM response: {response[:100]}...") | |
# Strategy 1: Try direct JSON parsing (cleanest case) | |
try: | |
# Clean the response first | |
cleaned = response.strip() | |
result = json.loads(cleaned) | |
logger.info("β Direct JSON parsing successful") | |
return result | |
except: | |
pass | |
# Strategy 2: Extract JSON block from response | |
try: | |
# Find the first complete JSON object | |
start = response.find('{') | |
if start == -1: | |
raise ValueError("No opening brace found") | |
# Find matching closing brace | |
brace_count = 0 | |
end = start | |
for i, char in enumerate(response[start:], start): | |
if char == '{': | |
brace_count += 1 | |
elif char == '}': | |
brace_count -= 1 | |
if brace_count == 0: | |
end = i | |
break | |
if brace_count != 0: | |
# Fallback to last closing brace | |
end = response.rfind('}') | |
if end == -1 or end < start: | |
raise ValueError("No matching closing brace found") | |
json_str = response[start:end+1] | |
result = json.loads(json_str) | |
logger.info("β JSON block extraction successful") | |
return result | |
except Exception as e: | |
logger.warning(f"JSON block extraction failed: {e}") | |
# Strategy 3: Clean and fix common JSON issues | |
try: | |
# Extract JSON part | |
start = response.find('{') | |
end = response.rfind('}') | |
if start != -1 and end != -1 and end > start: | |
json_str = response[start:end+1] | |
# Fix common issues | |
# Replace single quotes with double quotes (but be careful with contractions) | |
json_str = re.sub(r"(?<!\\)'([^']*)'(?=\s*[,}])", r'"\1"', json_str) | |
json_str = re.sub(r"(?<!\\)'([^']*)'(?=\s*:)", r'"\1"', json_str) | |
# Fix unescaped quotes in values | |
json_str = re.sub(r':\s*"([^"]*)"([^",}]*)"', r': "\1\2"', json_str) | |
# Remove trailing commas | |
json_str = re.sub(r',(\s*[}\]])', r'\1', json_str) | |
# Try parsing the cleaned version | |
result = json.loads(json_str) | |
logger.info("β JSON cleaning and fixing successful") | |
return result | |
except Exception as e: | |
logger.warning(f"JSON cleaning failed: {e}") | |
# Strategy 4: Manual field extraction as last resort | |
try: | |
logger.info("Attempting manual field extraction...") | |
result = {} | |
# Extract each field using regex patterns | |
patterns = { | |
'strength': [ | |
r'"strength"\s*:\s*"([^"]*)"', | |
r"'strength'\s*:\s*'([^']*)'", | |
r'strength[:\s]+"([^"]*)"', | |
r'strength[:\s]+\'([^\']*)\'' | |
], | |
'weaknesses': [ | |
r'"weaknesses"\s*:\s*"([^"]*)"', | |
r"'weaknesses'\s*:\s*'([^']*)'", | |
r'weaknesses[:\s]+"([^"]*)"', | |
r'weaknesses[:\s]+\'([^\']*)\'' | |
], | |
'speciality': [ | |
r'"speciality"\s*:\s*"([^"]*)"', | |
r"'speciality'\s*:\s*'([^']*)'", | |
r'speciality[:\s]+"([^"]*)"', | |
r'speciality[:\s]+\'([^\']*)\'' | |
], | |
'relevance rating': [ | |
r'"relevance rating"\s*:\s*"([^"]*)"', | |
r"'relevance rating'\s*:\s*'([^']*)'", | |
r'relevance[^:]*rating[:\s]+"([^"]*)"', | |
r'relevance[^:]*rating[:\s]+\'([^\']*)\'' | |
] | |
} | |
for field, field_patterns in patterns.items(): | |
found = False | |
for pattern in field_patterns: | |
match = re.search(pattern, response, re.IGNORECASE | re.DOTALL) | |
if match: | |
value = match.group(1).strip() | |
# Clean up the extracted value | |
value = re.sub(r'\\+(["\'])', r'\1', value) # Remove excessive escaping | |
value = value.replace('\\"', '"').replace("\\'", "'") | |
result[field] = value | |
found = True | |
break | |
if not found: | |
result[field] = "" | |
# Validate relevance rating | |
valid_ratings = ['very low', 'low', 'high', 'very high'] | |
if result.get('relevance rating', '').lower() not in [r.lower() for r in valid_ratings]: | |
# Try to fix common variations | |
rating = result.get('relevance rating', '').lower() | |
if 'very' in rating and 'low' in rating: | |
result['relevance rating'] = 'very low' | |
elif 'very' in rating and 'high' in rating: | |
result['relevance rating'] = 'very high' | |
elif 'low' in rating: | |
result['relevance rating'] = 'low' | |
elif 'high' in rating: | |
result['relevance rating'] = 'high' | |
else: | |
result['relevance rating'] = 'low' # Default fallback | |
logger.info("β Manual field extraction successful") | |
return result | |
except Exception as e: | |
logger.warning(f"Manual extraction failed: {e}") | |
# Strategy 5: Complete fallback with empty values | |
logger.error("All JSON parsing strategies failed, returning empty structure") | |
return { | |
"strength": "Analysis could not be completed - please try again", | |
"weaknesses": "Analysis could not be completed - please try again", | |
"speciality": "Analysis could not be completed - please try again", | |
"relevance rating": "low", | |
"error": f"Failed to parse LLM response after all strategies. Raw: {response[:200]}..." | |
} | |
def combine_repo_files_for_llm(repo_dir="repo_files", output_file="combined_repo.txt"): | |
""" | |
Combines all .py, .md, and .txt files in the given directory (recursively) into a single text file. | |
Returns the path to the combined file. | |
""" | |
combined_content = [] | |
seen_files = set() | |
# Priority files | |
priority_files = ["app.py", "README.md", "requirements.txt"] | |
for pf in priority_files: | |
pf_path = os.path.join(repo_dir, pf) | |
if os.path.isfile(pf_path): | |
try: | |
with open(pf_path, "r", encoding="utf-8") as f: | |
combined_content.append(f"\n# ===== File: {pf} =====\n") | |
combined_content.append(f.read()) | |
seen_files.add(os.path.abspath(pf_path)) | |
except Exception as e: | |
combined_content.append(f"\n# Could not read {pf_path}: {e}\n") | |
# All other .py, .md, and .txt files | |
for root, _, files in os.walk(repo_dir): | |
for file in files: | |
if file.endswith(".py") or file.endswith(".md") or file.endswith(".txt"): | |
file_path = os.path.join(root, file) | |
abs_path = os.path.abspath(file_path) | |
if abs_path in seen_files: | |
continue | |
try: | |
with open(file_path, "r", encoding="utf-8") as f: | |
combined_content.append(f"\n# ===== File: {file} =====\n") | |
combined_content.append(f.read()) | |
seen_files.add(abs_path) | |
except Exception as e: | |
combined_content.append(f"\n# Could not read {file_path}: {e}\n") | |
with open(output_file, "w", encoding="utf-8") as out_f: | |
out_f.write("\n".join(combined_content)) | |
return output_file | |
def analyze_code_chunk(code: str, user_requirements: str = "") -> str: | |
""" | |
Analyzes a code chunk and returns a JSON summary for that chunk. | |
""" | |
from openai import OpenAI | |
client = OpenAI(api_key=os.getenv("modal_api")) | |
client.base_url = os.getenv("base_url") | |
# Build the user requirements section | |
requirements_section = "" | |
if user_requirements.strip(): | |
requirements_section = f"\n\nUSER REQUIREMENTS:\n{user_requirements}\n\nWhen rating relevance, consider how well this code matches the user's stated requirements." | |
chunk_prompt = ( | |
"You are a highly precise and strict JSON generator. Analyze the following code chunk. " | |
"Your ONLY output must be a valid JSON object with the following keys: 'strength', 'weaknesses', 'speciality', 'relevance rating'. " | |
"All property names and string values MUST use double quotes (\"). Do NOT use single quotes. " | |
"For 'relevance rating', you MUST use ONLY one of these exact values: 'very low', 'low', 'high', 'very high'. " | |
"Do NOT include any explanation, markdown, or text outside the JSON. Do NOT add any commentary, preamble, or postscript. " | |
"If you cannot answer, still return a valid JSON with empty strings for each key. " | |
f"{requirements_section}" | |
"Example of the ONLY valid output:\n" | |
'{\n "strength": "...", \n "weaknesses": "...", \n "speciality": "...", \n "relevance rating": "high"\n}' | |
) | |
response = client.chat.completions.create( | |
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", | |
messages=[ | |
{"role": "system", "content": chunk_prompt}, | |
{"role": "user", "content": code} | |
], | |
temperature=0.4 | |
) | |
return response.choices[0].message.content | |
def aggregate_chunk_analyses(chunk_jsons: list, user_requirements: str = "") -> str: | |
""" | |
Aggregates a list of chunk JSONs into a single JSON summary using the LLM. | |
""" | |
from openai import OpenAI | |
client = OpenAI(api_key=os.getenv("modal_api")) | |
client.base_url = os.getenv("base_url") | |
# Build the user requirements section | |
requirements_section = "" | |
if user_requirements.strip(): | |
requirements_section = f"\n\nUSER REQUIREMENTS:\n{user_requirements}\n\nWhen aggregating the relevance rating, consider how well the overall repository matches the user's stated requirements." | |
aggregation_prompt = ( | |
"You are a highly precise and strict, code analyzer and JSON generator. You are given a list of JSON analyses of code chunks. " | |
"Aggregate these into a SINGLE overall JSON summary with the same keys: 'strength', 'weaknesses', 'speciality', 'relevance rating'. " | |
"All property names and string values MUST use double quotes (\"). Do NOT use single quotes. " | |
"For 'relevance rating', you MUST use ONLY one of these exact values: 'very low', 'low', 'high', 'very high'. " | |
"Summarize and combine the information from all chunks. Do NOT include any explanation, markdown, or text outside the JSON. " | |
"If a key is missing in all chunks, use an empty string. " | |
f"{requirements_section}" | |
"Example of the ONLY valid output:\n" | |
'{\n "strength": "...", \n "weaknesses": "...", \n "speciality": "...", \n "relevance rating": "high"\n}' | |
) | |
user_content = "Here are the chunk analyses:\n" + "\n".join(chunk_jsons) | |
response = client.chat.completions.create( | |
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", | |
messages=[ | |
{"role": "system", "content": aggregation_prompt}, | |
{"role": "user", "content": user_content} | |
], | |
max_tokens=512, | |
temperature=0.3 | |
) | |
return response.choices[0].message.content | |
def analyze_combined_file(output_file="combined_repo.txt", user_requirements: str = ""): | |
""" | |
Reads the combined file, splits it into 500-line chunks, analyzes each chunk, and aggregates the LLM's output into a final summary. | |
Now includes user requirements for better relevance rating. | |
Returns the chunk JSONs (for debugging) and the aggregated analysis as a string. | |
""" | |
try: | |
with open(output_file, "r", encoding="utf-8") as f: | |
lines = f.readlines() | |
chunk_size = 1200 | |
chunk_jsons = [] | |
for i in range(0, len(lines), chunk_size): | |
chunk = "".join(lines[i:i+chunk_size]) | |
analysis = analyze_code_chunk(chunk, user_requirements) | |
chunk_jsons.append(analysis) | |
final_summary = aggregate_chunk_analyses(chunk_jsons, user_requirements) | |
debug_output = ( | |
"==== Chunk JSON Outputs ====" | |
+ "\n\n".join([f"Chunk {i+1} JSON:\n{chunk_jsons[i]}" for i in range(len(chunk_jsons))]) | |
+ "\n\n==== Final Aggregated Summary ====" | |
+ f"\n{final_summary}" | |
) | |
return debug_output | |
except Exception as e: | |
return f"Error analyzing combined file: {e}" | |
def analyze_repo_chunk_for_context(chunk: str, repo_id: str) -> str: | |
""" | |
Analyze a repository chunk to create conversational context for the chatbot. | |
This creates summaries focused on helping users understand the repository. | |
""" | |
try: | |
from openai import OpenAI | |
client = OpenAI(api_key=os.getenv("modal_api")) | |
client.base_url = os.getenv("base_url") | |
context_prompt = f"""You are analyzing a chunk of code from the repository '{repo_id}' to create a conversational summary for a chatbot assistant. | |
Create a concise but informative summary that helps understand: | |
- What this code section does | |
- Key functions, classes, or components | |
- Important features or capabilities | |
- How it relates to the overall repository purpose | |
- Any notable patterns or technologies used | |
Focus on information that would be useful for answering user questions about the repository. | |
Repository chunk: | |
{chunk} | |
Provide a clear, conversational summary in 2-3 paragraphs:""" | |
response = client.chat.completions.create( | |
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", | |
messages=[ | |
{"role": "system", "content": "You are an expert code analyst creating conversational summaries for a repository assistant chatbot."}, | |
{"role": "user", "content": context_prompt} | |
], | |
max_tokens=600, # Increased for more detailed analysis with larger chunks | |
temperature=0.3 | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
logger.error(f"Error analyzing chunk for context: {e}") | |
return f"Code section analysis unavailable: {e}" | |
def create_repo_context_summary(repo_content: str, repo_id: str) -> str: | |
""" | |
Create a comprehensive context summary by analyzing the repository in chunks. | |
Returns a detailed summary that the chatbot can use to answer questions. | |
""" | |
try: | |
lines = repo_content.split('\n') | |
chunk_size = 1200 # Increased for better context and fewer API calls | |
chunk_summaries = [] | |
logger.info(f"Analyzing repository {repo_id} in chunks for chatbot context") | |
for i in range(0, len(lines), chunk_size): | |
chunk = '\n'.join(lines[i:i+chunk_size]) | |
if chunk.strip(): # Only analyze non-empty chunks | |
summary = analyze_repo_chunk_for_context(chunk, repo_id) | |
chunk_summaries.append(f"=== Section {len(chunk_summaries) + 1} ===\n{summary}") | |
# Create final comprehensive summary | |
try: | |
from openai import OpenAI | |
client = OpenAI(api_key=os.getenv("modal_api")) | |
client.base_url = os.getenv("base_url") | |
final_prompt = f"""Based on the following section summaries of repository '{repo_id}', create a comprehensive overview that a chatbot can use to answer user questions. | |
Section Summaries: | |
{chr(10).join(chunk_summaries)} | |
Create a well-structured overview covering: | |
1. Repository Purpose & Main Functionality | |
2. Key Components & Architecture | |
3. Important Features & Capabilities | |
4. Technology Stack & Dependencies | |
5. Usage Patterns & Examples | |
Make this comprehensive but conversational - it will be used by a chatbot to answer user questions about the repository.""" | |
response = client.chat.completions.create( | |
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", | |
messages=[ | |
{"role": "system", "content": "You are creating a comprehensive repository summary for a chatbot assistant."}, | |
{"role": "user", "content": final_prompt} | |
], | |
max_tokens=1500, # Increased for more comprehensive summaries | |
temperature=0.3 | |
) | |
final_summary = response.choices[0].message.content | |
# Combine everything for the chatbot context | |
full_context = f"""=== REPOSITORY ANALYSIS FOR {repo_id.upper()} === | |
{final_summary} | |
=== DETAILED SECTION SUMMARIES === | |
{chr(10).join(chunk_summaries)}""" | |
logger.info(f"Created comprehensive context summary for {repo_id}") | |
return full_context | |
except Exception as e: | |
logger.error(f"Error creating final summary: {e}") | |
# Fallback to just section summaries | |
return f"=== REPOSITORY ANALYSIS FOR {repo_id.upper()} ===\n\n" + '\n\n'.join(chunk_summaries) | |
except Exception as e: | |
logger.error(f"Error creating repo context summary: {e}") | |
return f"Repository analysis unavailable: {e}" | |
def handle_load_repository(repo_id: str) -> Tuple[str, str]: | |
"""Load a specific repository and prepare it for exploration with chunk-based analysis.""" | |
if not repo_id.strip(): | |
return "Status: Please enter a repository ID.", "" | |
try: | |
logger.info(f"Loading repository for exploration: {repo_id}") | |
# Download and process the repository | |
try: | |
download_filtered_space_files(repo_id, local_dir="repo_files", file_extensions=['.py', '.md', '.txt']) | |
combined_text_path = combine_repo_files_for_llm() | |
except Exception as e: | |
logger.error(f"Error downloading repository {repo_id}: {e}") | |
error_status = f"β Error downloading repository: {e}" | |
return error_status, "" | |
with open(combined_text_path, "r", encoding="utf-8") as f: | |
repo_content = f.read() | |
status = f"β Repository '{repo_id}' loaded successfully!\\nπ Files processed and ready for exploration.\\nπ Analyzing repository in chunks for comprehensive context...\\nπ¬ You can now ask questions about this repository." | |
# Create comprehensive context summary using chunk analysis | |
logger.info(f"Creating context summary for {repo_id}") | |
context_summary = create_repo_context_summary(repo_content, repo_id) | |
logger.info(f"Repository {repo_id} loaded and analyzed successfully for exploration") | |
return status, context_summary | |
except Exception as e: | |
logger.error(f"Error loading repository {repo_id}: {e}") | |
error_status = f"β Error loading repository: {e}" | |
return error_status, "" | |