GuglielmoTor commited on
Commit
d675733
·
verified ·
1 Parent(s): c7fbc19

Create employer_branding_coordinator.py

Browse files
insight_and_tasks/employer_branding_coordinator.py ADDED
@@ -0,0 +1,326 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # coordinators/employer_branding_coordinator.py
2
+ import json
3
+ import logging
4
+ from typing import Optional, Dict, Any # Added Dict, Any
5
+ from dataclasses import asdict # For converting dataclasses (like AgentMetrics) to dict
6
+
7
+ from google.adk.agents import LlmAgent
8
+ from google.adk.runners import InMemoryRunner
9
+ from google.genai import types as genai_types # For ADK agent inputs
10
+
11
+ # Project-specific imports
12
+ from agents.follower_agent import EnhancedFollowerAnalysisAgent
13
+ from agents.post_agent import EnhancedPostPerformanceAgent
14
+ from agents.mentions_agent import EnhancedMentionsAnalysisAgent
15
+ from data_models.metrics import AgentMetrics # To type hint inputs
16
+ from utils.retry_mechanism import RetryMechanism # If ADK calls need retry
17
+
18
+ # Configure logger for this module
19
+ logger = logging.getLogger(__name__)
20
+
21
+ DEFAULT_COORDINATOR_MODEL = "gemini-2.5-flash-preview-05-20" # Use a more capable model for synthesis
22
+
23
+ class EnhancedEmployerBrandingCoordinator:
24
+ """
25
+ Enhanced coordinator for synthesizing insights from multiple agent metrics,
26
+ identifying correlations, and generating integrated strategic recommendations.
27
+ """
28
+ COORDINATOR_AGENT_NAME = "employer_branding_coordinator"
29
+ COORDINATOR_AGENT_DESCRIPTION = (
30
+ "Strategic coordinator that analyzes metrics from Follower, Post Performance, and Mentions agents "
31
+ "to find correlations, suggest potential causal links, and generate integrated strategies."
32
+ )
33
+ COORDINATOR_AGENT_INSTRUCTION = """
34
+ You are the Enhanced Employer Branding Coordinator. Your primary mission is to synthesize analyses and
35
+ structured metrics (TimeSeries, Aggregate, Categorical) from three specialized agents: Follower Analysis,
36
+ Post Performance, and Mentions Analysis. Your goal is to provide a holistic, integrated understanding of
37
+ the LinkedIn employer branding performance.
38
+
39
+ You MUST focus on:
40
+ 1. Cross-Agent Correlations: Analyze how metrics from different agents relate to each other over time.
41
+ Pay close attention to the 'time_series_metrics' provided by each agent.
42
+ - Identify positive or negative correlations (e.g., "Follower growth rate increased by X% when posts about 'company culture' (Post Agent) were published, coinciding with a Y% rise in positive mentions (Mentions Agent)").
43
+ - Note any leading or lagging indicators (e.g., "A spike in negative mentions often preceded a dip in follower growth by approximately 2 weeks.").
44
+ - Look for relationships between specific content types/topics (from Post Agent) and follower engagement/growth (Follower Agent) or brand sentiment (Mentions Agent).
45
+ 2. Potential Causal Insights & Hypotheses: Based on observed correlations and temporal sequences, suggest plausible causal relationships.
46
+ These are hypotheses, not definitive conclusions.
47
+ - Example: "The Q2 campaign focusing on 'employee testimonials' (Post Agent data) likely contributed to the observed 15% increase in organic follower acquisition (Follower Agent data) and the shift towards more positive sentiment in mentions (Mentions Agent data) during the same period."
48
+ 3. Root Cause Analysis (Conceptual): For significant performance changes (e.g., sudden engagement drops, unexpected follower spikes, sharp sentiment shifts), attempt to identify potential root causes by cross-referencing data and summaries from all three agents.
49
+ 4. Predictive Insights (High-Level): Based on established trends and correlations, what are potential future performance trajectories or risks?
50
+ - Example: "If the current trend of declining engagement on text-only posts continues, overall reach may decrease by X% next quarter unless content strategy is diversified."
51
+ 5. Integrated Strategic Recommendations: Formulate actionable, strategic advice that leverages insights from ALL THREE data sources to optimize overall employer branding.
52
+ - Recommendations should be specific (e.g., "Increase frequency of video posts related to 'Team Achievements' as this format shows high engagement and correlates with positive mention spikes.").
53
+ - Prioritize recommendations based on their potential impact, supported by the cross-agent analysis.
54
+ - Suggest A/B tests or further investigations where appropriate.
55
+
56
+ INPUT: You will receive structured 'AgentMetrics' data (JSON format) from each of the three agents. This includes their own analysis summaries, time-series data, aggregate figures, and categorical breakdowns.
57
+
58
+ OUTPUT: A comprehensive, well-structured report covering:
59
+ I. Overall Executive Summary: A brief (2-3 paragraph) overview of the most critical findings and strategic implications derived from the integrated analysis.
60
+ II. Detailed Cross-Agent Correlation Analysis: Elaborate on specific correlations found, with examples.
61
+ III.Key Causal Hypotheses: Present the most compelling potential causal links.
62
+ IV. Noteworthy Performance Shifts & Potential Root Causes: Discuss any major changes and their likely drivers.
63
+ V. Forward-Looking Predictive Insights: Offer high-level predictions.
64
+ VI. Actionable Integrated Strategic Recommendations: Provide clear, prioritized recommendations.
65
+
66
+ Your analysis must be grounded in the provided data. Refer to specific metrics and agent findings to support your conclusions.
67
+ Be insightful and strategic. The goal is to provide a unified view that is more valuable than the sum of the individual agent analyses.
68
+ """
69
+
70
+ def __init__(self, api_key: str, model_name: Optional[str] = None):
71
+ self.api_key = api_key # Stored for LlmAgent or if agents need it passed explicitly
72
+ self.model_name = model_name or DEFAULT_COORDINATOR_MODEL
73
+
74
+ # Initialize individual agents. The coordinator will use their output.
75
+ # These agents are internal to the coordinator's process of getting data to synthesize.
76
+ self.follower_agent = EnhancedFollowerAnalysisAgent(api_key=api_key, model_name=model_name) # Pass down model if needed
77
+ self.post_agent = EnhancedPostPerformanceAgent(api_key=api_key, model_name=model_name)
78
+ self.mentions_agent = EnhancedMentionsAnalysisAgent(api_key=api_key, model_name=model_name)
79
+
80
+ # The LLM agent for the coordinator itself, responsible for synthesis
81
+ self.coordinator_llm_agent = LlmAgent(
82
+ name=self.COORDINATOR_AGENT_NAME,
83
+ model=self.model_name, # Use the coordinator's (potentially more powerful) model
84
+ description=self.COORDINATOR_AGENT_DESCRIPTION,
85
+ instruction=self.COORDINATOR_AGENT_INSTRUCTION
86
+ )
87
+ self.retry_mechanism = RetryMechanism()
88
+ logger.info(f"{self.COORDINATOR_AGENT_NAME} initialized with model {self.model_name}.")
89
+ logger.info(f"It internally uses: Follower Agent ({self.follower_agent.model_name}), "
90
+ f"Post Agent ({self.post_agent.model_name}), Mentions Agent ({self.mentions_agent.model_name}).")
91
+
92
+
93
+ async def generate_comprehensive_analysis(
94
+ self,
95
+ follower_metrics: AgentMetrics,
96
+ post_metrics: AgentMetrics,
97
+ mentions_metrics: AgentMetrics
98
+ ) -> str:
99
+ """
100
+ Generates a comprehensive analysis by synthesizing metrics from all specialized agents.
101
+
102
+ Args:
103
+ follower_metrics: Metrics from the EnhancedFollowerAnalysisAgent.
104
+ post_metrics: Metrics from the EnhancedPostPerformanceAgent.
105
+ mentions_metrics: Metrics from the EnhancedMentionsAnalysisAgent.
106
+
107
+ Returns:
108
+ A string containing the comprehensive analysis report.
109
+ """
110
+
111
+ # Prepare the input prompt for the coordinator's LlmAgent
112
+ # Serialize the AgentMetrics objects (which are dataclasses) to dictionaries
113
+ # then to JSON strings for clean inclusion in the prompt.
114
+ try:
115
+ follower_metrics_dict = asdict(follower_metrics)
116
+ post_metrics_dict = asdict(post_metrics)
117
+ mentions_metrics_dict = asdict(mentions_metrics)
118
+ except Exception as e:
119
+ logger.error(f"Error converting AgentMetrics to dict: {e}", exc_info=True)
120
+ return "Error: Could not process input metrics for coordination."
121
+
122
+ # Truncate individual agent summaries if they are too long to avoid overly large prompts
123
+ max_summary_len = 500 # Max characters for individual agent summaries in the prompt
124
+ follower_metrics_dict['analysis_summary'] = follower_metrics_dict.get('analysis_summary', '')[:max_summary_len]
125
+ post_metrics_dict['analysis_summary'] = post_metrics_dict.get('analysis_summary', '')[:max_summary_len]
126
+ mentions_metrics_dict['analysis_summary'] = mentions_metrics_dict.get('analysis_summary', '')[:max_summary_len]
127
+
128
+
129
+ synthesis_prompt = f"""
130
+ Please synthesize the following LinkedIn analytics insights, which are structured as 'AgentMetrics'
131
+ from three specialized agents. Your primary task is to identify cross-metric correlations,
132
+ deduce potential causal relationships, and provide integrated strategic recommendations based on
133
+ your core instructions.
134
+
135
+ DATA FROM SPECIALIZED AGENTS:
136
+
137
+ 1. Follower Analysis Agent Metrics:
138
+ - Agent Name: {follower_metrics_dict.get('agent_name')}
139
+ - Agent's Analysis Summary: {follower_metrics_dict.get('analysis_summary')}
140
+ - Time Series Metrics: {json.dumps([asdict(m) for m in follower_metrics.time_series_metrics], indent=2, default=str)}
141
+ - Aggregate Metrics: {json.dumps(follower_metrics_dict.get('aggregate_metrics'), indent=2, default=str)}
142
+ - Categorical Metrics: {json.dumps(follower_metrics_dict.get('categorical_metrics'), indent=2, default=str)}
143
+ - Time Periods Covered: {json.dumps(follower_metrics_dict.get('time_periods_covered'), default=str)}
144
+ - Key Insights by Agent: {json.dumps(follower_metrics_dict.get('key_insights'), default=str)}
145
+
146
+ 2. Post Performance Agent Metrics:
147
+ - Agent Name: {post_metrics_dict.get('agent_name')}
148
+ - Agent's Analysis Summary: {post_metrics_dict.get('analysis_summary')}
149
+ - Time Series Metrics: {json.dumps([asdict(m) for m in post_metrics.time_series_metrics], indent=2, default=str)}
150
+ - Aggregate Metrics: {json.dumps(post_metrics_dict.get('aggregate_metrics'), indent=2, default=str)}
151
+ - Categorical Metrics: {json.dumps(post_metrics_dict.get('categorical_metrics'), indent=2, default=str)}
152
+ - Time Periods Covered: {json.dumps(post_metrics_dict.get('time_periods_covered'), default=str)}
153
+ - Key Insights by Agent: {json.dumps(post_metrics_dict.get('key_insights'), default=str)}
154
+
155
+ 3. Mentions Analysis Agent Metrics:
156
+ - Agent Name: {mentions_metrics_dict.get('agent_name')}
157
+ - Agent's Analysis Summary: {mentions_metrics_dict.get('analysis_summary')}
158
+ - Time Series Metrics: {json.dumps([asdict(m) for m in mentions_metrics.time_series_metrics], indent=2, default=str)}
159
+ - Aggregate Metrics: {json.dumps(mentions_metrics_dict.get('aggregate_metrics'), indent=2, default=str)}
160
+ - Categorical Metrics: {json.dumps(mentions_metrics_dict.get('categorical_metrics'), indent=2, default=str)}
161
+ - Time Periods Covered: {json.dumps(mentions_metrics_dict.get('time_periods_covered'), default=str)}
162
+ - Key Insights by Agent: {json.dumps(mentions_metrics_dict.get('key_insights'), default=str)}
163
+
164
+ COORDINATION TASK:
165
+ Based on ALL the data presented above from the three agents, generate a comprehensive synthesis report.
166
+ Follow your core instructions meticulously, focusing on cross-agent correlations (especially using the
167
+ time_series_metrics), causal hypotheses, root cause considerations for major shifts, predictive insights,
168
+ and actionable, integrated strategic recommendations.
169
+ Structure your output as a detailed report with the specified sections.
170
+ """
171
+
172
+ user_input_content = genai_types.Content(
173
+ role="user",
174
+ parts=[genai_types.Part(text=synthesis_prompt)]
175
+ )
176
+
177
+ runner = InMemoryRunner(agent=self.coordinator_llm_agent, app_name=f"{self.COORDINATOR_AGENT_NAME}Runner")
178
+ user_id = f"system_user_coordinator_{int(datetime.utcnow().timestamp())}" # Unique ID for the run
179
+
180
+ session = await runner.session_service.create_session(
181
+ app_name=f"{self.COORDINATOR_AGENT_NAME}Runner",
182
+ user_id=user_id
183
+ )
184
+
185
+ result_text_parts = []
186
+ try:
187
+ logger.info(f"Running {self.COORDINATOR_AGENT_NAME} for synthesis. User ID: {user_id}, Session ID: {session.id}")
188
+ # Using retry for the ADK runner execution part
189
+ async def run_adk_coordinator():
190
+ temp_result_parts = []
191
+ async for event in runner.run(
192
+ user_id=user_id,
193
+ session_id=session.id,
194
+ new_message=user_input_content
195
+ ):
196
+ if hasattr(event, 'content') and event.content and event.content.parts:
197
+ for part in event.content.parts:
198
+ if hasattr(part, 'text'):
199
+ temp_result_parts.append(part.text)
200
+ if not temp_result_parts:
201
+ # This could happen if the LLM returns no content or an error not caught by ADK
202
+ logger.warning(f"{self.COORDINATOR_AGENT_NAME} produced no text output from ADK run.")
203
+ # Consider raising a specific error or returning a default message
204
+ # For now, it will result in an empty string if no parts are collected.
205
+ return "".join(temp_result_parts)
206
+
207
+ # The retry_with_backoff expects a synchronous function.
208
+ # For async, you'd typically handle retries within the async logic or use an async retry library.
209
+ # For simplicity here, we'll run it once. If retries are critical for ADK calls,
210
+ # the ADK runner itself might have retry mechanisms, or this part needs adjustment.
211
+ # The original code didn't show retry for this ADK call, so keeping it direct.
212
+
213
+ # Direct call without retry for the async ADK runner:
214
+ async for event in runner.run(
215
+ user_id=user_id,
216
+ session_id=session.id,
217
+ new_message=user_input_content
218
+ ):
219
+ if hasattr(event, 'content') and event.content and event.content.parts:
220
+ for part in event.content.parts:
221
+ if hasattr(part, 'text'):
222
+ result_text_parts.append(part.text)
223
+
224
+ final_result_text = "".join(result_text_parts)
225
+ if not final_result_text.strip():
226
+ logger.warning(f"{self.COORDINATOR_AGENT_NAME} synthesis resulted in an empty string.")
227
+ final_result_text = "Coordinator analysis did not produce output. Please check logs."
228
+
229
+
230
+ except Exception as e:
231
+ logger.error(f"Error during {self.COORDINATOR_AGENT_NAME} LLM agent execution: {e}", exc_info=True)
232
+ final_result_text = f"Error in coordinator synthesis: {str(e)}"
233
+ finally:
234
+ try:
235
+ await runner.session_service.delete_session(
236
+ app_name=f"{self.COORDINATOR_AGENT_NAME}Runner", user_id=user_id, session_id=session.id
237
+ )
238
+ except Exception as session_del_e:
239
+ logger.error(f"Error deleting coordinator session: {session_del_e}")
240
+
241
+ return final_result_text
242
+
243
+ if __name__ == '__main__':
244
+ import asyncio
245
+ import pandas as pd # For creating dummy data
246
+ from datetime import datetime # For dummy data AgentMetrics
247
+
248
+ try:
249
+ from utils.logging_config import setup_logging
250
+ setup_logging()
251
+ logger.info("Logging setup for EnhancedEmployerBrandingCoordinator test.")
252
+ except ImportError:
253
+ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
254
+ logger.warning("logging_config.py not found, using basicConfig for logging.")
255
+
256
+ MOCK_API_KEY = os.environ.get("GOOGLE_API_KEY", "test_api_key_coordinator")
257
+ MODEL_NAME = DEFAULT_COORDINATOR_MODEL # Or a specific test model
258
+
259
+ # Create dummy AgentMetrics data for testing
260
+ dummy_ts_metric = TimeSeriesMetric(metric_name="dummy_visits", values=[10.0,20.0], timestamps=["2023-01","2023-02"])
261
+
262
+ follower_metrics_data = AgentMetrics(
263
+ agent_name="follower_analyst_test",
264
+ analysis_summary="Followers grew steadily. Demographic: Young professionals.",
265
+ time_series_metrics=[dummy_ts_metric],
266
+ aggregate_metrics={"avg_growth_rate": 0.05},
267
+ categorical_metrics={"top_industry": "Tech"},
268
+ time_periods_covered=["2023-01", "2023-02"],
269
+ key_insights=["Organic growth is strong."]
270
+ )
271
+ post_metrics_data = AgentMetrics(
272
+ agent_name="post_analyst_test",
273
+ analysis_summary="Video posts performed best. Engagement rate is 3%.",
274
+ time_series_metrics=[TimeSeriesMetric(metric_name="dummy_engagement", values=[0.03,0.035], timestamps=["2023-01","2023-02"], unit="%")],
275
+ aggregate_metrics={"avg_engagement_rate_overall": 0.032},
276
+ categorical_metrics={"top_media_type": "VIDEO"},
277
+ time_periods_covered=["2023-01", "2023-02"],
278
+ key_insights=["Video content is key for engagement."]
279
+ )
280
+ mentions_metrics_data = AgentMetrics(
281
+ agent_name="mentions_analyst_test",
282
+ analysis_summary="Mentions are mostly neutral. Sentiment score avg 0.1.",
283
+ time_series_metrics=[TimeSeriesMetric(metric_name="dummy_sentiment_score", values=[0.1,0.12], timestamps=["2023-01","2023-02"])],
284
+ aggregate_metrics={"overall_avg_sentiment": 0.11},
285
+ categorical_metrics={"dominant_sentiment": "Neutral"},
286
+ time_periods_covered=["2023-01", "2023-02"],
287
+ key_insights=["Brand perception is stable but not overly positive."]
288
+ )
289
+
290
+ coordinator = EnhancedEmployerBrandingCoordinator(api_key=MOCK_API_KEY, model_name=MODEL_NAME)
291
+
292
+ async def run_coordination():
293
+ logger.info("Generating comprehensive analysis from dummy metrics...")
294
+ # For local tests without real API calls, the LlmAgent might behave as a mock.
295
+ if MOCK_API_KEY == "test_api_key_coordinator":
296
+ logger.warning("Using a mock API key. Coordinator LlmAgent behavior might be limited or mocked.")
297
+ # Mock the ADK runner for the coordinator's LLM agent if needed
298
+ class MockCoordinatorADKRunner:
299
+ def __init__(self, agent, app_name): self.agent = agent
300
+ async def session_service_create_session(self, app_name, user_id):
301
+ class MockSession: id = "mock_coord_session_id"
302
+ return MockSession()
303
+ async def run(self, user_id, session_id, new_message):
304
+ # Simulate a response from the coordinator LLM
305
+ yield genai_types.Content(parts=[genai_types.Part(text="Mock Coordinator Synthesis Report: Blah blah correlation. Recommendation: Do X.")])
306
+ async def session_service_delete_session(self, app_name, user_id, session_id): pass
307
+
308
+ global InMemoryRunner # Make sure we are modifying the correct InMemoryRunner
309
+ OriginalInMemoryRunnerCoord = InMemoryRunner
310
+ InMemoryRunner = MockCoordinatorADKRunner
311
+
312
+
313
+ report = await coordinator.generate_comprehensive_analysis(
314
+ follower_metrics_data,
315
+ post_metrics_data,
316
+ mentions_metrics_data
317
+ )
318
+
319
+ if MOCK_API_KEY == "test_api_key_coordinator" and 'OriginalInMemoryRunnerCoord' in globals():
320
+ InMemoryRunner = OriginalInMemoryRunnerCoord # Restore
321
+
322
+ print("\n--- EnhancedEmployerBrandingCoordinator Report ---")
323
+ print(report)
324
+
325
+ if __name__ == '__main__': # Inner check
326
+ asyncio.run(run_coordination())