Spaces:
Running
Running
Update features/insight_and_tasks/agents/task_extraction_agent.py
Browse files
features/insight_and_tasks/agents/task_extraction_agent.py
CHANGED
@@ -1,68 +1,63 @@
|
|
1 |
# agents/task_extraction_agent.py
|
2 |
-
|
3 |
import logging
|
4 |
-
import json
|
5 |
from typing import Optional
|
6 |
-
from datetime import datetime, date
|
|
|
7 |
from google.adk.agents import LlmAgent
|
8 |
-
from google.adk.runners import InMemoryRunner
|
9 |
-
from google.genai import types as genai_types
|
10 |
|
11 |
# Project-specific imports
|
12 |
from features.insight_and_tasks.data_models.tasks import (
|
13 |
-
TaskExtractionOutput,
|
14 |
-
OKR,
|
15 |
-
KeyResult,
|
16 |
-
Task,
|
17 |
-
EffortLevel,
|
18 |
-
TimelineCategory,
|
19 |
-
PriorityLevel,
|
20 |
-
TaskType,
|
21 |
-
DataSubject
|
22 |
)
|
23 |
-
from features.insight_and_tasks.utils.retry_mechanism import RetryMechanism
|
24 |
|
25 |
# Configure logger for this module
|
26 |
logger = logging.getLogger(__name__)
|
27 |
|
28 |
-
DEFAULT_AGENT_MODEL = "gemini-2.5-flash-preview-05-20"
|
29 |
|
30 |
class TaskExtractionAgent:
|
31 |
"""
|
32 |
Agent specialized in extracting actionable tasks and OKRs from analysis insights,
|
33 |
with awareness of the current date and quarter.
|
34 |
"""
|
35 |
-
|
36 |
AGENT_NAME = "task_extractor"
|
37 |
AGENT_DESCRIPTION = "Specialist in converting strategic insights into specific, time-aware actionable tasks and OKRs."
|
38 |
|
39 |
def __init__(self, api_key: str, model_name: Optional[str] = None, current_date: Optional[date] = None):
|
40 |
"""
|
41 |
Initializes the TaskExtractionAgent.
|
42 |
-
|
43 |
Args:
|
44 |
api_key: API key (may be used by LlmAgent configuration or future needs).
|
45 |
model_name: Name of the language model to use.
|
46 |
current_date: The current date to use for quarter calculations. Defaults to today.
|
47 |
"""
|
48 |
-
self.api_key = api_key
|
49 |
self.model_name = model_name or DEFAULT_AGENT_MODEL
|
50 |
-
self.current_date = current_date or datetime.utcnow().date()
|
51 |
|
52 |
# LlmAgent is initialized with dynamic instruction and output schema
|
53 |
self.agent = LlmAgent(
|
54 |
name=self.AGENT_NAME,
|
55 |
model=self.model_name,
|
56 |
description=self.AGENT_DESCRIPTION,
|
57 |
-
instruction=self._get_instruction_prompt(),
|
58 |
-
output_schema=TaskExtractionOutput,
|
59 |
-
output_key="extracted_tasks_okrs"
|
60 |
)
|
61 |
-
|
62 |
-
self.retry_mechanism = RetryMechanism()
|
63 |
-
|
64 |
logger.info(f"{self.AGENT_NAME} initialized for Q{self._get_quarter(self.current_date)}, "
|
65 |
-
|
66 |
|
67 |
def _get_quarter(self, d: date) -> int:
|
68 |
"""Calculates the quarter for a given date."""
|
@@ -72,7 +67,6 @@ class TaskExtractionAgent:
|
|
72 |
"""Calculates the number of days remaining in the current quarter from date d."""
|
73 |
current_q = self._get_quarter(d)
|
74 |
year = d.year
|
75 |
-
|
76 |
if current_q == 1:
|
77 |
quarter_end_date = date(year, 3, 31)
|
78 |
elif current_q == 2:
|
@@ -83,26 +77,46 @@ class TaskExtractionAgent:
|
|
83 |
quarter_end_date = date(year, 12, 31)
|
84 |
|
85 |
days_remaining = (quarter_end_date - d).days
|
86 |
-
return max(0, days_remaining)
|
87 |
|
88 |
def _get_instruction_prompt(self) -> str:
|
89 |
"""Generates the dynamic instruction string for the LlmAgent."""
|
90 |
quarter = self._get_quarter(self.current_date)
|
91 |
days_remaining = self._days_until_quarter_end(self.current_date)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
92 |
|
93 |
return f"""
|
94 |
-
You are a Time-Aware Task Extraction Specialist. Your
|
95 |
-
|
96 |
-
|
97 |
-
|
98 |
-
-
|
99 |
-
-
|
100 |
-
-
|
101 |
-
|
102 |
-
|
103 |
-
|
104 |
-
|
105 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
106 |
{{
|
107 |
"current_quarter_info": "Q{quarter}, {days_remaining} days remaining",
|
108 |
"okrs": [
|
@@ -139,7 +153,6 @@ You MUST return a complete JSON object with this exact structure:
|
|
139 |
"overall_strategic_focus": "Summary of main strategic themes",
|
140 |
"generation_timestamp": "{datetime.utcnow().isoformat()}"
|
141 |
}}
|
142 |
-
```
|
143 |
|
144 |
## CRITICAL REQUIREMENTS:
|
145 |
|
@@ -185,164 +198,44 @@ Given {days_remaining} days left in Q{quarter}:
|
|
185 |
- High: Critical for quarter goals, high impact, urgent
|
186 |
- Medium: Important but not critical, moderate impact
|
187 |
- Low: Nice to have, low impact, can be delayed
|
188 |
-
|
189 |
-
## VALIDATION CHECKLIST:
|
190 |
-
Before submitting your response, verify:
|
191 |
-
□ JSON is valid and complete
|
192 |
-
□ current_quarter_info matches exact format required
|
193 |
-
□ Every OKR has 2-4 key results
|
194 |
-
□ Every key result has 2-5 tasks with ALL required fields
|
195 |
-
□ All enum values match exactly (case-sensitive)
|
196 |
-
□ All tasks have clear connection to analysis in why_proposed
|
197 |
-
□ Target metrics and values are specific and measurable
|
198 |
-
□ Timeline assignments are realistic for remaining quarter days
|
199 |
-
|
200 |
-
## ERROR PREVENTION:
|
201 |
-
- Double-check all field names match the schema exactly
|
202 |
-
- Ensure no null values where fields are required
|
203 |
-
- Verify all enum values are spelled correctly
|
204 |
-
- Make sure every key result has tasks (never empty array)
|
205 |
-
- Confirm JSON syntax is valid
|
206 |
-
|
207 |
-
Your response must be ONLY the JSON object, no additional text or formatting.
|
208 |
-
"""
|
209 |
-
|
210 |
-
def _validate_extracted_data(self, data: dict) -> bool:
|
211 |
-
"""
|
212 |
-
Validates the extracted data structure before creating TaskExtractionOutput.
|
213 |
-
|
214 |
-
Args:
|
215 |
-
data: Dictionary containing extracted data
|
216 |
-
|
217 |
-
Returns:
|
218 |
-
bool: True if data is valid, False otherwise
|
219 |
"""
|
220 |
-
try:
|
221 |
-
# Check required top-level fields
|
222 |
-
if not isinstance(data.get('okrs'), list):
|
223 |
-
logger.error("Missing or invalid 'okrs' field")
|
224 |
-
return False
|
225 |
-
|
226 |
-
if not data.get('current_quarter_info'):
|
227 |
-
logger.error("Missing 'current_quarter_info' field")
|
228 |
-
return False
|
229 |
-
|
230 |
-
# Validate each OKR
|
231 |
-
for i, okr in enumerate(data['okrs']):
|
232 |
-
if not isinstance(okr.get('key_results'), list) or len(okr.get('key_results', [])) == 0:
|
233 |
-
logger.error(f"OKR {i} has empty or missing key_results")
|
234 |
-
return False
|
235 |
-
|
236 |
-
# Validate each key result
|
237 |
-
for j, kr in enumerate(okr['key_results']):
|
238 |
-
if not isinstance(kr.get('tasks'), list) or len(kr.get('tasks', [])) == 0:
|
239 |
-
logger.error(f"OKR {i}, Key Result {j} has empty or missing tasks")
|
240 |
-
return False
|
241 |
-
|
242 |
-
# Validate each task has required fields
|
243 |
-
for k, task in enumerate(kr['tasks']):
|
244 |
-
required_fields = [
|
245 |
-
'task_category', 'task_description', 'objective_deliverable',
|
246 |
-
'effort', 'timeline', 'responsible_party', 'success_criteria_metrics',
|
247 |
-
'priority', 'priority_justification', 'why_proposed', 'task_type'
|
248 |
-
]
|
249 |
-
|
250 |
-
for field in required_fields:
|
251 |
-
if not task.get(field):
|
252 |
-
logger.error(f"Task {k} in OKR {i}, KR {j} missing required field: {field}")
|
253 |
-
return False
|
254 |
-
|
255 |
-
return True
|
256 |
-
|
257 |
-
except Exception as e:
|
258 |
-
logger.error(f"Error validating extracted data: {e}")
|
259 |
-
return False
|
260 |
-
|
261 |
-
def _create_fallback_output(self, analysis_summary: str = "") -> TaskExtractionOutput:
|
262 |
-
"""Creates a fallback output with at least one complete OKR structure."""
|
263 |
-
quarter = self._get_quarter(self.current_date)
|
264 |
-
days_remaining = self._days_until_quarter_end(self.current_date)
|
265 |
-
|
266 |
-
fallback_task = Task(
|
267 |
-
task_category="Analysis Review",
|
268 |
-
task_description="Conduct comprehensive review of LinkedIn analytics insights",
|
269 |
-
objective_deliverable="Complete analysis review report with actionable recommendations",
|
270 |
-
effort=EffortLevel.MEDIUM,
|
271 |
-
timeline=TimelineCategory.SHORT_TERM,
|
272 |
-
responsible_party="Social Media Manager",
|
273 |
-
success_criteria_metrics="Review completed within 1 week, 3+ actionable insights identified",
|
274 |
-
dependencies_prerequisites=None,
|
275 |
-
priority=PriorityLevel.HIGH,
|
276 |
-
priority_justification="Required to understand current performance and identify improvement areas",
|
277 |
-
why_proposed="Based on provided comprehensive analysis requiring strategic review",
|
278 |
-
task_type=TaskType.INITIATIVE,
|
279 |
-
data_subject=DataSubject.GENERAL
|
280 |
-
)
|
281 |
-
|
282 |
-
fallback_key_result = KeyResult(
|
283 |
-
key_result_description="Complete strategic analysis review and identify improvement opportunities",
|
284 |
-
target_metric="Analysis Completion Rate",
|
285 |
-
target_value="100% within 1 week",
|
286 |
-
tasks=[fallback_task]
|
287 |
-
)
|
288 |
-
|
289 |
-
fallback_okr = OKR(
|
290 |
-
objective_description="Establish baseline understanding of current LinkedIn performance",
|
291 |
-
key_results=[fallback_key_result],
|
292 |
-
objective_timeline=TimelineCategory.SHORT_TERM,
|
293 |
-
objective_owner="Social Media Team"
|
294 |
-
)
|
295 |
-
|
296 |
-
return TaskExtractionOutput(
|
297 |
-
current_quarter_info=f"Q{quarter}, {days_remaining} days remaining",
|
298 |
-
okrs=[fallback_okr],
|
299 |
-
overall_strategic_focus=f"Focus on analysis review and strategic planning. {analysis_summary}".strip(),
|
300 |
-
generation_timestamp=datetime.utcnow().isoformat()
|
301 |
-
)
|
302 |
|
303 |
async def extract_tasks(self, comprehensive_analysis: str) -> TaskExtractionOutput:
|
304 |
"""
|
305 |
Extracts time-aware actionable tasks from the comprehensive analysis text.
|
306 |
-
|
307 |
Args:
|
308 |
comprehensive_analysis: The text analysis from which to extract tasks.
|
309 |
-
|
310 |
Returns:
|
311 |
A TaskExtractionOutput Pydantic model instance.
|
312 |
"""
|
313 |
if not comprehensive_analysis or not comprehensive_analysis.strip():
|
314 |
logger.warning("Comprehensive analysis text is empty. Cannot extract tasks.")
|
315 |
-
return
|
|
|
|
|
|
|
|
|
316 |
|
317 |
-
#
|
318 |
-
|
319 |
-
days_remaining = self._days_until_quarter_end(self.current_date)
|
320 |
-
|
321 |
prompt_for_adk_agent = f"""
|
322 |
-
|
323 |
-
|
324 |
-
|
325 |
-
|
326 |
-
Based on the analysis above,
|
327 |
-
|
328 |
-
|
329 |
-
- current_quarter_info must be exactly: "Q{quarter}, {days_remaining} days remaining"
|
330 |
-
- Every OKR must have 2-4 key results
|
331 |
-
- Every key result must have 2-5 tasks
|
332 |
-
- All tasks must include ALL required fields
|
333 |
-
- All enum values must match exactly (case-sensitive)
|
334 |
-
- Response must be ONLY valid JSON, no additional text
|
335 |
-
|
336 |
-
Generate the complete JSON structure now:
|
337 |
-
"""
|
338 |
|
339 |
user_input_content = genai_types.Content(
|
340 |
role="user",
|
341 |
parts=[genai_types.Part(text=prompt_for_adk_agent)]
|
342 |
)
|
343 |
|
344 |
-
# Using InMemoryRunner for LlmAgent
|
345 |
runner = InMemoryRunner(agent=self.agent, app_name=f"{self.AGENT_NAME}Runner")
|
|
|
346 |
user_id = f"system_user_task_extractor_{int(datetime.utcnow().timestamp())}"
|
347 |
|
348 |
session = await runner.session_service.create_session(
|
@@ -351,19 +244,21 @@ Generate the complete JSON structure now:
|
|
351 |
)
|
352 |
|
353 |
extracted_data_dict = None
|
354 |
-
full_response_text_for_debug = ""
|
355 |
|
356 |
try:
|
357 |
logger.info(f"Running TaskExtractionAgent for user_id: {user_id}, session_id: {session.id}")
|
358 |
|
|
|
359 |
run_result = runner.run(
|
360 |
user_id=user_id,
|
361 |
session_id=session.id,
|
362 |
new_message=user_input_content
|
363 |
)
|
364 |
-
|
365 |
-
#
|
366 |
if hasattr(run_result, '__aiter__'):
|
|
|
367 |
async for event in run_result:
|
368 |
if (hasattr(event, 'actions') and event.actions and
|
369 |
hasattr(event.actions, 'state_delta') and
|
@@ -371,15 +266,16 @@ Generate the complete JSON structure now:
|
|
371 |
self.agent.output_key in event.actions.state_delta):
|
372 |
|
373 |
extracted_data_dict = event.actions.state_delta[self.agent.output_key]
|
374 |
-
logger.info("Successfully extracted structured data via LlmAgent state_delta.")
|
375 |
break
|
376 |
-
|
377 |
-
# Capture text for debugging
|
378 |
if hasattr(event, 'content') and event.content and event.content.parts:
|
379 |
for part in event.content.parts:
|
380 |
if hasattr(part, 'text'):
|
381 |
-
|
382 |
else:
|
|
|
383 |
for event in run_result:
|
384 |
if (hasattr(event, 'actions') and event.actions and
|
385 |
hasattr(event.actions, 'state_delta') and
|
@@ -387,70 +283,171 @@ Generate the complete JSON structure now:
|
|
387 |
self.agent.output_key in event.actions.state_delta):
|
388 |
|
389 |
extracted_data_dict = event.actions.state_delta[self.agent.output_key]
|
390 |
-
logger.info("Successfully extracted structured data via LlmAgent state_delta.")
|
391 |
break
|
392 |
-
|
393 |
-
# Capture text for debugging
|
394 |
if hasattr(event, 'content') and event.content and event.content.parts:
|
395 |
for part in event.content.parts:
|
396 |
if hasattr(part, 'text'):
|
397 |
-
|
398 |
|
399 |
-
# If no structured output, try parsing the text response
|
400 |
if not extracted_data_dict and full_response_text_for_debug:
|
401 |
-
logger.
|
402 |
-
|
403 |
-
# Clean the response text
|
404 |
-
cleaned_text = full_response_text_for_debug.strip()
|
405 |
-
if cleaned_text.startswith('```json'):
|
406 |
-
cleaned_text = cleaned_text[7:]
|
407 |
-
if cleaned_text.endswith('```'):
|
408 |
-
cleaned_text = cleaned_text[:-3]
|
409 |
-
|
410 |
-
parsed_json = json.loads(cleaned_text)
|
411 |
-
if self._validate_extracted_data(parsed_json):
|
412 |
-
extracted_data_dict = parsed_json
|
413 |
-
logger.info("Successfully parsed and validated JSON from text response")
|
414 |
-
else:
|
415 |
-
logger.warning("Parsed JSON failed validation")
|
416 |
-
|
417 |
-
except json.JSONDecodeError as je:
|
418 |
-
logger.error(f"Failed to parse JSON from text response: {je}")
|
419 |
-
logger.error(f"Raw response (first 1000 chars): {full_response_text_for_debug[:1000]}")
|
420 |
|
421 |
except Exception as e:
|
422 |
logger.error(f"Error during TaskExtractionAgent execution: {e}", exc_info=True)
|
423 |
finally:
|
424 |
try:
|
425 |
await runner.session_service.delete_session(
|
426 |
-
app_name=f"{self.AGENT_NAME}Runner",
|
427 |
-
user_id=user_id,
|
428 |
-
session_id=session.id
|
429 |
)
|
430 |
except Exception as session_del_e:
|
431 |
logger.error(f"Error deleting task extractor session: {session_del_e}")
|
432 |
|
433 |
-
# Process the extracted data
|
434 |
if extracted_data_dict:
|
435 |
-
|
436 |
-
|
437 |
-
|
438 |
-
|
439 |
-
|
440 |
-
|
441 |
-
|
442 |
-
|
443 |
-
|
444 |
-
|
445 |
-
|
446 |
-
logger.error(f"Extracted data is unexpected type: {type(extracted_data_dict)}")
|
447 |
-
return self._create_fallback_output("Unexpected data type")
|
448 |
-
|
449 |
-
except Exception as pydantic_error:
|
450 |
-
logger.error(f"Error parsing extracted dictionary into TaskExtractionOutput: {pydantic_error}", exc_info=True)
|
451 |
-
logger.error(f"Problematic dictionary data: {extracted_data_dict}")
|
452 |
-
return self._create_fallback_output("Pydantic parsing error")
|
453 |
-
|
454 |
# Fallback if no valid data extracted
|
455 |
-
logger.warning("No valid structured data extracted by TaskExtractionAgent
|
456 |
-
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
# agents/task_extraction_agent.py
|
|
|
2 |
import logging
|
|
|
3 |
from typing import Optional
|
4 |
+
from datetime import datetime, date # Ensure date is imported if used for type hints
|
5 |
+
|
6 |
from google.adk.agents import LlmAgent
|
7 |
+
from google.adk.runners import InMemoryRunner # Assuming this is used for direct agent running
|
8 |
+
from google.genai import types as genai_types # For constructing ADK agent inputs
|
9 |
|
10 |
# Project-specific imports
|
11 |
from features.insight_and_tasks.data_models.tasks import (
|
12 |
+
TaskExtractionOutput,
|
13 |
+
OKR,
|
14 |
+
KeyResult,
|
15 |
+
Task,
|
16 |
+
EffortLevel,
|
17 |
+
TimelineCategory,
|
18 |
+
PriorityLevel,
|
19 |
+
TaskType,
|
20 |
+
DataSubject # Ensure all are imported
|
21 |
)
|
22 |
+
from features.insight_and_tasks.utils.retry_mechanism import RetryMechanism # If retries are needed for ADK calls
|
23 |
|
24 |
# Configure logger for this module
|
25 |
logger = logging.getLogger(__name__)
|
26 |
|
27 |
+
DEFAULT_AGENT_MODEL = "gemini-2.5-flash-preview-05-20" # Or your specific model
|
28 |
|
29 |
class TaskExtractionAgent:
|
30 |
"""
|
31 |
Agent specialized in extracting actionable tasks and OKRs from analysis insights,
|
32 |
with awareness of the current date and quarter.
|
33 |
"""
|
|
|
34 |
AGENT_NAME = "task_extractor"
|
35 |
AGENT_DESCRIPTION = "Specialist in converting strategic insights into specific, time-aware actionable tasks and OKRs."
|
36 |
|
37 |
def __init__(self, api_key: str, model_name: Optional[str] = None, current_date: Optional[date] = None):
|
38 |
"""
|
39 |
Initializes the TaskExtractionAgent.
|
|
|
40 |
Args:
|
41 |
api_key: API key (may be used by LlmAgent configuration or future needs).
|
42 |
model_name: Name of the language model to use.
|
43 |
current_date: The current date to use for quarter calculations. Defaults to today.
|
44 |
"""
|
45 |
+
self.api_key = api_key # Store if needed by LlmAgent or other components
|
46 |
self.model_name = model_name or DEFAULT_AGENT_MODEL
|
47 |
+
self.current_date = current_date or datetime.utcnow().date() # Use date object for consistency
|
48 |
|
49 |
# LlmAgent is initialized with dynamic instruction and output schema
|
50 |
self.agent = LlmAgent(
|
51 |
name=self.AGENT_NAME,
|
52 |
model=self.model_name,
|
53 |
description=self.AGENT_DESCRIPTION,
|
54 |
+
instruction=self._get_instruction_prompt(), # Instruction generated dynamically
|
55 |
+
output_schema=TaskExtractionOutput, # Pydantic model for structured output
|
56 |
+
output_key="extracted_tasks_okrs" # Key where LlmAgent stores structured output in state
|
57 |
)
|
58 |
+
self.retry_mechanism = RetryMechanism() # For retrying ADK runner if needed
|
|
|
|
|
59 |
logger.info(f"{self.AGENT_NAME} initialized for Q{self._get_quarter(self.current_date)}, "
|
60 |
+
f"{self._days_until_quarter_end(self.current_date)} days remaining in quarter. Model: {self.model_name}")
|
61 |
|
62 |
def _get_quarter(self, d: date) -> int:
|
63 |
"""Calculates the quarter for a given date."""
|
|
|
67 |
"""Calculates the number of days remaining in the current quarter from date d."""
|
68 |
current_q = self._get_quarter(d)
|
69 |
year = d.year
|
|
|
70 |
if current_q == 1:
|
71 |
quarter_end_date = date(year, 3, 31)
|
72 |
elif current_q == 2:
|
|
|
77 |
quarter_end_date = date(year, 12, 31)
|
78 |
|
79 |
days_remaining = (quarter_end_date - d).days
|
80 |
+
return max(0, days_remaining) # Ensure non-negative
|
81 |
|
82 |
def _get_instruction_prompt(self) -> str:
|
83 |
"""Generates the dynamic instruction string for the LlmAgent."""
|
84 |
quarter = self._get_quarter(self.current_date)
|
85 |
days_remaining = self._days_until_quarter_end(self.current_date)
|
86 |
+
|
87 |
+
# Dynamically include Pydantic model field descriptions for better LLM guidance
|
88 |
+
# This part can be complex if done fully automatically. For now, manually summarizing key fields.
|
89 |
+
task_fields_summary = (
|
90 |
+
"Each Task must include: task_category (e.g., Content Strategy), task_description, "
|
91 |
+
"objective_deliverable, effort (Small, Medium, Large), timeline (Immediate, Short-term, Medium-term, Long-term), "
|
92 |
+
"responsible_party, success_criteria_metrics, dependencies_prerequisites (optional), "
|
93 |
+
"priority (High, Medium, Low) with priority_justification, why_proposed (linking to analysis), "
|
94 |
+
"task_type (initiative or tracking), data_subject (for tracking tasks: follower_stats, posts, mentions, general)."
|
95 |
+
)
|
96 |
|
97 |
return f"""
|
98 |
+
You are a Time-Aware Task Extraction Specialist. Your primary function is to meticulously analyze strategic insights
|
99 |
+
derived from LinkedIn analytics and transform them into a structured set of actionable tasks. These tasks should be
|
100 |
+
organized within an Objectives and Key Results (OKRs) framework.
|
101 |
+
CURRENT CONTEXTUAL INFORMATION (DO NOT CHANGE THIS IN YOUR OUTPUT):
|
102 |
+
- Current Quarter: Q{quarter}
|
103 |
+
- Days remaining in current quarter: {days_remaining}
|
104 |
+
- Today's Date (for context): {self.current_date.isoformat()}
|
105 |
+
YOUR MANDATE:
|
106 |
+
1. Define clear, aspirational Objectives (qualitative goals).
|
107 |
+
2. For each Objective, formulate 2-3 specific, measurable Key Results.
|
108 |
+
3. Under each Key Result, list detailed, actionable Tasks required to achieve it.
|
109 |
+
4. CRITICAL: Each Task MUST strictly adhere to the 'Task' Pydantic model fields. This means providing values for ALL required fields: {task_fields_summary}
|
110 |
+
5. Task Timelines: Must be realistic given the {days_remaining} days left in Q{quarter}. Prioritize actions that can make significant progress or be completed within this timeframe. Use TimelineCategory enum values.
|
111 |
+
6. Task Types: Clearly distinguish between 'initiative' (new actions/projects) and 'tracking' (ongoing monitoring/measurement).
|
112 |
+
7. Data Subjects: For 'tracking' tasks, accurately specify the relevant 'data_subject'. For 'initiative' tasks, this can be 'general' or null if not specific to one data type.
|
113 |
+
8. Rationale ('why_proposed'): This is crucial. Each task's proposal must be explicitly justified by and linked back to specific findings, trends, or recommendations mentioned in the input 'comprehensive_analysis'.
|
114 |
+
9. Priority: Assign a priority (High, Medium, Low) to each task and provide a 'priority_justification'.
|
115 |
+
INPUT: You will receive a 'comprehensive_analysis' text.
|
116 |
+
OUTPUT FORMAT:
|
117 |
+
You MUST return a single JSON object that strictly conforms to the 'TaskExtractionOutput' Pydantic schema.
|
118 |
+
This JSON object will contain:
|
119 |
+
|
120 |
{{
|
121 |
"current_quarter_info": "Q{quarter}, {days_remaining} days remaining",
|
122 |
"okrs": [
|
|
|
153 |
"overall_strategic_focus": "Summary of main strategic themes",
|
154 |
"generation_timestamp": "{datetime.utcnow().isoformat()}"
|
155 |
}}
|
|
|
156 |
|
157 |
## CRITICAL REQUIREMENTS:
|
158 |
|
|
|
198 |
- High: Critical for quarter goals, high impact, urgent
|
199 |
- Medium: Important but not critical, moderate impact
|
200 |
- Low: Nice to have, low impact, can be delayed
|
201 |
+
Focus on quality, actionability, and strict adherence to the output schema.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
202 |
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
203 |
|
204 |
async def extract_tasks(self, comprehensive_analysis: str) -> TaskExtractionOutput:
|
205 |
"""
|
206 |
Extracts time-aware actionable tasks from the comprehensive analysis text.
|
|
|
207 |
Args:
|
208 |
comprehensive_analysis: The text analysis from which to extract tasks.
|
|
|
209 |
Returns:
|
210 |
A TaskExtractionOutput Pydantic model instance.
|
211 |
"""
|
212 |
if not comprehensive_analysis or not comprehensive_analysis.strip():
|
213 |
logger.warning("Comprehensive analysis text is empty. Cannot extract tasks.")
|
214 |
+
return TaskExtractionOutput(
|
215 |
+
current_quarter_info=f"Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining",
|
216 |
+
okrs=[],
|
217 |
+
overall_strategic_focus="No analysis provided to extract tasks."
|
218 |
+
)
|
219 |
|
220 |
+
# The LlmAgent's instruction already contains the dynamic date info and output format.
|
221 |
+
# The input to the agent's run method will be the comprehensive_analysis.
|
|
|
|
|
222 |
prompt_for_adk_agent = f"""
|
223 |
+
Comprehensive Analysis for Task Extraction:
|
224 |
+
---
|
225 |
+
{comprehensive_analysis}
|
226 |
+
---
|
227 |
+
Based on the analysis above, and adhering strictly to your primary instructions (especially regarding current quarter context, task field requirements, and JSON output schema 'TaskExtractionOutput'), generate the OKRs and tasks.
|
228 |
+
Ensure the 'current_quarter_info' field in your JSON output is exactly: "Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining".
|
229 |
+
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
230 |
|
231 |
user_input_content = genai_types.Content(
|
232 |
role="user",
|
233 |
parts=[genai_types.Part(text=prompt_for_adk_agent)]
|
234 |
)
|
235 |
|
236 |
+
# Using InMemoryRunner as per original structure for LlmAgent with output_schema
|
237 |
runner = InMemoryRunner(agent=self.agent, app_name=f"{self.AGENT_NAME}Runner")
|
238 |
+
# Generate a unique user_id for each run to ensure fresh session state if needed.
|
239 |
user_id = f"system_user_task_extractor_{int(datetime.utcnow().timestamp())}"
|
240 |
|
241 |
session = await runner.session_service.create_session(
|
|
|
244 |
)
|
245 |
|
246 |
extracted_data_dict = None
|
247 |
+
full_response_text_for_debug = "" # To capture raw text if parsing fails
|
248 |
|
249 |
try:
|
250 |
logger.info(f"Running TaskExtractionAgent for user_id: {user_id}, session_id: {session.id}")
|
251 |
|
252 |
+
# Fix: Use regular for loop instead of async for, since runner.run() returns a generator
|
253 |
run_result = runner.run(
|
254 |
user_id=user_id,
|
255 |
session_id=session.id,
|
256 |
new_message=user_input_content
|
257 |
)
|
258 |
+
|
259 |
+
# Check if it's an async iterator or regular generator
|
260 |
if hasattr(run_result, '__aiter__'):
|
261 |
+
# It's an async iterator, use async for
|
262 |
async for event in run_result:
|
263 |
if (hasattr(event, 'actions') and event.actions and
|
264 |
hasattr(event.actions, 'state_delta') and
|
|
|
266 |
self.agent.output_key in event.actions.state_delta):
|
267 |
|
268 |
extracted_data_dict = event.actions.state_delta[self.agent.output_key]
|
269 |
+
logger.info(f"Successfully extracted structured data via LlmAgent state_delta.")
|
270 |
break
|
271 |
+
|
272 |
+
# Capture text parts for debugging if direct structured output isn't found first
|
273 |
if hasattr(event, 'content') and event.content and event.content.parts:
|
274 |
for part in event.content.parts:
|
275 |
if hasattr(part, 'text'):
|
276 |
+
full_response_text_for_debug += part.text
|
277 |
else:
|
278 |
+
# It's a regular generator, use regular for loop
|
279 |
for event in run_result:
|
280 |
if (hasattr(event, 'actions') and event.actions and
|
281 |
hasattr(event.actions, 'state_delta') and
|
|
|
283 |
self.agent.output_key in event.actions.state_delta):
|
284 |
|
285 |
extracted_data_dict = event.actions.state_delta[self.agent.output_key]
|
286 |
+
logger.info(f"Successfully extracted structured data via LlmAgent state_delta.")
|
287 |
break
|
288 |
+
|
289 |
+
# Capture text parts for debugging if direct structured output isn't found first
|
290 |
if hasattr(event, 'content') and event.content and event.content.parts:
|
291 |
for part in event.content.parts:
|
292 |
if hasattr(part, 'text'):
|
293 |
+
full_response_text_for_debug += part.text
|
294 |
|
|
|
295 |
if not extracted_data_dict and full_response_text_for_debug:
|
296 |
+
logger.warning("LlmAgent did not produce structured output in state_delta. Raw text response was: %s",
|
297 |
+
full_response_text_for_debug[:500] + "...")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
298 |
|
299 |
except Exception as e:
|
300 |
logger.error(f"Error during TaskExtractionAgent execution: {e}", exc_info=True)
|
301 |
finally:
|
302 |
try:
|
303 |
await runner.session_service.delete_session(
|
304 |
+
app_name=f"{self.AGENT_NAME}Runner", user_id=user_id, session_id=session.id
|
|
|
|
|
305 |
)
|
306 |
except Exception as session_del_e:
|
307 |
logger.error(f"Error deleting task extractor session: {session_del_e}")
|
308 |
|
|
|
309 |
if extracted_data_dict:
|
310 |
+
if isinstance(extracted_data_dict, TaskExtractionOutput): # Already a Pydantic model
|
311 |
+
return extracted_data_dict
|
312 |
+
elif isinstance(extracted_data_dict, dict): # If it's a dict, parse it
|
313 |
+
try:
|
314 |
+
return TaskExtractionOutput(**extracted_data_dict)
|
315 |
+
except Exception as pydantic_error:
|
316 |
+
logger.error(f"Error parsing extracted dictionary into TaskExtractionOutput: {pydantic_error}", exc_info=True)
|
317 |
+
logger.error(f"Problematic dictionary data: {extracted_data_dict}")
|
318 |
+
else:
|
319 |
+
logger.error(f"Extracted data is not a dictionary or TaskExtractionOutput model: {type(extracted_data_dict)}")
|
320 |
+
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
321 |
# Fallback if no valid data extracted
|
322 |
+
logger.warning("No valid structured data extracted by TaskExtractionAgent.")
|
323 |
+
return TaskExtractionOutput(
|
324 |
+
current_quarter_info=f"Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining",
|
325 |
+
okrs=[],
|
326 |
+
overall_strategic_focus="Failed to extract tasks or no tasks were identified.",
|
327 |
+
generation_timestamp=datetime.utcnow().isoformat()
|
328 |
+
)
|
329 |
+
|
330 |
+
def update_current_date(self, new_date: date):
|
331 |
+
"""
|
332 |
+
Updates the current date for the agent and re-initializes the LlmAgent
|
333 |
+
to reflect the new date context in its instructions.
|
334 |
+
"""
|
335 |
+
self.current_date = new_date
|
336 |
+
# Re-initialize the LlmAgent with the new instruction based on the new date
|
337 |
+
self.agent = LlmAgent(
|
338 |
+
name=self.AGENT_NAME,
|
339 |
+
model=self.model_name,
|
340 |
+
description=self.AGENT_DESCRIPTION,
|
341 |
+
instruction=self._get_instruction_prompt(), # Get updated instruction
|
342 |
+
output_schema=TaskExtractionOutput,
|
343 |
+
output_key="extracted_tasks_okrs"
|
344 |
+
)
|
345 |
+
logger.info(f"{self.AGENT_NAME} date updated. New context: Q{self._get_quarter(self.current_date)}, "
|
346 |
+
f"{self._days_until_quarter_end(self.current_date)} days remaining.")
|
347 |
+
|
348 |
+
|
349 |
+
if __name__ == '__main__':
|
350 |
+
import asyncio
|
351 |
+
# (Ensure logging_config.py is in the same directory or PYTHONPATH is set for this example to run standalone)
|
352 |
+
try:
|
353 |
+
from utils.logging_config import setup_logging
|
354 |
+
setup_logging()
|
355 |
+
logger.info("Logging setup for TaskExtractionAgent test.")
|
356 |
+
except ImportError:
|
357 |
+
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
358 |
+
logger.warning("logging_config.py not found, using basicConfig for logging.")
|
359 |
+
|
360 |
+
MOCK_API_KEY = os.environ.get("GOOGLE_API_KEY", "test_api_key_task_extractor") # Use your actual key or env var
|
361 |
+
MODEL_NAME = DEFAULT_AGENT_MODEL
|
362 |
+
|
363 |
+
# Example comprehensive analysis text (replace with actual analysis output)
|
364 |
+
sample_analysis_text = """
|
365 |
+
Overall Summary: Follower growth is steady at 5% MoM. Post engagement is highest for video content
|
366 |
+
(avg 8% engagement rate) published on weekdays. However, mentions sentiment dipped in the last month
|
367 |
+
(-0.2 avg score) due to complaints about customer service response times.
|
368 |
+
Key opportunity: Improve customer service communication and leverage video content more effectively.
|
369 |
+
Strategic Recommendation: Launch a 'Customer First' initiative and create a video series showcasing customer success stories.
|
370 |
+
"""
|
371 |
+
|
372 |
+
# Test with a specific date
|
373 |
+
test_date = date(2025, 4, 15) # Example: Mid-Q2 2025
|
374 |
+
task_agent = TaskExtractionAgent(api_key=MOCK_API_KEY, model_name=MODEL_NAME, current_date=test_date)
|
375 |
+
|
376 |
+
logger.info(f"Task Agent Instruction for test_date {test_date}:\n{task_agent._get_instruction_prompt()[:500]}...")
|
377 |
+
|
378 |
+
async def run_extraction():
|
379 |
+
logger.info("Extracting tasks from sample analysis...")
|
380 |
+
# In a real scenario, ensure GOOGLE_API_KEY is set if the LlmAgent makes actual calls.
|
381 |
+
# For local tests without real API calls, the LlmAgent might behave as a mock or require specific test setup.
|
382 |
+
if MOCK_API_KEY == "test_api_key_task_extractor":
|
383 |
+
logger.warning("Using a mock API key. LlmAgent behavior might be limited or mocked for task extraction.")
|
384 |
+
# Mock the runner if no real API call should be made
|
385 |
+
class MockADKRunner:
|
386 |
+
def __init__(self, agent, app_name): self.agent = agent
|
387 |
+
async def session_service_create_session(self, app_name, user_id):
|
388 |
+
class MockSession: id = "mock_session_id"
|
389 |
+
return MockSession()
|
390 |
+
async def run(self, user_id, session_id, new_message):
|
391 |
+
# Simulate a response structure
|
392 |
+
mock_okr = OKR(
|
393 |
+
objective_description="Improve Customer Satisfaction",
|
394 |
+
key_results=[KeyResult(
|
395 |
+
key_result_description="Reduce negative mentions by 10%",
|
396 |
+
tasks=[Task(
|
397 |
+
task_category="Customer Service", task_description="Respond to all negative mentions within 2 hours.",
|
398 |
+
objective_deliverable="Improved response time.", effort=EffortLevel.MEDIUM, timeline=TimelineCategory.IMMEDIATE,
|
399 |
+
responsible_party="Support Team", success_criteria_metrics="Avg response time < 2hrs.",
|
400 |
+
priority=PriorityLevel.HIGH, priority_justification="Critical for reputation.",
|
401 |
+
why_proposed="Analysis showed dip in sentiment due to slow responses.", task_type=TaskType.INITIATIVE,
|
402 |
+
data_subject=DataSubject.MENTIONS
|
403 |
+
)]
|
404 |
+
)],
|
405 |
+
objective_timeline=TimelineCategory.SHORT_TERM
|
406 |
+
)
|
407 |
+
mock_output = TaskExtractionOutput(
|
408 |
+
current_quarter_info=f"Q{task_agent._get_quarter(task_agent.current_date)}, {task_agent._days_until_quarter_end(task_agent.current_date)} days remaining",
|
409 |
+
okrs=[mock_okr],
|
410 |
+
overall_strategic_focus="Focus on customer service improvement."
|
411 |
+
)
|
412 |
+
# Simulate the event structure LlmAgent with output_schema would produce
|
413 |
+
class MockEvent:
|
414 |
+
def __init__(self):
|
415 |
+
self.actions = type('Actions', (), {'state_delta': {task_agent.agent.output_key: mock_output.model_dump()}})() # .model_dump() for Pydantic v2
|
416 |
+
yield MockEvent()
|
417 |
+
|
418 |
+
async def session_service_delete_session(self, app_name, user_id, session_id): pass
|
419 |
+
|
420 |
+
# Monkey patch the InMemoryRunner for this test if using mock key
|
421 |
+
global InMemoryRunner
|
422 |
+
OriginalInMemoryRunner = InMemoryRunner
|
423 |
+
InMemoryRunner = MockADKRunner
|
424 |
+
|
425 |
+
|
426 |
+
extracted_okrs_output = await task_agent.extract_tasks(sample_analysis_text)
|
427 |
+
|
428 |
+
# Restore InMemoryRunner if it was patched
|
429 |
+
if MOCK_API_KEY == "test_api_key_task_extractor" and 'OriginalInMemoryRunner' in globals():
|
430 |
+
InMemoryRunner = OriginalInMemoryRunner
|
431 |
+
|
432 |
+
|
433 |
+
print("\n--- TaskExtractionAgent Results ---")
|
434 |
+
if extracted_okrs_output:
|
435 |
+
print(f"Current Quarter Info: {extracted_okrs_output.current_quarter_info}")
|
436 |
+
print(f"Overall Strategic Focus: {extracted_okrs_output.overall_strategic_focus}")
|
437 |
+
print(f"Generated Timestamp: {extracted_okrs_output.generation_timestamp}")
|
438 |
+
print("\nOKRs Extracted:")
|
439 |
+
# Use .model_dump_json() for Pydantic v2 for pretty printing
|
440 |
+
print(extracted_okrs_output.model_dump_json(indent=2))
|
441 |
+
else:
|
442 |
+
print("No OKRs extracted or an error occurred.")
|
443 |
+
|
444 |
+
if __name__ == '__main__': # This check is technically inside another if __name__ == '__main__'
|
445 |
+
asyncio.run(run_extraction())
|
446 |
+
|
447 |
+
# Example of updating date
|
448 |
+
logger.info("\n--- Updating date for Task Agent ---")
|
449 |
+
new_test_date = date(2025, 10, 5) # Q4
|
450 |
+
task_agent.update_current_date(new_test_date)
|
451 |
+
# The instruction within task_agent.agent is now updated.
|
452 |
+
# logger.info(f"Task Agent NEW Instruction for test_date {new_test_date}:\n{task_agent.agent.instruction[:500]}...")
|
453 |
+
# A new call to extract_tasks would use this updated context.
|