HF_RepoSense / analyzer.py
naman1102's picture
Update analyzer.py
dbe96a7
raw
history blame
10 kB
import openai
import os
import json
import re
def analyze_code(code: str) -> str:
"""
Uses qwen2.5-coder-7b-instruct-awq model to analyze the given code.
Returns the analysis as a string.
"""
from openai import OpenAI
client = OpenAI(api_key=os.getenv("modal_api"))
client.base_url = os.getenv("base_url")
system_prompt = (
"You are a highly precise and strict JSON generator. Analyze the code given to you. "
"Your ONLY output must be a valid JSON object with the following keys: 'strength', 'weaknesses', 'speciality', 'relevance rating'. "
"For 'relevance rating', you MUST use ONLY one of these exact values: 'very low', 'low', 'high', 'very high'. "
"Do NOT include any explanation, markdown, or text outside the JSON. Do NOT add any commentary, preamble, or postscript. "
"If you cannot answer, still return a valid JSON with empty strings for each key. "
"Example of the ONLY valid output:\n"
"{\n 'strength': '...', \n 'weaknesses': '...', \n 'speciality': '...', \n 'relevance rating': 'high'\n}"
)
response = client.chat.completions.create(
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ", # Updated model
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": code}
],
max_tokens=512,
temperature=0.4
)
return response.choices[0].message.content
def parse_llm_json_response(response: str):
try:
print("DEBUGGGGG ::: ", response)
# 1. Extract the JSON object part of the string
start = response.find('{')
end = response.rfind('}')
if start == -1 or end == -1 or end < start:
raise ValueError("No valid JSON object found in the response.")
json_str = response[start:end+1]
# 2. Replace single quotes used for keys/values with double quotes.
# This handles cases like {'key': 'value'}
json_str = re.sub(r"'", '"', json_str)
# 3. Find all string values and escape any unescaped double quotes inside them.
# This uses a function as the replacement in re.sub
def escape_inner_quotes(match):
# The match object gives us the full string matched by the regex.
# We take the part between the outer quotes (group 1)
# and replace any \" with a temporary unique placeholder.
# Then, we replace any remaining " with \", and finally
# restore the original escaped quotes.
inner_content = match.group(1)
placeholder = "___TEMP_QUOTE___"
inner_content = inner_content.replace('\\"', placeholder)
inner_content = inner_content.replace('"', '\\"')
inner_content = inner_content.replace(placeholder, '\\"')
return f'"{inner_content}"'
# This regex finds a double quote, captures everything until the next double quote,
# and then applies the function to that captured group.
json_str = re.sub(r'"(.*?)"', escape_inner_quotes, json_str)
return json.loads(json_str)
except Exception as e:
print("DEBUGGGGG error ::: ", e)
return {"error": f"Failed to parse JSON: {e}", "raw": response}
def combine_repo_files_for_llm(repo_dir="repo_files", output_file="combined_repo.txt"):
"""
Combines all .py and .md files in the given directory (recursively) into a single text file.
Returns the path to the combined file.
"""
combined_content = []
seen_files = set()
# Priority files
priority_files = ["app.py", "README.md"]
for pf in priority_files:
pf_path = os.path.join(repo_dir, pf)
if os.path.isfile(pf_path):
try:
with open(pf_path, "r", encoding="utf-8") as f:
combined_content.append(f"\n# ===== File: {pf} =====\n")
combined_content.append(f.read())
seen_files.add(os.path.abspath(pf_path))
except Exception as e:
combined_content.append(f"\n# Could not read {pf_path}: {e}\n")
# All other .py and .md files
for root, _, files in os.walk(repo_dir):
for file in files:
if file.endswith(".py") or file.endswith(".md"):
file_path = os.path.join(root, file)
abs_path = os.path.abspath(file_path)
if abs_path in seen_files:
continue
try:
with open(file_path, "r", encoding="utf-8") as f:
combined_content.append(f"\n# ===== File: {file} =====\n")
combined_content.append(f.read())
seen_files.add(abs_path)
except Exception as e:
combined_content.append(f"\n# Could not read {file_path}: {e}\n")
with open(output_file, "w", encoding="utf-8") as out_f:
out_f.write("\n".join(combined_content))
return output_file
def analyze_code_chunk(code: str, user_requirements: str = "") -> str:
"""
Analyzes a code chunk and returns a JSON summary for that chunk.
"""
from openai import OpenAI
client = OpenAI(api_key=os.getenv("modal_api"))
client.base_url = os.getenv("base_url")
# Build the user requirements section
requirements_section = ""
if user_requirements.strip():
requirements_section = f"\n\nUSER REQUIREMENTS:\n{user_requirements}\n\nWhen rating relevance, consider how well this code matches the user's stated requirements."
chunk_prompt = (
"You are a highly precise and strict JSON generator. Analyze the following code chunk. "
"Your ONLY output must be a valid JSON object with the following keys: 'strength', 'weaknesses', 'speciality', 'relevance rating'. "
"All property names and string values MUST use double quotes (\"). Do NOT use single quotes. "
"For 'relevance rating', you MUST use ONLY one of these exact values: 'very low', 'low', 'high', 'very high'. "
"Do NOT include any explanation, markdown, or text outside the JSON. Do NOT add any commentary, preamble, or postscript. "
"If you cannot answer, still return a valid JSON with empty strings for each key. "
f"{requirements_section}"
"Example of the ONLY valid output:\n"
'{\n "strength": "...", \n "weaknesses": "...", \n "speciality": "...", \n "relevance rating": "high"\n}'
)
response = client.chat.completions.create(
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ",
messages=[
{"role": "system", "content": chunk_prompt},
{"role": "user", "content": code}
],
temperature=0.4
)
return response.choices[0].message.content
def aggregate_chunk_analyses(chunk_jsons: list, user_requirements: str = "") -> str:
"""
Aggregates a list of chunk JSONs into a single JSON summary using the LLM.
"""
from openai import OpenAI
client = OpenAI(api_key=os.getenv("modal_api"))
client.base_url = os.getenv("base_url")
# Build the user requirements section
requirements_section = ""
if user_requirements.strip():
requirements_section = f"\n\nUSER REQUIREMENTS:\n{user_requirements}\n\nWhen aggregating the relevance rating, consider how well the overall repository matches the user's stated requirements."
aggregation_prompt = (
"You are a highly precise and strict, code analyzer and JSON generator. You are given a list of JSON analyses of code chunks. "
"Aggregate these into a SINGLE overall JSON summary with the same keys: 'strength', 'weaknesses', 'speciality', 'relevance rating'. "
"All property names and string values MUST use double quotes (\"). Do NOT use single quotes. "
"For 'relevance rating', you MUST use ONLY one of these exact values: 'very low', 'low', 'high', 'very high'. "
"Summarize and combine the information from all chunks. Do NOT include any explanation, markdown, or text outside the JSON. "
"If a key is missing in all chunks, use an empty string. "
f"{requirements_section}"
"Example of the ONLY valid output:\n"
'{\n "strength": "...", \n "weaknesses": "...", \n "speciality": "...", \n "relevance rating": "high"\n}'
)
user_content = "Here are the chunk analyses:\n" + "\n".join(chunk_jsons)
response = client.chat.completions.create(
model="Orion-zhen/Qwen2.5-Coder-7B-Instruct-AWQ",
messages=[
{"role": "system", "content": aggregation_prompt},
{"role": "user", "content": user_content}
],
max_tokens=512,
temperature=0.3
)
return response.choices[0].message.content
def analyze_combined_file(output_file="combined_repo.txt", user_requirements: str = ""):
"""
Reads the combined file, splits it into 500-line chunks, analyzes each chunk, and aggregates the LLM's output into a final summary.
Now includes user requirements for better relevance rating.
Returns the chunk JSONs (for debugging) and the aggregated analysis as a string.
"""
try:
with open(output_file, "r", encoding="utf-8") as f:
lines = f.readlines()
chunk_size = 1200
chunk_jsons = []
for i in range(0, len(lines), chunk_size):
chunk = "".join(lines[i:i+chunk_size])
analysis = analyze_code_chunk(chunk, user_requirements)
chunk_jsons.append(analysis)
final_summary = aggregate_chunk_analyses(chunk_jsons, user_requirements)
debug_output = (
"==== Chunk JSON Outputs ===="
+ "\n\n".join([f"Chunk {i+1} JSON:\n{chunk_jsons[i]}" for i in range(len(chunk_jsons))])
+ "\n\n==== Final Aggregated Summary ===="
+ f"\n{final_summary}"
)
return debug_output
except Exception as e:
return f"Error analyzing combined file: {e}"