import asyncio import ast import os import re from typing import Annotated, Dict, List, Optional, Generator from typing_extensions import TypedDict from dotenv import find_dotenv, load_dotenv from langchain.chat_models import init_chat_model from langgraph.graph import END, START, StateGraph from pydantic import BaseModel, Field # Import your existing agent functionality from agent import create_analysis_agent, FileInjectedPyodideSandbox, create_pyodide_eval_fn load_dotenv(find_dotenv()) # Initialize the language model model = init_chat_model( model="gpt-4.1-2025-04-14", api_key=os.getenv("OPENAI_API_KEY"), ) # Import classes from original graph.py from graph import FileExamination, CodeGuidance, CodeAnalysisState, validate_python_code, analyze_user_question def streaming_analyze_file_with_guidance(file_path: str, analysis_query: str = None) -> Generator[str, None, str]: """ Streaming version of guided file analysis that yields progress updates. Args: file_path: Path to the file to analyze analysis_query: Optional specific analysis request Yields: Progress updates as strings Returns: Final analysis results """ try: yield "🔍 **Starting Guided File Analysis...**\n\n" yield f"📁 **File:** `{os.path.basename(file_path)}`\n" yield f"❓ **Question:** {analysis_query or 'Comprehensive analysis'}\n\n" # Step 1: File Examination yield "## 📋 Step 1: Examining File Structure\n\n" yield "Reading first 20 lines to understand file format and patterns...\n\n" file_examination = examine_file_structure_streaming(file_path) yield f"✅ **File Type Detected:** `{file_examination.file_type}`\n" yield f"✅ **Structure Pattern:** {file_examination.structure_pattern}\n" yield f"✅ **Data Format:** {file_examination.data_format}\n" yield f"✅ **Complexity:** {file_examination.complexity_level}\n" yield f"✅ **Key Patterns Found:** {len(file_examination.key_patterns)} patterns\n\n" # Show sample of what AI is analyzing if file_examination.sample_lines and len(file_examination.sample_lines) > 0: yield "🔍 **File Sample (First 5 lines):**\n" yield "```\n" for i, line in enumerate(file_examination.sample_lines[:5], 1): # Truncate very long lines display_line = line[:100] + "..." if len(line) > 100 else line yield f"{i:2d}: {display_line}\n" yield "```\n\n" # Step 2: Code Guidance Generation yield "## 🎯 Step 2: Generating Analysis Strategy\n\n" yield "Creating specific code guidance based on file structure and your question...\n\n" code_guidance = generate_code_guidance_streaming(file_examination, analysis_query) yield f"✅ **Analysis Approach:** {code_guidance.analysis_approach}\n" yield f"✅ **Required Imports:** {', '.join(code_guidance.required_imports)}\n" yield f"✅ **Regex Patterns:** {len(code_guidance.specific_patterns)} patterns\n" # Show the actual patterns for transparency if code_guidance.specific_patterns: yield "🔍 **Pattern Details:**\n" for i, pattern in enumerate(code_guidance.specific_patterns[:3], 1): # Show first 3 yield f" {i}. `{pattern}`\n" if len(code_guidance.specific_patterns) > 3: yield f" ... and {len(code_guidance.specific_patterns) - 3} more patterns\n" yield f"✅ **Expected Outputs:** {', '.join(code_guidance.expected_outputs)}\n\n" # Step 3: Code Execution yield "## 🚀 Step 3: Executing Analysis\n\n" yield "Running guided code analysis with enhanced context...\n\n" # Stream the execution results execution_generator = execute_guided_analysis_streaming(file_path, file_examination, code_guidance, analysis_query) execution_results = [] for chunk in execution_generator: yield chunk execution_results.append(chunk) # Final Summary yield "\n\n## ✅ Analysis Complete!\n\n" final_analysis = f"""### 📊 **Analysis Summary** **File:** `{os.path.basename(file_path)}` **Type:** {file_examination.file_type} ({file_examination.data_format}) **Approach:** {code_guidance.analysis_approach} **Complexity:** {file_examination.complexity_level} **Guided Features Used:** - ✅ Structure-aware examination - ✅ Question-specific code generation - ✅ {len(code_guidance.specific_patterns)} targeted patterns - ✅ Enhanced error handling --- {''.join(execution_results)} """ yield final_analysis return final_analysis except Exception as e: error_msg = f"❌ **Error in guided analysis:** {str(e)}\n\n" yield error_msg return error_msg def examine_file_structure_streaming(file_path: str) -> FileExamination: """Examine file structure with minimal processing for streaming.""" try: if not os.path.exists(file_path): return FileExamination( file_type="error", structure_pattern="File not found", sample_lines=[], key_patterns=[], data_format="unknown", complexity_level="Simple" ) # Read first 20 lines with open(file_path, 'r', encoding='utf-8') as f: sample_lines = [] for i, line in enumerate(f): if i >= 20: break sample_lines.append(line.rstrip('\n\r')) if not sample_lines: sample_lines = [""] # Quick analysis based on file extension and content file_ext = os.path.splitext(file_path)[1].lower() first_lines_text = '\n'.join(sample_lines[:5]) # Simple file type detection if file_ext in ['.log', '.txt']: if 'ERROR' in first_lines_text or 'WARN' in first_lines_text: file_type = "application_log" structure_pattern = "Log entries with timestamps and severity levels" key_patterns = ["timestamp", "log_level", "error_codes"] else: file_type = "text_log" structure_pattern = "Plain text with line-based entries" key_patterns = ["timestamps", "text_patterns"] elif file_ext == '.csv': file_type = "csv_data" structure_pattern = "Comma-separated values with headers" key_patterns = ["column_headers", "data_rows"] elif file_ext == '.json': file_type = "json_data" structure_pattern = "Structured JSON data" key_patterns = ["json_objects", "nested_data"] else: file_type = "generic_file" structure_pattern = "Unknown structure" key_patterns = ["general_patterns"] return FileExamination( file_type=file_type, structure_pattern=structure_pattern, sample_lines=sample_lines, key_patterns=key_patterns, data_format="structured" if file_ext in ['.csv', '.json'] else "unstructured", complexity_level="Medium" ) except Exception as e: return FileExamination( file_type="error", structure_pattern=f"Error reading file: {str(e)}", sample_lines=[], key_patterns=[], data_format="unknown", complexity_level="Simple" ) def generate_code_guidance_streaming(file_examination: FileExamination, analysis_query: str = None) -> CodeGuidance: """Generate code guidance with quick processing for streaming.""" if not file_examination or file_examination.file_type == "error": return CodeGuidance( analysis_approach="Basic file analysis with error handling", required_imports=["re", "os"], code_structure="1. Check file exists\n2. Basic error handling\n3. Simple output", specific_patterns=[], expected_outputs=["Error message"], error_handling="Try-catch with informative errors" ) # Quick guidance based on file type if "log" in file_examination.file_type: approach = "Log file analysis with pattern matching" imports = ["re", "datetime", "collections"] patterns = [r'\d{4}-\d{2}-\d{2}', r'ERROR|WARN|INFO', r'\d+\.\d+\.\d+\.\d+'] outputs = ["Error counts", "Timeline analysis", "IP addresses"] elif "csv" in file_examination.file_type: approach = "CSV data analysis with statistical insights" imports = ["pandas", "numpy", "re"] patterns = [r'^\w+,', r'\d+', r'\w+@\w+'] outputs = ["Data summary", "Column analysis", "Statistics"] elif "json" in file_examination.file_type: approach = "JSON structure analysis and data extraction" imports = ["json", "re", "collections"] patterns = [r'"[\w]+":', r'\{.*\}', r'\[.*\]'] outputs = ["Structure overview", "Key extraction", "Value analysis"] else: approach = "General text analysis with pattern detection" imports = ["re", "collections", "os"] patterns = [r'\w+', r'\d+', r'[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}'] outputs = ["Pattern summary", "Content analysis", "Statistics"] return CodeGuidance( analysis_approach=approach, required_imports=imports, code_structure=f"1. Load and validate file\n2. Apply {len(patterns)} specific patterns\n3. Generate insights\n4. Format results", specific_patterns=patterns, expected_outputs=outputs, error_handling="Comprehensive error handling with informative messages" ) def execute_guided_analysis_streaming(file_path: str, file_examination: FileExamination, code_guidance: CodeGuidance, analysis_query: str = None) -> Generator[str, None, None]: """Execute the analysis with streaming progress updates.""" try: yield "### 🔄 **Initializing Analysis Environment**\n\n" # Create analysis agent try: model = init_chat_model("gpt-4.1-2025-04-14", model_provider="openai") agent = create_analysis_agent(file_path, model) yield "✅ Analysis agent initialized successfully\n\n" except Exception as e: yield f"❌ Failed to initialize agent: {str(e)}\n\n" return yield "### 📝 **Generating Analysis Code**\n\n" # Create analysis prompt user_analysis = analyze_user_question(analysis_query or "Comprehensive analysis") analysis_prompt = f""" Analyze the uploaded file based on this guidance: **File Information:** - Type: {file_examination.file_type} - Structure: {file_examination.structure_pattern} - Format: {file_examination.data_format} - Complexity: {file_examination.complexity_level} **User Question Analysis:** - Intent: {user_analysis['intent']} - Focus Areas: {user_analysis['focus_areas']} - Analysis Type: {user_analysis['analysis_type']} **Generated Guidance:** - Approach: {code_guidance.analysis_approach} - Required Imports: {code_guidance.required_imports} - Patterns to Use: {code_guidance.specific_patterns} - Expected Outputs: {code_guidance.expected_outputs} **User's Specific Question:** {analysis_query or 'Provide comprehensive analysis'} Please write Python code that follows this guidance and analyzes the file. The file is available at the virtual path '/uploaded_file.log'. """ yield "✅ Analysis prompt prepared\n\n" yield "### ⚡ **Running AI Analysis**\n\n" # Execute analysis - let's go back to a simpler approach that works try: async def run_simple_analysis(): # Try the original invoke method first to make sure it works result = await agent.ainvoke({"messages": [{"role": "user", "content": analysis_prompt}]}) return result yield "🤖 AI model is analyzing your file...\n\n" result = asyncio.run(run_simple_analysis()) # Debug: Show what we got yield "### 🔍 **AI Analysis Debug Info**\n\n" yield f"**Result Type:** {type(result)}\n" yield f"**Result Keys:** {list(result.keys()) if isinstance(result, dict) else 'Not a dict'}\n\n" # Display the thought process and generated code yield "### 🔍 **AI Thought Process & Generated Code**\n\n" yield f"💭 **AI Analysis Strategy:**\n" yield f"- File Type: {file_examination.file_type}\n" yield f"- Approach: {code_guidance.analysis_approach}\n" yield f"- Patterns: {len(code_guidance.specific_patterns)} regex patterns\n" yield f"- Question Focus: {user_analysis['analysis_type']}\n\n" # Extract content from result and find code blocks in ALL messages all_content = "" code_blocks = [] final_result = "" if result and isinstance(result, dict) and "messages" in result: messages = result["messages"] yield f"**Messages Count:** {len(messages)}\n\n" import re # Check each message for code blocks and content for i, msg in enumerate(messages): if hasattr(msg, 'content'): msg_content = msg.content msg_type = type(msg).__name__ yield f"**Message {i+1} Type:** {msg_type}\n" yield f"**Message {i+1} Content Preview:** {msg_content[:100]}...\n\n" # Look for code blocks in this specific message if msg_type == "AIMessage": # Code is usually in AI messages # Try multiple patterns python_blocks = re.findall(r'```python\n(.*?)\n```', msg_content, re.DOTALL) generic_blocks = re.findall(r'```\n(.*?)\n```', msg_content, re.DOTALL) if python_blocks: code_blocks.extend(python_blocks) yield f" 🔧 **Found {len(python_blocks)} Python code blocks in this message!**\n\n" elif generic_blocks: code_blocks.extend(generic_blocks) yield f" 🔧 **Found {len(generic_blocks)} generic code blocks in this message!**\n\n" all_content += msg_content + "\n" # Get final result from last AI message for msg in reversed(messages): if hasattr(msg, 'content') and type(msg).__name__ == "AIMessage": final_result = msg.content break else: yield "❌ Unexpected result format\n\n" all_content = str(result) final_result = str(result) if code_blocks: yield "🔧 **Generated Python Code:**\n\n" for i, code_block in enumerate(code_blocks, 1): clean_code = code_block.strip() yield f"**Code Block {i}:**\n" yield f"```python\n{clean_code}\n```\n\n" yield "⚡ **Code Execution Results:**\n\n" else: yield "🔍 **No code blocks found in any message**\n\n" yield f"**All Content Preview:**\n```\n{all_content[:500]}...\n```\n\n" # Show final results yield "### 📊 **Analysis Results**\n\n" yield final_result yield "\n\n" except Exception as e: yield f"❌ Error during analysis execution: {str(e)}\n\n" except Exception as e: yield f"❌ Error in analysis setup: {str(e)}\n\n"