Spaces:
Running
Running
File size: 16,638 Bytes
bb43287 230ff5f bb43287 3f2830f bb43287 3f2830f bb43287 230ff5f bb43287 3f2830f bb43287 3f2830f bb43287 3f2830f bb43287 3f2830f bb43287 3f2830f bb43287 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 |
import asyncio
import ast
import os
import re
from typing import Annotated, Dict, List, Optional, Generator
from typing_extensions import TypedDict
from dotenv import find_dotenv, load_dotenv
from langchain.chat_models import init_chat_model
from langgraph.graph import END, START, StateGraph
from pydantic import BaseModel, Field
# Import your existing agent functionality
from agent import create_analysis_agent, FileInjectedPyodideSandbox, create_pyodide_eval_fn
load_dotenv(find_dotenv())
# Initialize the language model
model = init_chat_model(
model="o3-2025-04-16",
api_key=os.getenv("OPENAI_API_KEY"),
)
# Import classes from original graph.py
from graph import FileExamination, CodeGuidance, CodeAnalysisState, validate_python_code, analyze_user_question
def streaming_analyze_file_with_guidance(file_path: str, analysis_query: str = None) -> Generator[str, None, str]:
"""
Streaming version of guided file analysis that yields progress updates.
Args:
file_path: Path to the file to analyze
analysis_query: Optional specific analysis request
Yields:
Progress updates as strings
Returns:
Final analysis results
"""
try:
yield "π **Starting Guided File Analysis...**\n\n"
yield f"π **File:** `{os.path.basename(file_path)}`\n"
yield f"β **Question:** {analysis_query or 'Comprehensive analysis'}\n\n"
# Step 1: File Examination
yield "## π Step 1: Examining File Structure\n\n"
yield "Reading first 20 lines to understand file format and patterns...\n\n"
file_examination = examine_file_structure_streaming(file_path)
yield f"β
**File Type Detected:** `{file_examination.file_type}`\n"
yield f"β
**Structure Pattern:** {file_examination.structure_pattern}\n"
yield f"β
**Data Format:** {file_examination.data_format}\n"
yield f"β
**Complexity:** {file_examination.complexity_level}\n"
yield f"β
**Key Patterns Found:** {len(file_examination.key_patterns)} patterns\n\n"
# Show sample of what AI is analyzing
if file_examination.sample_lines and len(file_examination.sample_lines) > 0:
yield "π **File Sample (First 5 lines):**\n"
yield "```\n"
for i, line in enumerate(file_examination.sample_lines[:5], 1):
# Truncate very long lines
display_line = line[:100] + "..." if len(line) > 100 else line
yield f"{i:2d}: {display_line}\n"
yield "```\n\n"
# Step 2: Code Guidance Generation
yield "## π― Step 2: Generating Analysis Strategy\n\n"
yield "Creating specific code guidance based on file structure and your question...\n\n"
code_guidance = generate_code_guidance_streaming(file_examination, analysis_query)
yield f"β
**Analysis Approach:** {code_guidance.analysis_approach}\n"
yield f"β
**Required Imports:** {', '.join(code_guidance.required_imports)}\n"
yield f"β
**Regex Patterns:** {len(code_guidance.specific_patterns)} patterns\n"
# Show the actual patterns for transparency
if code_guidance.specific_patterns:
yield "π **Pattern Details:**\n"
for i, pattern in enumerate(code_guidance.specific_patterns[:3], 1): # Show first 3
yield f" {i}. `{pattern}`\n"
if len(code_guidance.specific_patterns) > 3:
yield f" ... and {len(code_guidance.specific_patterns) - 3} more patterns\n"
yield f"β
**Expected Outputs:** {', '.join(code_guidance.expected_outputs)}\n\n"
# Step 3: Code Execution
yield "## π Step 3: Executing Analysis\n\n"
yield "Running guided code analysis with enhanced context...\n\n"
# Stream the execution results
execution_generator = execute_guided_analysis_streaming(file_path, file_examination, code_guidance, analysis_query)
execution_results = []
for chunk in execution_generator:
yield chunk
execution_results.append(chunk)
# Final Summary
yield "\n\n## β
Analysis Complete!\n\n"
final_analysis = f"""### π **Analysis Summary**
**File:** `{os.path.basename(file_path)}`
**Type:** {file_examination.file_type} ({file_examination.data_format})
**Approach:** {code_guidance.analysis_approach}
**Complexity:** {file_examination.complexity_level}
**Guided Features Used:**
- β
Structure-aware examination
- β
Question-specific code generation
- β
{len(code_guidance.specific_patterns)} targeted patterns
- β
Enhanced error handling
---
{''.join(execution_results)}
"""
yield final_analysis
return final_analysis
except Exception as e:
error_msg = f"β **Error in guided analysis:** {str(e)}\n\n"
yield error_msg
return error_msg
def examine_file_structure_streaming(file_path: str) -> FileExamination:
"""Examine file structure with minimal processing for streaming."""
try:
if not os.path.exists(file_path):
return FileExamination(
file_type="error",
structure_pattern="File not found",
sample_lines=[],
key_patterns=[],
data_format="unknown",
complexity_level="Simple"
)
# Read first 20 lines
with open(file_path, 'r', encoding='utf-8') as f:
sample_lines = []
for i, line in enumerate(f):
if i >= 20:
break
sample_lines.append(line.rstrip('\n\r'))
if not sample_lines:
sample_lines = ["<empty file>"]
# Quick analysis based on file extension and content
file_ext = os.path.splitext(file_path)[1].lower()
first_lines_text = '\n'.join(sample_lines[:5])
# Simple file type detection
if file_ext in ['.log', '.txt']:
if 'ERROR' in first_lines_text or 'WARN' in first_lines_text:
file_type = "application_log"
structure_pattern = "Log entries with timestamps and severity levels"
key_patterns = ["timestamp", "log_level", "error_codes"]
else:
file_type = "text_log"
structure_pattern = "Plain text with line-based entries"
key_patterns = ["timestamps", "text_patterns"]
elif file_ext == '.csv':
file_type = "csv_data"
structure_pattern = "Comma-separated values with headers"
key_patterns = ["column_headers", "data_rows"]
elif file_ext == '.json':
file_type = "json_data"
structure_pattern = "Structured JSON data"
key_patterns = ["json_objects", "nested_data"]
else:
file_type = "generic_file"
structure_pattern = "Unknown structure"
key_patterns = ["general_patterns"]
return FileExamination(
file_type=file_type,
structure_pattern=structure_pattern,
sample_lines=sample_lines,
key_patterns=key_patterns,
data_format="structured" if file_ext in ['.csv', '.json'] else "unstructured",
complexity_level="Medium"
)
except Exception as e:
return FileExamination(
file_type="error",
structure_pattern=f"Error reading file: {str(e)}",
sample_lines=[],
key_patterns=[],
data_format="unknown",
complexity_level="Simple"
)
def generate_code_guidance_streaming(file_examination: FileExamination, analysis_query: str = None) -> CodeGuidance:
"""Generate code guidance with quick processing for streaming."""
if not file_examination or file_examination.file_type == "error":
return CodeGuidance(
analysis_approach="Basic file analysis with error handling",
required_imports=["re", "os"],
code_structure="1. Check file exists\n2. Basic error handling\n3. Simple output",
specific_patterns=[],
expected_outputs=["Error message"],
error_handling="Try-catch with informative errors"
)
# Quick guidance based on file type
if "log" in file_examination.file_type:
approach = "Log file analysis with pattern matching"
imports = ["re", "datetime", "collections"]
patterns = [r'\d{4}-\d{2}-\d{2}', r'ERROR|WARN|INFO', r'\d+\.\d+\.\d+\.\d+']
outputs = ["Error counts", "Timeline analysis", "IP addresses"]
elif "csv" in file_examination.file_type:
approach = "CSV data analysis with statistical insights"
imports = ["pandas", "numpy", "re"]
patterns = [r'^\w+,', r'\d+', r'\w+@\w+']
outputs = ["Data summary", "Column analysis", "Statistics"]
elif "json" in file_examination.file_type:
approach = "JSON structure analysis and data extraction"
imports = ["json", "re", "collections"]
patterns = [r'"[\w]+":', r'\{.*\}', r'\[.*\]']
outputs = ["Structure overview", "Key extraction", "Value analysis"]
else:
approach = "General text analysis with pattern detection"
imports = ["re", "collections", "os"]
patterns = [r'\w+', r'\d+', r'[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}']
outputs = ["Pattern summary", "Content analysis", "Statistics"]
return CodeGuidance(
analysis_approach=approach,
required_imports=imports,
code_structure=f"1. Load and validate file\n2. Apply {len(patterns)} specific patterns\n3. Generate insights\n4. Format results",
specific_patterns=patterns,
expected_outputs=outputs,
error_handling="Comprehensive error handling with informative messages"
)
def execute_guided_analysis_streaming(file_path: str, file_examination: FileExamination,
code_guidance: CodeGuidance, analysis_query: str = None) -> Generator[str, None, None]:
"""Execute the analysis with streaming progress updates."""
try:
yield "### π **Initializing Analysis Environment**\n\n"
# Create analysis agent
try:
model = init_chat_model("o3-2025-04-16", model_provider="openai")
agent = create_analysis_agent(file_path, model)
yield "β
Analysis agent initialized successfully\n\n"
except Exception as e:
yield f"β Failed to initialize agent: {str(e)}\n\n"
return
yield "### π **Generating Analysis Code**\n\n"
# Create analysis prompt
user_analysis = analyze_user_question(analysis_query or "Comprehensive analysis")
analysis_prompt = f"""
Analyze the uploaded file based on this guidance:
**File Information:**
- Type: {file_examination.file_type}
- Structure: {file_examination.structure_pattern}
- Format: {file_examination.data_format}
- Complexity: {file_examination.complexity_level}
**User Question Analysis:**
- Intent: {user_analysis['intent']}
- Focus Areas: {user_analysis['focus_areas']}
- Analysis Type: {user_analysis['analysis_type']}
**Generated Guidance:**
- Approach: {code_guidance.analysis_approach}
- Required Imports: {code_guidance.required_imports}
- Patterns to Use: {code_guidance.specific_patterns}
- Expected Outputs: {code_guidance.expected_outputs}
**User's Specific Question:** {analysis_query or 'Provide comprehensive analysis'}
Please write Python code that follows this guidance and analyzes the file. The file is available at the virtual path '/uploaded_file.log'.
"""
yield "β
Analysis prompt prepared\n\n"
yield "### β‘ **Running AI Analysis**\n\n"
# Execute analysis - let's go back to a simpler approach that works
try:
async def run_simple_analysis():
# Try the original invoke method first to make sure it works
result = await agent.ainvoke({"messages": [{"role": "user", "content": analysis_prompt}]})
return result
yield "π€ AI model is analyzing your file...\n\n"
result = asyncio.run(run_simple_analysis())
# Debug: Show what we got
yield "### π **AI Analysis Debug Info**\n\n"
yield f"**Result Type:** {type(result)}\n"
yield f"**Result Keys:** {list(result.keys()) if isinstance(result, dict) else 'Not a dict'}\n\n"
# Display the thought process and generated code
yield "### π **AI Thought Process & Generated Code**\n\n"
yield f"π **AI Analysis Strategy:**\n"
yield f"- File Type: {file_examination.file_type}\n"
yield f"- Approach: {code_guidance.analysis_approach}\n"
yield f"- Patterns: {len(code_guidance.specific_patterns)} regex patterns\n"
yield f"- Question Focus: {user_analysis['analysis_type']}\n\n"
# Extract content from result and find code blocks in ALL messages
all_content = ""
code_blocks = []
final_result = ""
if result and isinstance(result, dict) and "messages" in result:
messages = result["messages"]
yield f"**Messages Count:** {len(messages)}\n\n"
import re
# Check each message for code blocks and content
for i, msg in enumerate(messages):
if hasattr(msg, 'content'):
msg_content = msg.content
msg_type = type(msg).__name__
yield f"**Message {i+1} Type:** {msg_type}\n"
yield f"**Message {i+1} Content Preview:** {msg_content[:100]}...\n\n"
# Look for code blocks in this specific message
if msg_type == "AIMessage": # Code is usually in AI messages
# Try multiple patterns
python_blocks = re.findall(r'```python\n(.*?)\n```', msg_content, re.DOTALL)
generic_blocks = re.findall(r'```\n(.*?)\n```', msg_content, re.DOTALL)
if python_blocks:
code_blocks.extend(python_blocks)
yield f" π§ **Found {len(python_blocks)} Python code blocks in this message!**\n\n"
elif generic_blocks:
code_blocks.extend(generic_blocks)
yield f" π§ **Found {len(generic_blocks)} generic code blocks in this message!**\n\n"
all_content += msg_content + "\n"
# Get final result from last AI message
for msg in reversed(messages):
if hasattr(msg, 'content') and type(msg).__name__ == "AIMessage":
final_result = msg.content
break
else:
yield "β Unexpected result format\n\n"
all_content = str(result)
final_result = str(result)
if code_blocks:
yield "π§ **Generated Python Code:**\n\n"
for i, code_block in enumerate(code_blocks, 1):
clean_code = code_block.strip()
yield f"**Code Block {i}:**\n"
yield f"```python\n{clean_code}\n```\n\n"
yield "β‘ **Code Execution Results:**\n\n"
else:
yield "π **No code blocks found in any message**\n\n"
yield f"**All Content Preview:**\n```\n{all_content[:500]}...\n```\n\n"
# Show final results
yield "### π **Analysis Results**\n\n"
yield final_result
yield "\n\n"
except Exception as e:
yield f"β Error during analysis execution: {str(e)}\n\n"
except Exception as e:
yield f"β Error in analysis setup: {str(e)}\n\n" |