Spaces:
Running
Running
import asyncio | |
import ast | |
import os | |
import re | |
from typing import Annotated, Dict, List, Optional, Generator | |
from typing_extensions import TypedDict | |
from dotenv import find_dotenv, load_dotenv | |
from langchain.chat_models import init_chat_model | |
from langgraph.graph import END, START, StateGraph | |
from pydantic import BaseModel, Field | |
# Import your existing agent functionality | |
from agent import create_analysis_agent, FileInjectedPyodideSandbox, create_pyodide_eval_fn | |
load_dotenv(find_dotenv()) | |
# Initialize the language model | |
model = init_chat_model( | |
model="gpt-4.1-2025-04-14", | |
api_key=os.getenv("OPENAI_API_KEY"), | |
) | |
# Import classes from original graph.py | |
from graph import FileExamination, CodeGuidance, CodeAnalysisState, validate_python_code, analyze_user_question | |
def streaming_analyze_file_with_guidance(file_path: str, analysis_query: str = None) -> Generator[str, None, str]: | |
""" | |
Streaming version of guided file analysis that yields progress updates. | |
Args: | |
file_path: Path to the file to analyze | |
analysis_query: Optional specific analysis request | |
Yields: | |
Progress updates as strings | |
Returns: | |
Final analysis results | |
""" | |
try: | |
yield "π **Starting Guided File Analysis...**\n\n" | |
yield f"π **File:** `{os.path.basename(file_path)}`\n" | |
yield f"β **Question:** {analysis_query or 'Comprehensive analysis'}\n\n" | |
# Step 1: File Examination | |
yield "## π Step 1: Examining File Structure\n\n" | |
yield "Reading first 20 lines to understand file format and patterns...\n\n" | |
file_examination = examine_file_structure_streaming(file_path) | |
yield f"β **File Type Detected:** `{file_examination.file_type}`\n" | |
yield f"β **Structure Pattern:** {file_examination.structure_pattern}\n" | |
yield f"β **Data Format:** {file_examination.data_format}\n" | |
yield f"β **Complexity:** {file_examination.complexity_level}\n" | |
yield f"β **Key Patterns Found:** {len(file_examination.key_patterns)} patterns\n\n" | |
# Show sample of what AI is analyzing | |
if file_examination.sample_lines and len(file_examination.sample_lines) > 0: | |
yield "π **File Sample (First 5 lines):**\n" | |
yield "```\n" | |
for i, line in enumerate(file_examination.sample_lines[:5], 1): | |
# Truncate very long lines | |
display_line = line[:100] + "..." if len(line) > 100 else line | |
yield f"{i:2d}: {display_line}\n" | |
yield "```\n\n" | |
# Step 2: Code Guidance Generation | |
yield "## π― Step 2: Generating Analysis Strategy\n\n" | |
yield "Creating specific code guidance based on file structure and your question...\n\n" | |
code_guidance = generate_code_guidance_streaming(file_examination, analysis_query) | |
yield f"β **Analysis Approach:** {code_guidance.analysis_approach}\n" | |
yield f"β **Required Imports:** {', '.join(code_guidance.required_imports)}\n" | |
yield f"β **Regex Patterns:** {len(code_guidance.specific_patterns)} patterns\n" | |
# Show the actual patterns for transparency | |
if code_guidance.specific_patterns: | |
yield "π **Pattern Details:**\n" | |
for i, pattern in enumerate(code_guidance.specific_patterns[:3], 1): # Show first 3 | |
yield f" {i}. `{pattern}`\n" | |
if len(code_guidance.specific_patterns) > 3: | |
yield f" ... and {len(code_guidance.specific_patterns) - 3} more patterns\n" | |
yield f"β **Expected Outputs:** {', '.join(code_guidance.expected_outputs)}\n\n" | |
# Step 3: Code Execution | |
yield "## π Step 3: Executing Analysis\n\n" | |
yield "Running guided code analysis with enhanced context...\n\n" | |
# Stream the execution results | |
execution_generator = execute_guided_analysis_streaming(file_path, file_examination, code_guidance, analysis_query) | |
execution_results = [] | |
for chunk in execution_generator: | |
yield chunk | |
execution_results.append(chunk) | |
# Final Summary | |
yield "\n\n## β Analysis Complete!\n\n" | |
final_analysis = f"""### π **Analysis Summary** | |
**File:** `{os.path.basename(file_path)}` | |
**Type:** {file_examination.file_type} ({file_examination.data_format}) | |
**Approach:** {code_guidance.analysis_approach} | |
**Complexity:** {file_examination.complexity_level} | |
**Guided Features Used:** | |
- β Structure-aware examination | |
- β Question-specific code generation | |
- β {len(code_guidance.specific_patterns)} targeted patterns | |
- β Enhanced error handling | |
--- | |
{''.join(execution_results)} | |
""" | |
yield final_analysis | |
return final_analysis | |
except Exception as e: | |
error_msg = f"β **Error in guided analysis:** {str(e)}\n\n" | |
yield error_msg | |
return error_msg | |
def examine_file_structure_streaming(file_path: str) -> FileExamination: | |
"""Examine file structure with minimal processing for streaming.""" | |
try: | |
if not os.path.exists(file_path): | |
return FileExamination( | |
file_type="error", | |
structure_pattern="File not found", | |
sample_lines=[], | |
key_patterns=[], | |
data_format="unknown", | |
complexity_level="Simple" | |
) | |
# Read first 20 lines | |
with open(file_path, 'r', encoding='utf-8') as f: | |
sample_lines = [] | |
for i, line in enumerate(f): | |
if i >= 20: | |
break | |
sample_lines.append(line.rstrip('\n\r')) | |
if not sample_lines: | |
sample_lines = ["<empty file>"] | |
# Quick analysis based on file extension and content | |
file_ext = os.path.splitext(file_path)[1].lower() | |
first_lines_text = '\n'.join(sample_lines[:5]) | |
# Simple file type detection | |
if file_ext in ['.log', '.txt']: | |
if 'ERROR' in first_lines_text or 'WARN' in first_lines_text: | |
file_type = "application_log" | |
structure_pattern = "Log entries with timestamps and severity levels" | |
key_patterns = ["timestamp", "log_level", "error_codes"] | |
else: | |
file_type = "text_log" | |
structure_pattern = "Plain text with line-based entries" | |
key_patterns = ["timestamps", "text_patterns"] | |
elif file_ext == '.csv': | |
file_type = "csv_data" | |
structure_pattern = "Comma-separated values with headers" | |
key_patterns = ["column_headers", "data_rows"] | |
elif file_ext == '.json': | |
file_type = "json_data" | |
structure_pattern = "Structured JSON data" | |
key_patterns = ["json_objects", "nested_data"] | |
else: | |
file_type = "generic_file" | |
structure_pattern = "Unknown structure" | |
key_patterns = ["general_patterns"] | |
return FileExamination( | |
file_type=file_type, | |
structure_pattern=structure_pattern, | |
sample_lines=sample_lines, | |
key_patterns=key_patterns, | |
data_format="structured" if file_ext in ['.csv', '.json'] else "unstructured", | |
complexity_level="Medium" | |
) | |
except Exception as e: | |
return FileExamination( | |
file_type="error", | |
structure_pattern=f"Error reading file: {str(e)}", | |
sample_lines=[], | |
key_patterns=[], | |
data_format="unknown", | |
complexity_level="Simple" | |
) | |
def generate_code_guidance_streaming(file_examination: FileExamination, analysis_query: str = None) -> CodeGuidance: | |
"""Generate code guidance with quick processing for streaming.""" | |
if not file_examination or file_examination.file_type == "error": | |
return CodeGuidance( | |
analysis_approach="Basic file analysis with error handling", | |
required_imports=["re", "os"], | |
code_structure="1. Check file exists\n2. Basic error handling\n3. Simple output", | |
specific_patterns=[], | |
expected_outputs=["Error message"], | |
error_handling="Try-catch with informative errors" | |
) | |
# Quick guidance based on file type | |
if "log" in file_examination.file_type: | |
approach = "Log file analysis with pattern matching" | |
imports = ["re", "datetime", "collections"] | |
patterns = [r'\d{4}-\d{2}-\d{2}', r'ERROR|WARN|INFO', r'\d+\.\d+\.\d+\.\d+'] | |
outputs = ["Error counts", "Timeline analysis", "IP addresses"] | |
elif "csv" in file_examination.file_type: | |
approach = "CSV data analysis with statistical insights" | |
imports = ["pandas", "numpy", "re"] | |
patterns = [r'^\w+,', r'\d+', r'\w+@\w+'] | |
outputs = ["Data summary", "Column analysis", "Statistics"] | |
elif "json" in file_examination.file_type: | |
approach = "JSON structure analysis and data extraction" | |
imports = ["json", "re", "collections"] | |
patterns = [r'"[\w]+":', r'\{.*\}', r'\[.*\]'] | |
outputs = ["Structure overview", "Key extraction", "Value analysis"] | |
else: | |
approach = "General text analysis with pattern detection" | |
imports = ["re", "collections", "os"] | |
patterns = [r'\w+', r'\d+', r'[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}'] | |
outputs = ["Pattern summary", "Content analysis", "Statistics"] | |
return CodeGuidance( | |
analysis_approach=approach, | |
required_imports=imports, | |
code_structure=f"1. Load and validate file\n2. Apply {len(patterns)} specific patterns\n3. Generate insights\n4. Format results", | |
specific_patterns=patterns, | |
expected_outputs=outputs, | |
error_handling="Comprehensive error handling with informative messages" | |
) | |
def execute_guided_analysis_streaming(file_path: str, file_examination: FileExamination, | |
code_guidance: CodeGuidance, analysis_query: str = None) -> Generator[str, None, None]: | |
"""Execute the analysis with streaming progress updates.""" | |
try: | |
yield "### π **Initializing Analysis Environment**\n\n" | |
# Create analysis agent | |
try: | |
model = init_chat_model("gpt-4.1-2025-04-14", model_provider="openai") | |
agent = create_analysis_agent(file_path, model) | |
yield "β Analysis agent initialized successfully\n\n" | |
except Exception as e: | |
yield f"β Failed to initialize agent: {str(e)}\n\n" | |
return | |
yield "### π **Generating Analysis Code**\n\n" | |
# Create analysis prompt | |
user_analysis = analyze_user_question(analysis_query or "Comprehensive analysis") | |
analysis_prompt = f""" | |
Analyze the uploaded file based on this guidance: | |
**File Information:** | |
- Type: {file_examination.file_type} | |
- Structure: {file_examination.structure_pattern} | |
- Format: {file_examination.data_format} | |
- Complexity: {file_examination.complexity_level} | |
**User Question Analysis:** | |
- Intent: {user_analysis['intent']} | |
- Focus Areas: {user_analysis['focus_areas']} | |
- Analysis Type: {user_analysis['analysis_type']} | |
**Generated Guidance:** | |
- Approach: {code_guidance.analysis_approach} | |
- Required Imports: {code_guidance.required_imports} | |
- Patterns to Use: {code_guidance.specific_patterns} | |
- Expected Outputs: {code_guidance.expected_outputs} | |
**User's Specific Question:** {analysis_query or 'Provide comprehensive analysis'} | |
Please write Python code that follows this guidance and analyzes the file. The file is available at the virtual path '/uploaded_file.log'. | |
""" | |
yield "β Analysis prompt prepared\n\n" | |
yield "### β‘ **Running AI Analysis**\n\n" | |
# Execute analysis - let's go back to a simpler approach that works | |
try: | |
async def run_simple_analysis(): | |
# Try the original invoke method first to make sure it works | |
result = await agent.ainvoke({"messages": [{"role": "user", "content": analysis_prompt}]}) | |
return result | |
yield "π€ AI model is analyzing your file...\n\n" | |
result = asyncio.run(run_simple_analysis()) | |
# Debug: Show what we got | |
yield "### π **AI Analysis Debug Info**\n\n" | |
yield f"**Result Type:** {type(result)}\n" | |
yield f"**Result Keys:** {list(result.keys()) if isinstance(result, dict) else 'Not a dict'}\n\n" | |
# Display the thought process and generated code | |
yield "### π **AI Thought Process & Generated Code**\n\n" | |
yield f"π **AI Analysis Strategy:**\n" | |
yield f"- File Type: {file_examination.file_type}\n" | |
yield f"- Approach: {code_guidance.analysis_approach}\n" | |
yield f"- Patterns: {len(code_guidance.specific_patterns)} regex patterns\n" | |
yield f"- Question Focus: {user_analysis['analysis_type']}\n\n" | |
# Extract content from result and find code blocks in ALL messages | |
all_content = "" | |
code_blocks = [] | |
final_result = "" | |
if result and isinstance(result, dict) and "messages" in result: | |
messages = result["messages"] | |
yield f"**Messages Count:** {len(messages)}\n\n" | |
import re | |
# Check each message for code blocks and content | |
for i, msg in enumerate(messages): | |
if hasattr(msg, 'content'): | |
msg_content = msg.content | |
msg_type = type(msg).__name__ | |
yield f"**Message {i+1} Type:** {msg_type}\n" | |
yield f"**Message {i+1} Content Preview:** {msg_content[:100]}...\n\n" | |
# Look for code blocks in this specific message | |
if msg_type == "AIMessage": # Code is usually in AI messages | |
# Try multiple patterns | |
python_blocks = re.findall(r'```python\n(.*?)\n```', msg_content, re.DOTALL) | |
generic_blocks = re.findall(r'```\n(.*?)\n```', msg_content, re.DOTALL) | |
if python_blocks: | |
code_blocks.extend(python_blocks) | |
yield f" π§ **Found {len(python_blocks)} Python code blocks in this message!**\n\n" | |
elif generic_blocks: | |
code_blocks.extend(generic_blocks) | |
yield f" π§ **Found {len(generic_blocks)} generic code blocks in this message!**\n\n" | |
all_content += msg_content + "\n" | |
# Get final result from last AI message | |
for msg in reversed(messages): | |
if hasattr(msg, 'content') and type(msg).__name__ == "AIMessage": | |
final_result = msg.content | |
break | |
else: | |
yield "β Unexpected result format\n\n" | |
all_content = str(result) | |
final_result = str(result) | |
if code_blocks: | |
yield "π§ **Generated Python Code:**\n\n" | |
for i, code_block in enumerate(code_blocks, 1): | |
clean_code = code_block.strip() | |
yield f"**Code Block {i}:**\n" | |
yield f"```python\n{clean_code}\n```\n\n" | |
yield "β‘ **Code Execution Results:**\n\n" | |
else: | |
yield "π **No code blocks found in any message**\n\n" | |
yield f"**All Content Preview:**\n```\n{all_content[:500]}...\n```\n\n" | |
# Show final results | |
yield "### π **Analysis Results**\n\n" | |
yield final_result | |
yield "\n\n" | |
except Exception as e: | |
yield f"β Error during analysis execution: {str(e)}\n\n" | |
except Exception as e: | |
yield f"β Error in analysis setup: {str(e)}\n\n" |