File size: 5,761 Bytes
17a51e1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
"""
A2A Servers for Agent1 (Image Generator) and Agent2 (Marketing Reviewer)
Starts both agents as A2A servers for communication
"""

import asyncio
import logging
import uvicorn
from multiprocessing import Process
import os

# A2A imports
try:
    from google_a2a.common.server import A2AServer
    from google_a2a.common.types import (
        AgentCard,
        AgentCapabilities,
        AgentSkill
    )
    A2A_AVAILABLE = True
except ImportError:
    A2A_AVAILABLE = False
    print("❌ google-a2a not available. Install with: pip install google-a2a")

# Import our task managers
from a2a_agent1_task_manager import Agent1TaskManager
from a2a_agent2_task_manager import Agent2TaskManager

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def create_agent1_server(host="localhost", port=8001):
    """Create and start Agent1 (Image Generator) A2A server"""
    if not A2A_AVAILABLE:
        logger.error("A2A not available for Agent1")
        return
    
    # Define Agent1 capabilities and skills
    agent1_skill = AgentSkill(
        id="imagen3-generation-skill",
        name="AI Image Generation",
        description="Generate high-quality marketing images using Imagen3-MCP",
        tags=["image", "generation", "marketing", "ai"],
        examples=["Generate a professional office scene", "Create a product showcase image"],
        inputModes=["text"],
        outputModes=["text", "image"]
    )
    
    capabilities = AgentCapabilities(
        streaming=True,
        pushNotifications=False,
        stateTransitionHistory=False
    )
    
    agent_card = AgentCard(
        name="Image Generator Agent",
        description="AI agent that generates marketing images using MCP-Imagen3 integration",
        url=f"http://{host}:{port}/",
        version="1.0.0",
        capabilities=capabilities,
        defaultInputModes=["text"],
        defaultOutputModes=["text", "image"],
        skills=[agent1_skill]
    )
    
    # Create task manager and server
    task_manager = Agent1TaskManager()
    server = A2AServer(
        agent_card=agent_card,
        task_manager=task_manager,
        host=host,
        port=port,
    )
    
    logger.info(f"πŸš€ Starting Agent1 (Image Generator) on {host}:{port}")
    server.start()

def create_agent2_server(host="localhost", port=8002):
    """Create and start Agent2 (Marketing Reviewer) A2A server"""
    if not A2A_AVAILABLE:
        logger.error("A2A not available for Agent2")
        return
    
    # Define Agent2 capabilities and skills
    agent2_skill = AgentSkill(
        id="marketing-review-skill",
        name="Marketing Review",
        description="Review and analyze marketing images and prompts for quality and compliance",
        tags=["review", "marketing", "analysis", "quality"],
        examples=["Review this marketing image", "Analyze prompt quality"],
        inputModes=["text"],
        outputModes=["text"]
    )
    
    capabilities = AgentCapabilities(
        streaming=True,
        pushNotifications=False,
        stateTransitionHistory=False
    )
    
    agent_card = AgentCard(
        name="Marketing Reviewer Agent",
        description="AI agent that reviews marketing images and provides quality analysis",
        url=f"http://{host}:{port}/",
        version="1.0.0",
        capabilities=capabilities,
        defaultInputModes=["text"],
        defaultOutputModes=["text"],
        skills=[agent2_skill]
    )
    
    # Create task manager and server
    task_manager = Agent2TaskManager()
    server = A2AServer(
        agent_card=agent_card,
        task_manager=task_manager,
        host=host,
        port=port,
    )
    
    logger.info(f"πŸš€ Starting Agent2 (Marketing Reviewer) on {host}:{port}")
    server.start()

def start_agent1_process():
    """Start Agent1 in a separate process"""
    try:
        create_agent1_server()
    except Exception as e:
        logger.error(f"Agent1 process failed: {e}")

def start_agent2_process():
    """Start Agent2 in a separate process"""
    try:
        create_agent2_server()
    except Exception as e:
        logger.error(f"Agent2 process failed: {e}")

def start_both_agents():
    """Start both agents in separate processes"""
    if not A2A_AVAILABLE:
        logger.error("❌ google-a2a not available. Cannot start A2A servers.")
        logger.info("Install with: pip install google-a2a")
        return False
    
    logger.info("πŸš€ Starting A2A Agent System...")
    
    # Start Agent1 (Image Generator)
    agent1_process = Process(target=start_agent1_process)
    agent1_process.start()
    
    # Start Agent2 (Marketing Reviewer) 
    agent2_process = Process(target=start_agent2_process)
    agent2_process.start()
    
    logger.info("βœ… Both A2A agents started successfully!")
    logger.info("Agent1 (Image Generator): http://localhost:8001")
    logger.info("Agent2 (Marketing Reviewer): http://localhost:8002")
    
    try:
        # Keep the main process alive
        agent1_process.join()
        agent2_process.join()
    except KeyboardInterrupt:
        logger.info("πŸ›‘ Shutting down A2A agents...")
        agent1_process.terminate()
        agent2_process.terminate()
        agent1_process.join()
        agent2_process.join()
        logger.info("βœ… A2A agents shut down successfully")
    
    return True

if __name__ == "__main__":
    # For testing individual agents
    import sys
    
    if len(sys.argv) > 1:
        if sys.argv[1] == "agent1":
            create_agent1_server()
        elif sys.argv[1] == "agent2":
            create_agent2_server()
        else:
            print("Usage: python a2a_servers.py [agent1|agent2]")
    else:
        # Start both agents
        start_both_agents()