Spaces:
Sleeping
Sleeping
# import json | |
# def analyze_code(language, code, tokenizer, model): | |
# messages = [ | |
# { | |
# "role": "system", | |
# "content": ( | |
# "You are a helpful and expert-level AI code reviewer and bug fixer. " | |
# "Your task is to analyze the given buggy code in the specified programming language, " | |
# "identify bugs (logical, syntax, runtime, etc.), and fix them. " | |
# "Return a JSON object with the following keys:\n\n" | |
# "1. 'bug_analysis': a list of objects, each containing:\n" | |
# " - 'line_number': the line number (approximate if needed)\n" | |
# " - 'error_message': a short name of the bug\n" | |
# " - 'explanation': short explanation of the problem\n" | |
# " - 'fix_suggestion': how to fix it\n" | |
# "2. 'corrected_code': the entire corrected code block.\n\n" | |
# "Respond with ONLY the raw JSON object, no extra commentary or markdown." | |
# ) | |
# }, | |
# { | |
# "role": "user", | |
# "content": f"π» Language: {language}\nπ Buggy Code:\n```{language.lower()}\n{code.strip()}\n```" | |
# } | |
# ] | |
# inputs = tokenizer.apply_chat_template(messages, add_generation_prompt=True, return_tensors="pt").to(model.device) | |
# attention_mask = (inputs != tokenizer.pad_token_id).long() | |
# outputs = model.generate( | |
# inputs, | |
# attention_mask=attention_mask, | |
# max_new_tokens=1024, | |
# do_sample=False, | |
# pad_token_id=tokenizer.eos_token_id, | |
# eos_token_id=tokenizer.eos_token_id | |
# ) | |
# response = tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True) | |
# # Try parsing response to JSON | |
# try: | |
# json_output = json.loads(response) | |
# return json_output | |
# except json.JSONDecodeError: | |
# print("β οΈ Could not decode response into JSON. Here's the raw output:\n") | |
# print(response) | |
# return None | |
# import json | |
# import logging | |
# import time | |
# import torch | |
# # Configure logging | |
# logger = logging.getLogger(__name__) | |
# def analyze_code(language, code, tokenizer, model): | |
# """ | |
# Analyze code and return bug analysis with improved logging and error handling | |
# """ | |
# start_time = time.time() | |
# logger.info(f"π Starting analysis for {language} code ({len(code)} characters)") | |
# try: | |
# # Prepare messages | |
# messages = [ | |
# { | |
# "role": "system", | |
# "content": ( | |
# "You are a helpful and expert-level AI code reviewer and bug fixer. " | |
# "Your task is to analyze the given buggy code in the specified programming language, " | |
# "identify bugs (logical, syntax, runtime, etc.), and fix them. " | |
# "Return a JSON object with the following keys:\n\n" | |
# "1. 'bug_analysis': a list of objects, each containing:\n" | |
# " - 'line_number': the line number (approximate if needed)\n" | |
# " - 'error_message': a short name of the bug\n" | |
# " - 'explanation': short explanation of the problem\n" | |
# " - 'fix_suggestion': how to fix it\n" | |
# "2. 'corrected_code': the entire corrected code block.\n\n" | |
# "Respond with ONLY the raw JSON object, no extra commentary or markdown." | |
# ) | |
# }, | |
# { | |
# "role": "user", | |
# "content": f"π» Language: {language}\nπ Buggy Code:\n```{language.lower()}\n{code.strip()}\n```" | |
# } | |
# ] | |
# logger.info("π§ Applying chat template...") | |
# inputs = tokenizer.apply_chat_template( | |
# messages, | |
# add_generation_prompt=True, | |
# return_tensors="pt" | |
# ).to(model.device) | |
# attention_mask = (inputs != tokenizer.pad_token_id).long() | |
# logger.info(f"π Input length: {inputs.shape[1]} tokens") | |
# logger.info("π Starting model generation...") | |
# generation_start = time.time() | |
# # Generate with more conservative settings | |
# with torch.no_grad(): # Ensure no gradients are computed | |
# outputs = model.generate( | |
# inputs, | |
# attention_mask=attention_mask, | |
# max_new_tokens=512, # Reduced from 1024 for faster inference | |
# do_sample=False, | |
# temperature=0.1, # Add temperature for more consistent output | |
# pad_token_id=tokenizer.eos_token_id, | |
# eos_token_id=tokenizer.eos_token_id, | |
# use_cache=True, # Enable KV cache for efficiency | |
# ) | |
# generation_time = time.time() - generation_start | |
# logger.info(f"β‘ Generation completed in {generation_time:.2f} seconds") | |
# logger.info("π Decoding response...") | |
# response = tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True) | |
# logger.info(f"π Response length: {len(response)} characters") | |
# logger.info(f"π First 100 chars: {response[:100]}...") | |
# # Try parsing response to JSON | |
# logger.info("π Attempting to parse JSON...") | |
# try: | |
# # Clean up response - remove any markdown formatting | |
# cleaned_response = response.strip() | |
# if cleaned_response.startswith('```json'): | |
# cleaned_response = cleaned_response[7:] | |
# if cleaned_response.startswith('```'): | |
# cleaned_response = cleaned_response[3:] | |
# if cleaned_response.endswith('```'): | |
# cleaned_response = cleaned_response[:-3] | |
# cleaned_response = cleaned_response.strip() | |
# json_output = json.loads(cleaned_response) | |
# total_time = time.time() - start_time | |
# logger.info(f"β Analysis completed successfully in {total_time:.2f} seconds") | |
# # Validate the JSON structure | |
# if not isinstance(json_output, dict): | |
# raise ValueError("Response is not a dictionary") | |
# if 'bug_analysis' not in json_output: | |
# logger.warning("β οΈ Missing 'bug_analysis' key, adding empty list") | |
# json_output['bug_analysis'] = [] | |
# if 'corrected_code' not in json_output: | |
# logger.warning("β οΈ Missing 'corrected_code' key, adding original code") | |
# json_output['corrected_code'] = code | |
# return json_output | |
# except json.JSONDecodeError as e: | |
# logger.error(f"β JSON decode error: {e}") | |
# logger.error(f"π Raw response: {repr(response)}") | |
# # Return a fallback structure with the raw response | |
# fallback_response = { | |
# "bug_analysis": [{ | |
# "line_number": 1, | |
# "error_message": "Analysis parsing failed", | |
# "explanation": "The AI model returned a response that couldn't be parsed as JSON", | |
# "fix_suggestion": "Please try again or check the code format" | |
# }], | |
# "corrected_code": code, | |
# "raw_output": response, | |
# "parsing_error": str(e) | |
# } | |
# return fallback_response | |
# except Exception as e: | |
# total_time = time.time() - start_time | |
# logger.error(f"β Analysis failed after {total_time:.2f} seconds: {str(e)}") | |
# logger.error(f"π₯ Exception type: {type(e).__name__}") | |
# # Return error response | |
# return { | |
# "bug_analysis": [{ | |
# "line_number": 1, | |
# "error_message": "Analysis failed", | |
# "explanation": f"An error occurred during analysis: {str(e)}", | |
# "fix_suggestion": "Please try again or contact support" | |
# }], | |
# "corrected_code": code, | |
# "error": str(e), | |
# "error_type": type(e).__name__ | |
# } | |
# analyzer.py | |
# analyzer.py | |
import torch | |
import json | |
import time | |
import logging | |
# Configure logger | |
logger = logging.getLogger("CodeAnalyzer") | |
logger.setLevel(logging.INFO) | |
handler = logging.StreamHandler() | |
formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] - %(message)s") | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
def analyze_code(tokenizer, model, language, code): | |
""" | |
Analyze and fix buggy code using CodeT5+ model with 'fix:' prompt prefix. | |
Works across multiple programming languages. | |
""" | |
start_time = time.time() | |
# Prepare prompt in CodeT5+ style | |
prompt = f"fix: {code.strip()}" | |
logger.info(f"π Starting analysis for language: {language}") | |
logger.info(f"π§Ύ Prompt: {prompt[:80]}...") | |
try: | |
# Tokenize and generate response | |
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512).to(model.device) | |
with torch.no_grad(): | |
output = model.generate(**inputs, max_new_tokens=1024) | |
# Decode output | |
response = tokenizer.decode(output[0], skip_special_tokens=True).strip() | |
elapsed = round(time.time() - start_time, 2) | |
logger.info(f"β Inference completed in {elapsed}s") | |
return { | |
"bug_analysis": [], # Optional: You could add heuristics here | |
"corrected_code": response | |
} | |
except Exception as e: | |
logger.error(f"β Error during analysis: {e}") | |
return { | |
"bug_analysis": [{ | |
"line_number": 0, | |
"error_message": "Inference failed", | |
"explanation": str(e), | |
"fix_suggestion": "Try again with simpler code or retry later" | |
}], | |
"corrected_code": code | |
} | |