mohammedelfeky-ai's picture
Update app.py
6fc9a0b verified
raw
history blame
13.6 kB
from smolagents import CodeAgent, tool
import datetime
import requests
import pytz
import yaml
import os
import tempfile
from tools.final_answer import FinalAnswerTool
from tools.visit_webpage import VisitWebpageTool
from smolagents import GradioUI
import gradio as gr
import json
import os
from typing import Dict, List, Optional, Union, Any
# Create a custom model adapter for Gemini since it's not natively supported in smolagents 1.13.0
from smolagents.models import LLMAdapter
import google.generativeai as genai
class CustomGeminiAdapter(LLMAdapter):
"""
Custom adapter for Google's Gemini model.
This adapter lets us use Gemini with smolagents even if it's not natively supported.
"""
def __init__(
self,
model: str = "gemini-1.5-pro",
temperature: float = 0.7,
max_tokens: int = 2048,
api_key: Optional[str] = None,
):
"""Initialize the Gemini adapter."""
self.model = model
self.temperature = temperature
self.max_tokens = max_tokens
# Set up API key
if api_key:
genai.configure(api_key=api_key)
elif os.environ.get("GOOGLE_API_KEY"):
genai.configure(api_key=os.environ.get("GOOGLE_API_KEY"))
else:
raise ValueError("Google API key must be provided either through api_key parameter or GOOGLE_API_KEY environment variable")
# Configure the model
self.generation_config = {
"temperature": temperature,
"max_output_tokens": max_tokens,
"top_p": 0.95,
"top_k": 0,
}
def call(
self,
system_message: str,
messages: List[Dict[str, str]],
functions: Optional[List[Dict]] = None,
function_call: Optional[str] = None,
**kwargs,
) -> Dict[str, Any]:
"""
Call the Gemini model with messages and return the response.
Args:
system_message: System message to set context
messages: List of messages in the conversation
functions: Function definitions (for function calling)
function_call: Function to call
Returns:
Dictionary with model response
"""
try:
# Convert messages format to what Gemini expects
gemini_messages = []
# Add system message as user message at the beginning (Gemini doesn't have system)
if system_message:
gemini_messages.append({
"role": "user",
"parts": [{"text": f"System: {system_message}"}]
})
gemini_messages.append({
"role": "model",
"parts": [{"text": "I understand and will follow these instructions."}]
})
# Add the rest of the messages
for message in messages:
if message["role"] == "system":
# Handle system messages as user instructions
gemini_messages.append({
"role": "user",
"parts": [{"text": f"System instruction: {message['content']}"}]
})
else:
role = "user" if message["role"] == "user" else "model"
gemini_messages.append({
"role": role,
"parts": [{"text": message["content"]}]
})
# Create the Gemini model
model = genai.GenerativeModel(
model_name=self.model,
generation_config=self.generation_config
)
# For function calling (tools)
if functions and len(functions) > 0:
# Simulate function calling by adding function descriptions to the prompt
function_descriptions = []
for func in functions:
function_descriptions.append(f"""
Function Name: {func.get('name')}
Description: {func.get('description')}
Parameters: {json.dumps(func.get('parameters', {}))}
""")
function_context = """
You have access to the following functions. When you decide to use a function, respond with a JSON object with 'function_call' key containing 'name' and 'arguments' keys.
Example: {"function_call": {"name": "function_name", "arguments": {"arg1": "value1"}}}
Functions:
""" + "\n\n".join(function_descriptions)
# Add function description as the last user message
gemini_messages.append({
"role": "user",
"parts": [{"text": function_context}]
})
# Create a chat session
chat = model.start_chat(history=gemini_messages[:-1])
# Get the last message content
last_message = gemini_messages[-1]["parts"][0]["text"]
# Generate response
response = chat.send_message(last_message)
content = response.text
# Process the content to see if it contains a function call
function_call_data = None
if functions:
# Check if the response contains a function call format
import re
function_call_match = re.search(r'{\s*"function_call"\s*:\s*{.*?}\s*}', content, re.DOTALL)
if function_call_match:
try:
function_call_text = function_call_match.group(0)
function_call_data = json.loads(function_call_text)
# Remove the function call from the content
content = content.replace(function_call_text, "").strip()
except json.JSONDecodeError:
pass
# Create response format that matches what smolagents expects
result = {
"content": content
}
# Add function call if present
if function_call_data:
result["function_call"] = {
"name": function_call_data.get("function_call", {}).get("name", ""),
"arguments": function_call_data.get("function_call", {}).get("arguments", {})
}
return result
except Exception as e:
return {"content": f"Error calling Gemini model: {str(e)}"}
'''
# Below is an example of a tool that does nothing. Amaze us with your creativity !
@tool
def my_custom_tool(x:str, y:int)-> int: #it's import to specify the return type
#Keep this format for the description / args / args description but feel free to modify the tool
"""A tool that does nothing yet
Args:
arg1: the first argument
arg2: the second argument
"""
return "What magic will you build ?"
@tool
def get_current_time_in_timezone(timezone: str) -> str:
"""A tool that fetches the current local time in a specified timezone.
Args:
timezone: A string representing a valid timezone (e.g., 'America/New_York').
"""
try:
# Create timezone object
tz = pytz.timezone(timezone)
# Get current time in that timezone
local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")
return f"The current local time in {timezone} is: {local_time}"
except Exception as e:
return f"Error fetching time for timezone '{timezone}': {str(e)}"
'''
@tool
def create_document(text: str, format: str = "docx") -> str:
"""Creates a document with the provided text and allows download.
Args:
text: The text content to write to the document
format: The output format, either 'docx' or 'pdf'
"""
try:
import docx
from docx.shared import Pt
# Create a temp directory to store files
temp_dir = tempfile.mkdtemp()
# Create a new document
doc = docx.Document()
# Add a heading
doc.add_heading('Generated Document', 0)
# Set font style for regular text
style = doc.styles['Normal']
font = style.font
font.name = 'Calibri'
font.size = Pt(11)
# Add paragraphs from the input text
# Split by newlines to maintain paragraph structure
for paragraph in text.split('\n'):
if paragraph.strip(): # Skip empty paragraphs
doc.add_paragraph(paragraph)
# Save the document
docx_path = os.path.join(temp_dir, "generated_document.docx")
doc.save(docx_path)
# Convert to PDF if requested
if format.lower() == "pdf":
try:
from docx2pdf import convert
pdf_path = os.path.join(temp_dir, "generated_document.pdf")
convert(docx_path, pdf_path)
return pdf_path
except ImportError:
return f"PDF conversion requires docx2pdf package. Document saved as DOCX instead at: {docx_path}"
return docx_path
except Exception as e:
return f"Error creating document: {str(e)}"
# Custom file download tool to help with file handling
@tool
def get_file_download_link(file_path: str) -> str:
"""Creates a download link for a file.
Args:
file_path: Path to the file that should be made available for download
"""
if not os.path.exists(file_path):
return f"Error: File not found at {file_path}"
# Get file extension and set up appropriate mime type
_, file_extension = os.path.splitext(file_path)
mime_types = {
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.pdf': 'application/pdf',
}
mime_type = mime_types.get(file_extension.lower(), 'application/octet-stream')
# Return information that can be used by the agent to instruct the user
return f"File ready for download: {os.path.basename(file_path)} ({mime_type})"
final_answer = FinalAnswerTool()
#web_search=DuckDuckGoSearchTool()
visit_webpage=VisitWebpageTool()
# If the agent does not answer, the model is overloaded, please use another model or the following Hugging Face Endpoint that also contains qwen2.5 coder:
# model_id='https://pflgm2locj2t89co.us-east-1.aws.endpoints.huggingface.cloud'
# Load LLM
model = GeminiModel(
model="gemini-1.5-pro", # Using Gemini 1.5 Pro which is powerful but has free tier
temperature=0.5,
max_tokens=2048,
)
# Import tool from Hub
image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True)
with open("prompts.yaml", 'r') as stream:
prompt_templates = yaml.safe_load(stream)
#,web_search
agent = CodeAgent(
model=model,
tools=[final_answer,visit_webpage,create_document,get_file_download_link], ## add your tools here (don't remove final answer)
max_steps=6,
verbosity_level=1,
grammar=None,
planning_interval=None,
name=None,
description=None,
prompt_templates=prompt_templates
)
# Custom Gradio UI with file download capability
class CustomGradioUI(GradioUI):
def build_interface(self):
with gr.Blocks() as interface:
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("# AI Assistant")
chatbot = gr.Chatbot(height=600)
msg = gr.Textbox(
placeholder="Ask me anything...",
container=False,
scale=7,
)
# Add a file download component
download_btn = gr.Button("Download File", visible=False)
file_output = gr.File(label="Generated Document", visible=False)
# Store the latest file path
self._latest_file_path = None
def respond(message, chat_history):
agent_response = self.agent.run(message)
chat_history.append((message, agent_response))
# Check if response contains a file path
import re
file_paths = re.findall(r'File ready for download: .+ \((application/[\w.+-]+)\)', agent_response)
show_download = False
self._latest_file_path = None
# Look for generated file paths in the response
paths = re.findall(r'/tmp/\w+/generated_document\.(docx|pdf)', agent_response)
if paths:
self._latest_file_path = paths[0]
show_download = True
return chat_history, gr.Button.update(visible=show_download), gr.File.update(visible=False)
def prepare_download():
if self._latest_file_path:
return gr.File.update(value=self._latest_file_path, visible=True)
return gr.File.update(visible=False)
# Connect the components
msg.submit(respond, [msg, chatbot], [chatbot, download_btn, file_output])
download_btn.click(prepare_download, [], [file_output])
gr.Markdown("Powered by smolagents and Qwen")
return interface
GradioUI(agent).launch()