nguyennp86's picture
Deploy Email Classifier API
1291f7a
#!/usr/bin/env python3
"""
Automated deployment script for Hugging Face Spaces
"""
import os
import subprocess
import sys
import shutil
from pathlib import Path
import json
import requests
from datetime import datetime
class HuggingFaceDeployer:
def __init__(self, username: str, space_name: str, token: str = None):
self.username = username
self.space_name = space_name
self.token = token
self.space_url = f"https://huggingface.co/spaces/{username}/{space_name}"
self.git_url = f"https://huggingface.co/spaces/{username}/{space_name}.git"
def check_requirements(self):
"""Check if all required files exist"""
required_files = [
'app.py',
'requirements.txt',
'Dockerfile',
'README.md',
'Synthetic_Email_Dataset.csv'
]
missing_files = []
for file in required_files:
if not os.path.exists(file):
missing_files.append(file)
if missing_files:
print(f"❌ Missing required files: {', '.join(missing_files)}")
return False
print("βœ… All required files found")
return True
def setup_huggingface_space(self):
"""Create Hugging Face Space via API"""
if not self.token:
print("⚠️ No Hugging Face token provided. You'll need to create the space manually.")
print(f"Go to: https://huggingface.co/new-space")
input("Press Enter after creating the space...")
return True
headers = {"Authorization": f"Bearer {self.token}"}
data = {
"type": "space",
"name": self.space_name,
"sdk": "docker",
"private": False
}
response = requests.post(
f"https://huggingface.co/api/repos/create",
headers=headers,
json=data
)
if response.status_code == 201:
print(f"βœ… Space created successfully: {self.space_url}")
return True
elif response.status_code == 409:
print(f"ℹ️ Space already exists: {self.space_url}")
return True
else:
print(f"❌ Failed to create space: {response.text}")
return False
def prepare_files(self):
"""Prepare files for deployment"""
print("πŸ“ Preparing files...")
# Create README.md with proper header
readme_header = f'''---
title: {self.space_name.replace('-', ' ').title()}
emoji: πŸ“§
colorFrom: blue
colorTo: green
sdk: docker
app_port: 7860
pinned: false
license: mit
tags:
- email-classification
- naive-bayes
- text-classification
- nlp
- fastapi
---
'''
# Read existing README and add header
if os.path.exists('README.md'):
with open('README.md', 'r', encoding='utf-8') as f:
existing_content = f.read()
# Remove existing header if present
if existing_content.startswith('---'):
parts = existing_content.split('---', 2)
if len(parts) > 2:
existing_content = parts[2].strip()
with open('README.md', 'w', encoding='utf-8') as f:
f.write(readme_header + existing_content)
print("βœ… Files prepared")
return True
def clone_and_deploy(self):
"""Clone the space repository and deploy"""
print("πŸ”„ Cloning repository...")
# Create temporary directory
temp_dir = f"temp_{self.space_name}"
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
try:
# Clone the repository
result = subprocess.run([
'git', 'clone', self.git_url, temp_dir
], capture_output=True, text=True)
if result.returncode != 0:
print(f"❌ Failed to clone repository: {result.stderr}")
return False
# Copy files to the cloned repository
files_to_copy = [
'app.py',
'requirements.txt',
'Dockerfile',
'README.md',
'train_model.py',
'Synthetic_Email_Dataset.csv'
]
for file in files_to_copy:
if os.path.exists(file):
shutil.copy2(file, temp_dir)
print(f"πŸ“„ Copied {file}")
# Change to the repository directory
os.chdir(temp_dir)
# Configure git
subprocess.run(['git', 'config', 'user.email', '[email protected]'])
subprocess.run(['git', 'config', 'user.name', 'Auto Deployer'])
# Add all files
subprocess.run(['git', 'add', '.'])
# Commit changes
commit_message = f"Deploy Email Classifier API - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
result = subprocess.run(['git', 'commit', '-m', commit_message],
capture_output=True, text=True)
if result.returncode != 0:
print(f"ℹ️ Nothing to commit or commit failed: {result.stderr}")
# Push changes
if self.token:
# Use token for authentication
auth_url = self.git_url.replace('https://', f'https://oauth2:{self.token}@')
result = subprocess.run(['git', 'push', auth_url, 'main'],
capture_output=True, text=True)
else:
result = subprocess.run(['git', 'push', 'origin', 'main'],
capture_output=True, text=True)
if result.returncode == 0:
print("βœ… Successfully pushed to Hugging Face!")
print(f"πŸš€ Your API will be available at: {self.space_url}")
return True
else:
print(f"❌ Failed to push: {result.stderr}")
return False
except Exception as e:
print(f"❌ Deployment failed: {str(e)}")
return False
finally:
# Clean up
os.chdir('..')
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
def test_deployment(self):
"""Test the deployed API"""
print("πŸ§ͺ Testing deployed API...")
api_url = f"{self.space_url.replace('/spaces/', '/spaces/')}"
# Wait a bit for deployment
import time
print("⏳ Waiting for deployment to complete...")
time.sleep(30)
try:
# Test health endpoint
response = requests.get(f"{api_url}/health", timeout=30)
if response.status_code == 200:
print("βœ… Health check passed")
# Test prediction endpoint
test_response = requests.post(
f"{api_url}/predict",
json={"message": "Please find attached the document."},
timeout=30
)
if test_response.status_code == 200:
result = test_response.json()
print(f"βœ… Prediction test passed: {result['prediction_label']}")
return True
else:
print(f"❌ Prediction test failed: {test_response.status_code}")
else:
print(f"❌ Health check failed: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"⚠️ API not yet accessible (this is normal for new deployments): {str(e)}")
print("πŸ• The API may take a few minutes to become available")
return False
def deploy(self):
"""Full deployment process"""
print("πŸš€ Starting deployment to Hugging Face Spaces...")
print(f"πŸ“¦ Space: {self.space_url}")
print("="*60)
# Check requirements
if not self.check_requirements():
return False
# Setup Hugging Face Space
if not self.setup_huggingface_space():
return False
# Prepare files
if not self.prepare_files():
return False
# Deploy
if not self.clone_and_deploy():
return False
print("\n" + "="*60)
print("πŸŽ‰ DEPLOYMENT COMPLETED!")
print("="*60)
print(f"πŸ“§ API URL: {self.space_url}")
print(f"πŸ“š Documentation: {self.space_url}/docs")
print(f"πŸ” Interactive API: {self.space_url}/redoc")
print("\n⏳ Note: It may take a few minutes for the API to become fully available.")
# Test deployment
self.test_deployment()
return True
def main():
"""Main deployment function"""
print("πŸ”§ Hugging Face Spaces Deployment Tool")
print("="*40)
# Get user input
username = input("Enter your Hugging Face username: ").strip()
if not username:
print("❌ Username is required")
sys.exit(1)
space_name = input("Enter space name (e.g., email-classifier-api): ").strip()
if not space_name:
print("❌ Space name is required")
sys.exit(1)
token = input("Enter your Hugging Face token (optional, press Enter to skip): ").strip()
if not token:
print("ℹ️ No token provided. You'll need to create the space manually.")
# Create deployer and deploy
deployer = HuggingFaceDeployer(username, space_name, token)
if deployer.deploy():
print("\nβœ… Deployment successful!")
print("\nπŸ“‹ Next steps:")
print("1. Wait 2-3 minutes for the build to complete")
print("2. Check the build logs in your Hugging Face Space")
print("3. Test your API endpoints")
print("4. Share your API with others!")
else:
print("\n❌ Deployment failed!")
print("\nπŸ”§ Troubleshooting:")
print("1. Check that all required files are present")
print("2. Verify your Hugging Face credentials")
print("3. Ensure git is installed and configured")
print("4. Check the build logs in your Hugging Face Space")
if __name__ == "__main__":
main()