marketing-image-generator / a2a_servers.py
Noo88ear's picture
Create a2a_servers.py
17a51e1 verified
raw
history blame
5.76 kB
"""
A2A Servers for Agent1 (Image Generator) and Agent2 (Marketing Reviewer)
Starts both agents as A2A servers for communication
"""
import asyncio
import logging
import uvicorn
from multiprocessing import Process
import os
# A2A imports
try:
from google_a2a.common.server import A2AServer
from google_a2a.common.types import (
AgentCard,
AgentCapabilities,
AgentSkill
)
A2A_AVAILABLE = True
except ImportError:
A2A_AVAILABLE = False
print("❌ google-a2a not available. Install with: pip install google-a2a")
# Import our task managers
from a2a_agent1_task_manager import Agent1TaskManager
from a2a_agent2_task_manager import Agent2TaskManager
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def create_agent1_server(host="localhost", port=8001):
"""Create and start Agent1 (Image Generator) A2A server"""
if not A2A_AVAILABLE:
logger.error("A2A not available for Agent1")
return
# Define Agent1 capabilities and skills
agent1_skill = AgentSkill(
id="imagen3-generation-skill",
name="AI Image Generation",
description="Generate high-quality marketing images using Imagen3-MCP",
tags=["image", "generation", "marketing", "ai"],
examples=["Generate a professional office scene", "Create a product showcase image"],
inputModes=["text"],
outputModes=["text", "image"]
)
capabilities = AgentCapabilities(
streaming=True,
pushNotifications=False,
stateTransitionHistory=False
)
agent_card = AgentCard(
name="Image Generator Agent",
description="AI agent that generates marketing images using MCP-Imagen3 integration",
url=f"http://{host}:{port}/",
version="1.0.0",
capabilities=capabilities,
defaultInputModes=["text"],
defaultOutputModes=["text", "image"],
skills=[agent1_skill]
)
# Create task manager and server
task_manager = Agent1TaskManager()
server = A2AServer(
agent_card=agent_card,
task_manager=task_manager,
host=host,
port=port,
)
logger.info(f"πŸš€ Starting Agent1 (Image Generator) on {host}:{port}")
server.start()
def create_agent2_server(host="localhost", port=8002):
"""Create and start Agent2 (Marketing Reviewer) A2A server"""
if not A2A_AVAILABLE:
logger.error("A2A not available for Agent2")
return
# Define Agent2 capabilities and skills
agent2_skill = AgentSkill(
id="marketing-review-skill",
name="Marketing Review",
description="Review and analyze marketing images and prompts for quality and compliance",
tags=["review", "marketing", "analysis", "quality"],
examples=["Review this marketing image", "Analyze prompt quality"],
inputModes=["text"],
outputModes=["text"]
)
capabilities = AgentCapabilities(
streaming=True,
pushNotifications=False,
stateTransitionHistory=False
)
agent_card = AgentCard(
name="Marketing Reviewer Agent",
description="AI agent that reviews marketing images and provides quality analysis",
url=f"http://{host}:{port}/",
version="1.0.0",
capabilities=capabilities,
defaultInputModes=["text"],
defaultOutputModes=["text"],
skills=[agent2_skill]
)
# Create task manager and server
task_manager = Agent2TaskManager()
server = A2AServer(
agent_card=agent_card,
task_manager=task_manager,
host=host,
port=port,
)
logger.info(f"πŸš€ Starting Agent2 (Marketing Reviewer) on {host}:{port}")
server.start()
def start_agent1_process():
"""Start Agent1 in a separate process"""
try:
create_agent1_server()
except Exception as e:
logger.error(f"Agent1 process failed: {e}")
def start_agent2_process():
"""Start Agent2 in a separate process"""
try:
create_agent2_server()
except Exception as e:
logger.error(f"Agent2 process failed: {e}")
def start_both_agents():
"""Start both agents in separate processes"""
if not A2A_AVAILABLE:
logger.error("❌ google-a2a not available. Cannot start A2A servers.")
logger.info("Install with: pip install google-a2a")
return False
logger.info("πŸš€ Starting A2A Agent System...")
# Start Agent1 (Image Generator)
agent1_process = Process(target=start_agent1_process)
agent1_process.start()
# Start Agent2 (Marketing Reviewer)
agent2_process = Process(target=start_agent2_process)
agent2_process.start()
logger.info("βœ… Both A2A agents started successfully!")
logger.info("Agent1 (Image Generator): http://localhost:8001")
logger.info("Agent2 (Marketing Reviewer): http://localhost:8002")
try:
# Keep the main process alive
agent1_process.join()
agent2_process.join()
except KeyboardInterrupt:
logger.info("πŸ›‘ Shutting down A2A agents...")
agent1_process.terminate()
agent2_process.terminate()
agent1_process.join()
agent2_process.join()
logger.info("βœ… A2A agents shut down successfully")
return True
if __name__ == "__main__":
# For testing individual agents
import sys
if len(sys.argv) > 1:
if sys.argv[1] == "agent1":
create_agent1_server()
elif sys.argv[1] == "agent2":
create_agent2_server()
else:
print("Usage: python a2a_servers.py [agent1|agent2]")
else:
# Start both agents
start_both_agents()