DataForge / graph_streaming.py
ai-puppy
save
bb43287
raw
history blame
12.2 kB
import asyncio
import ast
import os
import re
from typing import Annotated, Dict, List, Optional, Generator
from typing_extensions import TypedDict
from dotenv import find_dotenv, load_dotenv
from langchain.chat_models import init_chat_model
from langgraph.graph import END, START, StateGraph
from pydantic import BaseModel, Field
# Import your existing agent functionality
from agent import create_analysis_agent, FileInjectedPyodideSandbox, create_pyodide_eval_fn
load_dotenv(find_dotenv())
# Initialize the language model
model = init_chat_model(
model="gpt-4.1-2025-04-14",
api_key=os.getenv("OPENAI_API_KEY"),
)
# Import classes from original graph.py
from graph import FileExamination, CodeGuidance, CodeAnalysisState, validate_python_code, analyze_user_question
def streaming_analyze_file_with_guidance(file_path: str, analysis_query: str = None) -> Generator[str, None, str]:
"""
Streaming version of guided file analysis that yields progress updates.
Args:
file_path: Path to the file to analyze
analysis_query: Optional specific analysis request
Yields:
Progress updates as strings
Returns:
Final analysis results
"""
try:
yield "πŸ” **Starting Guided File Analysis...**\n\n"
yield f"πŸ“ **File:** `{os.path.basename(file_path)}`\n"
yield f"❓ **Question:** {analysis_query or 'Comprehensive analysis'}\n\n"
# Step 1: File Examination
yield "## πŸ“‹ Step 1: Examining File Structure\n\n"
yield "Reading first 20 lines to understand file format and patterns...\n\n"
file_examination = examine_file_structure_streaming(file_path)
yield f"βœ… **File Type Detected:** `{file_examination.file_type}`\n"
yield f"βœ… **Structure Pattern:** {file_examination.structure_pattern}\n"
yield f"βœ… **Data Format:** {file_examination.data_format}\n"
yield f"βœ… **Complexity:** {file_examination.complexity_level}\n"
yield f"βœ… **Key Patterns Found:** {len(file_examination.key_patterns)} patterns\n\n"
# Step 2: Code Guidance Generation
yield "## 🎯 Step 2: Generating Analysis Strategy\n\n"
yield "Creating specific code guidance based on file structure and your question...\n\n"
code_guidance = generate_code_guidance_streaming(file_examination, analysis_query)
yield f"βœ… **Analysis Approach:** {code_guidance.analysis_approach}\n"
yield f"βœ… **Required Imports:** {', '.join(code_guidance.required_imports)}\n"
yield f"βœ… **Specific Patterns:** {len(code_guidance.specific_patterns)} regex patterns ready\n"
yield f"βœ… **Expected Outputs:** {len(code_guidance.expected_outputs)} result types\n\n"
# Step 3: Code Execution
yield "## πŸš€ Step 3: Executing Analysis\n\n"
yield "Running guided code analysis with enhanced context...\n\n"
# Stream the execution results
execution_generator = execute_guided_analysis_streaming(file_path, file_examination, code_guidance, analysis_query)
execution_results = []
for chunk in execution_generator:
yield chunk
execution_results.append(chunk)
# Final Summary
yield "\n\n## βœ… Analysis Complete!\n\n"
final_analysis = f"""### πŸ“Š **Analysis Summary**
**File:** `{os.path.basename(file_path)}`
**Type:** {file_examination.file_type} ({file_examination.data_format})
**Approach:** {code_guidance.analysis_approach}
**Complexity:** {file_examination.complexity_level}
**Guided Features Used:**
- βœ… Structure-aware examination
- βœ… Question-specific code generation
- βœ… {len(code_guidance.specific_patterns)} targeted patterns
- βœ… Enhanced error handling
---
{''.join(execution_results)}
"""
yield final_analysis
return final_analysis
except Exception as e:
error_msg = f"❌ **Error in guided analysis:** {str(e)}\n\n"
yield error_msg
return error_msg
def examine_file_structure_streaming(file_path: str) -> FileExamination:
"""Examine file structure with minimal processing for streaming."""
try:
if not os.path.exists(file_path):
return FileExamination(
file_type="error",
structure_pattern="File not found",
sample_lines=[],
key_patterns=[],
data_format="unknown",
complexity_level="Simple"
)
# Read first 20 lines
with open(file_path, 'r', encoding='utf-8') as f:
sample_lines = []
for i, line in enumerate(f):
if i >= 20:
break
sample_lines.append(line.rstrip('\n\r'))
if not sample_lines:
sample_lines = ["<empty file>"]
# Quick analysis based on file extension and content
file_ext = os.path.splitext(file_path)[1].lower()
first_lines_text = '\n'.join(sample_lines[:5])
# Simple file type detection
if file_ext in ['.log', '.txt']:
if 'ERROR' in first_lines_text or 'WARN' in first_lines_text:
file_type = "application_log"
structure_pattern = "Log entries with timestamps and severity levels"
key_patterns = ["timestamp", "log_level", "error_codes"]
else:
file_type = "text_log"
structure_pattern = "Plain text with line-based entries"
key_patterns = ["timestamps", "text_patterns"]
elif file_ext == '.csv':
file_type = "csv_data"
structure_pattern = "Comma-separated values with headers"
key_patterns = ["column_headers", "data_rows"]
elif file_ext == '.json':
file_type = "json_data"
structure_pattern = "Structured JSON data"
key_patterns = ["json_objects", "nested_data"]
else:
file_type = "generic_file"
structure_pattern = "Unknown structure"
key_patterns = ["general_patterns"]
return FileExamination(
file_type=file_type,
structure_pattern=structure_pattern,
sample_lines=sample_lines,
key_patterns=key_patterns,
data_format="structured" if file_ext in ['.csv', '.json'] else "unstructured",
complexity_level="Medium"
)
except Exception as e:
return FileExamination(
file_type="error",
structure_pattern=f"Error reading file: {str(e)}",
sample_lines=[],
key_patterns=[],
data_format="unknown",
complexity_level="Simple"
)
def generate_code_guidance_streaming(file_examination: FileExamination, analysis_query: str = None) -> CodeGuidance:
"""Generate code guidance with quick processing for streaming."""
if not file_examination or file_examination.file_type == "error":
return CodeGuidance(
analysis_approach="Basic file analysis with error handling",
required_imports=["re", "os"],
code_structure="1. Check file exists\n2. Basic error handling\n3. Simple output",
specific_patterns=[],
expected_outputs=["Error message"],
error_handling="Try-catch with informative errors"
)
# Quick guidance based on file type
if "log" in file_examination.file_type:
approach = "Log file analysis with pattern matching"
imports = ["re", "datetime", "collections"]
patterns = [r'\d{4}-\d{2}-\d{2}', r'ERROR|WARN|INFO', r'\d+\.\d+\.\d+\.\d+']
outputs = ["Error counts", "Timeline analysis", "IP addresses"]
elif "csv" in file_examination.file_type:
approach = "CSV data analysis with statistical insights"
imports = ["pandas", "numpy", "re"]
patterns = [r'^\w+,', r'\d+', r'\w+@\w+']
outputs = ["Data summary", "Column analysis", "Statistics"]
elif "json" in file_examination.file_type:
approach = "JSON structure analysis and data extraction"
imports = ["json", "re", "collections"]
patterns = [r'"[\w]+":', r'\{.*\}', r'\[.*\]']
outputs = ["Structure overview", "Key extraction", "Value analysis"]
else:
approach = "General text analysis with pattern detection"
imports = ["re", "collections", "os"]
patterns = [r'\w+', r'\d+', r'[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}']
outputs = ["Pattern summary", "Content analysis", "Statistics"]
return CodeGuidance(
analysis_approach=approach,
required_imports=imports,
code_structure=f"1. Load and validate file\n2. Apply {len(patterns)} specific patterns\n3. Generate insights\n4. Format results",
specific_patterns=patterns,
expected_outputs=outputs,
error_handling="Comprehensive error handling with informative messages"
)
def execute_guided_analysis_streaming(file_path: str, file_examination: FileExamination,
code_guidance: CodeGuidance, analysis_query: str = None) -> Generator[str, None, None]:
"""Execute the analysis with streaming progress updates."""
try:
yield "### πŸ”„ **Initializing Analysis Environment**\n\n"
# Create analysis agent
try:
model = init_chat_model("gpt-4.1-2025-04-14", model_provider="openai")
agent = create_analysis_agent(file_path, model)
yield "βœ… Analysis agent initialized successfully\n\n"
except Exception as e:
yield f"❌ Failed to initialize agent: {str(e)}\n\n"
return
yield "### πŸ“ **Generating Analysis Code**\n\n"
# Create analysis prompt
user_analysis = analyze_user_question(analysis_query or "Comprehensive analysis")
analysis_prompt = f"""
Analyze the uploaded file based on this guidance:
**File Information:**
- Type: {file_examination.file_type}
- Structure: {file_examination.structure_pattern}
- Format: {file_examination.data_format}
- Complexity: {file_examination.complexity_level}
**User Question Analysis:**
- Intent: {user_analysis['intent']}
- Focus Areas: {user_analysis['focus_areas']}
- Analysis Type: {user_analysis['analysis_type']}
**Generated Guidance:**
- Approach: {code_guidance.analysis_approach}
- Required Imports: {code_guidance.required_imports}
- Patterns to Use: {code_guidance.specific_patterns}
- Expected Outputs: {code_guidance.expected_outputs}
**User's Specific Question:** {analysis_query or 'Provide comprehensive analysis'}
Please write Python code that follows this guidance and analyzes the file. The file is available at the virtual path '/uploaded_file.log'.
"""
yield "βœ… Analysis prompt prepared\n\n"
yield "### ⚑ **Running AI Analysis**\n\n"
# Execute analysis
try:
async def run_analysis():
result = await agent.ainvoke({"messages": [{"role": "user", "content": analysis_prompt}]})
return result
yield "πŸ€– AI model is analyzing your file...\n\n"
result = asyncio.run(run_analysis())
# Extract the final message
if result and "messages" in result:
final_message = result["messages"][-1]
if hasattr(final_message, 'content'):
yield "### πŸ“Š **Analysis Results**\n\n"
yield final_message.content
yield "\n\n"
else:
yield "❌ No content in analysis result\n\n"
else:
yield "❌ Invalid analysis result format\n\n"
except Exception as e:
yield f"❌ Error during analysis execution: {str(e)}\n\n"
except Exception as e:
yield f"❌ Error in analysis setup: {str(e)}\n\n"