|
""" |
|
A2A Servers for Agent1 (Image Generator) and Agent2 (Marketing Reviewer) |
|
Starts both agents as A2A servers for communication |
|
""" |
|
|
|
import asyncio |
|
import logging |
|
import uvicorn |
|
from multiprocessing import Process |
|
import os |
|
|
|
|
|
try: |
|
from google_a2a.common.server import A2AServer |
|
from google_a2a.common.types import ( |
|
AgentCard, |
|
AgentCapabilities, |
|
AgentSkill |
|
) |
|
A2A_AVAILABLE = True |
|
except ImportError: |
|
A2A_AVAILABLE = False |
|
print("β google-a2a not available. Install with: pip install google-a2a") |
|
|
|
|
|
from a2a_agent1_task_manager import Agent1TaskManager |
|
from a2a_agent2_task_manager import Agent2TaskManager |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
def create_agent1_server(host="localhost", port=8001): |
|
"""Create and start Agent1 (Image Generator) A2A server""" |
|
if not A2A_AVAILABLE: |
|
logger.error("A2A not available for Agent1") |
|
return |
|
|
|
|
|
agent1_skill = AgentSkill( |
|
id="imagen3-generation-skill", |
|
name="AI Image Generation", |
|
description="Generate high-quality marketing images using Imagen3-MCP", |
|
tags=["image", "generation", "marketing", "ai"], |
|
examples=["Generate a professional office scene", "Create a product showcase image"], |
|
inputModes=["text"], |
|
outputModes=["text", "image"] |
|
) |
|
|
|
capabilities = AgentCapabilities( |
|
streaming=True, |
|
pushNotifications=False, |
|
stateTransitionHistory=False |
|
) |
|
|
|
agent_card = AgentCard( |
|
name="Image Generator Agent", |
|
description="AI agent that generates marketing images using MCP-Imagen3 integration", |
|
url=f"http://{host}:{port}/", |
|
version="1.0.0", |
|
capabilities=capabilities, |
|
defaultInputModes=["text"], |
|
defaultOutputModes=["text", "image"], |
|
skills=[agent1_skill] |
|
) |
|
|
|
|
|
task_manager = Agent1TaskManager() |
|
server = A2AServer( |
|
agent_card=agent_card, |
|
task_manager=task_manager, |
|
host=host, |
|
port=port, |
|
) |
|
|
|
logger.info(f"π Starting Agent1 (Image Generator) on {host}:{port}") |
|
server.start() |
|
|
|
def create_agent2_server(host="localhost", port=8002): |
|
"""Create and start Agent2 (Marketing Reviewer) A2A server""" |
|
if not A2A_AVAILABLE: |
|
logger.error("A2A not available for Agent2") |
|
return |
|
|
|
|
|
agent2_skill = AgentSkill( |
|
id="marketing-review-skill", |
|
name="Marketing Review", |
|
description="Review and analyze marketing images and prompts for quality and compliance", |
|
tags=["review", "marketing", "analysis", "quality"], |
|
examples=["Review this marketing image", "Analyze prompt quality"], |
|
inputModes=["text"], |
|
outputModes=["text"] |
|
) |
|
|
|
capabilities = AgentCapabilities( |
|
streaming=True, |
|
pushNotifications=False, |
|
stateTransitionHistory=False |
|
) |
|
|
|
agent_card = AgentCard( |
|
name="Marketing Reviewer Agent", |
|
description="AI agent that reviews marketing images and provides quality analysis", |
|
url=f"http://{host}:{port}/", |
|
version="1.0.0", |
|
capabilities=capabilities, |
|
defaultInputModes=["text"], |
|
defaultOutputModes=["text"], |
|
skills=[agent2_skill] |
|
) |
|
|
|
|
|
task_manager = Agent2TaskManager() |
|
server = A2AServer( |
|
agent_card=agent_card, |
|
task_manager=task_manager, |
|
host=host, |
|
port=port, |
|
) |
|
|
|
logger.info(f"π Starting Agent2 (Marketing Reviewer) on {host}:{port}") |
|
server.start() |
|
|
|
def start_agent1_process(): |
|
"""Start Agent1 in a separate process""" |
|
try: |
|
create_agent1_server() |
|
except Exception as e: |
|
logger.error(f"Agent1 process failed: {e}") |
|
|
|
def start_agent2_process(): |
|
"""Start Agent2 in a separate process""" |
|
try: |
|
create_agent2_server() |
|
except Exception as e: |
|
logger.error(f"Agent2 process failed: {e}") |
|
|
|
def start_both_agents(): |
|
"""Start both agents in separate processes""" |
|
if not A2A_AVAILABLE: |
|
logger.error("β google-a2a not available. Cannot start A2A servers.") |
|
logger.info("Install with: pip install google-a2a") |
|
return False |
|
|
|
logger.info("π Starting A2A Agent System...") |
|
|
|
|
|
agent1_process = Process(target=start_agent1_process) |
|
agent1_process.start() |
|
|
|
|
|
agent2_process = Process(target=start_agent2_process) |
|
agent2_process.start() |
|
|
|
logger.info("β
Both A2A agents started successfully!") |
|
logger.info("Agent1 (Image Generator): http://localhost:8001") |
|
logger.info("Agent2 (Marketing Reviewer): http://localhost:8002") |
|
|
|
try: |
|
|
|
agent1_process.join() |
|
agent2_process.join() |
|
except KeyboardInterrupt: |
|
logger.info("π Shutting down A2A agents...") |
|
agent1_process.terminate() |
|
agent2_process.terminate() |
|
agent1_process.join() |
|
agent2_process.join() |
|
logger.info("β
A2A agents shut down successfully") |
|
|
|
return True |
|
|
|
if __name__ == "__main__": |
|
|
|
import sys |
|
|
|
if len(sys.argv) > 1: |
|
if sys.argv[1] == "agent1": |
|
create_agent1_server() |
|
elif sys.argv[1] == "agent2": |
|
create_agent2_server() |
|
else: |
|
print("Usage: python a2a_servers.py [agent1|agent2]") |
|
else: |
|
|
|
start_both_agents() |