DataForge / graph_streaming.py
ai-puppy
Revert "update better model"
b7e87c1
import asyncio
import ast
import os
import re
from typing import Annotated, Dict, List, Optional, Generator
from typing_extensions import TypedDict
from dotenv import find_dotenv, load_dotenv
from langchain.chat_models import init_chat_model
from langgraph.graph import END, START, StateGraph
from pydantic import BaseModel, Field
# Import your existing agent functionality
from agent import create_analysis_agent, FileInjectedPyodideSandbox, create_pyodide_eval_fn
load_dotenv(find_dotenv())
# Initialize the language model
model = init_chat_model(
model="gpt-4.1-2025-04-14",
api_key=os.getenv("OPENAI_API_KEY"),
)
# Import classes from original graph.py
from graph import FileExamination, CodeGuidance, CodeAnalysisState, validate_python_code, analyze_user_question
def streaming_analyze_file_with_guidance(file_path: str, analysis_query: str = None) -> Generator[str, None, str]:
"""
Streaming version of guided file analysis that yields progress updates.
Args:
file_path: Path to the file to analyze
analysis_query: Optional specific analysis request
Yields:
Progress updates as strings
Returns:
Final analysis results
"""
try:
yield "πŸ” **Starting Guided File Analysis...**\n\n"
yield f"πŸ“ **File:** `{os.path.basename(file_path)}`\n"
yield f"❓ **Question:** {analysis_query or 'Comprehensive analysis'}\n\n"
# Step 1: File Examination
yield "## πŸ“‹ Step 1: Examining File Structure\n\n"
yield "Reading first 20 lines to understand file format and patterns...\n\n"
file_examination = examine_file_structure_streaming(file_path)
yield f"βœ… **File Type Detected:** `{file_examination.file_type}`\n"
yield f"βœ… **Structure Pattern:** {file_examination.structure_pattern}\n"
yield f"βœ… **Data Format:** {file_examination.data_format}\n"
yield f"βœ… **Complexity:** {file_examination.complexity_level}\n"
yield f"βœ… **Key Patterns Found:** {len(file_examination.key_patterns)} patterns\n\n"
# Show sample of what AI is analyzing
if file_examination.sample_lines and len(file_examination.sample_lines) > 0:
yield "πŸ” **File Sample (First 5 lines):**\n"
yield "```\n"
for i, line in enumerate(file_examination.sample_lines[:5], 1):
# Truncate very long lines
display_line = line[:100] + "..." if len(line) > 100 else line
yield f"{i:2d}: {display_line}\n"
yield "```\n\n"
# Step 2: Code Guidance Generation
yield "## 🎯 Step 2: Generating Analysis Strategy\n\n"
yield "Creating specific code guidance based on file structure and your question...\n\n"
code_guidance = generate_code_guidance_streaming(file_examination, analysis_query)
yield f"βœ… **Analysis Approach:** {code_guidance.analysis_approach}\n"
yield f"βœ… **Required Imports:** {', '.join(code_guidance.required_imports)}\n"
yield f"βœ… **Regex Patterns:** {len(code_guidance.specific_patterns)} patterns\n"
# Show the actual patterns for transparency
if code_guidance.specific_patterns:
yield "πŸ” **Pattern Details:**\n"
for i, pattern in enumerate(code_guidance.specific_patterns[:3], 1): # Show first 3
yield f" {i}. `{pattern}`\n"
if len(code_guidance.specific_patterns) > 3:
yield f" ... and {len(code_guidance.specific_patterns) - 3} more patterns\n"
yield f"βœ… **Expected Outputs:** {', '.join(code_guidance.expected_outputs)}\n\n"
# Step 3: Code Execution
yield "## πŸš€ Step 3: Executing Analysis\n\n"
yield "Running guided code analysis with enhanced context...\n\n"
# Stream the execution results
execution_generator = execute_guided_analysis_streaming(file_path, file_examination, code_guidance, analysis_query)
execution_results = []
for chunk in execution_generator:
yield chunk
execution_results.append(chunk)
# Final Summary
yield "\n\n## βœ… Analysis Complete!\n\n"
final_analysis = f"""### πŸ“Š **Analysis Summary**
**File:** `{os.path.basename(file_path)}`
**Type:** {file_examination.file_type} ({file_examination.data_format})
**Approach:** {code_guidance.analysis_approach}
**Complexity:** {file_examination.complexity_level}
**Guided Features Used:**
- βœ… Structure-aware examination
- βœ… Question-specific code generation
- βœ… {len(code_guidance.specific_patterns)} targeted patterns
- βœ… Enhanced error handling
---
{''.join(execution_results)}
"""
yield final_analysis
return final_analysis
except Exception as e:
error_msg = f"❌ **Error in guided analysis:** {str(e)}\n\n"
yield error_msg
return error_msg
def examine_file_structure_streaming(file_path: str) -> FileExamination:
"""Examine file structure with minimal processing for streaming."""
try:
if not os.path.exists(file_path):
return FileExamination(
file_type="error",
structure_pattern="File not found",
sample_lines=[],
key_patterns=[],
data_format="unknown",
complexity_level="Simple"
)
# Read first 20 lines
with open(file_path, 'r', encoding='utf-8') as f:
sample_lines = []
for i, line in enumerate(f):
if i >= 20:
break
sample_lines.append(line.rstrip('\n\r'))
if not sample_lines:
sample_lines = ["<empty file>"]
# Quick analysis based on file extension and content
file_ext = os.path.splitext(file_path)[1].lower()
first_lines_text = '\n'.join(sample_lines[:5])
# Simple file type detection
if file_ext in ['.log', '.txt']:
if 'ERROR' in first_lines_text or 'WARN' in first_lines_text:
file_type = "application_log"
structure_pattern = "Log entries with timestamps and severity levels"
key_patterns = ["timestamp", "log_level", "error_codes"]
else:
file_type = "text_log"
structure_pattern = "Plain text with line-based entries"
key_patterns = ["timestamps", "text_patterns"]
elif file_ext == '.csv':
file_type = "csv_data"
structure_pattern = "Comma-separated values with headers"
key_patterns = ["column_headers", "data_rows"]
elif file_ext == '.json':
file_type = "json_data"
structure_pattern = "Structured JSON data"
key_patterns = ["json_objects", "nested_data"]
else:
file_type = "generic_file"
structure_pattern = "Unknown structure"
key_patterns = ["general_patterns"]
return FileExamination(
file_type=file_type,
structure_pattern=structure_pattern,
sample_lines=sample_lines,
key_patterns=key_patterns,
data_format="structured" if file_ext in ['.csv', '.json'] else "unstructured",
complexity_level="Medium"
)
except Exception as e:
return FileExamination(
file_type="error",
structure_pattern=f"Error reading file: {str(e)}",
sample_lines=[],
key_patterns=[],
data_format="unknown",
complexity_level="Simple"
)
def generate_code_guidance_streaming(file_examination: FileExamination, analysis_query: str = None) -> CodeGuidance:
"""Generate code guidance with quick processing for streaming."""
if not file_examination or file_examination.file_type == "error":
return CodeGuidance(
analysis_approach="Basic file analysis with error handling",
required_imports=["re", "os"],
code_structure="1. Check file exists\n2. Basic error handling\n3. Simple output",
specific_patterns=[],
expected_outputs=["Error message"],
error_handling="Try-catch with informative errors"
)
# Quick guidance based on file type
if "log" in file_examination.file_type:
approach = "Log file analysis with pattern matching"
imports = ["re", "datetime", "collections"]
patterns = [r'\d{4}-\d{2}-\d{2}', r'ERROR|WARN|INFO', r'\d+\.\d+\.\d+\.\d+']
outputs = ["Error counts", "Timeline analysis", "IP addresses"]
elif "csv" in file_examination.file_type:
approach = "CSV data analysis with statistical insights"
imports = ["pandas", "numpy", "re"]
patterns = [r'^\w+,', r'\d+', r'\w+@\w+']
outputs = ["Data summary", "Column analysis", "Statistics"]
elif "json" in file_examination.file_type:
approach = "JSON structure analysis and data extraction"
imports = ["json", "re", "collections"]
patterns = [r'"[\w]+":', r'\{.*\}', r'\[.*\]']
outputs = ["Structure overview", "Key extraction", "Value analysis"]
else:
approach = "General text analysis with pattern detection"
imports = ["re", "collections", "os"]
patterns = [r'\w+', r'\d+', r'[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}']
outputs = ["Pattern summary", "Content analysis", "Statistics"]
return CodeGuidance(
analysis_approach=approach,
required_imports=imports,
code_structure=f"1. Load and validate file\n2. Apply {len(patterns)} specific patterns\n3. Generate insights\n4. Format results",
specific_patterns=patterns,
expected_outputs=outputs,
error_handling="Comprehensive error handling with informative messages"
)
def execute_guided_analysis_streaming(file_path: str, file_examination: FileExamination,
code_guidance: CodeGuidance, analysis_query: str = None) -> Generator[str, None, None]:
"""Execute the analysis with streaming progress updates."""
try:
yield "### πŸ”„ **Initializing Analysis Environment**\n\n"
# Create analysis agent
try:
model = init_chat_model("gpt-4.1-2025-04-14", model_provider="openai")
agent = create_analysis_agent(file_path, model)
yield "βœ… Analysis agent initialized successfully\n\n"
except Exception as e:
yield f"❌ Failed to initialize agent: {str(e)}\n\n"
return
yield "### πŸ“ **Generating Analysis Code**\n\n"
# Create analysis prompt
user_analysis = analyze_user_question(analysis_query or "Comprehensive analysis")
analysis_prompt = f"""
Analyze the uploaded file based on this guidance:
**File Information:**
- Type: {file_examination.file_type}
- Structure: {file_examination.structure_pattern}
- Format: {file_examination.data_format}
- Complexity: {file_examination.complexity_level}
**User Question Analysis:**
- Intent: {user_analysis['intent']}
- Focus Areas: {user_analysis['focus_areas']}
- Analysis Type: {user_analysis['analysis_type']}
**Generated Guidance:**
- Approach: {code_guidance.analysis_approach}
- Required Imports: {code_guidance.required_imports}
- Patterns to Use: {code_guidance.specific_patterns}
- Expected Outputs: {code_guidance.expected_outputs}
**User's Specific Question:** {analysis_query or 'Provide comprehensive analysis'}
Please write Python code that follows this guidance and analyzes the file. The file is available at the virtual path '/uploaded_file.log'.
"""
yield "βœ… Analysis prompt prepared\n\n"
yield "### ⚑ **Running AI Analysis**\n\n"
# Execute analysis - let's go back to a simpler approach that works
try:
async def run_simple_analysis():
# Try the original invoke method first to make sure it works
result = await agent.ainvoke({"messages": [{"role": "user", "content": analysis_prompt}]})
return result
yield "πŸ€– AI model is analyzing your file...\n\n"
result = asyncio.run(run_simple_analysis())
# Debug: Show what we got
yield "### πŸ” **AI Analysis Debug Info**\n\n"
yield f"**Result Type:** {type(result)}\n"
yield f"**Result Keys:** {list(result.keys()) if isinstance(result, dict) else 'Not a dict'}\n\n"
# Display the thought process and generated code
yield "### πŸ” **AI Thought Process & Generated Code**\n\n"
yield f"πŸ’­ **AI Analysis Strategy:**\n"
yield f"- File Type: {file_examination.file_type}\n"
yield f"- Approach: {code_guidance.analysis_approach}\n"
yield f"- Patterns: {len(code_guidance.specific_patterns)} regex patterns\n"
yield f"- Question Focus: {user_analysis['analysis_type']}\n\n"
# Extract content from result and find code blocks in ALL messages
all_content = ""
code_blocks = []
final_result = ""
if result and isinstance(result, dict) and "messages" in result:
messages = result["messages"]
yield f"**Messages Count:** {len(messages)}\n\n"
import re
# Check each message for code blocks and content
for i, msg in enumerate(messages):
if hasattr(msg, 'content'):
msg_content = msg.content
msg_type = type(msg).__name__
yield f"**Message {i+1} Type:** {msg_type}\n"
yield f"**Message {i+1} Content Preview:** {msg_content[:100]}...\n\n"
# Look for code blocks in this specific message
if msg_type == "AIMessage": # Code is usually in AI messages
# Try multiple patterns
python_blocks = re.findall(r'```python\n(.*?)\n```', msg_content, re.DOTALL)
generic_blocks = re.findall(r'```\n(.*?)\n```', msg_content, re.DOTALL)
if python_blocks:
code_blocks.extend(python_blocks)
yield f" πŸ”§ **Found {len(python_blocks)} Python code blocks in this message!**\n\n"
elif generic_blocks:
code_blocks.extend(generic_blocks)
yield f" πŸ”§ **Found {len(generic_blocks)} generic code blocks in this message!**\n\n"
all_content += msg_content + "\n"
# Get final result from last AI message
for msg in reversed(messages):
if hasattr(msg, 'content') and type(msg).__name__ == "AIMessage":
final_result = msg.content
break
else:
yield "❌ Unexpected result format\n\n"
all_content = str(result)
final_result = str(result)
if code_blocks:
yield "πŸ”§ **Generated Python Code:**\n\n"
for i, code_block in enumerate(code_blocks, 1):
clean_code = code_block.strip()
yield f"**Code Block {i}:**\n"
yield f"```python\n{clean_code}\n```\n\n"
yield "⚑ **Code Execution Results:**\n\n"
else:
yield "πŸ” **No code blocks found in any message**\n\n"
yield f"**All Content Preview:**\n```\n{all_content[:500]}...\n```\n\n"
# Show final results
yield "### πŸ“Š **Analysis Results**\n\n"
yield final_result
yield "\n\n"
except Exception as e:
yield f"❌ Error during analysis execution: {str(e)}\n\n"
except Exception as e:
yield f"❌ Error in analysis setup: {str(e)}\n\n"