Spaces:
Runtime error
Runtime error
import asyncio | |
import ast | |
import os | |
import re | |
from typing import Annotated, Dict, List, Optional, Generator | |
from typing_extensions import TypedDict | |
from dotenv import find_dotenv, load_dotenv | |
from langchain.chat_models import init_chat_model | |
from langgraph.graph import END, START, StateGraph | |
from pydantic import BaseModel, Field | |
# Import your existing agent functionality | |
from agent import create_analysis_agent, FileInjectedPyodideSandbox, create_pyodide_eval_fn | |
load_dotenv(find_dotenv()) | |
# Initialize the language model | |
model = init_chat_model( | |
model="gpt-4.1-2025-04-14", | |
api_key=os.getenv("OPENAI_API_KEY"), | |
) | |
# Import classes from original graph.py | |
from graph import FileExamination, CodeGuidance, CodeAnalysisState, validate_python_code, analyze_user_question | |
def streaming_analyze_file_with_guidance(file_path: str, analysis_query: str = None) -> Generator[str, None, str]: | |
""" | |
Streaming version of guided file analysis that yields progress updates. | |
Args: | |
file_path: Path to the file to analyze | |
analysis_query: Optional specific analysis request | |
Yields: | |
Progress updates as strings | |
Returns: | |
Final analysis results | |
""" | |
try: | |
yield "π **Starting Guided File Analysis...**\n\n" | |
yield f"π **File:** `{os.path.basename(file_path)}`\n" | |
yield f"β **Question:** {analysis_query or 'Comprehensive analysis'}\n\n" | |
# Step 1: File Examination | |
yield "## π Step 1: Examining File Structure\n\n" | |
yield "Reading first 20 lines to understand file format and patterns...\n\n" | |
file_examination = examine_file_structure_streaming(file_path) | |
yield f"β **File Type Detected:** `{file_examination.file_type}`\n" | |
yield f"β **Structure Pattern:** {file_examination.structure_pattern}\n" | |
yield f"β **Data Format:** {file_examination.data_format}\n" | |
yield f"β **Complexity:** {file_examination.complexity_level}\n" | |
yield f"β **Key Patterns Found:** {len(file_examination.key_patterns)} patterns\n\n" | |
# Step 2: Code Guidance Generation | |
yield "## π― Step 2: Generating Analysis Strategy\n\n" | |
yield "Creating specific code guidance based on file structure and your question...\n\n" | |
code_guidance = generate_code_guidance_streaming(file_examination, analysis_query) | |
yield f"β **Analysis Approach:** {code_guidance.analysis_approach}\n" | |
yield f"β **Required Imports:** {', '.join(code_guidance.required_imports)}\n" | |
yield f"β **Specific Patterns:** {len(code_guidance.specific_patterns)} regex patterns ready\n" | |
yield f"β **Expected Outputs:** {len(code_guidance.expected_outputs)} result types\n\n" | |
# Step 3: Code Execution | |
yield "## π Step 3: Executing Analysis\n\n" | |
yield "Running guided code analysis with enhanced context...\n\n" | |
# Stream the execution results | |
execution_generator = execute_guided_analysis_streaming(file_path, file_examination, code_guidance, analysis_query) | |
execution_results = [] | |
for chunk in execution_generator: | |
yield chunk | |
execution_results.append(chunk) | |
# Final Summary | |
yield "\n\n## β Analysis Complete!\n\n" | |
final_analysis = f"""### π **Analysis Summary** | |
**File:** `{os.path.basename(file_path)}` | |
**Type:** {file_examination.file_type} ({file_examination.data_format}) | |
**Approach:** {code_guidance.analysis_approach} | |
**Complexity:** {file_examination.complexity_level} | |
**Guided Features Used:** | |
- β Structure-aware examination | |
- β Question-specific code generation | |
- β {len(code_guidance.specific_patterns)} targeted patterns | |
- β Enhanced error handling | |
--- | |
{''.join(execution_results)} | |
""" | |
yield final_analysis | |
return final_analysis | |
except Exception as e: | |
error_msg = f"β **Error in guided analysis:** {str(e)}\n\n" | |
yield error_msg | |
return error_msg | |
def examine_file_structure_streaming(file_path: str) -> FileExamination: | |
"""Examine file structure with minimal processing for streaming.""" | |
try: | |
if not os.path.exists(file_path): | |
return FileExamination( | |
file_type="error", | |
structure_pattern="File not found", | |
sample_lines=[], | |
key_patterns=[], | |
data_format="unknown", | |
complexity_level="Simple" | |
) | |
# Read first 20 lines | |
with open(file_path, 'r', encoding='utf-8') as f: | |
sample_lines = [] | |
for i, line in enumerate(f): | |
if i >= 20: | |
break | |
sample_lines.append(line.rstrip('\n\r')) | |
if not sample_lines: | |
sample_lines = ["<empty file>"] | |
# Quick analysis based on file extension and content | |
file_ext = os.path.splitext(file_path)[1].lower() | |
first_lines_text = '\n'.join(sample_lines[:5]) | |
# Simple file type detection | |
if file_ext in ['.log', '.txt']: | |
if 'ERROR' in first_lines_text or 'WARN' in first_lines_text: | |
file_type = "application_log" | |
structure_pattern = "Log entries with timestamps and severity levels" | |
key_patterns = ["timestamp", "log_level", "error_codes"] | |
else: | |
file_type = "text_log" | |
structure_pattern = "Plain text with line-based entries" | |
key_patterns = ["timestamps", "text_patterns"] | |
elif file_ext == '.csv': | |
file_type = "csv_data" | |
structure_pattern = "Comma-separated values with headers" | |
key_patterns = ["column_headers", "data_rows"] | |
elif file_ext == '.json': | |
file_type = "json_data" | |
structure_pattern = "Structured JSON data" | |
key_patterns = ["json_objects", "nested_data"] | |
else: | |
file_type = "generic_file" | |
structure_pattern = "Unknown structure" | |
key_patterns = ["general_patterns"] | |
return FileExamination( | |
file_type=file_type, | |
structure_pattern=structure_pattern, | |
sample_lines=sample_lines, | |
key_patterns=key_patterns, | |
data_format="structured" if file_ext in ['.csv', '.json'] else "unstructured", | |
complexity_level="Medium" | |
) | |
except Exception as e: | |
return FileExamination( | |
file_type="error", | |
structure_pattern=f"Error reading file: {str(e)}", | |
sample_lines=[], | |
key_patterns=[], | |
data_format="unknown", | |
complexity_level="Simple" | |
) | |
def generate_code_guidance_streaming(file_examination: FileExamination, analysis_query: str = None) -> CodeGuidance: | |
"""Generate code guidance with quick processing for streaming.""" | |
if not file_examination or file_examination.file_type == "error": | |
return CodeGuidance( | |
analysis_approach="Basic file analysis with error handling", | |
required_imports=["re", "os"], | |
code_structure="1. Check file exists\n2. Basic error handling\n3. Simple output", | |
specific_patterns=[], | |
expected_outputs=["Error message"], | |
error_handling="Try-catch with informative errors" | |
) | |
# Quick guidance based on file type | |
if "log" in file_examination.file_type: | |
approach = "Log file analysis with pattern matching" | |
imports = ["re", "datetime", "collections"] | |
patterns = [r'\d{4}-\d{2}-\d{2}', r'ERROR|WARN|INFO', r'\d+\.\d+\.\d+\.\d+'] | |
outputs = ["Error counts", "Timeline analysis", "IP addresses"] | |
elif "csv" in file_examination.file_type: | |
approach = "CSV data analysis with statistical insights" | |
imports = ["pandas", "numpy", "re"] | |
patterns = [r'^\w+,', r'\d+', r'\w+@\w+'] | |
outputs = ["Data summary", "Column analysis", "Statistics"] | |
elif "json" in file_examination.file_type: | |
approach = "JSON structure analysis and data extraction" | |
imports = ["json", "re", "collections"] | |
patterns = [r'"[\w]+":', r'\{.*\}', r'\[.*\]'] | |
outputs = ["Structure overview", "Key extraction", "Value analysis"] | |
else: | |
approach = "General text analysis with pattern detection" | |
imports = ["re", "collections", "os"] | |
patterns = [r'\w+', r'\d+', r'[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}'] | |
outputs = ["Pattern summary", "Content analysis", "Statistics"] | |
return CodeGuidance( | |
analysis_approach=approach, | |
required_imports=imports, | |
code_structure=f"1. Load and validate file\n2. Apply {len(patterns)} specific patterns\n3. Generate insights\n4. Format results", | |
specific_patterns=patterns, | |
expected_outputs=outputs, | |
error_handling="Comprehensive error handling with informative messages" | |
) | |
def execute_guided_analysis_streaming(file_path: str, file_examination: FileExamination, | |
code_guidance: CodeGuidance, analysis_query: str = None) -> Generator[str, None, None]: | |
"""Execute the analysis with streaming progress updates.""" | |
try: | |
yield "### π **Initializing Analysis Environment**\n\n" | |
# Create analysis agent | |
try: | |
model = init_chat_model("gpt-4.1-2025-04-14", model_provider="openai") | |
agent = create_analysis_agent(file_path, model) | |
yield "β Analysis agent initialized successfully\n\n" | |
except Exception as e: | |
yield f"β Failed to initialize agent: {str(e)}\n\n" | |
return | |
yield "### π **Generating Analysis Code**\n\n" | |
# Create analysis prompt | |
user_analysis = analyze_user_question(analysis_query or "Comprehensive analysis") | |
analysis_prompt = f""" | |
Analyze the uploaded file based on this guidance: | |
**File Information:** | |
- Type: {file_examination.file_type} | |
- Structure: {file_examination.structure_pattern} | |
- Format: {file_examination.data_format} | |
- Complexity: {file_examination.complexity_level} | |
**User Question Analysis:** | |
- Intent: {user_analysis['intent']} | |
- Focus Areas: {user_analysis['focus_areas']} | |
- Analysis Type: {user_analysis['analysis_type']} | |
**Generated Guidance:** | |
- Approach: {code_guidance.analysis_approach} | |
- Required Imports: {code_guidance.required_imports} | |
- Patterns to Use: {code_guidance.specific_patterns} | |
- Expected Outputs: {code_guidance.expected_outputs} | |
**User's Specific Question:** {analysis_query or 'Provide comprehensive analysis'} | |
Please write Python code that follows this guidance and analyzes the file. The file is available at the virtual path '/uploaded_file.log'. | |
""" | |
yield "β Analysis prompt prepared\n\n" | |
yield "### β‘ **Running AI Analysis**\n\n" | |
# Execute analysis | |
try: | |
async def run_analysis(): | |
result = await agent.ainvoke({"messages": [{"role": "user", "content": analysis_prompt}]}) | |
return result | |
yield "π€ AI model is analyzing your file...\n\n" | |
result = asyncio.run(run_analysis()) | |
# Extract the final message | |
if result and "messages" in result: | |
final_message = result["messages"][-1] | |
if hasattr(final_message, 'content'): | |
yield "### π **Analysis Results**\n\n" | |
yield final_message.content | |
yield "\n\n" | |
else: | |
yield "β No content in analysis result\n\n" | |
else: | |
yield "β Invalid analysis result format\n\n" | |
except Exception as e: | |
yield f"β Error during analysis execution: {str(e)}\n\n" | |
except Exception as e: | |
yield f"β Error in analysis setup: {str(e)}\n\n" |