llama-models / app /email_tool.py
deniskiplimo816's picture
Upload 27 files
293ab16 verified
import os
import ssl
import smtplib
from datetime import datetime
from email.message import EmailMessage
from typing import Optional, List, Union
from pydantic import BaseModel, EmailStr, Field
# === ENV SETTINGS ===
EMAIL_SENDER = os.getenv("EMAIL_SENDER", "[email protected]")
EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD", "yourpassword")
SMTP_SERVER = os.getenv("SMTP_SERVER", "smtp.gmail.com")
SMTP_PORT = int(os.getenv("SMTP_PORT", 465))
USE_SSL = os.getenv("USE_SSL", "true").lower() in ("true", "1", "yes")
EMAIL_TEST_MODE = os.getenv("EMAIL_TEST_MODE", "false").lower() in ("true", "1", "yes")
# === Models ===
class EmailInput(BaseModel):
to: Union[EmailStr, List[EmailStr]] = Field(..., description="Recipient(s) email address")
subject: str = Field(..., description="Email subject line")
body: str = Field(..., description="Plaintext or HTML body")
html: Optional[bool] = Field(default=False, description="Send as HTML?")
attachments: Optional[List[str]] = Field(default=None, description="List of file paths to attach")
# === Email Body Generator ===
def generate_email(recipient_name: str, product_name: str, discount: float) -> str:
return f"""\
Dear {recipient_name},
πŸŽ‰ Great news! We're offering you an exclusive {discount}% discount on {product_name}.
Visit our site today and don't miss out!
Best regards,
Your AI Assistant
"""
# === Email Sending Core ===
def send_email(email_input: EmailInput) -> dict:
msg = EmailMessage()
msg["From"] = EMAIL_SENDER
msg["To"] = ", ".join(email_input.to) if isinstance(email_input.to, list) else email_input.to
msg["Subject"] = email_input.subject
if email_input.html:
msg.add_alternative(email_input.body, subtype="html")
else:
msg.set_content(email_input.body)
# Add attachments
if email_input.attachments:
for path in email_input.attachments:
if os.path.exists(path):
with open(path, "rb") as f:
data = f.read()
filename = os.path.basename(path)
msg.add_attachment(data, maintype="application", subtype="octet-stream", filename=filename)
# Early exit in test mode
if EMAIL_TEST_MODE:
return {
"status": "βœ… TEST MODE β€” email not actually sent",
"to": msg["To"],
"subject": msg["Subject"],
"timestamp": datetime.utcnow().isoformat()
}
# Send email via SMTP
try:
if USE_SSL:
context = ssl.create_default_context()
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, context=context) as smtp:
smtp.login(EMAIL_SENDER, EMAIL_PASSWORD)
smtp.send_message(msg)
else:
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as smtp:
smtp.ehlo()
smtp.starttls(context=ssl.create_default_context())
smtp.ehlo()
smtp.login(EMAIL_SENDER, EMAIL_PASSWORD)
smtp.send_message(msg)
return {
"status": "βœ… Email sent successfully",
"to": msg["To"],
"subject": msg["Subject"],
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
return {
"status": f"❌ Failed to send email: {e}",
"to": msg["To"],
"subject": msg["Subject"],
"timestamp": datetime.utcnow().isoformat()
}
# === Example usage ===
if __name__ == "__main__":
email_body = generate_email("Alice", "Super Serum", 25)
email_data = EmailInput(
to=["[email protected]", "[email protected]"],
subject="🎁 Special Deal Just For You!",
body=email_body,
html=False,
attachments=["/path/to/brochure.pdf"]
)
result = send_email(email_data)
print(result)