File size: 4,909 Bytes
0dd81f9
 
 
 
 
 
2c23242
8fe992b
 
 
 
68114ba
 
 
 
 
2c23242
 
68114ba
2c23242
0dd81f9
2c23242
 
68114ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0dd81f9
 
 
 
 
 
579b38c
 
 
0dd81f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2c23242
8259463
 
2c23242
b171242
 
2c23242
8259463
 
2c23242
8fe992b
2c23242
 
 
d6cadf9
2c23242
b171242
8fe992b
8259463
2c23242
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import os
import zipfile
import base64
import tempfile
import re
import shutil
from typing import Any
from smolagents.tools import Tool

class FinalAnswerTool(Tool):
    name = "final_answer"
    description = "Formats and presents final answers in a human-readable format, optionally providing a download option for code."
    inputs = {
        'answer': {'type': 'any', 'description': 'The final answer, which could be job listings, a general response, or file paths to be zipped'},
        'include_download': {'type': 'boolean', 'description': 'Whether to include a download link for the code (if applicable)', 'required': False, 'nullable': True}
    }
    output_type = "string"

    def forward(self, answer: Any, include_download: bool = False) -> str:
        """
        Determines the type of answer and formats it accordingly, optionally zipping referenced files.
        """
        if isinstance(answer, str):
            # Check if the answer contains code
            code_match = re.search(r"```(?:\w+)?\n(.*?)```", answer, re.DOTALL)
            if include_download and code_match:
                code = code_match.group(1)
                try:
                    # Create a temporary file
                    with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as tmpfile:
                        tmpfile.write(code.encode("utf-8"))
                        file_path = tmpfile.name

                    # Encode the file path to base64
                    with open(file_path, "rb") as f:
                        file_bytes = f.read()
                        base64_encoded = base64.b64encode(file_bytes).decode("utf-8")

                    # Create a download link
                    download_link = f"data:text/python;base64,{base64_encoded}"
                    return f"📌 **Final Answer:**\n\n{answer}\n\n[Download Code]({download_link})"
                except Exception as e:
                    return f"📌 **Final Answer:**\n\n{answer}\n\nError creating download link: {str(e)}"
            # Get the current working directory
            cwd = os.getcwd()

            # Check if the answer contains file paths
            file_paths = re.findall(r"("+re.escape(cwd)+r"/[^\s'\"]+)", answer)

            if not file_paths and isinstance(answer, list) and all(isinstance(item, str) for item in answer):
                file_paths = answer

            if file_paths:
                try:
                    # Create a temporary directory
                    with tempfile.TemporaryDirectory() as tmpdir:
                        # Copy files to the temporary directory
                        for file_path in file_paths:
                            # Create the directory structure if it doesn't exist
                            dest_path = os.path.join(tmpdir, os.path.relpath(file_path, cwd))
                            os.makedirs(os.path.dirname(dest_path), exist_ok=True)
                            shutil.copy2(file_path, dest_path)

                        # Create a zip archive
                        zip_filename = os.path.join(tmpdir, "download.zip")
                        with zipfile.ZipFile(zip_filename, "w", zipfile.ZIP_DEFLATED) as zipf:
                            for root, _, files in os.walk(tmpdir):
                                for file in files:
                                    file_path = os.path.join(root, file)
                                    zipf.write(file_path, os.path.relpath(file_path, tmpdir))

                        # Encode the zip file to base64
                        with open(zip_filename, "rb") as f:
                            zip_bytes = f.read()
                            base64_encoded = base64.b64encode(zip_bytes).decode("utf-8")

                        return "data:application/zip;base64," + base64_encoded

                except Exception as e:
                    return f"Error creating zip file: {str(e)}"

            else:
                return f"📌 **Final Answer:**\n\n{answer}"

        elif isinstance(answer, list) and all(isinstance(job, dict) for job in answer):
            if not answer:
                return "⚠️ No job listings found."

            formatted_output = "**🔍 Job Listings Found**\n\n"

            for idx, job in enumerate(answer, start=1):
                title = job.get("Title", "Unknown Job Title")
                company = job.get("Company", "Unknown Company")
                location = job.get("Location", "Anywhere")

                formatted_output += (
                    f"**{idx}. {title}**\n"
                    f"   - **Company:** {company}\n"
                    f"   - **Location:** {location}\n\n"
                )
            return formatted_output.strip()

        # Case 3: If it's an unexpected format, return it as a formatted string
        else:
            return f"📌 **Final Answer:**\n\n```{str(answer)}```"