DataForge / agent.py
ai-puppy
save
3495357
raw
history blame
8.15 kB
import asyncio
import inspect
import uuid
import os
from typing import Any
from langchain.chat_models import init_chat_model
from langchain_sandbox import PyodideSandbox
from langgraph_codeact import EvalCoroutine, create_codeact
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv())
class FileInjectedPyodideSandbox(PyodideSandbox):
"""Custom PyodideSandbox that can inject files into the virtual filesystem."""
def __init__(self, file_path: str = None, virtual_path: str = "/server.log", **kwargs):
super().__init__(**kwargs)
self.file_path = file_path
self.virtual_path = virtual_path
self._file_injected = False
async def execute(self, code: str, **kwargs):
# If we have a file to inject, prepend the injection code to the user code
if self.file_path and os.path.exists(self.file_path):
print(f"Injecting file {self.file_path} into execution")
try:
with open(self.file_path, 'r') as f:
file_content = f.read()
# Use base64 encoding to avoid string literal issues
import base64
encoded_content = base64.b64encode(file_content.encode('utf-8')).decode('ascii')
# Prepend file injection code to user code
injection_code = f'''
# File injection code - inject {self.virtual_path}
import base64
import os
# Decode the log file content from base64
encoded_content = """{encoded_content}"""
file_content = base64.b64decode(encoded_content).decode('utf-8')
# Create the file on disk for compatibility
with open("{self.virtual_path}", 'w') as f:
f.write(file_content)
# Make the content directly available as variables for analysis
log_lines = file_content.splitlines()
total_lines = len(log_lines)
print(f"[INJECTION] Successfully created {self.virtual_path} with {{len(file_content)}} characters")
print(f"[INJECTION] File content available as 'file_content' variable ({{len(file_content)}} chars)")
print(f"[INJECTION] Log lines available as 'log_lines' variable ({{total_lines}} lines)")
# Verify injection worked
if os.path.exists("{self.virtual_path}"):
print(f"[INJECTION] File {self.virtual_path} exists and ready for use")
else:
print(f"[INJECTION] ERROR: Failed to create {self.virtual_path}")
# Variables now available for analysis:
# - file_content: raw file content as string
# - log_lines: list of individual log lines
# - total_lines: number of lines in the log
# - File also available at: {self.virtual_path}
# End of injection code
'''
# Combine injection code with user code
combined_code = injection_code + "\n" + code
print(f"Combined code length: {len(combined_code)}")
return await super().execute(combined_code, **kwargs)
except Exception as e:
print(f"Error preparing file injection: {e}")
return await super().execute(code, **kwargs)
else:
return await super().execute(code, **kwargs)
def create_pyodide_eval_fn(sandbox: PyodideSandbox) -> EvalCoroutine:
"""Create an eval_fn that uses PyodideSandbox.
"""
async def async_eval_fn(
code: str, _locals: dict[str, Any]
) -> tuple[str, dict[str, Any]]:
# Create a wrapper function that will execute the code and return locals
wrapper_code = f"""
def execute():
try:
# Execute the provided code
{chr(10).join(" " + line for line in code.strip().split(chr(10)))}
return locals()
except Exception as e:
return {{"error": str(e)}}
execute()
"""
# Convert functions in _locals to their string representation
context_setup = ""
for key, value in _locals.items():
if callable(value):
# Get the function's source code
try:
src = inspect.getsource(value)
context_setup += f"\n{src}"
except:
# If we can't get source, skip it
pass
else:
context_setup += f"\n{key} = {repr(value)}"
try:
# Combine context setup and the actual code
full_code = context_setup + "\n\n" + wrapper_code
# Execute the code and get the result
response = await sandbox.execute(code=full_code)
# Check if execution was successful
if response.stderr:
return f"Error during execution: {response.stderr}", {}
# Get the output from stdout
output = (
response.stdout
if response.stdout
else "<Code ran, no output printed to stdout>"
)
result = response.result
# If there was an error in the result, return it
if isinstance(result, dict) and "error" in result:
return f"Error during execution: {result['error']}", {}
# Get the new variables by comparing with original locals
new_vars = {
k: v
for k, v in result.items()
if k not in _locals and not k.startswith("_")
}
return output, new_vars
except Exception as e:
return f"Error during PyodideSandbox execution: {repr(e)}", {}
return async_eval_fn
def read_file(file_path: str) -> str:
"""Read a file and return its content."""
with open(file_path, "r") as file:
return file.read()
tools = []
model = init_chat_model("gpt-4.1-2025-04-14", model_provider="openai")
# Specify the log file path
log_file_path = "/Users/hw/Desktop/codeact_agent/server.log"
# Create our custom sandbox with file injection capability
sandbox = FileInjectedPyodideSandbox(
file_path=log_file_path,
virtual_path="/server.log",
allow_net=True
)
eval_fn = create_pyodide_eval_fn(sandbox)
code_act = create_codeact(model, tools, eval_fn)
agent = code_act.compile()
query = """
Analyze these server logs and provide:
1. Security threat summary - identify attack patterns, suspicious IPs, and breach attempts
2. Performance bottlenecks - find slow endpoints, database issues, and resource constraints
3. User behavior analysis - login patterns, most accessed endpoints, session durations
4. System health report - error rates, critical alerts, and infrastructure issues
5. Recommended actions based on the analysis
LOG FORMAT INFORMATION:
The server logs follow this format:
YYYY-MM-DD HH:MM:SS [LEVEL] event_type: key=value, key=value, ...
Sample log entries:
- 2024-01-15 08:23:45 [INFO] user_login: user=john_doe, ip=192.168.1.100, success=true
- 2024-01-15 08:24:12 [INFO] api_request: endpoint=/api/users, method=GET, user=john_doe, response_time=45ms
- 2024-01-15 08:27:22 [WARN] failed_login: user=admin, ip=203.45.67.89, attempts=3
- 2024-01-15 08:38:33 [CRITICAL] security_alert: suspicious_activity, ip=185.234.72.19, pattern=sql_injection_attempt
- 2024-01-15 08:26:01 [ERROR] database_connection: host=db-primary, error=timeout, duration=30s
Key log levels: INFO, WARN, ERROR, CRITICAL
Key event types: user_login, user_logout, api_request, failed_login, security_alert, database_connection, etc.
DATA SOURCES AVAILABLE:
- `file_content`: Raw log content as a string
- `log_lines`: List of individual log lines
- `total_lines`: Number of lines in the log
- File path: `/server.log` (can be read with open('/server.log', 'r'))
Generate python code and run it in the sandbox to get the analysis.
"""
async def run_agent(query: str):
# Stream agent outputs
async for typ, chunk in agent.astream(
{"messages": query},
stream_mode=["values", "messages"],
):
if typ == "messages":
print(chunk[0].content, end="")
elif typ == "values":
print("\n\n---answer---\n\n", chunk)
if __name__ == "__main__":
# Run the agent
asyncio.run(run_agent(query))