File size: 5,761 Bytes
17a51e1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
"""
A2A Servers for Agent1 (Image Generator) and Agent2 (Marketing Reviewer)
Starts both agents as A2A servers for communication
"""
import asyncio
import logging
import uvicorn
from multiprocessing import Process
import os
# A2A imports
try:
from google_a2a.common.server import A2AServer
from google_a2a.common.types import (
AgentCard,
AgentCapabilities,
AgentSkill
)
A2A_AVAILABLE = True
except ImportError:
A2A_AVAILABLE = False
print("β google-a2a not available. Install with: pip install google-a2a")
# Import our task managers
from a2a_agent1_task_manager import Agent1TaskManager
from a2a_agent2_task_manager import Agent2TaskManager
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def create_agent1_server(host="localhost", port=8001):
"""Create and start Agent1 (Image Generator) A2A server"""
if not A2A_AVAILABLE:
logger.error("A2A not available for Agent1")
return
# Define Agent1 capabilities and skills
agent1_skill = AgentSkill(
id="imagen3-generation-skill",
name="AI Image Generation",
description="Generate high-quality marketing images using Imagen3-MCP",
tags=["image", "generation", "marketing", "ai"],
examples=["Generate a professional office scene", "Create a product showcase image"],
inputModes=["text"],
outputModes=["text", "image"]
)
capabilities = AgentCapabilities(
streaming=True,
pushNotifications=False,
stateTransitionHistory=False
)
agent_card = AgentCard(
name="Image Generator Agent",
description="AI agent that generates marketing images using MCP-Imagen3 integration",
url=f"http://{host}:{port}/",
version="1.0.0",
capabilities=capabilities,
defaultInputModes=["text"],
defaultOutputModes=["text", "image"],
skills=[agent1_skill]
)
# Create task manager and server
task_manager = Agent1TaskManager()
server = A2AServer(
agent_card=agent_card,
task_manager=task_manager,
host=host,
port=port,
)
logger.info(f"π Starting Agent1 (Image Generator) on {host}:{port}")
server.start()
def create_agent2_server(host="localhost", port=8002):
"""Create and start Agent2 (Marketing Reviewer) A2A server"""
if not A2A_AVAILABLE:
logger.error("A2A not available for Agent2")
return
# Define Agent2 capabilities and skills
agent2_skill = AgentSkill(
id="marketing-review-skill",
name="Marketing Review",
description="Review and analyze marketing images and prompts for quality and compliance",
tags=["review", "marketing", "analysis", "quality"],
examples=["Review this marketing image", "Analyze prompt quality"],
inputModes=["text"],
outputModes=["text"]
)
capabilities = AgentCapabilities(
streaming=True,
pushNotifications=False,
stateTransitionHistory=False
)
agent_card = AgentCard(
name="Marketing Reviewer Agent",
description="AI agent that reviews marketing images and provides quality analysis",
url=f"http://{host}:{port}/",
version="1.0.0",
capabilities=capabilities,
defaultInputModes=["text"],
defaultOutputModes=["text"],
skills=[agent2_skill]
)
# Create task manager and server
task_manager = Agent2TaskManager()
server = A2AServer(
agent_card=agent_card,
task_manager=task_manager,
host=host,
port=port,
)
logger.info(f"π Starting Agent2 (Marketing Reviewer) on {host}:{port}")
server.start()
def start_agent1_process():
"""Start Agent1 in a separate process"""
try:
create_agent1_server()
except Exception as e:
logger.error(f"Agent1 process failed: {e}")
def start_agent2_process():
"""Start Agent2 in a separate process"""
try:
create_agent2_server()
except Exception as e:
logger.error(f"Agent2 process failed: {e}")
def start_both_agents():
"""Start both agents in separate processes"""
if not A2A_AVAILABLE:
logger.error("β google-a2a not available. Cannot start A2A servers.")
logger.info("Install with: pip install google-a2a")
return False
logger.info("π Starting A2A Agent System...")
# Start Agent1 (Image Generator)
agent1_process = Process(target=start_agent1_process)
agent1_process.start()
# Start Agent2 (Marketing Reviewer)
agent2_process = Process(target=start_agent2_process)
agent2_process.start()
logger.info("β
Both A2A agents started successfully!")
logger.info("Agent1 (Image Generator): http://localhost:8001")
logger.info("Agent2 (Marketing Reviewer): http://localhost:8002")
try:
# Keep the main process alive
agent1_process.join()
agent2_process.join()
except KeyboardInterrupt:
logger.info("π Shutting down A2A agents...")
agent1_process.terminate()
agent2_process.terminate()
agent1_process.join()
agent2_process.join()
logger.info("β
A2A agents shut down successfully")
return True
if __name__ == "__main__":
# For testing individual agents
import sys
if len(sys.argv) > 1:
if sys.argv[1] == "agent1":
create_agent1_server()
elif sys.argv[1] == "agent2":
create_agent2_server()
else:
print("Usage: python a2a_servers.py [agent1|agent2]")
else:
# Start both agents
start_both_agents() |