Spaces:
Runtime error
Runtime error
import asyncio | |
import inspect | |
import uuid | |
import os | |
from typing import Any | |
from langchain.chat_models import init_chat_model | |
from langchain_sandbox import PyodideSandbox | |
from langgraph_codeact import EvalCoroutine, create_codeact | |
from dotenv import find_dotenv, load_dotenv | |
load_dotenv(find_dotenv()) | |
class FileInjectedPyodideSandbox(PyodideSandbox): | |
"""Custom PyodideSandbox that can inject files into the virtual filesystem.""" | |
def __init__(self, file_path: str = None, virtual_path: str = "/server.log", **kwargs): | |
super().__init__(**kwargs) | |
self.file_path = file_path | |
self.virtual_path = virtual_path | |
self._file_injected = False | |
async def execute(self, code: str, **kwargs): | |
# If we have a file to inject, prepend the injection code to the user code | |
if self.file_path and os.path.exists(self.file_path): | |
print(f"Injecting file {self.file_path} into execution") | |
try: | |
with open(self.file_path, 'r') as f: | |
file_content = f.read() | |
# Use base64 encoding to avoid string literal issues | |
import base64 | |
encoded_content = base64.b64encode(file_content.encode('utf-8')).decode('ascii') | |
# Prepend file injection code to user code | |
injection_code = f''' | |
# File injection code - inject {self.virtual_path} | |
import base64 | |
import os | |
# Decode the log file content from base64 | |
encoded_content = """{encoded_content}""" | |
file_content = base64.b64decode(encoded_content).decode('utf-8') | |
# Create the file on disk for compatibility | |
with open("{self.virtual_path}", 'w') as f: | |
f.write(file_content) | |
# Make the content directly available as variables for analysis | |
log_lines = file_content.splitlines() | |
total_lines = len(log_lines) | |
print(f"[INJECTION] Successfully created {self.virtual_path} with {{len(file_content)}} characters") | |
print(f"[INJECTION] File content available as 'file_content' variable ({{len(file_content)}} chars)") | |
print(f"[INJECTION] Log lines available as 'log_lines' variable ({{total_lines}} lines)") | |
# Verify injection worked | |
if os.path.exists("{self.virtual_path}"): | |
print(f"[INJECTION] File {self.virtual_path} exists and ready for use") | |
else: | |
print(f"[INJECTION] ERROR: Failed to create {self.virtual_path}") | |
# Variables now available for analysis: | |
# - file_content: raw file content as string | |
# - log_lines: list of individual log lines | |
# - total_lines: number of lines in the log | |
# - File also available at: {self.virtual_path} | |
# End of injection code | |
''' | |
# Combine injection code with user code | |
combined_code = injection_code + "\n" + code | |
print(f"Combined code length: {len(combined_code)}") | |
return await super().execute(combined_code, **kwargs) | |
except Exception as e: | |
print(f"Error preparing file injection: {e}") | |
return await super().execute(code, **kwargs) | |
else: | |
return await super().execute(code, **kwargs) | |
def create_pyodide_eval_fn(sandbox: PyodideSandbox) -> EvalCoroutine: | |
"""Create an eval_fn that uses PyodideSandbox. | |
""" | |
async def async_eval_fn( | |
code: str, _locals: dict[str, Any] | |
) -> tuple[str, dict[str, Any]]: | |
# Create a wrapper function that will execute the code and return locals | |
wrapper_code = f""" | |
def execute(): | |
try: | |
# Execute the provided code | |
{chr(10).join(" " + line for line in code.strip().split(chr(10)))} | |
return locals() | |
except Exception as e: | |
return {{"error": str(e)}} | |
execute() | |
""" | |
# Convert functions in _locals to their string representation | |
context_setup = "" | |
for key, value in _locals.items(): | |
if callable(value): | |
# Get the function's source code | |
try: | |
src = inspect.getsource(value) | |
context_setup += f"\n{src}" | |
except: | |
# If we can't get source, skip it | |
pass | |
else: | |
context_setup += f"\n{key} = {repr(value)}" | |
try: | |
# Combine context setup and the actual code | |
full_code = context_setup + "\n\n" + wrapper_code | |
# Execute the code and get the result | |
response = await sandbox.execute(code=full_code) | |
# Check if execution was successful | |
if response.stderr: | |
return f"Error during execution: {response.stderr}", {} | |
# Get the output from stdout | |
output = ( | |
response.stdout | |
if response.stdout | |
else "<Code ran, no output printed to stdout>" | |
) | |
result = response.result | |
# If there was an error in the result, return it | |
if isinstance(result, dict) and "error" in result: | |
return f"Error during execution: {result['error']}", {} | |
# Get the new variables by comparing with original locals | |
new_vars = { | |
k: v | |
for k, v in result.items() | |
if k not in _locals and not k.startswith("_") | |
} | |
return output, new_vars | |
except Exception as e: | |
return f"Error during PyodideSandbox execution: {repr(e)}", {} | |
return async_eval_fn | |
def read_file(file_path: str) -> str: | |
"""Read a file and return its content.""" | |
with open(file_path, "r") as file: | |
return file.read() | |
tools = [] | |
model = init_chat_model("gpt-4.1-2025-04-14", model_provider="openai") | |
# Specify the log file path | |
log_file_path = "/Users/hw/Desktop/codeact_agent/server.log" | |
# Create our custom sandbox with file injection capability | |
sandbox = FileInjectedPyodideSandbox( | |
file_path=log_file_path, | |
virtual_path="/server.log", | |
allow_net=True | |
) | |
eval_fn = create_pyodide_eval_fn(sandbox) | |
code_act = create_codeact(model, tools, eval_fn) | |
agent = code_act.compile() | |
query = """ | |
Analyze these server logs and provide: | |
1. Security threat summary - identify attack patterns, suspicious IPs, and breach attempts | |
2. Performance bottlenecks - find slow endpoints, database issues, and resource constraints | |
3. User behavior analysis - login patterns, most accessed endpoints, session durations | |
4. System health report - error rates, critical alerts, and infrastructure issues | |
5. Recommended actions based on the analysis | |
LOG FORMAT INFORMATION: | |
The server logs follow this format: | |
YYYY-MM-DD HH:MM:SS [LEVEL] event_type: key=value, key=value, ... | |
Sample log entries: | |
- 2024-01-15 08:23:45 [INFO] user_login: user=john_doe, ip=192.168.1.100, success=true | |
- 2024-01-15 08:24:12 [INFO] api_request: endpoint=/api/users, method=GET, user=john_doe, response_time=45ms | |
- 2024-01-15 08:27:22 [WARN] failed_login: user=admin, ip=203.45.67.89, attempts=3 | |
- 2024-01-15 08:38:33 [CRITICAL] security_alert: suspicious_activity, ip=185.234.72.19, pattern=sql_injection_attempt | |
- 2024-01-15 08:26:01 [ERROR] database_connection: host=db-primary, error=timeout, duration=30s | |
Key log levels: INFO, WARN, ERROR, CRITICAL | |
Key event types: user_login, user_logout, api_request, failed_login, security_alert, database_connection, etc. | |
DATA SOURCES AVAILABLE: | |
- `file_content`: Raw log content as a string | |
- `log_lines`: List of individual log lines | |
- `total_lines`: Number of lines in the log | |
- File path: `/server.log` (can be read with open('/server.log', 'r')) | |
Generate python code and run it in the sandbox to get the analysis. | |
""" | |
async def run_agent(query: str): | |
# Stream agent outputs | |
async for typ, chunk in agent.astream( | |
{"messages": query}, | |
stream_mode=["values", "messages"], | |
): | |
if typ == "messages": | |
print(chunk[0].content, end="") | |
elif typ == "values": | |
print("\n\n---answer---\n\n", chunk) | |
if __name__ == "__main__": | |
# Run the agent | |
asyncio.run(run_agent(query)) |