DataForge / graph.py
ai-puppy
save
3774bab
raw
history blame
20.9 kB
import asyncio
import os
import re
from typing import Annotated, Dict, List, Optional
from typing_extensions import TypedDict
from dotenv import find_dotenv, load_dotenv
from langchain.chat_models import init_chat_model
from langgraph.graph import END, START, StateGraph
from pydantic import BaseModel, Field
# Import your existing agent functionality
from agent import create_analysis_agent, FileInjectedPyodideSandbox, create_pyodide_eval_fn
load_dotenv(find_dotenv())
# Initialize the language model
model = init_chat_model(
model="gpt-4.1-2025-04-14",
api_key=os.getenv("OPENAI_API_KEY"),
)
class FileExamination(BaseModel):
"""File examination results"""
file_type: str = Field(description="Type of file detected (log, csv, json, etc.)")
structure_pattern: str = Field(description="Detected structure pattern of the file")
sample_lines: List[str] = Field(description="First few lines of the file")
key_patterns: List[str] = Field(description="Important patterns found in sample")
data_format: str = Field(description="Format of data (structured, unstructured, mixed)")
complexity_level: str = Field(description="Simple, Medium, or Complex")
class CodeGuidance(BaseModel):
"""Code generation guidance"""
analysis_approach: str = Field(description="Recommended analysis approach")
required_imports: List[str] = Field(description="Python imports needed")
code_structure: str = Field(description="Step-by-step code structure")
specific_patterns: List[str] = Field(description="Specific regex patterns to use")
expected_outputs: List[str] = Field(description="What outputs to generate")
error_handling: str = Field(description="Error handling recommendations")
class CodeAnalysisState(TypedDict):
"""State for the code analysis workflow"""
file_path: str # Input file path
analysis_query: Optional[str] # Optional custom analysis query
# File examination results
file_examination: Optional[FileExamination]
# Generated guidance
code_guidance: Optional[CodeGuidance]
# Final results
generated_code: Optional[str]
execution_result: Optional[str]
final_analysis: Optional[str]
def examine_file_structure(state: CodeAnalysisState) -> CodeAnalysisState:
"""
Node 1: Examine the file structure by reading the first several lines
and understanding the file format and patterns.
"""
file_path = state["file_path"]
if not os.path.exists(file_path):
return {
"file_examination": FileExamination(
file_type="error",
structure_pattern="File not found",
sample_lines=[],
key_patterns=[],
data_format="unknown",
complexity_level="Simple"
)
}
try:
# Read first 20 lines for examination
with open(file_path, 'r', encoding='utf-8') as f:
sample_lines = []
for i, line in enumerate(f):
if i >= 20: # Read first 20 lines
break
sample_lines.append(line.rstrip('\n\r'))
if not sample_lines:
sample_lines = ["<empty file>"]
# Create examination prompt
examination_model = model.with_structured_output(FileExamination)
sample_text = '\n'.join(sample_lines[:10]) # Show first 10 lines in prompt
message = {
"role": "user",
"content": f"""
Examine this file sample and determine its structure and characteristics:
FILE PATH: {file_path}
FILE EXTENSION: {os.path.splitext(file_path)[1]}
FIRST 10 LINES:
```
{sample_text}
```
TOTAL SAMPLE LINES AVAILABLE: {len(sample_lines)}
Analyze and determine:
1. What type of file this is (log file, CSV, JSON, text, etc.)
2. The structure pattern (each line format/pattern)
3. Key patterns that would be important for analysis (timestamps, IPs, error codes, etc.)
4. Data format classification (structured/unstructured/mixed)
5. Complexity level for analysis (Simple/Medium/Complex)
Be specific about patterns you detect - these will guide code generation.
"""
}
examination_result = examination_model.invoke([message])
examination_result.sample_lines = sample_lines # Keep full sample
print(f"πŸ“‹ File Examination Complete:")
print(f" Type: {examination_result.file_type}")
print(f" Structure: {examination_result.structure_pattern}")
print(f" Complexity: {examination_result.complexity_level}")
print(f" Key Patterns: {examination_result.key_patterns}")
return {"file_examination": examination_result}
except Exception as e:
print(f"❌ Error examining file: {e}")
return {
"file_examination": FileExamination(
file_type="error",
structure_pattern=f"Error reading file: {str(e)}",
sample_lines=[],
key_patterns=[],
data_format="unknown",
complexity_level="Simple"
)
}
def generate_code_guidance(state: CodeAnalysisState) -> CodeAnalysisState:
"""
Node 2: Generate specific code guidance based on both the file examination and user question.
This creates a targeted prompt for the code generation that addresses the user's specific needs.
"""
file_examination = state["file_examination"]
analysis_query = state.get("analysis_query", "")
if not file_examination or file_examination.file_type == "error":
return {
"code_guidance": CodeGuidance(
analysis_approach="Basic file analysis with error handling",
required_imports=["re", "os"],
code_structure="1. Check file exists\n2. Basic error handling\n3. Simple output",
specific_patterns=[],
expected_outputs=["Error message"],
error_handling="Try-catch with informative errors"
)
}
try:
guidance_model = model.with_structured_output(CodeGuidance)
sample_preview = '\n'.join(file_examination.sample_lines[:5])
# Analyze the user's question to understand intent
question_analysis = analyze_user_question(analysis_query or "General comprehensive analysis")
message = {
"role": "user",
"content": f"""
Generate QUESTION-SPECIFIC Python code guidance for analyzing this file:
FILE ANALYSIS RESULTS:
- File Type: {file_examination.file_type}
- Structure Pattern: {file_examination.structure_pattern}
- Data Format: {file_examination.data_format}
- Complexity: {file_examination.complexity_level}
- Key Patterns Found: {file_examination.key_patterns}
SAMPLE LINES:
```
{sample_preview}
```
USER'S SPECIFIC QUESTION: "{analysis_query or "General comprehensive analysis"}"
QUESTION ANALYSIS:
- Intent: {question_analysis['intent']}
- Focus Areas: {question_analysis['focus_areas']}
- Expected Analysis Type: {question_analysis['analysis_type']}
- Key Terms: {question_analysis['key_terms']}
Based on BOTH the file structure AND the user's specific question, provide targeted guidance:
1. **Analysis Approach**: What specific method addresses the user's question for this file type
2. **Required Imports**: Exact Python imports needed for this specific analysis
3. **Code Structure**: Step-by-step structure that answers the user's question
4. **Specific Patterns**: Exact regex patterns or operations needed for the user's query
5. **Expected Outputs**: What specific outputs will answer the user's question
6. **Error Handling**: How to handle issues specific to this analysis type
IMPORTANT - Make guidance QUESTION-SPECIFIC:
- If user asks about "security", focus on authentication, IPs, failed logins, errors
- If user asks about "performance", focus on response times, slow operations, bottlenecks
- If user asks about "patterns", focus on frequency analysis, trends, anomalies
- If user asks about "errors", focus on error extraction, categorization, root causes
- If user asks about "statistics", focus on counts, averages, distributions
- If user asks about "time trends", focus on temporal analysis, time-based patterns
Generate code guidance that directly answers their question using the detected file structure.
"""
}
guidance_result = guidance_model.invoke([message])
print(f"🎯 Code Guidance Generated:")
print(f" Approach: {guidance_result.analysis_approach}")
print(f" Imports: {guidance_result.required_imports}")
print(f" Patterns: {len(guidance_result.specific_patterns)} specific patterns")
return {"code_guidance": guidance_result}
except Exception as e:
print(f"❌ Error generating guidance: {e}")
return {
"code_guidance": CodeGuidance(
analysis_approach="Basic analysis with error recovery",
required_imports=["re", "os"],
code_structure="Simple analysis with error handling",
specific_patterns=[],
expected_outputs=["Basic file information"],
error_handling="Comprehensive try-catch blocks"
)
}
def execute_guided_analysis(state: CodeAnalysisState) -> CodeAnalysisState:
"""
Node 3: Execute the file analysis using the generated guidance.
This replaces the original agent with guided code generation.
"""
file_path = state["file_path"]
file_examination = state["file_examination"]
code_guidance = state["code_guidance"]
analysis_query = state.get("analysis_query", "")
if not file_examination or not code_guidance:
return {
"execution_result": "❌ Missing examination or guidance data",
"final_analysis": "Analysis failed due to missing preliminary data"
}
try:
# Create the guided analysis query
guided_query = f"""
Based on the file examination and guidance, analyze this file with the following SPECIFIC instructions:
FILE CONTEXT:
- File Type: {file_examination.file_type}
- Structure: {file_examination.structure_pattern}
- Data Format: {file_examination.data_format}
- Complexity: {file_examination.complexity_level}
CODING GUIDANCE TO FOLLOW:
- Analysis Approach: {code_guidance.analysis_approach}
- Required Imports: {', '.join(code_guidance.required_imports)}
- Code Structure: {code_guidance.code_structure}
- Specific Patterns: {code_guidance.specific_patterns}
- Expected Outputs: {', '.join(code_guidance.expected_outputs)}
- Error Handling: {code_guidance.error_handling}
SAMPLE FILE STRUCTURE (first few lines):
```
{chr(10).join(file_examination.sample_lines[:5])}
```
USER REQUEST: {analysis_query or "Comprehensive analysis following the guidance above"}
INSTRUCTIONS:
1. Follow the specified analysis approach exactly
2. Import only the recommended libraries: {', '.join(code_guidance.required_imports)}
3. Use the specific patterns provided: {code_guidance.specific_patterns}
4. Structure your code following: {code_guidance.code_structure}
5. Generate the expected outputs: {', '.join(code_guidance.expected_outputs)}
6. Implement proper error handling: {code_guidance.error_handling}
Since you have detailed guidance about this specific file structure, your code should be highly accurate and efficient.
The file examination shows this is a {file_examination.file_type} with {file_examination.data_format} data format.
Write Python code that leverages this specific knowledge for optimal analysis.
"""
print(f"πŸš€ Executing guided analysis...")
print(f" Using {len(code_guidance.required_imports)} specific imports")
print(f" Following {file_examination.complexity_level} complexity approach")
# Use the existing agent with the guided query
agent = create_analysis_agent(file_path, model)
async def run_guided_analysis():
result_parts = []
async for typ, chunk in agent.astream(
{"messages": guided_query},
stream_mode=["values", "messages"],
):
if typ == "messages":
if hasattr(chunk[0], 'content') and chunk[0].content:
result_parts.append(chunk[0].content)
elif typ == "values":
if chunk and "messages" in chunk:
final_message = chunk["messages"][-1]
if hasattr(final_message, 'content') and final_message.content:
result_parts.append(f"\n\n=== FINAL ANALYSIS ===\n{final_message.content}")
return "\n".join(result_parts) if result_parts else "Analysis completed but no output generated."
# Run the analysis
execution_result = asyncio.run(run_guided_analysis())
# Create final analysis summary
final_analysis = f"""
=== GUIDED FILE ANALYSIS RESULTS ===
File: {file_path}
Type: {file_examination.file_type} ({file_examination.data_format})
Approach: {code_guidance.analysis_approach}
{execution_result}
=== ANALYSIS METADATA ===
- Examination guided approach: βœ…
- Specific patterns used: {len(code_guidance.specific_patterns)} patterns
- Complexity level: {file_examination.complexity_level}
- Guided imports: {', '.join(code_guidance.required_imports)}
"""
print(f"βœ… Guided analysis completed successfully!")
return {
"execution_result": execution_result,
"final_analysis": final_analysis
}
except Exception as e:
error_msg = f"❌ Error in guided analysis execution: {str(e)}"
print(error_msg)
return {
"execution_result": error_msg,
"final_analysis": f"Analysis failed: {str(e)}"
}
def build_guided_analysis_graph():
"""
Build the guided file analysis workflow graph.
The workflow:
1. Examine file structure (first ~20 lines)
2. Generate specific code guidance based on structure
3. Execute analysis with improved guidance
"""
builder = StateGraph(CodeAnalysisState)
# Add nodes
builder.add_node("examine_file_structure", examine_file_structure)
builder.add_node("generate_code_guidance", generate_code_guidance)
builder.add_node("execute_guided_analysis", execute_guided_analysis)
# Add edges - linear workflow
builder.add_edge(START, "examine_file_structure")
builder.add_edge("examine_file_structure", "generate_code_guidance")
builder.add_edge("generate_code_guidance", "execute_guided_analysis")
builder.add_edge("execute_guided_analysis", END)
return builder.compile()
# Create the graph instance
guided_analysis_graph = build_guided_analysis_graph()
def analyze_user_question(question: str) -> dict:
"""
Analyze the user's question to understand their intent and focus areas.
This helps generate more targeted code guidance.
"""
question_lower = question.lower()
# Determine primary intent
intent = "general"
if any(word in question_lower for word in ["security", "threat", "attack", "login", "auth", "breach", "suspicious"]):
intent = "security"
elif any(word in question_lower for word in ["performance", "slow", "fast", "speed", "time", "latency", "bottleneck"]):
intent = "performance"
elif any(word in question_lower for word in ["error", "exception", "fail", "problem", "issue", "bug"]):
intent = "error_analysis"
elif any(word in question_lower for word in ["pattern", "trend", "frequent", "common", "anomal", "unusual"]):
intent = "pattern_analysis"
elif any(word in question_lower for word in ["statistic", "count", "average", "distribution", "summary", "metrics"]):
intent = "statistical"
elif any(word in question_lower for word in ["time", "temporal", "timeline", "chronological", "over time"]):
intent = "temporal"
# Identify focus areas
focus_areas = []
if "ip" in question_lower or "address" in question_lower:
focus_areas.append("ip_analysis")
if "user" in question_lower or "account" in question_lower:
focus_areas.append("user_analysis")
if "endpoint" in question_lower or "api" in question_lower or "url" in question_lower:
focus_areas.append("endpoint_analysis")
if "database" in question_lower or "query" in question_lower or "db" in question_lower:
focus_areas.append("database_analysis")
if "network" in question_lower or "connection" in question_lower:
focus_areas.append("network_analysis")
# Determine analysis type
analysis_type = "comprehensive"
if any(word in question_lower for word in ["find", "identify", "detect", "search"]):
analysis_type = "detection"
elif any(word in question_lower for word in ["show", "list", "display", "get"]):
analysis_type = "extraction"
elif any(word in question_lower for word in ["analyze", "examine", "investigate"]):
analysis_type = "deep_analysis"
elif any(word in question_lower for word in ["count", "how many", "frequency"]):
analysis_type = "quantitative"
elif any(word in question_lower for word in ["compare", "correlation", "relationship"]):
analysis_type = "comparative"
# Extract key terms
key_terms = []
import re
# Extract quoted terms
quoted_terms = re.findall(r'"([^"]*)"', question)
key_terms.extend(quoted_terms)
# Extract technical terms
tech_terms = re.findall(r'\b(?:login|logout|auth|api|endpoint|database|query|ip|user|error|exception|timeout|response|request|status|code)\b', question_lower)
key_terms.extend(tech_terms)
return {
"intent": intent,
"focus_areas": focus_areas if focus_areas else ["general"],
"analysis_type": analysis_type,
"key_terms": list(set(key_terms)) # Remove duplicates
}
async def analyze_file_with_guidance(file_path: str, analysis_query: str = None) -> str:
"""
Main function to analyze a file using the guided approach.
Args:
file_path: Path to the file to analyze
analysis_query: Optional specific analysis request
Returns:
Final analysis results
"""
print(f"πŸ” Starting guided analysis for: {file_path}")
# Initialize state
initial_state = {
"file_path": file_path,
"analysis_query": analysis_query
}
# Run the graph
try:
final_state = await guided_analysis_graph.ainvoke(initial_state)
return final_state.get("final_analysis", "Analysis completed but no results generated.")
except Exception as e:
return f"❌ Guided analysis failed: {str(e)}"
def analyze_file_with_guidance_sync(file_path: str, analysis_query: str = None) -> str:
"""
Synchronous wrapper for the guided analysis.
"""
return asyncio.run(analyze_file_with_guidance(file_path, analysis_query))
# Example usage and testing
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
test_file_path = sys.argv[1]
test_query = sys.argv[2] if len(sys.argv) > 2 else None
print(f"πŸ§ͺ Testing guided analysis with: {test_file_path}")
if test_query:
print(f"πŸ“ Custom query: {test_query}")
result = analyze_file_with_guidance_sync(test_file_path, test_query)
print("\n" + "="*80)
print("GUIDED ANALYSIS RESULT:")
print("="*80)
print(result)
else:
print("Usage: python graph.py <file_path> [analysis_query]")
print("\nThis will run the guided analysis workflow that:")
print("1. πŸ“‹ Examines file structure (first ~20 lines)")
print("2. 🎯 Generates specific code guidance")
print("3. πŸš€ Executes analysis with improved context")