Spaces:
Running
Running
import asyncio | |
import inspect | |
import uuid | |
import os | |
import tempfile | |
import shutil | |
from typing import Any | |
from langchain.chat_models import init_chat_model | |
from langchain_sandbox import PyodideSandbox | |
from langgraph_codeact import EvalCoroutine, create_codeact | |
from dotenv import find_dotenv, load_dotenv | |
load_dotenv(find_dotenv()) | |
class FileInjectedPyodideSandbox(PyodideSandbox): | |
"""Custom PyodideSandbox that can inject files into the virtual filesystem.""" | |
def __init__(self, file_path: str = None, virtual_path: str = "/uploaded_file.log", sessions_dir: str = None, **kwargs): | |
# Create a temporary sessions directory if none provided | |
if sessions_dir is None: | |
sessions_dir = tempfile.mkdtemp(prefix="pyodide_sessions_") | |
super().__init__(sessions_dir=sessions_dir, **kwargs) | |
self.file_path = file_path | |
self.virtual_path = virtual_path | |
self._file_injected = False | |
self._temp_sessions_dir = sessions_dir | |
self._created_temp_dir = sessions_dir is None | |
async def execute(self, code: str, **kwargs): | |
# If we have a file to inject, prepend the injection code to the user code | |
if self.file_path and os.path.exists(self.file_path): | |
print(f"Injecting file {self.file_path} into execution") | |
try: | |
with open(self.file_path, 'r') as f: | |
file_content = f.read() | |
# Use base64 encoding to avoid string literal issues | |
import base64 | |
encoded_content = base64.b64encode(file_content.encode('utf-8')).decode('ascii') | |
# Prepend file injection code to user code | |
injection_code = f''' | |
# File injection code - inject {self.virtual_path} | |
import base64 | |
import os | |
# Decode the file content from base64 | |
encoded_content = """{encoded_content}""" | |
file_content = base64.b64decode(encoded_content).decode('utf-8') | |
# Create the file on disk for compatibility | |
with open("{self.virtual_path}", 'w') as f: | |
f.write(file_content) | |
# Make the content directly available as variables for analysis | |
log_lines = file_content.splitlines() | |
total_lines = len(log_lines) | |
print(f"[INJECTION] Successfully created {self.virtual_path} with {{len(file_content)}} characters") | |
print(f"[INJECTION] File content available as 'file_content' variable ({{len(file_content)}} chars)") | |
print(f"[INJECTION] Lines available as 'log_lines' variable ({{total_lines}} lines)") | |
# Verify injection worked | |
if os.path.exists("{self.virtual_path}"): | |
print(f"[INJECTION] File {self.virtual_path} exists and ready for use") | |
else: | |
print(f"[INJECTION] ERROR: Failed to create {self.virtual_path}") | |
# Variables now available for analysis: | |
# - file_content: raw file content as string | |
# - log_lines: list of individual lines | |
# - total_lines: number of lines in the file | |
# - File also available at: {self.virtual_path} | |
# End of injection code | |
''' | |
# Combine injection code with user code | |
combined_code = injection_code + "\n" + code | |
print(f"Combined code length: {len(combined_code)}") | |
return await super().execute(combined_code, **kwargs) | |
except Exception as e: | |
print(f"Error preparing file injection: {e}") | |
return await super().execute(code, **kwargs) | |
else: | |
return await super().execute(code, **kwargs) | |
def cleanup(self): | |
"""Clean up temporary directories if we created them.""" | |
if self._created_temp_dir and self._temp_sessions_dir and os.path.exists(self._temp_sessions_dir): | |
try: | |
shutil.rmtree(self._temp_sessions_dir) | |
print(f"Cleaned up temporary sessions directory: {self._temp_sessions_dir}") | |
except Exception as e: | |
print(f"Warning: Could not clean up temporary directory {self._temp_sessions_dir}: {e}") | |
def __del__(self): | |
"""Cleanup when object is destroyed.""" | |
self.cleanup() | |
def create_pyodide_eval_fn(sandbox: PyodideSandbox) -> EvalCoroutine: | |
"""Create an eval_fn that uses PyodideSandbox. | |
""" | |
async def async_eval_fn( | |
code: str, _locals: dict[str, Any] | |
) -> tuple[str, dict[str, Any]]: | |
# Create a wrapper function that will execute the code and return locals | |
wrapper_code = f""" | |
def execute(): | |
try: | |
# Execute the provided code | |
{chr(10).join(" " + line for line in code.strip().split(chr(10)))} | |
return locals() | |
except Exception as e: | |
return {{"error": str(e)}} | |
execute() | |
""" | |
# Convert functions in _locals to their string representation | |
context_setup = "" | |
for key, value in _locals.items(): | |
if callable(value): | |
# Get the function's source code | |
try: | |
src = inspect.getsource(value) | |
context_setup += f"\n{src}" | |
except: | |
# If we can't get source, skip it | |
pass | |
else: | |
context_setup += f"\n{key} = {repr(value)}" | |
try: | |
# Combine context setup and the actual code | |
full_code = context_setup + "\n\n" + wrapper_code | |
# Execute the code and get the result | |
response = await sandbox.execute(code=full_code) | |
# Check if execution was successful | |
if response.stderr: | |
return f"Error during execution: {response.stderr}", {} | |
# Get the output from stdout | |
output = ( | |
response.stdout | |
if response.stdout | |
else "<Code ran, no output printed to stdout>" | |
) | |
result = response.result | |
# If there was an error in the result, return it | |
if isinstance(result, dict) and "error" in result: | |
return f"Error during execution: {result['error']}", {} | |
# Get the new variables by comparing with original locals | |
new_vars = { | |
k: v | |
for k, v in result.items() | |
if k not in _locals and not k.startswith("_") | |
} | |
return output, new_vars | |
except Exception as e: | |
return f"Error during PyodideSandbox execution: {repr(e)}", {} | |
return async_eval_fn | |
def read_file(file_path: str) -> str: | |
"""Read a file and return its content.""" | |
with open(file_path, "r") as file: | |
return file.read() | |
def create_analysis_agent(file_path: str, model=None, virtual_path: str = "/uploaded_file.log", sessions_dir: str = None): | |
""" | |
Create a CodeAct agent configured for file analysis. | |
Args: | |
file_path: Path to the file to analyze | |
model: Language model to use (if None, will initialize default) | |
virtual_path: Virtual path where file will be mounted in sandbox | |
sessions_dir: Directory for PyodideSandbox sessions (if None, will create temp dir) | |
Returns: | |
Compiled CodeAct agent ready for analysis | |
""" | |
if model is None: | |
model = init_chat_model("gpt-4.1-2025-04-14", model_provider="openai") | |
# Create our custom sandbox with file injection capability | |
sandbox = FileInjectedPyodideSandbox( | |
file_path=file_path, | |
virtual_path=virtual_path, | |
sessions_dir=sessions_dir, | |
allow_net=True | |
) | |
eval_fn = create_pyodide_eval_fn(sandbox) | |
code_act = create_codeact(model, [], eval_fn) | |
return code_act.compile() | |
def get_default_analysis_query(file_extension: str = None) -> str: | |
""" | |
Get a default analysis query based on file type. | |
Args: | |
file_extension: File extension (e.g., '.log', '.csv', '.txt') | |
Returns: | |
Analysis query string | |
""" | |
if file_extension and file_extension.lower() in ['.log', '.txt']: | |
return """ | |
Analyze this uploaded file and provide comprehensive insights. Follow the example code patterns below for reliable analysis. | |
ANALYSIS REQUIREMENTS: | |
1. **Content Overview** - What type of data/logs this file contains | |
2. **Security Analysis** - Identify any security-related events, threats, or suspicious activities | |
3. **Performance Insights** - Find bottlenecks, slow operations, or performance issues | |
4. **Error Analysis** - Identify and categorize errors, warnings, and critical issues | |
5. **Statistical Summary** - Basic statistics (line count, data distribution, time ranges) | |
6. **Key Patterns** - Important patterns, trends, or anomalies found | |
7. **Recommendations** - Suggested actions based on the analysis | |
DATA SOURCES AVAILABLE: | |
- `file_content`: Raw file content as a string | |
- `log_lines`: List of individual lines | |
- `total_lines`: Number of lines in the file | |
- File path: `/uploaded_file.log` | |
EXAMPLE CODE PATTERNS TO FOLLOW: | |
Start with basic analysis, then add specific patterns based on your file type: | |
1. Import required libraries: re, Counter, defaultdict, datetime | |
2. Basic file statistics: total_lines, file_content length, sample lines | |
3. Pattern analysis using regex for security, performance, errors | |
4. Data extraction and frequency analysis | |
5. Clear formatted output with sections | |
6. Actionable recommendations | |
Use these code snippets as templates: | |
- Counter() for frequency analysis | |
- re.search() and re.findall() for pattern matching | |
- enumerate(log_lines, 1) for line-by-line processing | |
- defaultdict(list) for grouping findings | |
- Clear print statements with section headers | |
Generate Python code following these patterns. Always include proper error handling, clear output formatting, and actionable insights. | |
""" | |
else: | |
return """ | |
Analyze this uploaded file and provide comprehensive insights. Follow these reliable patterns: | |
ANALYSIS REQUIREMENTS: | |
1. **File Type Analysis** - What type of file this is and its structure | |
2. **Content Summary** - Overview of the file contents | |
3. **Key Information** - Important data points or patterns found | |
4. **Data Quality** - Assessment of data completeness and consistency | |
5. **Statistical Analysis** - Basic statistics and data distribution | |
6. **Insights & Findings** - Key takeaways from the analysis | |
7. **Recommendations** - Suggested next steps or insights | |
DATA SOURCES AVAILABLE: | |
- file_content: Raw file content as a string | |
- log_lines: List of individual lines | |
- total_lines: Number of lines in the file | |
- File path: /uploaded_file.log | |
RELIABLE CODE PATTERNS: | |
1. Start with basic stats: total_lines, len(file_content), file preview | |
2. Use Counter() for frequency analysis of patterns | |
3. Use re.findall() for extracting structured data like emails, IPs, dates | |
4. Analyze line structure and consistency | |
5. Calculate data quality metrics | |
6. Provide clear sections with === headers === | |
7. End with actionable recommendations | |
Focus on reliability over complexity. Use simple, proven Python patterns that work consistently. | |
Generate Python code following these guidelines for robust file analysis. | |
""" | |
async def run_file_analysis(file_path: str, query: str = None, model=None) -> str: | |
""" | |
Run file analysis using CodeAct agent. | |
Args: | |
file_path: Path to the file to analyze | |
query: Analysis query (if None, will use default based on file type) | |
model: Language model to use | |
Returns: | |
Analysis results as string | |
""" | |
if not os.path.exists(file_path): | |
return f"❌ File not found: {file_path}" | |
try: | |
# Create the agent | |
agent = create_analysis_agent(file_path, model) | |
# Use default query if none provided | |
if query is None: | |
file_ext = os.path.splitext(file_path)[1] | |
query = get_default_analysis_query(file_ext) | |
# Run the analysis | |
result_parts = [] | |
async for typ, chunk in agent.astream( | |
{"messages": query}, | |
stream_mode=["values", "messages"], | |
): | |
if typ == "messages": | |
result_parts.append(chunk[0].content) | |
elif typ == "values": | |
if chunk and "messages" in chunk: | |
final_message = chunk["messages"][-1] | |
if hasattr(final_message, 'content'): | |
result_parts.append(f"\n\n**Final Analysis:**\n{final_message.content}") | |
return "\n".join(result_parts) if result_parts else "Analysis completed but no output generated." | |
except Exception as e: | |
return f"❌ Error analyzing file: {str(e)}" | |
# Example usage and testing | |
if __name__ == "__main__": | |
# This section is for testing only - remove or comment out in production | |
import sys | |
if len(sys.argv) > 1: | |
test_file_path = sys.argv[1] | |
print(f"Testing with file: {test_file_path}") | |
async def test_analysis(): | |
result = await run_file_analysis(test_file_path) | |
print("Analysis Result:") | |
print("=" * 50) | |
print(result) | |
asyncio.run(test_analysis()) | |
else: | |
print("Usage: python agent.py <file_path>") | |
print("Or import this module and use the functions directly.") |