|
import openai |
|
import os |
|
import json |
|
import re |
|
import logging |
|
from typing import Tuple |
|
from hf_utils import download_filtered_space_files |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
def analyze_code(code: str) -> str: |
|
""" |
|
Uses qwen2.5-coder-7b-instruct-awq model to analyze the given code. |
|
Returns the analysis as a string. |
|
""" |
|
from openai import OpenAI |
|
client = OpenAI(api_key=os.getenv("modal_api")) |
|
client.base_url = os.getenv("base_url") |
|
system_prompt = ( |
|
"You are a highly precise and strict JSON generator. Analyze the code given to you. " |
|
"Your ONLY output must be a valid JSON object with the following keys: 'strength', 'weaknesses', 'speciality', 'relevance rating'. " |
|
"For 'relevance rating', you MUST use ONLY one of these exact values: 'very low', 'low', 'high', 'very high'. " |
|
"Do NOT include any explanation, markdown, or text outside the JSON. Do NOT add any commentary, preamble, or postscript. " |
|
"If you cannot answer, still return a valid JSON with empty strings for each key. " |
|
"Example of the ONLY valid output:\n" |
|
"{\n 'strength': '...', \n 'weaknesses': '...', \n 'speciality': '...', \n 'relevance rating': 'high'\n}" |
|
) |
|
response = client.chat.completions.create( |
|
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", |
|
messages=[ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": code} |
|
], |
|
max_tokens=512, |
|
temperature=0.4 |
|
) |
|
return response.choices[0].message.content |
|
|
|
def parse_llm_json_response(response: str): |
|
""" |
|
Robust JSON parser with multiple fallback strategies for LLM responses. |
|
""" |
|
logger.info(f"Attempting to parse LLM response: {response[:100]}...") |
|
|
|
|
|
try: |
|
|
|
cleaned = response.strip() |
|
result = json.loads(cleaned) |
|
logger.info("β
Direct JSON parsing successful") |
|
return result |
|
except: |
|
pass |
|
|
|
|
|
try: |
|
|
|
start = response.find('{') |
|
if start == -1: |
|
raise ValueError("No opening brace found") |
|
|
|
|
|
brace_count = 0 |
|
end = start |
|
for i, char in enumerate(response[start:], start): |
|
if char == '{': |
|
brace_count += 1 |
|
elif char == '}': |
|
brace_count -= 1 |
|
if brace_count == 0: |
|
end = i |
|
break |
|
|
|
if brace_count != 0: |
|
|
|
end = response.rfind('}') |
|
if end == -1 or end < start: |
|
raise ValueError("No matching closing brace found") |
|
|
|
json_str = response[start:end+1] |
|
result = json.loads(json_str) |
|
logger.info("β
JSON block extraction successful") |
|
return result |
|
except Exception as e: |
|
logger.warning(f"JSON block extraction failed: {e}") |
|
|
|
|
|
try: |
|
|
|
start = response.find('{') |
|
end = response.rfind('}') |
|
if start != -1 and end != -1 and end > start: |
|
json_str = response[start:end+1] |
|
|
|
|
|
|
|
json_str = re.sub(r"(?<!\\)'([^']*)'(?=\s*[,}])", r'"\1"', json_str) |
|
json_str = re.sub(r"(?<!\\)'([^']*)'(?=\s*:)", r'"\1"', json_str) |
|
|
|
|
|
json_str = re.sub(r':\s*"([^"]*)"([^",}]*)"', r': "\1\2"', json_str) |
|
|
|
|
|
json_str = re.sub(r',(\s*[}\]])', r'\1', json_str) |
|
|
|
|
|
result = json.loads(json_str) |
|
logger.info("β
JSON cleaning and fixing successful") |
|
return result |
|
except Exception as e: |
|
logger.warning(f"JSON cleaning failed: {e}") |
|
|
|
|
|
try: |
|
logger.info("Attempting manual field extraction...") |
|
result = {} |
|
|
|
|
|
patterns = { |
|
'strength': [ |
|
r'"strength"\s*:\s*"([^"]*)"', |
|
r"'strength'\s*:\s*'([^']*)'", |
|
r'strength[:\s]+"([^"]*)"', |
|
r'strength[:\s]+\'([^\']*)\'' |
|
], |
|
'weaknesses': [ |
|
r'"weaknesses"\s*:\s*"([^"]*)"', |
|
r"'weaknesses'\s*:\s*'([^']*)'", |
|
r'weaknesses[:\s]+"([^"]*)"', |
|
r'weaknesses[:\s]+\'([^\']*)\'' |
|
], |
|
'speciality': [ |
|
r'"speciality"\s*:\s*"([^"]*)"', |
|
r"'speciality'\s*:\s*'([^']*)'", |
|
r'speciality[:\s]+"([^"]*)"', |
|
r'speciality[:\s]+\'([^\']*)\'' |
|
], |
|
'relevance rating': [ |
|
r'"relevance rating"\s*:\s*"([^"]*)"', |
|
r"'relevance rating'\s*:\s*'([^']*)'", |
|
r'relevance[^:]*rating[:\s]+"([^"]*)"', |
|
r'relevance[^:]*rating[:\s]+\'([^\']*)\'' |
|
] |
|
} |
|
|
|
for field, field_patterns in patterns.items(): |
|
found = False |
|
for pattern in field_patterns: |
|
match = re.search(pattern, response, re.IGNORECASE | re.DOTALL) |
|
if match: |
|
value = match.group(1).strip() |
|
|
|
value = re.sub(r'\\+(["\'])', r'\1', value) |
|
value = value.replace('\\"', '"').replace("\\'", "'") |
|
result[field] = value |
|
found = True |
|
break |
|
|
|
if not found: |
|
result[field] = "" |
|
|
|
|
|
valid_ratings = ['very low', 'low', 'high', 'very high'] |
|
if result.get('relevance rating', '').lower() not in [r.lower() for r in valid_ratings]: |
|
|
|
rating = result.get('relevance rating', '').lower() |
|
if 'very' in rating and 'low' in rating: |
|
result['relevance rating'] = 'very low' |
|
elif 'very' in rating and 'high' in rating: |
|
result['relevance rating'] = 'very high' |
|
elif 'low' in rating: |
|
result['relevance rating'] = 'low' |
|
elif 'high' in rating: |
|
result['relevance rating'] = 'high' |
|
else: |
|
result['relevance rating'] = 'low' |
|
|
|
logger.info("β
Manual field extraction successful") |
|
return result |
|
|
|
except Exception as e: |
|
logger.warning(f"Manual extraction failed: {e}") |
|
|
|
|
|
logger.error("All JSON parsing strategies failed, returning empty structure") |
|
return { |
|
"strength": "Analysis could not be completed - please try again", |
|
"weaknesses": "Analysis could not be completed - please try again", |
|
"speciality": "Analysis could not be completed - please try again", |
|
"relevance rating": "low", |
|
"error": f"Failed to parse LLM response after all strategies. Raw: {response[:200]}..." |
|
} |
|
|
|
def combine_repo_files_for_llm(repo_dir="repo_files", output_file="combined_repo.txt"): |
|
""" |
|
Combines all .py, .md, and .txt files in the given directory (recursively) into a single text file. |
|
Returns the path to the combined file. |
|
""" |
|
combined_content = [] |
|
seen_files = set() |
|
|
|
priority_files = ["app.py", "README.md", "requirements.txt"] |
|
for pf in priority_files: |
|
pf_path = os.path.join(repo_dir, pf) |
|
if os.path.isfile(pf_path): |
|
try: |
|
with open(pf_path, "r", encoding="utf-8") as f: |
|
combined_content.append(f"\n# ===== File: {pf} =====\n") |
|
combined_content.append(f.read()) |
|
seen_files.add(os.path.abspath(pf_path)) |
|
except Exception as e: |
|
combined_content.append(f"\n# Could not read {pf_path}: {e}\n") |
|
|
|
for root, _, files in os.walk(repo_dir): |
|
for file in files: |
|
if file.endswith(".py") or file.endswith(".md") or file.endswith(".txt"): |
|
file_path = os.path.join(root, file) |
|
abs_path = os.path.abspath(file_path) |
|
if abs_path in seen_files: |
|
continue |
|
try: |
|
with open(file_path, "r", encoding="utf-8") as f: |
|
combined_content.append(f"\n# ===== File: {file} =====\n") |
|
combined_content.append(f.read()) |
|
seen_files.add(abs_path) |
|
except Exception as e: |
|
combined_content.append(f"\n# Could not read {file_path}: {e}\n") |
|
with open(output_file, "w", encoding="utf-8") as out_f: |
|
out_f.write("\n".join(combined_content)) |
|
return output_file |
|
|
|
def analyze_code_chunk(code: str, user_requirements: str = "") -> str: |
|
""" |
|
Analyzes a code chunk and returns a JSON summary for that chunk. |
|
""" |
|
from openai import OpenAI |
|
client = OpenAI(api_key=os.getenv("modal_api")) |
|
client.base_url = os.getenv("base_url") |
|
|
|
|
|
requirements_section = "" |
|
if user_requirements.strip(): |
|
requirements_section = f"\n\nUSER REQUIREMENTS:\n{user_requirements}\n\nWhen rating relevance, consider how well this code matches the user's stated requirements." |
|
|
|
chunk_prompt = ( |
|
"You are a highly precise and strict JSON generator. Analyze the following code chunk. " |
|
"Your ONLY output must be a valid JSON object with the following keys: 'strength', 'weaknesses', 'speciality', 'relevance rating'. " |
|
"All property names and string values MUST use double quotes (\"). Do NOT use single quotes. " |
|
"For 'relevance rating', you MUST use ONLY one of these exact values: 'very low', 'low', 'high', 'very high'. " |
|
"Do NOT include any explanation, markdown, or text outside the JSON. Do NOT add any commentary, preamble, or postscript. " |
|
"If you cannot answer, still return a valid JSON with empty strings for each key. " |
|
f"{requirements_section}" |
|
"Example of the ONLY valid output:\n" |
|
'{\n "strength": "...", \n "weaknesses": "...", \n "speciality": "...", \n "relevance rating": "high"\n}' |
|
) |
|
|
|
response = client.chat.completions.create( |
|
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", |
|
messages=[ |
|
{"role": "system", "content": chunk_prompt}, |
|
{"role": "user", "content": code} |
|
], |
|
|
|
temperature=0.4 |
|
) |
|
return response.choices[0].message.content |
|
|
|
def aggregate_chunk_analyses(chunk_jsons: list, user_requirements: str = "") -> str: |
|
""" |
|
Aggregates a list of chunk JSONs into a single JSON summary using the LLM. |
|
""" |
|
from openai import OpenAI |
|
client = OpenAI(api_key=os.getenv("modal_api")) |
|
client.base_url = os.getenv("base_url") |
|
|
|
|
|
requirements_section = "" |
|
if user_requirements.strip(): |
|
requirements_section = f"\n\nUSER REQUIREMENTS:\n{user_requirements}\n\nWhen aggregating the relevance rating, consider how well the overall repository matches the user's stated requirements." |
|
|
|
aggregation_prompt = ( |
|
"You are a highly precise and strict, code analyzer and JSON generator. You are given a list of JSON analyses of code chunks. " |
|
"Aggregate these into a SINGLE overall JSON summary with the same keys: 'strength', 'weaknesses', 'speciality', 'relevance rating'. " |
|
"All property names and string values MUST use double quotes (\"). Do NOT use single quotes. " |
|
"For 'relevance rating', you MUST use ONLY one of these exact values: 'very low', 'low', 'high', 'very high'. " |
|
"Summarize and combine the information from all chunks. Do NOT include any explanation, markdown, or text outside the JSON. " |
|
"If a key is missing in all chunks, use an empty string. " |
|
f"{requirements_section}" |
|
"Example of the ONLY valid output:\n" |
|
'{\n "strength": "...", \n "weaknesses": "...", \n "speciality": "...", \n "relevance rating": "high"\n}' |
|
) |
|
user_content = "Here are the chunk analyses:\n" + "\n".join(chunk_jsons) |
|
response = client.chat.completions.create( |
|
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", |
|
messages=[ |
|
{"role": "system", "content": aggregation_prompt}, |
|
{"role": "user", "content": user_content} |
|
], |
|
max_tokens=512, |
|
temperature=0.3 |
|
) |
|
return response.choices[0].message.content |
|
|
|
def analyze_combined_file(output_file="combined_repo.txt", user_requirements: str = ""): |
|
""" |
|
Reads the combined file, splits it into 500-line chunks, analyzes each chunk, and aggregates the LLM's output into a final summary. |
|
Now includes user requirements for better relevance rating. |
|
Returns the chunk JSONs (for debugging) and the aggregated analysis as a string. |
|
""" |
|
try: |
|
with open(output_file, "r", encoding="utf-8") as f: |
|
lines = f.readlines() |
|
chunk_size = 1200 |
|
chunk_jsons = [] |
|
for i in range(0, len(lines), chunk_size): |
|
chunk = "".join(lines[i:i+chunk_size]) |
|
analysis = analyze_code_chunk(chunk, user_requirements) |
|
chunk_jsons.append(analysis) |
|
final_summary = aggregate_chunk_analyses(chunk_jsons, user_requirements) |
|
return final_summary |
|
except Exception as e: |
|
return f"Error analyzing combined file: {e}" |
|
|
|
def analyze_repo_chunk_for_context(chunk: str, repo_id: str) -> str: |
|
""" |
|
Analyze a repository chunk to create conversational context for the chatbot. |
|
This creates summaries focused on helping users understand the repository. |
|
""" |
|
try: |
|
from openai import OpenAI |
|
client = OpenAI(api_key=os.getenv("modal_api")) |
|
client.base_url = os.getenv("base_url") |
|
|
|
context_prompt = f"""You are analyzing a chunk of code from the repository '{repo_id}' to create a conversational summary for a chatbot assistant. |
|
|
|
Create a concise but informative summary that helps understand: |
|
- What this code section does |
|
- Key functions, classes, or components |
|
- Important features or capabilities |
|
- How it relates to the overall repository purpose |
|
- Any notable patterns or technologies used |
|
|
|
Focus on information that would be useful for answering user questions about the repository. |
|
|
|
Repository chunk: |
|
{chunk} |
|
|
|
Provide a clear, conversational summary in 2-3 paragraphs:""" |
|
|
|
response = client.chat.completions.create( |
|
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", |
|
messages=[ |
|
{"role": "system", "content": "You are an expert code analyst creating conversational summaries for a repository assistant chatbot."}, |
|
{"role": "user", "content": context_prompt} |
|
], |
|
max_tokens=600, |
|
temperature=0.3 |
|
) |
|
|
|
return response.choices[0].message.content |
|
|
|
except Exception as e: |
|
logger.error(f"Error analyzing chunk for context: {e}") |
|
return f"Code section analysis unavailable: {e}" |
|
|
|
def create_repo_context_summary(repo_content: str, repo_id: str) -> str: |
|
""" |
|
Create a comprehensive context summary by analyzing the repository in chunks. |
|
Returns a detailed summary that the chatbot can use to answer questions. |
|
""" |
|
try: |
|
lines = repo_content.split('\n') |
|
chunk_size = 1200 |
|
chunk_summaries = [] |
|
|
|
logger.info(f"Analyzing repository {repo_id} in chunks for chatbot context") |
|
|
|
for i in range(0, len(lines), chunk_size): |
|
chunk = '\n'.join(lines[i:i+chunk_size]) |
|
if chunk.strip(): |
|
summary = analyze_repo_chunk_for_context(chunk, repo_id) |
|
chunk_summaries.append(f"=== Section {len(chunk_summaries) + 1} ===\n{summary}") |
|
|
|
|
|
try: |
|
from openai import OpenAI |
|
client = OpenAI(api_key=os.getenv("modal_api")) |
|
client.base_url = os.getenv("base_url") |
|
|
|
final_prompt = f"""Based on the following section summaries of repository '{repo_id}', create a comprehensive overview that a chatbot can use to answer user questions. |
|
|
|
Section Summaries: |
|
{chr(10).join(chunk_summaries)} |
|
|
|
Create a well-structured overview covering: |
|
1. Repository Purpose & Main Functionality |
|
2. Key Components & Architecture |
|
3. Important Features & Capabilities |
|
4. Technology Stack & Dependencies |
|
5. Usage Patterns & Examples |
|
|
|
Make this comprehensive but conversational - it will be used by a chatbot to answer user questions about the repository.""" |
|
|
|
response = client.chat.completions.create( |
|
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", |
|
messages=[ |
|
{"role": "system", "content": "You are creating a comprehensive repository summary for a chatbot assistant."}, |
|
{"role": "user", "content": final_prompt} |
|
], |
|
max_tokens=1500, |
|
temperature=0.3 |
|
) |
|
|
|
final_summary = response.choices[0].message.content |
|
|
|
|
|
full_context = f"""=== REPOSITORY ANALYSIS FOR {repo_id.upper()} === |
|
|
|
{final_summary} |
|
|
|
=== DETAILED SECTION SUMMARIES === |
|
{chr(10).join(chunk_summaries)}""" |
|
|
|
logger.info(f"Created comprehensive context summary for {repo_id}") |
|
return full_context |
|
|
|
except Exception as e: |
|
logger.error(f"Error creating final summary: {e}") |
|
|
|
return f"=== REPOSITORY ANALYSIS FOR {repo_id.upper()} ===\n\n" + '\n\n'.join(chunk_summaries) |
|
|
|
except Exception as e: |
|
logger.error(f"Error creating repo context summary: {e}") |
|
return f"Repository analysis unavailable: {e}" |
|
|
|
def handle_load_repository(repo_id: str) -> Tuple[str, str]: |
|
"""Load a specific repository and prepare it for exploration with chunk-based analysis.""" |
|
if not repo_id.strip(): |
|
return "Status: Please enter a repository ID.", "" |
|
|
|
try: |
|
logger.info(f"Loading repository for exploration: {repo_id}") |
|
|
|
|
|
try: |
|
download_filtered_space_files(repo_id, local_dir="repo_files", file_extensions=['.py', '.md', '.txt']) |
|
combined_text_path = combine_repo_files_for_llm() |
|
|
|
except Exception as e: |
|
logger.error(f"Error downloading repository {repo_id}: {e}") |
|
error_status = f"β Error downloading repository: {e}" |
|
return error_status, "" |
|
|
|
with open(combined_text_path, "r", encoding="utf-8") as f: |
|
repo_content = f.read() |
|
|
|
status = f"β
Repository '{repo_id}' loaded successfully!\\nπ Files processed and ready for exploration.\\nπ Analyzing repository in chunks for comprehensive context...\\nπ¬ You can now ask questions about this repository." |
|
|
|
|
|
logger.info(f"Creating context summary for {repo_id}") |
|
context_summary = create_repo_context_summary(repo_content, repo_id) |
|
|
|
logger.info(f"Repository {repo_id} loaded and analyzed successfully for exploration") |
|
return status, context_summary |
|
|
|
except Exception as e: |
|
logger.error(f"Error loading repository {repo_id}: {e}") |
|
error_status = f"β Error loading repository: {e}" |
|
return error_status, "" |
|
|