Spaces:
Running
Running
import os | |
import logging | |
from typing import List, Dict | |
import sympy as sp | |
import numpy as np | |
import scipy.linalg as la | |
import scipy.special as special | |
from llama_index.tools.code_interpreter import CodeInterpreterToolSpec | |
from scipy.integrate import quad | |
from scipy.stats import binom, norm, poisson | |
import numpy.fft as fft | |
from llama_index.core.agent.workflow import ReActAgent | |
from llama_index.core.tools import FunctionTool | |
from llama_index.llms.google_genai import GoogleGenAI | |
from llama_index.tools.wolfram_alpha import WolframAlphaToolSpec | |
# Setup logging | |
logger = logging.getLogger(__name__) | |
# --- Math Tool Functions (with enhanced logging and error handling) --- | |
# Helper decorator for error handling and logging | |
def math_tool_handler(func): | |
def wrapper(*args, **kwargs): | |
func_name = func.__name__ | |
logger.info(f"Executing math tool: {func_name} with args: {args}, kwargs: {kwargs}") | |
try: | |
result = func(*args, **kwargs) | |
logger.info(f"Tool {func_name} executed successfully. Result: {str(result)[:200]}...") | |
# Ensure result is serializable (convert numpy types if necessary) | |
if isinstance(result, np.ndarray): | |
return result.tolist() | |
if isinstance(result, (np.int_, np.intc, np.intp, np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64)): | |
return int(result) | |
if isinstance(result, (np.float_, np.float16, np.float32, np.float64)): | |
return float(result) | |
if isinstance(result, (np.complex_, np.complex64, np.complex128)): | |
return complex(result) | |
if isinstance(result, np.bool_): | |
return bool(result) | |
if isinstance(result, dict): | |
return {k: wrapper(v) if isinstance(v, (np.ndarray, np.number, np.bool_)) else v for k, v in result.items()} # Recursively handle dicts | |
return result | |
except (sp.SympifyError, TypeError, ValueError, np.linalg.LinAlgError, ZeroDivisionError) as e: | |
logger.warning(f"Math error in {func_name}: {e}") | |
return f"Error in {func_name}: {e}" | |
except Exception as e: | |
logger.error(f"Unexpected error in {func_name}: {e}", exc_info=True) | |
return f"Unexpected error in {func_name}: {e}" | |
return wrapper | |
# --- Symbolic math functions --- | |
def solve_symbolic_equation(equation: str, variable: str = "x") -> str: | |
"""Solve a symbolic equation (e.g., 'x**2 - 4') for the given variable.""" | |
symbol = sp.symbols(variable) | |
# Ensure equation is treated as expression == 0 if no equality sign | |
if "=" not in equation: | |
expr = sp.sympify(equation) | |
else: | |
lhs, rhs = equation.split("=", 1) | |
expr = sp.Eq(sp.sympify(lhs.strip()), sp.sympify(rhs.strip())) | |
solutions = sp.solve(expr, symbol) | |
return f"Solutions: {solutions}" | |
def compute_derivative(expression: str, variable: str = "x") -> str: | |
"""Compute the symbolic derivative of an expression (e.g., 'sin(x)*x**2').""" | |
symbol = sp.symbols(variable) | |
expr = sp.sympify(expression) | |
deriv = sp.diff(expr, symbol) | |
return f"Derivative: {deriv}" | |
def compute_integral(expression: str, variable: str = "x") -> str: | |
"""Compute the symbolic indefinite integral of an expression (e.g., '1/x').""" | |
symbol = sp.symbols(variable) | |
expr = sp.sympify(expression) | |
integ = sp.integrate(expr, symbol) | |
return f"Integral: {integ} + C" | |
def compute_limit( | |
expression: str, variable: str = "x", point: str = "oo" | |
) -> str: | |
"""Compute the limit of an expression (e.g., 'sin(x)/x') as variable approaches point (e.g., '0', 'oo').""" | |
symbol = sp.symbols(variable) | |
expr = sp.sympify(expression) | |
# Handle 'oo', '-oo', 'zoo' for infinity, or numerical points | |
if point.lower() == "oo": | |
pt = sp.oo | |
elif point.lower() == "-oo": | |
pt = -sp.oo | |
elif point.lower() == "zoo": | |
pt = sp.zoo # Complex infinity | |
else: | |
pt = sp.sympify(point) | |
lim = sp.limit(expr, symbol, pt) | |
return f"Limit at {point}: {lim}" | |
def simplify_expression(expression: str) -> str: | |
"""Simplify a symbolic expression (e.g., 'sin(x)**2 + cos(x)**2').""" | |
expr = sp.sympify(expression) | |
simp = sp.simplify(expr) | |
return f"Simplified expression: {simp}" | |
def expand_expression(expression: str) -> str: | |
"""Expand a symbolic expression (e.g., '(x+y)**2').""" | |
expr = sp.sympify(expression) | |
exp = sp.expand(expr) | |
return f"Expanded expression: {exp}" | |
def factor_expression(expression: str) -> str: | |
"""Factor a symbolic expression (e.g., 'x**2 - y**2').""" | |
expr = sp.sympify(expression) | |
fact = sp.factor(expr) | |
return f"Factored expression: {fact}" | |
# --- Matrix math functions --- | |
def matrix_addition(a: List[List[float]], b: List[List[float]]) -> List[List[float]]: | |
"""Add two matrices element-wise. Input: [[1, 2], [3, 4]], [[5, 6], [7, 8]].""" | |
A = np.array(a) | |
B = np.array(b) | |
if A.shape != B.shape: | |
raise ValueError("Matrices must have the same shape for addition.") | |
return (A + B) | |
def matrix_subtraction(a: List[List[float]], b: List[List[float]]) -> List[List[float]]: | |
"""Subtract matrix B from matrix A element-wise. Input: [[5, 6], [7, 8]], [[1, 2], [3, 4]].""" | |
A = np.array(a) | |
B = np.array(b) | |
if A.shape != B.shape: | |
raise ValueError("Matrices must have the same shape for subtraction.") | |
return (A - B) | |
def matrix_multiplication(a: List[List[float]], b: List[List[float]]) -> List[List[float]]: | |
"""Multiply two matrices. Input: [[1, 2], [3, 4]], [[5, 6], [7, 8]].""" | |
A = np.array(a) | |
B = np.array(b) | |
if A.shape[1] != B.shape[0]: | |
raise ValueError("Inner dimensions must match for matrix multiplication.") | |
return np.matmul(A, B) | |
def matrix_inverse(matrix: List[List[float]]) -> List[List[float]]: | |
"""Compute the inverse of a square matrix. Input: [[1, 2], [3, 4]].""" | |
M = np.array(matrix) | |
if M.shape[0] != M.shape[1]: | |
raise ValueError("Matrix must be square to compute inverse.") | |
return np.linalg.inv(M) | |
def matrix_determinant(matrix: List[List[float]]) -> float: | |
"""Compute the determinant of a square matrix. Input: [[1, 2], [3, 4]].""" | |
M = np.array(matrix) | |
if M.shape[0] != M.shape[1]: | |
raise ValueError("Matrix must be square to compute determinant.") | |
return np.linalg.det(M) | |
def matrix_transpose(matrix: List[List[float]]) -> List[List[float]]: | |
"""Transpose a matrix. Input: [[1, 2, 3], [4, 5, 6]].""" | |
M = np.array(matrix) | |
return M.T | |
def matrix_rank(matrix: List[List[float]]) -> int: | |
"""Compute the rank of a matrix. Input: [[1, 2], [2, 4]].""" | |
M = np.array(matrix) | |
return np.linalg.matrix_rank(M) | |
def matrix_trace(matrix: List[List[float]]) -> float: | |
"""Compute the trace of a square matrix. Input: [[1, 2], [3, 4]].""" | |
M = np.array(matrix) | |
if M.shape[0] != M.shape[1]: | |
raise ValueError("Matrix must be square to compute trace.") | |
return np.trace(M) | |
def matrix_norm(matrix: List[List[float]], ord_str: str = "fro") -> float: | |
"""Compute the norm of a matrix. ord_str can be 'fro' (Frobenius), 'nuc' (nuclear), inf, -inf, 1, -1, 2, -2. Input: [[1, 2], [3, 4]].""" | |
M = np.array(matrix) | |
ord_map = {"fro": "fro", "nuc": "nuc", "inf": np.inf, "-inf": -np.inf, "1": 1, "-1": -1, "2": 2, "-2": -2} | |
ord_val = ord_map.get(ord_str) | |
if ord_val is None: | |
raise ValueError(f"Invalid ord_str: {ord_str}. Must be one of {list(ord_map.keys())}") | |
return np.linalg.norm(M, ord=ord_val) | |
def eigenvalues(matrix: List[List[float]]) -> List[complex]: | |
"""Compute eigenvalues of a square matrix. Input: [[1, -1], [1, 1]].""" | |
M = np.array(matrix) | |
if M.shape[0] != M.shape[1]: | |
raise ValueError("Matrix must be square to compute eigenvalues.") | |
vals = np.linalg.eigvals(M) | |
return vals | |
def eigenvectors(matrix: List[List[float]]) -> List[List[complex]]: | |
"""Compute eigenvectors of a square matrix. Returns list of eigenvectors. Input: [[1, -1], [1, 1]].""" | |
M = np.array(matrix) | |
if M.shape[0] != M.shape[1]: | |
raise ValueError("Matrix must be square to compute eigenvectors.") | |
vals, vecs = np.linalg.eig(M) | |
# Return eigenvectors as rows or columns? Let's return as list of column vectors | |
return vecs.T # Transpose to get eigenvectors as list items | |
def svd_decompose(matrix: List[List[float]]) -> Dict[str, List]: | |
"""Compute the singular value decomposition (U, S, Vh) of a matrix. Input: [[1, 2], [3, 4], [5, 6]].""" | |
M = np.array(matrix) | |
U, S, Vh = np.linalg.svd(M) | |
return {"U": U, "S": S, "Vh": Vh} | |
def lu_decompose(matrix: List[List[float]]) -> Dict[str, List]: | |
"""Compute the LU decomposition (P, L, U) of a matrix. Input: [[1, 2], [3, 4]].""" | |
M = np.array(matrix) | |
P, L, U = la.lu(M) | |
return {"P": P, "L": L, "U": U} | |
def qr_decompose(matrix: List[List[float]]) -> Dict[str, List]: | |
"""Compute the QR decomposition (Q, R) of a matrix. Input: [[1, 2], [3, 4]].""" | |
M = np.array(matrix) | |
Q, R = np.linalg.qr(M) | |
return {"Q": Q, "R": R} | |
# --- Statistics functions --- | |
def mean(values: List[float]) -> float: | |
"""Compute the mean of a list of numbers. Input: [1, 2, 3, 4, 5].""" | |
if not values: | |
raise ValueError("Input list cannot be empty for mean calculation.") | |
return np.mean(np.array(values)) | |
def median(values: List[float]) -> float: | |
"""Compute the median of a list of numbers. Input: [1, 3, 2, 4, 5].""" | |
if not values: | |
raise ValueError("Input list cannot be empty for median calculation.") | |
return np.median(np.array(values)) | |
def std_dev(values: List[float], ddof: int = 1) -> float: | |
"""Compute the sample standard deviation (ddof=1) or population (ddof=0) of a list. Input: [1, 2, 3, 4, 5].""" | |
if not values or len(values) < ddof: | |
raise ValueError(f"Input list must have at least {ddof} elements for std dev with ddof={ddof}.") | |
return np.std(np.array(values), ddof=ddof) | |
def variance(values: List[float], ddof: int = 1) -> float: | |
"""Compute the sample variance (ddof=1) or population (ddof=0) of a list. Input: [1, 2, 3, 4, 5].""" | |
if not values or len(values) < ddof: | |
raise ValueError(f"Input list must have at least {ddof} elements for variance with ddof={ddof}.") | |
return np.var(np.array(values), ddof=ddof) | |
def percentile(values: List[float], percent: float) -> float: | |
"""Compute the q-th percentile (0<=q<=100) of a list. Input: [1, 2, 3, 4, 5], 75.""" | |
if not values: | |
raise ValueError("Input list cannot be empty for percentile calculation.") | |
if not (0 <= percent <= 100): | |
raise ValueError("Percent must be between 0 and 100.") | |
return np.percentile(np.array(values), percent) | |
def covariance(x: List[float], y: List[float], ddof: int = 1) -> float: | |
"""Compute sample covariance (ddof=1) or population (ddof=0) between two lists. Input: [1, 2, 3], [4, 5, 6].""" | |
X = np.array(x) | |
Y = np.array(y) | |
if X.size != Y.size: | |
raise ValueError("Input lists must have the same length for covariance.") | |
if X.size == 0 or X.size < ddof: | |
raise ValueError(f"Input lists must have at least {ddof} elements for covariance with ddof={ddof}.") | |
# np.cov returns the covariance matrix, we want the off-diagonal element | |
return np.cov(X, Y, ddof=ddof)[0, 1] | |
def correlation(x: List[float], y: List[float]) -> float: | |
"""Compute Pearson correlation coefficient between two lists. Input: [1, 2, 3], [1, 2, 3.1].""" | |
X = np.array(x) | |
Y = np.array(y) | |
if X.size != Y.size: | |
raise ValueError("Input lists must have the same length for correlation.") | |
if X.size < 2: | |
raise ValueError("Need at least 2 data points for correlation.") | |
# np.corrcoef returns the correlation matrix | |
corr_matrix = np.corrcoef(X, Y) | |
# Handle case where std dev is zero (results in nan) | |
if np.isnan(corr_matrix[0, 1]): | |
logger.warning("Correlation resulted in NaN, likely due to zero standard deviation in one or both inputs.") | |
# Return 0 or raise error? Let's return 0 for now. | |
return 0.0 | |
return corr_matrix[0, 1] | |
def linear_regression(x: List[float], y: List[float]) -> Dict[str, float]: | |
"""Perform simple linear regression (y = mx + c). Returns slope (m) and intercept (c). Input: [1, 2, 3], [2, 4.1, 5.9].""" | |
X = np.array(x) | |
Y = np.array(y) | |
if X.size != Y.size: | |
raise ValueError("Input lists must have the same length for linear regression.") | |
if X.size < 2: | |
raise ValueError("Need at least 2 data points for linear regression.") | |
slope, intercept = np.polyfit(X, Y, 1) | |
return {"slope": slope, "intercept": intercept} | |
# --- Numerical functions --- | |
def find_polynomial_roots(coefficients: List[float]) -> List[complex]: | |
"""Find roots of a polynomial given coefficients [a_n, a_n-1, ..., a_0]. Input: [1, -3, 2] for x^2-3x+2.""" | |
if not coefficients: | |
raise ValueError("Coefficient list cannot be empty.") | |
return np.roots(coefficients) | |
def interpolate_value(x_vals: List[float], y_vals: List[float], x: float) -> float: | |
"""Linear interpolate a value at x given data points (x_vals, y_vals). Input: [0, 1, 2], [0, 1, 4], 1.5.""" | |
if len(x_vals) != len(y_vals): | |
raise ValueError("x_vals and y_vals must have the same length.") | |
if len(x_vals) < 2: | |
raise ValueError("Need at least 2 data points for interpolation.") | |
# Ensure x_vals are sorted for np.interp | |
sorted_indices = np.argsort(x_vals) | |
x_sorted = np.array(x_vals)[sorted_indices] | |
y_sorted = np.array(y_vals)[sorted_indices] | |
return np.interp(x, x_sorted, y_sorted) | |
def numerical_integration( | |
func_str: str, a: float, b: float, variable: str = "x" | |
) -> float: | |
"""Numerically integrate func_str (e.g., 'x**2 * sin(x)') from a to b. Input: 'x**2', 0, 1.""" | |
symbol = sp.symbols(variable) | |
# Security Note: Using sympify/lambdify can be risky if func_str is untrusted. | |
# Consider using a safer evaluation method if input is external. | |
try: | |
func = sp.sympify(func_str) | |
f_lambdified = sp.lambdify(symbol, func, modules=["numpy"]) | |
except (sp.SympifyError, SyntaxError) as sym_err: | |
raise ValueError(f"Invalid function string: {func_str}. Error: {sym_err}") | |
result, abserr = quad(f_lambdified, a, b) | |
logger.info(f"Numerical integration estimated absolute error: {abserr}") | |
return result | |
def solve_ode( | |
func_str: str, y0: float, t_eval: List[float], args: tuple = () | |
) -> List[float]: | |
"""Solve a first-order ODE dy/dt = f(t, y) using scipy.integrate.solve_ivp. | |
func_str should define f(t, y), e.g., '-y + sin(t)'. | |
y0 is the initial condition y(t_eval[0]). | |
t_eval is the list of time points to evaluate the solution at. | |
args are optional additional arguments passed to f(t, y, *args). | |
Input: func_str='-y', y0=1, t_eval=[0, 1, 2, 3, 4].""" | |
from scipy.integrate import solve_ivp | |
import math # Make math functions available | |
# Security Note: Using eval is dangerous with untrusted input. | |
# A safer approach would parse the expression or use a restricted environment. | |
def ode_func(t, y, *args): | |
try: | |
# Provide t, y, args, and safe math functions in the eval context | |
local_vars = {"t": t, "y": y, "math": math, "np": np} | |
# Add args if provided | |
if args: | |
# Assuming args correspond to p1, p2, ... in the func_str | |
for i, arg_val in enumerate(args): | |
local_vars[f"p{i+1}"] = arg_val | |
return eval(func_str, {"__builtins__": {}}, local_vars) | |
except Exception as e: | |
# Log the error and raise it to be caught by the handler | |
logger.error(f"Error evaluating ODE function {func_str} at t={t}, y={y}: {e}") | |
raise ValueError(f"Error in ODE function definition: {e}") | |
if not t_eval: | |
raise ValueError("t_eval list cannot be empty.") | |
t_span = (min(t_eval), max(t_eval)) | |
sol = solve_ivp(ode_func, t_span, [y0], t_eval=t_eval, args=args) | |
if not sol.success: | |
raise RuntimeError(f"ODE solver failed: {sol.message}") | |
return sol.y[0] # Return the solution for y | |
# --- Vector functions --- | |
def dot_product(a: List[float], b: List[float]) -> float: | |
"""Compute dot product of two vectors. Input: [1, 2, 3], [4, 5, 6].""" | |
A = np.array(a) | |
B = np.array(b) | |
if A.shape != B.shape: | |
raise ValueError("Vectors must have the same dimension for dot product.") | |
return np.dot(A, B) | |
def cross_product(a: List[float], b: List[float]) -> List[float]: | |
"""Compute cross product of two 3D vectors. Input: [1, 0, 0], [0, 1, 0].""" | |
A = np.array(a) | |
B = np.array(b) | |
if A.size != 3 or B.size != 3: | |
raise ValueError("Cross product is only defined for 3D vectors.") | |
return np.cross(A, B) | |
def vector_magnitude(a: List[float]) -> float: | |
"""Compute magnitude (Euclidean norm) of a vector. Input: [3, 4].""" | |
if not a: | |
raise ValueError("Input vector cannot be empty.") | |
return np.linalg.norm(np.array(a)) | |
def vector_normalize(a: List[float]) -> List[float]: | |
"""Normalize a vector to unit length. Input: [3, 4].""" | |
A = np.array(a) | |
norm = np.linalg.norm(A) | |
if norm == 0: | |
raise ValueError("Cannot normalize a zero vector.") | |
return (A / norm) | |
def vector_angle(a: List[float], b: List[float], degrees: bool = False) -> float: | |
"""Compute the angle (in radians or degrees) between two vectors. Input: [1, 0], [0, 1].""" | |
dot = dot_product(a, b) # Use our handled dot_product | |
norm_a = vector_magnitude(a) | |
norm_b = vector_magnitude(b) | |
if norm_a == 0 or norm_b == 0: | |
raise ValueError("Cannot compute angle with zero vector(s).") | |
# Clip argument to arccos to avoid domain errors due to floating point inaccuracies | |
cos_theta = np.clip(dot / (norm_a * norm_b), -1.0, 1.0) | |
angle_rad = np.arccos(cos_theta) | |
return np.degrees(angle_rad) if degrees else angle_rad | |
# --- Probability functions --- | |
def binomial_pmf(k: int, n: int, p: float) -> float: | |
"""Compute binomial probability mass function P(X=k | n, p). Input: k=2, n=5, p=0.5.""" | |
if not (0 <= p <= 1): | |
raise ValueError("Probability p must be between 0 and 1.") | |
if not (0 <= k <= n): | |
raise ValueError("k must be between 0 and n (inclusive).") | |
return binom.pmf(k, n, p) | |
def normal_pdf(x: float, mu: float = 0, sigma: float = 1) -> float: | |
"""Compute normal distribution probability density function N(x | mu, sigma). Input: x=0, mu=0, sigma=1.""" | |
if sigma <= 0: | |
raise ValueError("Standard deviation sigma must be positive.") | |
return norm.pdf(x, mu, sigma) | |
def normal_cdf(x: float, mu: float = 0, sigma: float = 1) -> float: | |
"""Compute normal distribution cumulative distribution function P(X<=x | mu, sigma). Input: x=0, mu=0, sigma=1.""" | |
if sigma <= 0: | |
raise ValueError("Standard deviation sigma must be positive.") | |
return norm.cdf(x, mu, sigma) | |
def poisson_pmf(k: int, lam: float) -> float: | |
"""Compute Poisson probability mass function P(X=k | lambda). Input: k=2, lam=3.""" | |
if lam < 0: | |
raise ValueError("Rate parameter lambda must be non-negative.") | |
if k < 0 or not isinstance(k, int): | |
raise ValueError("k must be a non-negative integer.") | |
return poisson.pmf(k, lam) | |
# --- Special functions --- | |
def gamma_function(x: float) -> float: | |
"""Compute the gamma function Gamma(x). Input: 5.""" | |
return special.gamma(x) | |
def beta_function(x: float, y: float) -> float: | |
"""Compute the beta function B(x, y). Input: 2, 3.""" | |
return special.beta(x, y) | |
def erf_function(x: float) -> float: | |
"""Compute the error function erf(x). Input: 1.""" | |
return special.erf(x) | |
# --- Fourier Transform functions --- | |
def fft_transform(y: List[float]) -> List[complex]: | |
"""Compute the Fast Fourier Transform (FFT) of a real sequence y. Input: [0, 1, 0, -1].""" | |
if not y: | |
raise ValueError("Input list cannot be empty for FFT.") | |
return fft.fft(np.array(y)) | |
def ifft_transform(y_complex: List[complex]) -> List[complex]: | |
"""Compute the inverse Fast Fourier Transform (IFFT) of a complex sequence. Input: result from fft_transform.""" | |
if not y_complex: | |
raise ValueError("Input list cannot be empty for IFFT.") | |
return fft.ifft(np.array(y_complex)) | |
# --- Tool List Creation --- | |
def get_python_math_tools() -> List[FunctionTool]: | |
"""Returns a list of FunctionTools for the Python math functions.""" | |
py_tools = [ | |
# Symbolic | |
FunctionTool.from_defaults(fn=solve_symbolic_equation), | |
FunctionTool.from_defaults(fn=compute_derivative), | |
FunctionTool.from_defaults(fn=compute_integral), | |
FunctionTool.from_defaults(fn=compute_limit), | |
FunctionTool.from_defaults(fn=simplify_expression), | |
FunctionTool.from_defaults(fn=expand_expression), | |
FunctionTool.from_defaults(fn=factor_expression), | |
# Matrix | |
FunctionTool.from_defaults(fn=matrix_addition), | |
FunctionTool.from_defaults(fn=matrix_subtraction), | |
FunctionTool.from_defaults(fn=matrix_multiplication), | |
FunctionTool.from_defaults(fn=matrix_inverse), | |
FunctionTool.from_defaults(fn=matrix_determinant), | |
FunctionTool.from_defaults(fn=matrix_transpose), | |
FunctionTool.from_defaults(fn=matrix_rank), | |
FunctionTool.from_defaults(fn=matrix_trace), | |
FunctionTool.from_defaults(fn=matrix_norm), | |
FunctionTool.from_defaults(fn=eigenvalues), | |
FunctionTool.from_defaults(fn=eigenvectors), | |
FunctionTool.from_defaults(fn=svd_decompose), | |
FunctionTool.from_defaults(fn=lu_decompose), | |
FunctionTool.from_defaults(fn=qr_decompose), | |
# Statistics | |
FunctionTool.from_defaults(fn=mean), | |
FunctionTool.from_defaults(fn=median), | |
FunctionTool.from_defaults(fn=std_dev), | |
FunctionTool.from_defaults(fn=variance), | |
FunctionTool.from_defaults(fn=percentile), | |
FunctionTool.from_defaults(fn=covariance), | |
FunctionTool.from_defaults(fn=correlation), | |
FunctionTool.from_defaults(fn=linear_regression), | |
# Numerical | |
FunctionTool.from_defaults(fn=find_polynomial_roots), | |
FunctionTool.from_defaults(fn=interpolate_value), | |
FunctionTool.from_defaults(fn=numerical_integration), | |
FunctionTool.from_defaults(fn=solve_ode), | |
# Vector | |
FunctionTool.from_defaults(fn=dot_product), | |
FunctionTool.from_defaults(fn=cross_product), | |
FunctionTool.from_defaults(fn=vector_magnitude), | |
FunctionTool.from_defaults(fn=vector_normalize), | |
FunctionTool.from_defaults(fn=vector_angle), | |
# Probability | |
FunctionTool.from_defaults(fn=binomial_pmf), | |
FunctionTool.from_defaults(fn=normal_pdf), | |
FunctionTool.from_defaults(fn=normal_cdf), | |
FunctionTool.from_defaults(fn=poisson_pmf), | |
# Special Functions | |
FunctionTool.from_defaults(fn=gamma_function), | |
FunctionTool.from_defaults(fn=beta_function), | |
FunctionTool.from_defaults(fn=erf_function), | |
# Fourier | |
FunctionTool.from_defaults(fn=fft_transform), | |
FunctionTool.from_defaults(fn=ifft_transform), | |
] | |
# Update descriptions for clarity if needed (optional) | |
for tool in py_tools: | |
tool.metadata.description = f"(Python) {tool.metadata.description}" | |
logger.info(f"Created {len(py_tools)} Python math tools.") | |
return py_tools | |
# --- Wolfram Alpha Tool --- | |
_wolfram_alpha_tools = None | |
def get_wolfram_alpha_tools() -> List[FunctionTool]: | |
"""Initializes and returns Wolfram Alpha tools (singleton).""" | |
global _wolfram_alpha_tools | |
if _wolfram_alpha_tools is None: | |
logger.info("Initializing WolframAlphaToolSpec...") | |
wolfram_alpha_app_id = os.getenv("WOLFRAM_ALPHA_APP_ID") | |
if not wolfram_alpha_app_id: | |
logger.warning("WOLFRAM_ALPHA_APP_ID not set. Wolfram Alpha tools will be unavailable.") | |
_wolfram_alpha_tools = [] | |
else: | |
try: | |
spec = WolframAlphaToolSpec(app_id=wolfram_alpha_app_id) | |
_wolfram_alpha_tools = spec.to_tool_list() | |
# Add prefix to description for clarity | |
for tool in _wolfram_alpha_tools: | |
tool.metadata.description = f"(WolframAlpha) {tool.metadata.description}" | |
logger.info(f"WolframAlpha tools initialized: {len(_wolfram_alpha_tools)} tools.") | |
except Exception as e: | |
logger.error(f"Failed to initialize WolframAlpha tools: {e}", exc_info=True) | |
_wolfram_alpha_tools = [] | |
return _wolfram_alpha_tools | |
# Use LlamaIndex's built-in Code Interpreter Tool Spec for safe execution | |
# This assumes the necessary environment (e.g., docker) for the spec is available | |
try: | |
code_interpreter_spec = CodeInterpreterToolSpec() | |
# Get the tool(s) from the spec. It might return multiple tools. | |
code_interpreter_tools = code_interpreter_spec.to_tool_list() | |
if not code_interpreter_tools: | |
raise RuntimeError("CodeInterpreterToolSpec did not return any tools.") | |
# Assuming the primary tool is the first one, or find by name if necessary | |
code_interpreter_tool = next((t for t in code_interpreter_tools if t.metadata.name == "code_interpreter"), None) | |
if code_interpreter_tool is None: | |
raise RuntimeError("Could not find 'code_interpreter' tool in CodeInterpreterToolSpec results.") | |
logger.info("CodeInterpreterToolSpec initialized successfully.") | |
except Exception as e: | |
logger.error(f"Failed to initialize CodeInterpreterToolSpec: {e}", exc_info=True) | |
# Fallback: Define a dummy tool or raise error to prevent agent start? | |
# For now, let initialization fail if the safe interpreter isn't available. | |
raise RuntimeError("CodeInterpreterToolSpec failed to initialize. Cannot create code_agent.") from e | |
# --- Agent Initialization --- | |
def initialize_math_agent() -> ReActAgent: | |
"""Initializes the Math Agent with Python and Wolfram Alpha tools.""" | |
logger.info("Initializing MathAgent...") | |
# Configuration | |
agent_llm_model = os.getenv("MATH_AGENT_LLM_MODEL", "gemini-2.5-pro-preview-03-25") | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("GEMINI_API_KEY not found in environment variables for MathAgent.") | |
raise ValueError("GEMINI_API_KEY must be set for MathAgent") | |
try: | |
llm = GoogleGenAI( | |
api_key=gemini_api_key, | |
model=agent_llm_model, | |
temperature=0.05 | |
) | |
logger.info(f"Using agent LLM: {agent_llm_model}") | |
# Combine Python tools and Wolfram Alpha tools | |
all_tools = get_python_math_tools() + get_wolfram_alpha_tools() + [code_interpreter_tool] | |
if not all_tools: | |
logger.warning("No math tools available (Python or WolframAlpha). MathAgent may be ineffective.") | |
# System prompt (consider loading from file) | |
system_prompt = """\ | |
You are MathAgent, a powerful mathematical problem solver. Your goal is to accurately answer mathematical questions using the available tools. | |
Available Tools: | |
- Python Tools: A comprehensive suite for symbolic math (SymPy), numerical computation (NumPy/SciPy), statistics, linear algebra, calculus, ODEs, and transforms. Prefixed with '(Python)'. Use these for precise calculations when the method is clear. | |
- WolframAlpha Tool: Accesses Wolfram Alpha for complex queries, natural language math questions, data, and real-world facts. Prefixed with '(WolframAlpha)'. Use this for broader questions, knowledge-based math, or when Python tools are insufficient. | |
Workflow: | |
1. **Thought**: Analyze the question. Determine the mathematical concepts involved. Decide the best tool or sequence of tools to use. Prefer Python tools for specific, well-defined calculations. Use WolframAlpha for complex, ambiguous, or knowledge-based queries. | |
2. **Action**: Call the chosen tool with the correct arguments. Ensure inputs match the tool's requirements (e.g., list of lists for matrices, strings for symbolic expressions). | |
3. **Observation**: Examine the tool's output. Check for errors or unexpected results. | |
4. **Iteration**: If the result is incorrect or incomplete, rethink the approach. Try a different tool, adjust parameters, or break the problem down further. If a Python tool fails, consider rephrasing for WolframAlpha. | |
5. **Final Answer**: Once the correct answer is obtained, state it clearly and concisely. Provide the numerical result, symbolic expression, or explanation as requested. | |
6. **Hand-Off**: Pass the final mathematical result or analysis to **planner_agent** for integration into the overall response. | |
Constraints: | |
- Always use a tool for calculations; do not perform calculations yourself. | |
- Clearly state which tool you are using and why. | |
- Handle potential errors gracefully and report them if they prevent finding a solution. | |
- Pay close attention to input formats required by each tool (e.g., lists for vectors/matrices, strings for symbolic expressions). | |
If your response exceeds the maximum token limit and cannot be completed in a single reply, please conclude your output with the marker [CONTINUE]. In subsequent interactions, I will prompt you with “continue” to receive the next portion of the response. | |
""" | |
agent = ReActAgent( | |
name="math_agent", | |
description=( | |
"MathAgent solves mathematical problems using a suite of Python tools (SymPy, NumPy, SciPy) and WolframAlpha. " | |
"It handles symbolic math, numerical computation, statistics, linear algebra, calculus, and more." | |
), | |
tools=all_tools, | |
llm=llm, | |
system_prompt=system_prompt, | |
can_handoff_to=["planner_agent", "reasoning_agent"], | |
) | |
logger.info("MathAgent initialized successfully.") | |
return agent | |
except Exception as e: | |
logger.error(f"Error during MathAgent initialization: {e}", exc_info=True) | |
raise | |
# Example usage (for testing if run directly) | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger.info("Running math_agent.py directly for testing...") | |
# Ensure API keys are set for testing | |
required_keys = ["GEMINI_API_KEY"] # WOLFRAM_ALPHA_APP_ID is optional | |
missing_keys = [key for key in required_keys if not os.getenv(key)] | |
if missing_keys: | |
print(f"Error: Required environment variable(s) not set: {', '.join(missing_keys)}. Cannot run test.") | |
else: | |
if not os.getenv("WOLFRAM_ALPHA_APP_ID"): | |
print("Warning: WOLFRAM_ALPHA_APP_ID not set. WolframAlpha tools will be unavailable for testing.") | |
try: | |
test_agent = initialize_math_agent() | |
print("Math Agent initialized successfully for testing.") | |
# Example test | |
# result = test_agent.chat("What is the integral of x**2 from 0 to 1?") | |
# print(f"Test query result: {result}") | |
# result2 = test_agent.chat("what is the population of france?") # Test WolframAlpha | |
# print(f"Test query 2 result: {result2}") | |
except Exception as e: | |
print(f"Error during testing: {e}") | |