Noo88ear commited on
Commit
204138a
·
verified ·
1 Parent(s): 57ccf92

Delete a2a_servers.py

Browse files
Files changed (1) hide show
  1. a2a_servers.py +0 -186
a2a_servers.py DELETED
@@ -1,186 +0,0 @@
1
- """
2
- A2A Servers for Agent1 (Image Generator) and Agent2 (Marketing Reviewer)
3
- Starts both agents as A2A servers for communication
4
- """
5
-
6
- import asyncio
7
- import logging
8
- import uvicorn
9
- from multiprocessing import Process
10
- import os
11
-
12
- # A2A imports
13
- try:
14
- from google_a2a.common.server import A2AServer
15
- from google_a2a.common.types import (
16
- AgentCard,
17
- AgentCapabilities,
18
- AgentSkill
19
- )
20
- A2A_AVAILABLE = True
21
- except ImportError:
22
- A2A_AVAILABLE = False
23
- print("❌ google-a2a not available. Install with: pip install google-a2a")
24
-
25
- # Import our task managers
26
- from a2a_agent1_task_manager import Agent1TaskManager
27
- from a2a_agent2_task_manager import Agent2TaskManager
28
-
29
- logging.basicConfig(level=logging.INFO)
30
- logger = logging.getLogger(__name__)
31
-
32
- def create_agent1_server(host="localhost", port=8001):
33
- """Create and start Agent1 (Image Generator) A2A server"""
34
- if not A2A_AVAILABLE:
35
- logger.error("A2A not available for Agent1")
36
- return
37
-
38
- # Define Agent1 capabilities and skills
39
- agent1_skill = AgentSkill(
40
- id="imagen3-generation-skill",
41
- name="AI Image Generation",
42
- description="Generate high-quality marketing images using Imagen3-MCP",
43
- tags=["image", "generation", "marketing", "ai"],
44
- examples=["Generate a professional office scene", "Create a product showcase image"],
45
- inputModes=["text"],
46
- outputModes=["text", "image"]
47
- )
48
-
49
- capabilities = AgentCapabilities(
50
- streaming=True,
51
- pushNotifications=False,
52
- stateTransitionHistory=False
53
- )
54
-
55
- agent_card = AgentCard(
56
- name="Image Generator Agent",
57
- description="AI agent that generates marketing images using MCP-Imagen3 integration",
58
- url=f"http://{host}:{port}/",
59
- version="1.0.0",
60
- capabilities=capabilities,
61
- defaultInputModes=["text"],
62
- defaultOutputModes=["text", "image"],
63
- skills=[agent1_skill]
64
- )
65
-
66
- # Create task manager and server
67
- task_manager = Agent1TaskManager()
68
- server = A2AServer(
69
- agent_card=agent_card,
70
- task_manager=task_manager,
71
- host=host,
72
- port=port,
73
- )
74
-
75
- logger.info(f"🚀 Starting Agent1 (Image Generator) on {host}:{port}")
76
- server.start()
77
-
78
- def create_agent2_server(host="localhost", port=8002):
79
- """Create and start Agent2 (Marketing Reviewer) A2A server"""
80
- if not A2A_AVAILABLE:
81
- logger.error("A2A not available for Agent2")
82
- return
83
-
84
- # Define Agent2 capabilities and skills
85
- agent2_skill = AgentSkill(
86
- id="marketing-review-skill",
87
- name="Marketing Review",
88
- description="Review and analyze marketing images and prompts for quality and compliance",
89
- tags=["review", "marketing", "analysis", "quality"],
90
- examples=["Review this marketing image", "Analyze prompt quality"],
91
- inputModes=["text"],
92
- outputModes=["text"]
93
- )
94
-
95
- capabilities = AgentCapabilities(
96
- streaming=True,
97
- pushNotifications=False,
98
- stateTransitionHistory=False
99
- )
100
-
101
- agent_card = AgentCard(
102
- name="Marketing Reviewer Agent",
103
- description="AI agent that reviews marketing images and provides quality analysis",
104
- url=f"http://{host}:{port}/",
105
- version="1.0.0",
106
- capabilities=capabilities,
107
- defaultInputModes=["text"],
108
- defaultOutputModes=["text"],
109
- skills=[agent2_skill]
110
- )
111
-
112
- # Create task manager and server
113
- task_manager = Agent2TaskManager()
114
- server = A2AServer(
115
- agent_card=agent_card,
116
- task_manager=task_manager,
117
- host=host,
118
- port=port,
119
- )
120
-
121
- logger.info(f"🚀 Starting Agent2 (Marketing Reviewer) on {host}:{port}")
122
- server.start()
123
-
124
- def start_agent1_process():
125
- """Start Agent1 in a separate process"""
126
- try:
127
- create_agent1_server()
128
- except Exception as e:
129
- logger.error(f"Agent1 process failed: {e}")
130
-
131
- def start_agent2_process():
132
- """Start Agent2 in a separate process"""
133
- try:
134
- create_agent2_server()
135
- except Exception as e:
136
- logger.error(f"Agent2 process failed: {e}")
137
-
138
- def start_both_agents():
139
- """Start both agents in separate processes"""
140
- if not A2A_AVAILABLE:
141
- logger.error("❌ google-a2a not available. Cannot start A2A servers.")
142
- logger.info("Install with: pip install google-a2a")
143
- return False
144
-
145
- logger.info("🚀 Starting A2A Agent System...")
146
-
147
- # Start Agent1 (Image Generator)
148
- agent1_process = Process(target=start_agent1_process)
149
- agent1_process.start()
150
-
151
- # Start Agent2 (Marketing Reviewer)
152
- agent2_process = Process(target=start_agent2_process)
153
- agent2_process.start()
154
-
155
- logger.info("✅ Both A2A agents started successfully!")
156
- logger.info("Agent1 (Image Generator): http://localhost:8001")
157
- logger.info("Agent2 (Marketing Reviewer): http://localhost:8002")
158
-
159
- try:
160
- # Keep the main process alive
161
- agent1_process.join()
162
- agent2_process.join()
163
- except KeyboardInterrupt:
164
- logger.info("🛑 Shutting down A2A agents...")
165
- agent1_process.terminate()
166
- agent2_process.terminate()
167
- agent1_process.join()
168
- agent2_process.join()
169
- logger.info("✅ A2A agents shut down successfully")
170
-
171
- return True
172
-
173
- if __name__ == "__main__":
174
- # For testing individual agents
175
- import sys
176
-
177
- if len(sys.argv) > 1:
178
- if sys.argv[1] == "agent1":
179
- create_agent1_server()
180
- elif sys.argv[1] == "agent2":
181
- create_agent2_server()
182
- else:
183
- print("Usage: python a2a_servers.py [agent1|agent2]")
184
- else:
185
- # Start both agents
186
- start_both_agents()