|
import os |
|
import zipfile |
|
import base64 |
|
import tempfile |
|
import re |
|
import shutil |
|
from typing import Any |
|
from smolagents.tools import Tool |
|
|
|
class FinalAnswerTool(Tool): |
|
name = "final_answer" |
|
description = "Formats and presents final answers in a human-readable format, optionally providing a download option for code." |
|
inputs = { |
|
'answer': {'type': 'any', 'description': 'The final answer, which could be job listings, a general response, or file paths to be zipped'}, |
|
'include_download': {'type': 'boolean', 'description': 'Whether to include a download link for the code (if applicable)', 'required': False, 'nullable': True} |
|
} |
|
output_type = "string" |
|
|
|
def forward(self, answer: Any, include_download: bool = False) -> str: |
|
""" |
|
Determines the type of answer and formats it accordingly, optionally zipping referenced files. |
|
""" |
|
if isinstance(answer, str): |
|
|
|
code_match = re.search(r"```(?:\w+)?\n(.*?)```", answer, re.DOTALL) |
|
if include_download and code_match: |
|
code = code_match.group(1) |
|
try: |
|
|
|
with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as tmpfile: |
|
tmpfile.write(code.encode("utf-8")) |
|
file_path = tmpfile.name |
|
|
|
|
|
with open(file_path, "rb") as f: |
|
file_bytes = f.read() |
|
base64_encoded = base64.b64encode(file_bytes).decode("utf-8") |
|
|
|
|
|
download_link = f"data:text/python;base64,{base64_encoded}" |
|
return f"📌 **Final Answer:**\n\n{answer}\n\n[Download Code]({download_link})" |
|
except Exception as e: |
|
return f"📌 **Final Answer:**\n\n{answer}\n\nError creating download link: {str(e)}" |
|
|
|
cwd = os.getcwd() |
|
|
|
|
|
file_paths = re.findall(r"("+re.escape(cwd)+r"/[^\s'\"]+)", answer) |
|
|
|
if not file_paths and isinstance(answer, list) and all(isinstance(item, str) for item in answer): |
|
file_paths = answer |
|
|
|
if file_paths: |
|
try: |
|
|
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
|
|
for file_path in file_paths: |
|
|
|
dest_path = os.path.join(tmpdir, os.path.relpath(file_path, cwd)) |
|
os.makedirs(os.path.dirname(dest_path), exist_ok=True) |
|
shutil.copy2(file_path, dest_path) |
|
|
|
|
|
zip_filename = os.path.join(tmpdir, "download.zip") |
|
with zipfile.ZipFile(zip_filename, "w", zipfile.ZIP_DEFLATED) as zipf: |
|
for root, _, files in os.walk(tmpdir): |
|
for file in files: |
|
file_path = os.path.join(root, file) |
|
zipf.write(file_path, os.path.relpath(file_path, tmpdir)) |
|
|
|
|
|
with open(zip_filename, "rb") as f: |
|
zip_bytes = f.read() |
|
base64_encoded = base64.b64encode(zip_bytes).decode("utf-8") |
|
|
|
return "data:application/zip;base64," + base64_encoded |
|
|
|
except Exception as e: |
|
return f"Error creating zip file: {str(e)}" |
|
|
|
else: |
|
return f"📌 **Final Answer:**\n\n{answer}" |
|
|
|
elif isinstance(answer, list) and all(isinstance(job, dict) for job in answer): |
|
if not answer: |
|
return "⚠️ No job listings found." |
|
|
|
formatted_output = "**🔍 Job Listings Found**\n\n" |
|
|
|
for idx, job in enumerate(answer, start=1): |
|
title = job.get("Title", "Unknown Job Title") |
|
company = job.get("Company", "Unknown Company") |
|
location = job.get("Location", "Anywhere") |
|
|
|
formatted_output += ( |
|
f"**{idx}. {title}**\n" |
|
f" - **Company:** {company}\n" |
|
f" - **Location:** {location}\n\n" |
|
) |
|
return formatted_output.strip() |
|
|
|
|
|
else: |
|
return f"📌 **Final Answer:**\n\n```{str(answer)}```" |
|
|