Spaces:
Running
Running
import asyncio | |
import ast | |
import os | |
import re | |
from typing import Annotated, Dict, List, Optional | |
from typing_extensions import TypedDict | |
from dotenv import find_dotenv, load_dotenv | |
from langchain.chat_models import init_chat_model | |
from langgraph.graph import END, START, StateGraph | |
from pydantic import BaseModel, Field | |
# Import your existing agent functionality | |
from agent import create_analysis_agent, FileInjectedPyodideSandbox, create_pyodide_eval_fn | |
load_dotenv(find_dotenv()) | |
# Initialize the language model | |
model = init_chat_model( | |
model="gpt-4.1-2025-04-14", | |
api_key=os.getenv("OPENAI_API_KEY"), | |
) | |
class FileExamination(BaseModel): | |
"""File examination results""" | |
file_type: str = Field(description="Type of file detected (log, csv, json, etc.)") | |
structure_pattern: str = Field(description="Detected structure pattern of the file") | |
sample_lines: List[str] = Field(description="First few lines of the file") | |
key_patterns: List[str] = Field(description="Important patterns found in sample") | |
data_format: str = Field(description="Format of data (structured, unstructured, mixed)") | |
complexity_level: str = Field(description="Simple, Medium, or Complex") | |
class CodeGuidance(BaseModel): | |
"""Code generation guidance""" | |
analysis_approach: str = Field(description="Recommended analysis approach") | |
required_imports: List[str] = Field(description="Python imports needed") | |
code_structure: str = Field(description="Step-by-step code structure") | |
specific_patterns: List[str] = Field(description="Specific regex patterns to use") | |
expected_outputs: List[str] = Field(description="What outputs to generate") | |
error_handling: str = Field(description="Error handling recommendations") | |
class CodeAnalysisState(TypedDict): | |
"""State for the code analysis workflow""" | |
file_path: str # Input file path | |
analysis_query: Optional[str] # Optional custom analysis query | |
# File examination results | |
file_examination: Optional[FileExamination] | |
# Generated guidance | |
code_guidance: Optional[CodeGuidance] | |
# Final results | |
generated_code: Optional[str] | |
execution_result: Optional[str] | |
final_analysis: Optional[str] | |
def validate_python_code(code: str) -> tuple[bool, str]: | |
""" | |
Validate Python code for syntax errors and potential issues. | |
Returns (is_valid, error_message) | |
""" | |
try: | |
# Try to parse the code as AST | |
ast.parse(code) | |
# Check for common problematic patterns | |
lines = code.split('\n') | |
for i, line in enumerate(lines, 1): | |
line_stripped = line.strip() | |
# Check for unterminated strings | |
if line_stripped.startswith('print(') and not line_stripped.endswith(')'): | |
if line_stripped.count('"') % 2 != 0 or line_stripped.count("'") % 2 != 0: | |
return False, f"Line {i}: Potentially unterminated string in print statement" | |
# Check for very long lines that might get truncated | |
if len(line) > 100: | |
return False, f"Line {i}: Line too long ({len(line)} chars) - may cause truncation" | |
return True, "Code validation passed" | |
except SyntaxError as e: | |
return False, f"Syntax error: {e.msg} at line {e.lineno}" | |
except Exception as e: | |
return False, f"Validation error: {str(e)}" | |
def examine_file_structure(state: CodeAnalysisState) -> CodeAnalysisState: | |
""" | |
Node 1: Examine the file structure by reading the first several lines | |
and understanding the file format and patterns. | |
""" | |
file_path = state["file_path"] | |
if not os.path.exists(file_path): | |
return { | |
"file_examination": FileExamination( | |
file_type="error", | |
structure_pattern="File not found", | |
sample_lines=[], | |
key_patterns=[], | |
data_format="unknown", | |
complexity_level="Simple" | |
) | |
} | |
try: | |
# Read first 20 lines for examination | |
with open(file_path, 'r', encoding='utf-8') as f: | |
sample_lines = [] | |
for i, line in enumerate(f): | |
if i >= 20: # Read first 20 lines | |
break | |
sample_lines.append(line.rstrip('\n\r')) | |
if not sample_lines: | |
sample_lines = ["<empty file>"] | |
# Create examination prompt | |
examination_model = model.with_structured_output(FileExamination) | |
sample_text = '\n'.join(sample_lines[:10]) # Show first 10 lines in prompt | |
message = { | |
"role": "user", | |
"content": f""" | |
Examine this file sample and determine its structure and characteristics: | |
FILE PATH: {file_path} | |
FILE EXTENSION: {os.path.splitext(file_path)[1]} | |
FIRST 10 LINES: | |
``` | |
{sample_text} | |
``` | |
TOTAL SAMPLE LINES AVAILABLE: {len(sample_lines)} | |
Analyze and determine: | |
1. What type of file this is (log file, CSV, JSON, text, etc.) | |
2. The structure pattern (each line format/pattern) | |
3. Key patterns that would be important for analysis (timestamps, IPs, error codes, etc.) | |
4. Data format classification (structured/unstructured/mixed) | |
5. Complexity level for analysis (Simple/Medium/Complex) | |
Be specific about patterns you detect - these will guide code generation. | |
""" | |
} | |
examination_result = examination_model.invoke([message]) | |
examination_result.sample_lines = sample_lines # Keep full sample | |
print(f"π File Examination Complete:") | |
print(f" Type: {examination_result.file_type}") | |
print(f" Structure: {examination_result.structure_pattern}") | |
print(f" Complexity: {examination_result.complexity_level}") | |
print(f" Key Patterns: {examination_result.key_patterns}") | |
return {"file_examination": examination_result} | |
except Exception as e: | |
print(f"β Error examining file: {e}") | |
return { | |
"file_examination": FileExamination( | |
file_type="error", | |
structure_pattern=f"Error reading file: {str(e)}", | |
sample_lines=[], | |
key_patterns=[], | |
data_format="unknown", | |
complexity_level="Simple" | |
) | |
} | |
def generate_code_guidance(state: CodeAnalysisState) -> CodeAnalysisState: | |
""" | |
Node 2: Generate specific code guidance based on both the file examination and user question. | |
This creates a targeted prompt for the code generation that addresses the user's specific needs. | |
""" | |
file_examination = state["file_examination"] | |
analysis_query = state.get("analysis_query", "") | |
if not file_examination or file_examination.file_type == "error": | |
return { | |
"code_guidance": CodeGuidance( | |
analysis_approach="Basic file analysis with error handling", | |
required_imports=["re", "os"], | |
code_structure="1. Check file exists\n2. Basic error handling\n3. Simple output", | |
specific_patterns=[], | |
expected_outputs=["Error message"], | |
error_handling="Try-catch with informative errors" | |
) | |
} | |
try: | |
guidance_model = model.with_structured_output(CodeGuidance) | |
sample_preview = '\n'.join(file_examination.sample_lines[:5]) | |
# Analyze the user's question to understand intent | |
question_analysis = analyze_user_question(analysis_query or "General comprehensive analysis") | |
message = { | |
"role": "user", | |
"content": f""" | |
Generate QUESTION-SPECIFIC Python code guidance for analyzing this file: | |
FILE ANALYSIS RESULTS: | |
- File Type: {file_examination.file_type} | |
- Structure Pattern: {file_examination.structure_pattern} | |
- Data Format: {file_examination.data_format} | |
- Complexity: {file_examination.complexity_level} | |
- Key Patterns Found: {file_examination.key_patterns} | |
SAMPLE LINES: | |
``` | |
{sample_preview} | |
``` | |
USER'S SPECIFIC QUESTION: "{analysis_query or "General comprehensive analysis"}" | |
QUESTION ANALYSIS: | |
- Intent: {question_analysis['intent']} | |
- Focus Areas: {question_analysis['focus_areas']} | |
- Expected Analysis Type: {question_analysis['analysis_type']} | |
- Key Terms: {question_analysis['key_terms']} | |
Based on BOTH the file structure AND the user's specific question, provide targeted guidance: | |
1. **Analysis Approach**: What specific method addresses the user's question for this file type | |
2. **Required Imports**: Exact Python imports needed for this specific analysis | |
3. **Code Structure**: Step-by-step structure that answers the user's question | |
4. **Specific Patterns**: Exact regex patterns or operations needed for the user's query | |
5. **Expected Outputs**: What specific outputs will answer the user's question | |
6. **Error Handling**: How to handle issues specific to this analysis type | |
IMPORTANT - Make guidance QUESTION-SPECIFIC: | |
- If user asks about "security", focus on authentication, IPs, failed logins, errors | |
- If user asks about "performance", focus on response times, slow operations, bottlenecks | |
- If user asks about "patterns", focus on frequency analysis, trends, anomalies | |
- If user asks about "errors", focus on error extraction, categorization, root causes | |
- If user asks about "statistics", focus on counts, averages, distributions | |
- If user asks about "time trends", focus on temporal analysis, time-based patterns | |
Generate code guidance that directly answers their question using the detected file structure. | |
""" | |
} | |
guidance_result = guidance_model.invoke([message]) | |
print(f"π― Code Guidance Generated:") | |
print(f" Approach: {guidance_result.analysis_approach}") | |
print(f" Imports: {guidance_result.required_imports}") | |
print(f" Patterns: {len(guidance_result.specific_patterns)} specific patterns") | |
return {"code_guidance": guidance_result} | |
except Exception as e: | |
print(f"β Error generating guidance: {e}") | |
return { | |
"code_guidance": CodeGuidance( | |
analysis_approach="Basic analysis with error recovery", | |
required_imports=["re", "os"], | |
code_structure="Simple analysis with error handling", | |
specific_patterns=[], | |
expected_outputs=["Basic file information"], | |
error_handling="Comprehensive try-catch blocks" | |
) | |
} | |
def execute_guided_analysis(state: CodeAnalysisState) -> CodeAnalysisState: | |
""" | |
Node 3: Execute the file analysis using the generated guidance with code quality validation. | |
""" | |
file_path = state["file_path"] | |
file_examination = state["file_examination"] | |
code_guidance = state["code_guidance"] | |
analysis_query = state.get("analysis_query", "") | |
if not file_examination or not code_guidance: | |
return { | |
"execution_result": "β Missing examination or guidance data", | |
"final_analysis": "Analysis failed due to missing preliminary data" | |
} | |
try: | |
# Create the guided analysis query with strict code quality requirements | |
guided_query = f"""Based on the file examination and guidance, analyze this file with the following SPECIFIC instructions: | |
FILE CONTEXT: | |
- File Type: {file_examination.file_type} | |
- Structure: {file_examination.structure_pattern} | |
- Data Format: {file_examination.data_format} | |
- Complexity: {file_examination.complexity_level} | |
CODING GUIDANCE TO FOLLOW: | |
- Analysis Approach: {code_guidance.analysis_approach} | |
- Required Imports: {', '.join(code_guidance.required_imports)} | |
- Code Structure: {code_guidance.code_structure} | |
- Specific Patterns: {code_guidance.specific_patterns} | |
- Expected Outputs: {', '.join(code_guidance.expected_outputs)} | |
- Error Handling: {code_guidance.error_handling} | |
SAMPLE FILE STRUCTURE (first few lines): | |
``` | |
{chr(10).join(file_examination.sample_lines[:5])} | |
``` | |
USER REQUEST: {analysis_query or "Comprehensive analysis following the guidance above"} | |
CRITICAL CODE QUALITY REQUIREMENTS: | |
1. ALL print statements MUST be on single lines with properly closed quotes | |
2. NO multi-line strings or f-strings that span multiple lines | |
3. NO print statements longer than 80 characters - break into multiple prints instead | |
4. ALL strings must be properly terminated with matching quotes | |
5. Use short variable names and concise output formatting | |
6. If you need to print long text, use multiple short print() calls | |
7. Always close parentheses, brackets, and quotes on the same line they open | |
8. Use simple string concatenation instead of complex f-strings for long output | |
9. NEVER use triple quotes for multi-line strings in limited execution environments | |
10. Test each print statement individually to ensure it executes without truncation | |
EXAMPLE OF SAFE CODING PRACTICES: | |
```python | |
# GOOD - Short, single-line prints | |
print("=== Results ===") | |
print(f"Count: {{count}}") | |
print(f"User: {{user}}") | |
# BAD - Long print that could be truncated | |
print(f"This is a very long print statement that could get truncated...") | |
# GOOD - Break long output into multiple prints | |
print("Analysis complete:") | |
print(f"Found {{count}} items") | |
print(f"Top user: {{user}}") | |
``` | |
MANDATORY CODE GENERATION PROCESS: | |
1. Generate your analysis code following the above requirements | |
2. Before presenting the code, internally validate each line for potential issues | |
3. Ensure ALL print statements are under 80 characters | |
4. Verify all quotes and parentheses are properly closed | |
5. If any line might cause issues, rewrite it using multiple shorter statements | |
INSTRUCTIONS: | |
1. Follow the specified analysis approach exactly | |
2. Import only the recommended libraries: {', '.join(code_guidance.required_imports)} | |
3. Use the specific patterns provided: {code_guidance.specific_patterns} | |
4. Structure your code following: {code_guidance.code_structure} | |
5. Generate the expected outputs: {', '.join(code_guidance.expected_outputs)} | |
6. Implement proper error handling: {code_guidance.error_handling} | |
7. ENSURE ALL CODE FOLLOWS THE QUALITY REQUIREMENTS ABOVE | |
Since you have detailed guidance about this specific file structure, your code should be highly accurate and efficient. | |
The file examination shows this is a {file_examination.file_type} with {file_examination.data_format} data format. | |
Write Python code that leverages this specific knowledge for optimal analysis and follows strict code quality standards. | |
""" | |
print(f"π Executing guided analysis...") | |
print(f" Using {len(code_guidance.required_imports)} specific imports") | |
print(f" Following {file_examination.complexity_level} complexity approach") | |
# Use the existing agent with the guided query | |
agent = create_analysis_agent(file_path, model) | |
async def run_guided_analysis(): | |
result_parts = [] | |
async for typ, chunk in agent.astream( | |
{"messages": guided_query}, | |
stream_mode=["values", "messages"], | |
): | |
if typ == "messages": | |
if hasattr(chunk[0], 'content') and chunk[0].content: | |
result_parts.append(chunk[0].content) | |
elif typ == "values": | |
if chunk and "messages" in chunk: | |
final_message = chunk["messages"][-1] | |
if hasattr(final_message, 'content') and final_message.content: | |
result_parts.append(f"\n\n=== FINAL ANALYSIS ===\n{final_message.content}") | |
return "\n".join(result_parts) if result_parts else "Analysis completed but no output generated." | |
# Run the analysis | |
execution_result = asyncio.run(run_guided_analysis()) | |
# Create final analysis summary | |
final_analysis = f"""=== GUIDED FILE ANALYSIS RESULTS === | |
File: {file_path} | |
Type: {file_examination.file_type} ({file_examination.data_format}) | |
Approach: {code_guidance.analysis_approach} | |
{execution_result} | |
=== ANALYSIS METADATA === | |
- Examination guided approach: β | |
- Specific patterns used: {len(code_guidance.specific_patterns)} patterns | |
- Complexity level: {file_examination.complexity_level} | |
- Guided imports: {', '.join(code_guidance.required_imports)} | |
""" | |
print(f"β Guided analysis completed successfully!") | |
return { | |
"execution_result": execution_result, | |
"final_analysis": final_analysis | |
} | |
except Exception as e: | |
error_msg = f"β Error in guided analysis execution: {str(e)}" | |
print(error_msg) | |
return { | |
"execution_result": error_msg, | |
"final_analysis": f"Analysis failed: {str(e)}" | |
} | |
def build_guided_analysis_graph(): | |
""" | |
Build the guided file analysis workflow graph. | |
The workflow: | |
1. Examine file structure (first ~20 lines) | |
2. Generate specific code guidance based on structure | |
3. Execute analysis with improved guidance | |
""" | |
builder = StateGraph(CodeAnalysisState) | |
# Add nodes | |
builder.add_node("examine_file_structure", examine_file_structure) | |
builder.add_node("generate_code_guidance", generate_code_guidance) | |
builder.add_node("execute_guided_analysis", execute_guided_analysis) | |
# Add edges - linear workflow | |
builder.add_edge(START, "examine_file_structure") | |
builder.add_edge("examine_file_structure", "generate_code_guidance") | |
builder.add_edge("generate_code_guidance", "execute_guided_analysis") | |
builder.add_edge("execute_guided_analysis", END) | |
return builder.compile() | |
# Create the graph instance | |
guided_analysis_graph = build_guided_analysis_graph() | |
def analyze_user_question(question: str) -> dict: | |
""" | |
Analyze the user's question to understand their intent and focus areas. | |
This helps generate more targeted code guidance. | |
""" | |
question_lower = question.lower() | |
# Determine primary intent | |
intent = "general" | |
if any(word in question_lower for word in ["security", "threat", "attack", "login", "auth", "breach", "suspicious"]): | |
intent = "security" | |
elif any(word in question_lower for word in ["performance", "slow", "fast", "speed", "time", "latency", "bottleneck"]): | |
intent = "performance" | |
elif any(word in question_lower for word in ["error", "exception", "fail", "problem", "issue", "bug"]): | |
intent = "error_analysis" | |
elif any(word in question_lower for word in ["pattern", "trend", "frequent", "common", "anomal", "unusual"]): | |
intent = "pattern_analysis" | |
elif any(word in question_lower for word in ["statistic", "count", "average", "distribution", "summary", "metrics"]): | |
intent = "statistical" | |
elif any(word in question_lower for word in ["time", "temporal", "timeline", "chronological", "over time"]): | |
intent = "temporal" | |
# Identify focus areas | |
focus_areas = [] | |
if "ip" in question_lower or "address" in question_lower: | |
focus_areas.append("ip_analysis") | |
if "user" in question_lower or "account" in question_lower: | |
focus_areas.append("user_analysis") | |
if "endpoint" in question_lower or "api" in question_lower or "url" in question_lower: | |
focus_areas.append("endpoint_analysis") | |
if "database" in question_lower or "query" in question_lower or "db" in question_lower: | |
focus_areas.append("database_analysis") | |
if "network" in question_lower or "connection" in question_lower: | |
focus_areas.append("network_analysis") | |
# Determine analysis type | |
analysis_type = "comprehensive" | |
if any(word in question_lower for word in ["find", "identify", "detect", "search"]): | |
analysis_type = "detection" | |
elif any(word in question_lower for word in ["show", "list", "display", "get"]): | |
analysis_type = "extraction" | |
elif any(word in question_lower for word in ["analyze", "examine", "investigate"]): | |
analysis_type = "deep_analysis" | |
elif any(word in question_lower for word in ["count", "how many", "frequency"]): | |
analysis_type = "quantitative" | |
elif any(word in question_lower for word in ["compare", "correlation", "relationship"]): | |
analysis_type = "comparative" | |
# Extract key terms | |
key_terms = [] | |
import re | |
# Extract quoted terms | |
quoted_terms = re.findall(r'"([^"]*)"', question) | |
key_terms.extend(quoted_terms) | |
# Extract technical terms | |
tech_terms = re.findall(r'\b(?:login|logout|auth|api|endpoint|database|query|ip|user|error|exception|timeout|response|request|status|code)\b', question_lower) | |
key_terms.extend(tech_terms) | |
return { | |
"intent": intent, | |
"focus_areas": focus_areas if focus_areas else ["general"], | |
"analysis_type": analysis_type, | |
"key_terms": list(set(key_terms)) # Remove duplicates | |
} | |
async def analyze_file_with_guidance(file_path: str, analysis_query: str = None) -> str: | |
""" | |
Main function to analyze a file using the guided approach. | |
Args: | |
file_path: Path to the file to analyze | |
analysis_query: Optional specific analysis request | |
Returns: | |
Final analysis results | |
""" | |
print(f"π Starting guided analysis for: {file_path}") | |
# Initialize state | |
initial_state = { | |
"file_path": file_path, | |
"analysis_query": analysis_query | |
} | |
# Run the graph | |
try: | |
final_state = await guided_analysis_graph.ainvoke(initial_state) | |
return final_state.get("final_analysis", "Analysis completed but no results generated.") | |
except Exception as e: | |
return f"β Guided analysis failed: {str(e)}" | |
def analyze_file_with_guidance_sync(file_path: str, analysis_query: str = None) -> str: | |
""" | |
Synchronous wrapper for the guided analysis. | |
""" | |
return asyncio.run(analyze_file_with_guidance(file_path, analysis_query)) | |
# Example usage and testing | |
if __name__ == "__main__": | |
import sys | |
if len(sys.argv) > 1: | |
test_file_path = sys.argv[1] | |
test_query = sys.argv[2] if len(sys.argv) > 2 else None | |
print(f"π§ͺ Testing guided analysis with: {test_file_path}") | |
if test_query: | |
print(f"π Custom query: {test_query}") | |
result = analyze_file_with_guidance_sync(test_file_path, test_query) | |
print("\n" + "="*80) | |
print("GUIDED ANALYSIS RESULT:") | |
print("="*80) | |
print(result) | |
else: | |
print("Usage: python graph.py <file_path> [analysis_query]") | |
print("\nThis will run the guided analysis workflow that:") | |
print("1. π Examines file structure (first ~20 lines)") | |
print("2. π― Generates specific code guidance") | |
print("3. π Executes analysis with improved context") | |