# app.py - CodeLab Stage 3: Semantic Analysis - Fixed Version import gradio as gr import torch from transformers import AutoTokenizer, AutoModel, T5ForConditionalGeneration import json import re import ast import time from typing import Dict, List, Any, Optional import logging import traceback # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SemanticAnalyzer: def __init__(self): logger.info("🚀 Initializing CodeLab Semantic Analyzer...") self.models_loaded = False # Initialize models with error handling try: # CodeBERT for semantic understanding logger.info("📖 Loading CodeBERT...") self.codebert_tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base") self.codebert_model = AutoModel.from_pretrained("microsoft/codebert-base") # CodeT5 for code analysis and generation logger.info("🔧 Loading CodeT5...") self.codet5_tokenizer = AutoTokenizer.from_pretrained("Salesforce/codet5-base") # ✅ self.codet5_model = T5ForConditionalGeneration.from_pretrained("Salesforce/codet5-base") # Set models to eval mode for inference self.codebert_model.eval() self.codet5_model.eval() self.models_loaded = True logger.info("✅ All models loaded successfully!") except Exception as e: logger.error(f"❌ Error loading models: {str(e)}") self.models_loaded = False # Don't raise - allow fallback functionality def generate_code_embedding(self, code: str) -> List[float]: """Generate semantic embedding using CodeBERT""" if not self.models_loaded: logger.warning("⚠️ Models not loaded, returning zero embedding") return [0.0] * 768 try: # Clean and prepare code cleaned_code = self._clean_code_for_analysis(code) # Tokenize code inputs = self.codebert_tokenizer( cleaned_code, return_tensors="pt", max_length=512, truncation=True, padding=True ) # Generate embedding with torch.no_grad(): outputs = self.codebert_model(**inputs) # Use [CLS] token embedding (better for semantic representation) embedding = outputs.last_hidden_state[:, 0, :].squeeze() # Normalize embedding embedding_norm = torch.nn.functional.normalize(embedding, dim=0) return embedding_norm.tolist() except Exception as e: logger.error(f"❌ Error generating embedding: {str(e)}") return [0.0] * 768 # Return zero vector on error def analyze_with_codet5(self, code: str, question_text: str) -> Dict[str, Any]: """Enhanced code analysis using CodeT5""" if not self.models_loaded: return self._fallback_analysis(code) try: results = {} # Task 1: Code summarization with better prompt summarize_input = f"Summarize the following Python function: {code}" inputs = self.codet5_tokenizer( summarize_input, return_tensors="pt", max_length=512, truncation=True ) with torch.no_grad(): summary_ids = self.codet5_model.generate( inputs.input_ids, max_length=100, num_beams=3, # Increased for better quality early_stopping=True, do_sample=False, # Deterministic for consistency pad_token_id=self.codet5_tokenizer.pad_token_id ) results['code_summary'] = self.codet5_tokenizer.decode( summary_ids[0], skip_special_tokens=True ) # Task 2: Enhanced pattern extraction results['logic_patterns'] = self.extract_logic_patterns_enhanced(code) results['approach_analysis'] = self.analyze_approach_enhanced(code) results['complexity_analysis'] = self.analyze_complexity_enhanced(code) results['semantic_quality'] = self.assess_semantic_quality(code) return results except Exception as e: logger.error(f"❌ Error in CodeT5 analysis: {str(e)}") return self._fallback_analysis(code) def _fallback_analysis(self, code: str) -> Dict[str, Any]: """Fallback analysis when AI models fail""" lines_count = len(code.split('\n')) return { 'code_summary': f'Python function with {lines_count} lines', 'logic_patterns': self.extract_logic_patterns_enhanced(code), 'approach_analysis': self.analyze_approach_enhanced(code), 'complexity_analysis': self.analyze_complexity_enhanced(code), 'semantic_quality': self.assess_semantic_quality(code) } def extract_logic_patterns_enhanced(self, code: str) -> List[str]: """Enhanced logical pattern extraction""" patterns = [] code_lower = code.lower() # Basic patterns if 'max(' in code: patterns.append('builtin_max') if 'min(' in code: patterns.append('builtin_min') if 'sum(' in code: patterns.append('builtin_sum') if 'len(' in code: patterns.append('length_operations') if 'sorted(' in code: patterns.append('sorting_operations') # Control flow patterns if 'for' in code and 'if' in code: patterns.append('iterative_conditional') if 'while' in code: patterns.append('loop_based') if 'def' in code: patterns.append('function_definition') if 'return' in code: patterns.append('return_statement') # Advanced patterns with regex if re.search(r'for\s+\w+\s+in\s+range', code): patterns.append('indexed_iteration') if re.search(r'for\s+\w+\s+in\s+enumerate', code): patterns.append('indexed_enumeration') if re.search(r'if\s+.*[<>]=?.*:', code): patterns.append('comparison_logic') if re.search(r'\[.*\]', code): patterns.append('list_operations') # Error handling patterns if 'try:' in code or 'except' in code: patterns.append('error_handling') if 'if not' in code or 'if len(' in code: patterns.append('input_validation') # Mathematical patterns if any(op in code for op in ['**', 'pow(', 'sqrt', 'math.']): patterns.append('mathematical_operations') return list(set(patterns)) # Remove duplicates def analyze_approach_enhanced(self, code: str) -> str: """Enhanced algorithmic approach analysis""" # Built-in function approaches (optimal) if 'max(' in code and 'min(' not in code: return 'builtin_maximum_approach' elif 'min(' in code and 'max(' not in code: return 'builtin_minimum_approach' elif 'max(' in code and 'min(' in code: return 'dual_builtin_approach' elif 'sum(' in code: return 'builtin_aggregation_approach' elif 'sorted(' in code: return 'sorting_based_approach' # Loop-based approaches elif 'for' in code and 'if' in code and 'range' in code: return 'indexed_iterative_approach' elif 'for' in code and 'if' in code: return 'iterative_comparison_approach' elif 'while' in code: return 'loop_based_approach' # Advanced approaches elif 'enumerate' in code: return 'enumerated_iteration_approach' elif re.search(r'def\s+\w+.*def\s+\w+', code): return 'nested_function_approach' else: return 'custom_logic_approach' def analyze_complexity_enhanced(self, code: str) -> Dict[str, str]: """Enhanced complexity analysis""" def estimate_time_complexity(code): nested_loops = len(re.findall(r'for.*for|while.*for|for.*while', code)) single_loops = code.count('for') + code.count('while') - (nested_loops * 2) if 'max(' in code or 'min(' in code or 'sum(' in code: return 'O(n)' elif 'sorted(' in code: return 'O(n log n)' elif nested_loops >= 1: return 'O(n²)' if nested_loops == 1 else 'O(n³)' elif single_loops >= 1: return 'O(n)' else: return 'O(1)' def estimate_space_complexity(code): if 'sorted(' in code or re.search(r'\[.*for.*\]', code): return 'O(n)' elif '[' in code and ']' in code: return 'O(n)' else: return 'O(1)' return { 'time': estimate_time_complexity(code), 'space': estimate_space_complexity(code) } def assess_semantic_quality(self, code: str) -> Dict[str, Any]: """Assess the semantic quality of code""" quality_metrics = { 'readability_score': 0, 'logic_clarity': 'unclear', 'efficiency_level': 'low', 'best_practices': [] } # Readability assessment lines = code.split('\n') total_score = 10 # Check for comments or docstrings if '"""' in code or "'''" in code or '#' in code: quality_metrics['best_practices'].append('documented_code') total_score += 1 # Check for meaningful variable names if re.search(r'\b(max_val|min_val|result|answer|total)\b', code): quality_metrics['best_practices'].append('meaningful_variables') total_score += 1 # Check for input validation if 'if not' in code or 'if len(' in code: quality_metrics['best_practices'].append('input_validation') total_score += 1 # Efficiency assessment if any(builtin in code for builtin in ['max(', 'min(', 'sum(']): quality_metrics['efficiency_level'] = 'high' quality_metrics['best_practices'].append('builtin_functions') elif 'for' in code and 'if' in code: quality_metrics['efficiency_level'] = 'medium' # Logic clarity if len(lines) <= 10 and 'def' in code and 'return' in code: quality_metrics['logic_clarity'] = 'clear' elif len(lines) <= 20: quality_metrics['logic_clarity'] = 'moderate' quality_metrics['readability_score'] = min(10, max(1, total_score)) return quality_metrics def generate_optimal_solution(self, question_text: str, question_type: str = "auto_detect") -> Dict[str, Any]: """Enhanced optimal solution generation""" try: question_lower = question_text.lower() # Pattern-based solution generation (more reliable than AI generation) if 'max' in question_lower and 'min' not in question_lower: return { 'code': 'def find_max(numbers):\n """Find maximum value in a list"""\n if not numbers:\n return None\n return max(numbers)', 'explanation': 'Optimal solution using built-in max() function with input validation', 'approach': 'builtin_optimized', 'complexity': {'time': 'O(n)', 'space': 'O(1)'}, 'generated_by': 'pattern_optimized', 'quality_score': 10 } elif 'min' in question_lower and 'max' not in question_lower: return { 'code': 'def find_min(numbers):\n """Find minimum value in a list"""\n if not numbers:\n return None\n return min(numbers)', 'explanation': 'Optimal solution using built-in min() function with input validation', 'approach': 'builtin_optimized', 'complexity': {'time': 'O(n)', 'space': 'O(1)'}, 'generated_by': 'pattern_optimized', 'quality_score': 10 } elif 'sum' in question_lower or 'total' in question_lower: return { 'code': 'def calculate_sum(numbers):\n """Calculate sum of numbers in a list"""\n return sum(numbers)', 'explanation': 'Optimal solution using built-in sum() function', 'approach': 'builtin_optimized', 'complexity': {'time': 'O(n)', 'space': 'O(1)'}, 'generated_by': 'pattern_optimized', 'quality_score': 10 } else: # Try AI generation as fallback if self.models_loaded: return self._ai_generate_solution(question_text) else: return self._template_solution(question_text) except Exception as e: logger.error(f"❌ Error generating optimal solution: {str(e)}") return self._template_solution(question_text) def _ai_generate_solution(self, question_text: str) -> Dict[str, Any]: """AI-based solution generation using CodeT5""" try: generate_input = f"Generate optimal Python function for: {question_text}" inputs = self.codet5_tokenizer( generate_input, return_tensors="pt", max_length=256, truncation=True ) with torch.no_grad(): generated_ids = self.codet5_model.generate( inputs.input_ids, max_length=200, num_beams=3, early_stopping=True, do_sample=False, # Deterministic pad_token_id=self.codet5_tokenizer.pad_token_id ) generated_code = self.codet5_tokenizer.decode( generated_ids[0], skip_special_tokens=True ) return { 'code': generated_code, 'explanation': 'AI-generated solution using CodeT5', 'approach': 'ai_generated', 'complexity': 'O(n)', 'generated_by': 'codet5', 'quality_score': 7 } except Exception as e: logger.error(f"❌ Error in AI generation: {str(e)}") return self._template_solution(question_text) def _template_solution(self, question_text: str) -> Dict[str, Any]: """Template-based fallback solution""" return { 'code': 'def solution(data):\n """Template solution"""\n # Implementation needed\n return data[0] if data else None', 'explanation': 'Template solution - implementation needed based on specific requirements', 'approach': 'template_fallback', 'complexity': 'O(1)', 'generated_by': 'template', 'quality_score': 5 } def compare_solutions(self, student_code: str, optimal_code: str) -> Dict[str, Any]: """Enhanced solution comparison""" try: # Generate embeddings for semantic comparison student_embedding = self.generate_code_embedding(student_code) optimal_embedding = self.generate_code_embedding(optimal_code) # Calculate semantic similarity similarity = self.calculate_cosine_similarity(student_embedding, optimal_embedding) # Pattern analysis student_patterns = self.extract_logic_patterns_enhanced(student_code) optimal_patterns = self.extract_logic_patterns_enhanced(optimal_code) # Approach comparison student_approach = self.analyze_approach_enhanced(student_code) optimal_approach = self.analyze_approach_enhanced(optimal_code) # Quality comparison student_quality = self.assess_semantic_quality(student_code) optimal_quality = self.assess_semantic_quality(optimal_code) return { 'semantic_similarity': float(similarity), 'student_patterns': student_patterns, 'optimal_patterns': optimal_patterns, 'pattern_overlap': len(set(student_patterns) & set(optimal_patterns)), 'approach_comparison': { 'student': student_approach, 'optimal': optimal_approach, 'matches': student_approach == optimal_approach }, 'quality_comparison': { 'student_readability': student_quality['readability_score'], 'optimal_readability': optimal_quality['readability_score'], 'student_efficiency': student_quality['efficiency_level'], 'optimal_efficiency': optimal_quality['efficiency_level'] }, 'complexity_comparison': self.compare_complexity_enhanced(student_code, optimal_code) } except Exception as e: logger.error(f"❌ Error comparing solutions: {str(e)}") return { 'semantic_similarity': 0.0, 'student_patterns': [], 'optimal_patterns': [], 'pattern_overlap': 0, 'approach_comparison': {'error': str(e)}, 'quality_comparison': {'error': str(e)}, 'complexity_comparison': 'unable_to_compare' } def calculate_cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float: """Enhanced cosine similarity calculation""" try: if len(vec1) != len(vec2) or not vec1 or not vec2: return 0.0 # Convert to tensors for more accurate calculation vec1_tensor = torch.tensor(vec1) vec2_tensor = torch.tensor(vec2) # Calculate cosine similarity similarity = torch.nn.functional.cosine_similarity( vec1_tensor.unsqueeze(0), vec2_tensor.unsqueeze(0) ) return float(similarity.item()) except Exception as e: logger.error(f"❌ Error calculating similarity: {str(e)}") return 0.0 def compare_complexity_enhanced(self, code1: str, code2: str) -> Dict[str, Any]: """Enhanced complexity comparison""" complexity1 = self.analyze_complexity_enhanced(code1) complexity2 = self.analyze_complexity_enhanced(code2) # Complexity ranking for comparison complexity_rank = { 'O(1)': 1, 'O(log n)': 2, 'O(n)': 3, 'O(n log n)': 4, 'O(n²)': 5, 'O(n³)': 6 } rank1 = complexity_rank.get(complexity1['time'], 999) rank2 = complexity_rank.get(complexity2['time'], 999) return { 'student_complexity': complexity1, 'optimal_complexity': complexity2, 'efficiency_comparison': 'better' if rank1 < rank2 else 'worse' if rank1 > rank2 else 'same', 'recommendation': self._get_complexity_recommendation(complexity1, complexity2) } def _get_complexity_recommendation(self, student_comp: Dict, optimal_comp: Dict) -> str: """Generate complexity-based recommendations""" if student_comp['time'] == optimal_comp['time']: return "Excellent! Your solution has optimal time complexity" elif student_comp['time'] in ['O(n²)', 'O(n³)'] and optimal_comp['time'] == 'O(n)': return "Consider using built-in functions to improve from quadratic to linear complexity" elif student_comp['time'] == 'O(n)' and optimal_comp['time'] == 'O(1)': return "Good approach, but there might be a constant-time solution" else: return "Your complexity is acceptable, but optimization is possible" def _clean_code_for_analysis(self, code: str) -> str: """Clean code for better analysis""" # Remove excessive whitespace lines = [line.strip() for line in code.split('\n') if line.strip()] return '\n'.join(lines) # Initialize the analyzer (with lazy loading) analyzer = None def get_analyzer(): """Get analyzer instance with lazy initialization""" global analyzer if analyzer is None: analyzer = SemanticAnalyzer() return analyzer def process_semantic_analysis( student_code: str, question_text: str, question_id: str = "default", need_optimal_solution: bool = True ) -> str: """Enhanced main function for semantic analysis""" start_time = time.time() try: logger.info(f"🧠 Starting enhanced semantic analysis for question: {question_id}") # Get analyzer instance semantic_analyzer = get_analyzer() # Input validation if not student_code or not student_code.strip(): return json.dumps({ 'success': False, 'error': 'Empty code provided', 'processing_time_ms': int((time.time() - start_time) * 1000) }) # Step 1: Generate code embedding logger.info("📊 Generating code embedding...") code_embedding = semantic_analyzer.generate_code_embedding(student_code) # Step 2: Enhanced analysis with CodeT5 logger.info("🔍 Performing enhanced analysis...") codet5_analysis = semantic_analyzer.analyze_with_codet5(student_code, question_text) # Step 3: Generate optimal solution if needed optimal_solution = None if need_optimal_solution: logger.info("💡 Generating optimal solution...") optimal_solution = semantic_analyzer.generate_optimal_solution(question_text) # Step 4: Enhanced solution comparison comparison = None if optimal_solution: logger.info("⚖️ Performing enhanced comparison...") comparison = semantic_analyzer.compare_solutions(student_code, optimal_solution['code']) # Step 5: Generate comprehensive insights insights = generate_comprehensive_insights( student_code, codet5_analysis, comparison, optimal_solution ) processing_time = time.time() - start_time # Prepare enhanced results results = { 'success': True, 'processing_time_ms': int(processing_time * 1000), 'semantic_analysis': { 'code_embedding': code_embedding[:100], # More dimensions for better representation 'embedding_size': len(code_embedding), 'logic_patterns': codet5_analysis['logic_patterns'], 'approach_analysis': codet5_analysis['approach_analysis'], 'complexity_analysis': codet5_analysis['complexity_analysis'], 'semantic_quality': codet5_analysis['semantic_quality'], 'code_summary': codet5_analysis['code_summary'] }, 'optimal_solution': optimal_solution, 'solution_comparison': comparison, 'semantic_insights': insights, 'recommendations': generate_recommendations(codet5_analysis, comparison), 'metadata': { 'question_id': question_id, 'analysis_version': '3.1-enhanced-ai', 'models_used': ['CodeBERT', 'CodeT5'] if semantic_analyzer.models_loaded else ['Fallback'], 'models_status': 'loaded' if semantic_analyzer.models_loaded else 'fallback', 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'processing_stage': 'semantic_analysis' } } logger.info(f"✅ Enhanced semantic analysis completed in {processing_time:.2f}s") return json.dumps(results, indent=2) except Exception as e: logger.error(f"❌ Error in semantic analysis: {str(e)}") logger.error(traceback.format_exc()) return json.dumps({ 'success': False, 'error': str(e), 'processing_time_ms': int((time.time() - start_time) * 1000), 'fallback_analysis': 'Enhanced analysis unavailable due to error', 'metadata': { 'analysis_version': '3.1-enhanced-ai', 'error_occurred': True, 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') } }) def generate_comprehensive_insights( student_code: str, codet5_analysis: Dict, comparison: Optional[Dict] = None, optimal_solution: Optional[Dict] = None ) -> List[str]: """Generate comprehensive insights about the student's code""" insights = [] # Logic understanding insights patterns = codet5_analysis['logic_patterns'] if 'builtin_max' in patterns or 'builtin_min' in patterns or 'builtin_sum' in patterns: insights.append("Excellent! Student demonstrates advanced understanding by using Python built-in functions") elif 'iterative_conditional' in patterns: insights.append("Good logical thinking demonstrated with iterative comparison approach") elif 'function_definition' in patterns and 'return_statement' in patterns: insights.append("Proper function structure with clear return logic") # Approach analysis insights approach = codet5_analysis['approach_analysis'] if 'builtin' in approach: insights.append("Optimal algorithmic approach chosen - highly efficient solution") elif 'iterative' in approach: insights.append("Solid iterative approach, shows good programming fundamentals") elif 'custom' in approach: insights.append("Creative custom approach, demonstrates independent problem-solving") # Complexity insights complexity = codet5_analysis['complexity_analysis'] if complexity['time'] == 'O(n)' and complexity['space'] == 'O(1)': insights.append("Excellent time and space complexity - very efficient solution") elif complexity['time'] in ['O(n²)', 'O(n³)']: insights.append("Solution works correctly but could benefit from complexity optimization") # Quality insights quality = codet5_analysis['semantic_quality'] if quality['readability_score'] >= 8: insights.append("Code is highly readable with good programming practices") elif quality['efficiency_level'] == 'high': insights.append("Solution demonstrates awareness of efficient programming techniques") # Comparison insights if comparison: similarity = comparison['semantic_similarity'] if similarity > 0.8: insights.append("Student's solution is semantically very similar to the optimal approach") elif similarity > 0.6: insights.append("Good understanding shown, with opportunities for further optimization") elif similarity > 0.4: insights.append("Correct approach with different implementation style") # Pattern overlap insights overlap = comparison['pattern_overlap'] total_patterns = len(comparison['optimal_patterns']) if total_patterns > 0 and overlap / total_patterns > 0.7: insights.append("Strong pattern recognition - matches most optimal solution patterns") # Default insight if none found if not insights: insights.append("Student shows basic understanding of the problem and provides a working solution") return insights def generate_recommendations(codet5_analysis: Dict, comparison: Optional[Dict] = None) -> List[str]: """Generate actionable recommendations for improvement""" recommendations = [] # Efficiency recommendations patterns = codet5_analysis['logic_patterns'] if 'iterative_conditional' in patterns and 'builtin_max' not in patterns: recommendations.append("Consider using built-in max() or min() functions for better efficiency") # Complexity recommendations complexity = codet5_analysis['complexity_analysis'] if complexity['time'] in ['O(n²)', 'O(n³)']: recommendations.append("Try to reduce algorithmic complexity using more efficient approaches") # Quality recommendations quality = codet5_analysis['semantic_quality'] if quality['readability_score'] < 7: recommendations.append("Add comments or use more descriptive variable names for better readability") if 'input_validation' not in quality['best_practices']: recommendations.append("Consider adding input validation for more robust code") # Comparison-based recommendations if comparison and comparison['semantic_similarity'] < 0.6: recommendations.append("Review the optimal solution to learn alternative approaches") return recommendations # Enhanced Gradio Interface def gradio_interface(student_code, question_text, need_optimal): """Enhanced Gradio interface wrapper""" if not student_code.strip(): return json.dumps({ 'error': 'Please provide student code for analysis', 'success': False }, indent=2) return process_semantic_analysis( student_code=student_code, question_text=question_text, question_id="gradio_test", need_optimal_solution=need_optimal ) # Create enhanced Gradio interface demo = gr.Interface( fn=gradio_interface, inputs=[ gr.Textbox( label="Student Code", placeholder="Enter Python code here...", lines=12, value="def find_max(numbers):\n max_val = numbers[0]\n for num in numbers:\n if num > max_val:\n max_val = num\n return max_val" ), gr.Textbox( label="Question Text", placeholder="Enter the question...", lines=2, value="Find the maximum number in a list" ), gr.Checkbox( label="Generate Optimal Solution", value=True ) ], outputs=gr.Textbox( label="Semantic Analysis Results (JSON)", lines=25, show_copy_button=True ), title="🧠 CodeLab Semantic Analysis - Stage 3 (Fixed)", description=""" Advanced semantic analysis using CodeBERT and CodeT5 models for educational code evaluation. This system analyzes code semantics, generates optimal solutions, and provides educational insights. """, examples=[ [ "def find_max(numbers):\n return max(numbers)", "Find the maximum number in a list", True ], [ "def find_min(arr):\n minimum = arr[0]\n for i in range(1, len(arr)):\n if arr[i] < minimum:\n minimum = arr[i]\n return minimum", "Find the minimum number in an array", True ], [ "def calculate_sum(nums):\n total = 0\n for num in nums:\n total += num\n return total", "Calculate the sum of all numbers in a list", True ] ], theme=gr.themes.Soft(), analytics_enabled=False ) # Launch the interface if __name__ == "__main__": demo.launch( server_name="0.0.0.0", server_port=7860 )