GuglielmoTor commited on
Commit
11f6ec0
·
verified ·
1 Parent(s): 8add36b

Update features/insight_and_tasks/agents/task_extraction_agent.py

Browse files
features/insight_and_tasks/agents/task_extraction_agent.py CHANGED
@@ -1,64 +1,68 @@
1
  # agents/task_extraction_agent.py
 
2
  import logging
 
3
  from typing import Optional
4
- from datetime import datetime, date # Ensure date is imported if used for type hints
5
-
6
  from google.adk.agents import LlmAgent
7
- from google.adk.runners import InMemoryRunner # Assuming this is used for direct agent running
8
- from google.genai import types as genai_types # For constructing ADK agent inputs
9
 
10
  # Project-specific imports
11
  from features.insight_and_tasks.data_models.tasks import (
12
- TaskExtractionOutput,
13
- OKR,
14
- KeyResult,
15
- Task,
16
- EffortLevel,
17
- TimelineCategory,
18
- PriorityLevel,
19
- TaskType,
20
- DataSubject # Ensure all are imported
21
  )
22
- from features.insight_and_tasks.utils.retry_mechanism import RetryMechanism # If retries are needed for ADK calls
23
 
24
  # Configure logger for this module
25
  logger = logging.getLogger(__name__)
26
 
27
- DEFAULT_AGENT_MODEL = "gemini-2.5-flash-preview-05-20" # Or your specific model
28
 
29
  class TaskExtractionAgent:
30
  """
31
  Agent specialized in extracting actionable tasks and OKRs from analysis insights,
32
  with awareness of the current date and quarter.
33
  """
 
34
  AGENT_NAME = "task_extractor"
35
  AGENT_DESCRIPTION = "Specialist in converting strategic insights into specific, time-aware actionable tasks and OKRs."
36
 
37
  def __init__(self, api_key: str, model_name: Optional[str] = None, current_date: Optional[date] = None):
38
  """
39
  Initializes the TaskExtractionAgent.
40
-
41
  Args:
42
  api_key: API key (may be used by LlmAgent configuration or future needs).
43
  model_name: Name of the language model to use.
44
  current_date: The current date to use for quarter calculations. Defaults to today.
45
  """
46
- self.api_key = api_key # Store if needed by LlmAgent or other components
47
  self.model_name = model_name or DEFAULT_AGENT_MODEL
48
- self.current_date = current_date or datetime.utcnow().date() # Use date object for consistency
49
 
50
  # LlmAgent is initialized with dynamic instruction and output schema
51
  self.agent = LlmAgent(
52
  name=self.AGENT_NAME,
53
  model=self.model_name,
54
  description=self.AGENT_DESCRIPTION,
55
- instruction=self._get_instruction_prompt(), # Instruction generated dynamically
56
- output_schema=TaskExtractionOutput, # Pydantic model for structured output
57
- output_key="extracted_tasks_okrs" # Key where LlmAgent stores structured output in state
58
  )
59
- self.retry_mechanism = RetryMechanism() # For retrying ADK runner if needed
 
 
60
  logger.info(f"{self.AGENT_NAME} initialized for Q{self._get_quarter(self.current_date)}, "
61
- f"{self._days_until_quarter_end(self.current_date)} days remaining in quarter. Model: {self.model_name}")
62
 
63
  def _get_quarter(self, d: date) -> int:
64
  """Calculates the quarter for a given date."""
@@ -68,6 +72,7 @@ class TaskExtractionAgent:
68
  """Calculates the number of days remaining in the current quarter from date d."""
69
  current_q = self._get_quarter(d)
70
  year = d.year
 
71
  if current_q == 1:
72
  quarter_end_date = date(year, 3, 31)
73
  elif current_q == 2:
@@ -78,111 +83,266 @@ class TaskExtractionAgent:
78
  quarter_end_date = date(year, 12, 31)
79
 
80
  days_remaining = (quarter_end_date - d).days
81
- return max(0, days_remaining) # Ensure non-negative
82
 
83
  def _get_instruction_prompt(self) -> str:
84
  """Generates the dynamic instruction string for the LlmAgent."""
85
  quarter = self._get_quarter(self.current_date)
86
  days_remaining = self._days_until_quarter_end(self.current_date)
87
-
88
- # Dynamically include Pydantic model field descriptions for better LLM guidance
89
- # This part can be complex if done fully automatically. For now, manually summarizing key fields.
90
- task_fields_summary = (
91
- "Each Task must include: task_category (e.g., Content Strategy), task_description, "
92
- "objective_deliverable, effort (Small, Medium, Large), timeline (Immediate, Short-term, Medium-term, Long-term), "
93
- "responsible_party, success_criteria_metrics, dependencies_prerequisites (optional), "
94
- "priority (High, Medium, Low) with priority_justification, why_proposed (linking to analysis), "
95
- "task_type (initiative or tracking), data_subject (for tracking tasks: follower_stats, posts, mentions, general)."
96
- )
97
 
98
  return f"""
99
- You are a Time-Aware Task Extraction Specialist. Your primary function is to meticulously analyze strategic insights
100
- derived from LinkedIn analytics and transform them into a structured set of actionable tasks. These tasks should be
101
- organized within an Objectives and Key Results (OKRs) framework.
102
-
103
- CURRENT CONTEXTUAL INFORMATION (DO NOT CHANGE THIS IN YOUR OUTPUT):
104
- - Current Quarter: Q{quarter}
105
- - Days remaining in current quarter: {days_remaining}
106
- - Today's Date (for context): {self.current_date.isoformat()}
107
-
108
- YOUR MANDATE:
109
- 1. Define clear, aspirational Objectives (qualitative goals).
110
- 2. For each Objective, formulate 2-3 specific, measurable Key Results.
111
- 3. Under each Key Result, list detailed, actionable Tasks required to achieve it.
112
- 4. CRITICAL: Each Task MUST strictly adhere to the 'Task' Pydantic model fields. This means providing values for ALL required fields: {task_fields_summary}
113
- 5. Task Timelines: Must be realistic given the {days_remaining} days left in Q{quarter}. Prioritize actions that can make significant progress or be completed within this timeframe. Use TimelineCategory enum values.
114
- 6. Task Types: Clearly distinguish between 'initiative' (new actions/projects) and 'tracking' (ongoing monitoring/measurement).
115
- 7. Data Subjects: For 'tracking' tasks, accurately specify the relevant 'data_subject'. For 'initiative' tasks, this can be 'general' or null if not specific to one data type.
116
- 8. Rationale ('why_proposed'): This is crucial. Each task's proposal must be explicitly justified by and linked back to specific findings, trends, or recommendations mentioned in the input 'comprehensive_analysis'.
117
- 9. Priority: Assign a priority (High, Medium, Low) to each task and provide a 'priority_justification'.
118
-
119
- INPUT: You will receive a 'comprehensive_analysis' text.
120
-
121
- OUTPUT FORMAT:
122
- You MUST return a single JSON object that strictly conforms to the 'TaskExtractionOutput' Pydantic schema.
123
- This JSON object will contain:
124
- - 'current_quarter_info': A string exactly like "Q{quarter}, {days_remaining} days remaining". (This is fixed based on the context above).
125
- - 'okrs': A list, where each item is an 'OKR' object.
126
- - 'overall_strategic_focus': (Optional) A brief summary of the main strategic themes emerging from the OKRs.
127
- - 'generation_timestamp': (This will be auto-filled if you conform to the schema, or you can provide an ISO timestamp).
128
-
129
- Example of a Task (ensure all fields from the Pydantic model are covered):
130
  {{
131
- "task_category": "Content Creation",
132
- "task_description": "Launch a 3-part blog series on AI in Marketing.",
133
- "objective_deliverable": "Objective: Increase thought leadership in AI marketing. Deliverable: 3 published blog posts.",
134
- "effort": "Medium",
135
- "timeline": "Short-term",
136
- "responsible_party": "Content Team Lead",
137
- "success_criteria_metrics": "Average 500 views per post, 10+ shares per post.",
138
- "dependencies_prerequisites": "Keyword research for AI marketing topics completed.",
139
- "priority": "High",
140
- "priority_justification": "Addresses key strategic goal of establishing AI expertise.",
141
- "why_proposed": "Analysis highlighted a gap in content related to AI, a high-interest area for our target audience.",
142
- "task_type": "initiative",
143
- "data_subject": "general"
 
 
 
 
 
 
 
144
  }}
145
-
146
- Focus on quality, actionability, and strict adherence to the output schema.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
148
 
149
  async def extract_tasks(self, comprehensive_analysis: str) -> TaskExtractionOutput:
150
  """
151
  Extracts time-aware actionable tasks from the comprehensive analysis text.
152
-
153
  Args:
154
  comprehensive_analysis: The text analysis from which to extract tasks.
155
-
156
  Returns:
157
  A TaskExtractionOutput Pydantic model instance.
158
  """
159
  if not comprehensive_analysis or not comprehensive_analysis.strip():
160
  logger.warning("Comprehensive analysis text is empty. Cannot extract tasks.")
161
- return TaskExtractionOutput(
162
- current_quarter_info=f"Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining",
163
- okrs=[],
164
- overall_strategic_focus="No analysis provided to extract tasks."
165
- )
166
 
167
- # The LlmAgent's instruction already contains the dynamic date info and output format.
168
- # The input to the agent's run method will be the comprehensive_analysis.
 
 
169
  prompt_for_adk_agent = f"""
170
- Comprehensive Analysis for Task Extraction:
171
- ---
172
- {comprehensive_analysis}
173
- ---
174
- Based on the analysis above, and adhering strictly to your primary instructions (especially regarding current quarter context, task field requirements, and JSON output schema 'TaskExtractionOutput'), generate the OKRs and tasks.
175
- Ensure the 'current_quarter_info' field in your JSON output is exactly: "Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining".
176
- """
 
 
 
 
 
 
 
 
 
177
 
178
  user_input_content = genai_types.Content(
179
  role="user",
180
  parts=[genai_types.Part(text=prompt_for_adk_agent)]
181
  )
182
 
183
- # Using InMemoryRunner as per original structure for LlmAgent with output_schema
184
  runner = InMemoryRunner(agent=self.agent, app_name=f"{self.AGENT_NAME}Runner")
185
- # Generate a unique user_id for each run to ensure fresh session state if needed.
186
  user_id = f"system_user_task_extractor_{int(datetime.utcnow().timestamp())}"
187
 
188
  session = await runner.session_service.create_session(
@@ -191,21 +351,19 @@ class TaskExtractionAgent:
191
  )
192
 
193
  extracted_data_dict = None
194
- full_response_text_for_debug = "" # To capture raw text if parsing fails
195
 
196
  try:
197
  logger.info(f"Running TaskExtractionAgent for user_id: {user_id}, session_id: {session.id}")
198
 
199
- # Fix: Use regular for loop instead of async for, since runner.run() returns a generator
200
  run_result = runner.run(
201
  user_id=user_id,
202
  session_id=session.id,
203
  new_message=user_input_content
204
  )
205
-
206
- # Check if it's an async iterator or regular generator
207
  if hasattr(run_result, '__aiter__'):
208
- # It's an async iterator, use async for
209
  async for event in run_result:
210
  if (hasattr(event, 'actions') and event.actions and
211
  hasattr(event.actions, 'state_delta') and
@@ -213,16 +371,15 @@ class TaskExtractionAgent:
213
  self.agent.output_key in event.actions.state_delta):
214
 
215
  extracted_data_dict = event.actions.state_delta[self.agent.output_key]
216
- logger.info(f"Successfully extracted structured data via LlmAgent state_delta.")
217
  break
218
-
219
- # Capture text parts for debugging if direct structured output isn't found first
220
  if hasattr(event, 'content') and event.content and event.content.parts:
221
  for part in event.content.parts:
222
  if hasattr(part, 'text'):
223
- full_response_text_for_debug += part.text
224
  else:
225
- # It's a regular generator, use regular for loop
226
  for event in run_result:
227
  if (hasattr(event, 'actions') and event.actions and
228
  hasattr(event.actions, 'state_delta') and
@@ -230,171 +387,70 @@ class TaskExtractionAgent:
230
  self.agent.output_key in event.actions.state_delta):
231
 
232
  extracted_data_dict = event.actions.state_delta[self.agent.output_key]
233
- logger.info(f"Successfully extracted structured data via LlmAgent state_delta.")
234
  break
235
-
236
- # Capture text parts for debugging if direct structured output isn't found first
237
  if hasattr(event, 'content') and event.content and event.content.parts:
238
  for part in event.content.parts:
239
  if hasattr(part, 'text'):
240
- full_response_text_for_debug += part.text
241
 
 
242
  if not extracted_data_dict and full_response_text_for_debug:
243
- logger.warning("LlmAgent did not produce structured output in state_delta. Raw text response was: %s",
244
- full_response_text_for_debug[:500] + "...")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
245
 
246
  except Exception as e:
247
  logger.error(f"Error during TaskExtractionAgent execution: {e}", exc_info=True)
248
  finally:
249
  try:
250
  await runner.session_service.delete_session(
251
- app_name=f"{self.AGENT_NAME}Runner", user_id=user_id, session_id=session.id
 
 
252
  )
253
  except Exception as session_del_e:
254
  logger.error(f"Error deleting task extractor session: {session_del_e}")
255
 
 
256
  if extracted_data_dict:
257
- if isinstance(extracted_data_dict, TaskExtractionOutput): # Already a Pydantic model
258
- return extracted_data_dict
259
- elif isinstance(extracted_data_dict, dict): # If it's a dict, parse it
260
- try:
261
- return TaskExtractionOutput(**extracted_data_dict)
262
- except Exception as pydantic_error:
263
- logger.error(f"Error parsing extracted dictionary into TaskExtractionOutput: {pydantic_error}", exc_info=True)
264
- logger.error(f"Problematic dictionary data: {extracted_data_dict}")
265
- else:
266
- logger.error(f"Extracted data is not a dictionary or TaskExtractionOutput model: {type(extracted_data_dict)}")
267
-
268
- # Fallback if no valid data extracted
269
- logger.warning("No valid structured data extracted by TaskExtractionAgent.")
270
- return TaskExtractionOutput(
271
- current_quarter_info=f"Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining",
272
- okrs=[],
273
- overall_strategic_focus="Failed to extract tasks or no tasks were identified.",
274
- generation_timestamp=datetime.utcnow().isoformat()
275
- )
276
-
277
- def update_current_date(self, new_date: date):
278
- """
279
- Updates the current date for the agent and re-initializes the LlmAgent
280
- to reflect the new date context in its instructions.
281
- """
282
- self.current_date = new_date
283
- # Re-initialize the LlmAgent with the new instruction based on the new date
284
- self.agent = LlmAgent(
285
- name=self.AGENT_NAME,
286
- model=self.model_name,
287
- description=self.AGENT_DESCRIPTION,
288
- instruction=self._get_instruction_prompt(), # Get updated instruction
289
- output_schema=TaskExtractionOutput,
290
- output_key="extracted_tasks_okrs"
291
- )
292
- logger.info(f"{self.AGENT_NAME} date updated. New context: Q{self._get_quarter(self.current_date)}, "
293
- f"{self._days_until_quarter_end(self.current_date)} days remaining.")
294
-
295
-
296
- if __name__ == '__main__':
297
- import asyncio
298
- # (Ensure logging_config.py is in the same directory or PYTHONPATH is set for this example to run standalone)
299
- try:
300
- from utils.logging_config import setup_logging
301
- setup_logging()
302
- logger.info("Logging setup for TaskExtractionAgent test.")
303
- except ImportError:
304
- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
305
- logger.warning("logging_config.py not found, using basicConfig for logging.")
306
-
307
- MOCK_API_KEY = os.environ.get("GOOGLE_API_KEY", "test_api_key_task_extractor") # Use your actual key or env var
308
- MODEL_NAME = DEFAULT_AGENT_MODEL
309
-
310
- # Example comprehensive analysis text (replace with actual analysis output)
311
- sample_analysis_text = """
312
- Overall Summary: Follower growth is steady at 5% MoM. Post engagement is highest for video content
313
- (avg 8% engagement rate) published on weekdays. However, mentions sentiment dipped in the last month
314
- (-0.2 avg score) due to complaints about customer service response times.
315
- Key opportunity: Improve customer service communication and leverage video content more effectively.
316
- Strategic Recommendation: Launch a 'Customer First' initiative and create a video series showcasing customer success stories.
317
- """
318
 
319
- # Test with a specific date
320
- test_date = date(2025, 4, 15) # Example: Mid-Q2 2025
321
- task_agent = TaskExtractionAgent(api_key=MOCK_API_KEY, model_name=MODEL_NAME, current_date=test_date)
322
-
323
- logger.info(f"Task Agent Instruction for test_date {test_date}:\n{task_agent._get_instruction_prompt()[:500]}...")
324
-
325
- async def run_extraction():
326
- logger.info("Extracting tasks from sample analysis...")
327
- # In a real scenario, ensure GOOGLE_API_KEY is set if the LlmAgent makes actual calls.
328
- # For local tests without real API calls, the LlmAgent might behave as a mock or require specific test setup.
329
- if MOCK_API_KEY == "test_api_key_task_extractor":
330
- logger.warning("Using a mock API key. LlmAgent behavior might be limited or mocked for task extraction.")
331
- # Mock the runner if no real API call should be made
332
- class MockADKRunner:
333
- def __init__(self, agent, app_name): self.agent = agent
334
- async def session_service_create_session(self, app_name, user_id):
335
- class MockSession: id = "mock_session_id"
336
- return MockSession()
337
- async def run(self, user_id, session_id, new_message):
338
- # Simulate a response structure
339
- mock_okr = OKR(
340
- objective_description="Improve Customer Satisfaction",
341
- key_results=[KeyResult(
342
- key_result_description="Reduce negative mentions by 10%",
343
- tasks=[Task(
344
- task_category="Customer Service", task_description="Respond to all negative mentions within 2 hours.",
345
- objective_deliverable="Improved response time.", effort=EffortLevel.MEDIUM, timeline=TimelineCategory.IMMEDIATE,
346
- responsible_party="Support Team", success_criteria_metrics="Avg response time < 2hrs.",
347
- priority=PriorityLevel.HIGH, priority_justification="Critical for reputation.",
348
- why_proposed="Analysis showed dip in sentiment due to slow responses.", task_type=TaskType.INITIATIVE,
349
- data_subject=DataSubject.MENTIONS
350
- )]
351
- )],
352
- objective_timeline=TimelineCategory.SHORT_TERM
353
- )
354
- mock_output = TaskExtractionOutput(
355
- current_quarter_info=f"Q{task_agent._get_quarter(task_agent.current_date)}, {task_agent._days_until_quarter_end(task_agent.current_date)} days remaining",
356
- okrs=[mock_okr],
357
- overall_strategic_focus="Focus on customer service improvement."
358
- )
359
- # Simulate the event structure LlmAgent with output_schema would produce
360
- class MockEvent:
361
- def __init__(self):
362
- self.actions = type('Actions', (), {'state_delta': {task_agent.agent.output_key: mock_output.model_dump()}})() # .model_dump() for Pydantic v2
363
- yield MockEvent()
364
-
365
- async def session_service_delete_session(self, app_name, user_id, session_id): pass
366
-
367
- # Monkey patch the InMemoryRunner for this test if using mock key
368
- global InMemoryRunner
369
- OriginalInMemoryRunner = InMemoryRunner
370
- InMemoryRunner = MockADKRunner
371
-
372
-
373
- extracted_okrs_output = await task_agent.extract_tasks(sample_analysis_text)
374
-
375
- # Restore InMemoryRunner if it was patched
376
- if MOCK_API_KEY == "test_api_key_task_extractor" and 'OriginalInMemoryRunner' in globals():
377
- InMemoryRunner = OriginalInMemoryRunner
378
-
379
-
380
- print("\n--- TaskExtractionAgent Results ---")
381
- if extracted_okrs_output:
382
- print(f"Current Quarter Info: {extracted_okrs_output.current_quarter_info}")
383
- print(f"Overall Strategic Focus: {extracted_okrs_output.overall_strategic_focus}")
384
- print(f"Generated Timestamp: {extracted_okrs_output.generation_timestamp}")
385
- print("\nOKRs Extracted:")
386
- # Use .model_dump_json() for Pydantic v2 for pretty printing
387
- print(extracted_okrs_output.model_dump_json(indent=2))
388
- else:
389
- print("No OKRs extracted or an error occurred.")
390
-
391
- if __name__ == '__main__': # This check is technically inside another if __name__ == '__main__'
392
- asyncio.run(run_extraction())
393
-
394
- # Example of updating date
395
- logger.info("\n--- Updating date for Task Agent ---")
396
- new_test_date = date(2025, 10, 5) # Q4
397
- task_agent.update_current_date(new_test_date)
398
- # The instruction within task_agent.agent is now updated.
399
- # logger.info(f"Task Agent NEW Instruction for test_date {new_test_date}:\n{task_agent.agent.instruction[:500]}...")
400
- # A new call to extract_tasks would use this updated context.
 
1
  # agents/task_extraction_agent.py
2
+
3
  import logging
4
+ import json
5
  from typing import Optional
6
+ from datetime import datetime, date
 
7
  from google.adk.agents import LlmAgent
8
+ from google.adk.runners import InMemoryRunner
9
+ from google.genai import types as genai_types
10
 
11
  # Project-specific imports
12
  from features.insight_and_tasks.data_models.tasks import (
13
+ TaskExtractionOutput,
14
+ OKR,
15
+ KeyResult,
16
+ Task,
17
+ EffortLevel,
18
+ TimelineCategory,
19
+ PriorityLevel,
20
+ TaskType,
21
+ DataSubject
22
  )
23
+ from features.insight_and_tasks.utils.retry_mechanism import RetryMechanism
24
 
25
  # Configure logger for this module
26
  logger = logging.getLogger(__name__)
27
 
28
+ DEFAULT_AGENT_MODEL = "gemini-2.5-flash-preview-05-20"
29
 
30
  class TaskExtractionAgent:
31
  """
32
  Agent specialized in extracting actionable tasks and OKRs from analysis insights,
33
  with awareness of the current date and quarter.
34
  """
35
+
36
  AGENT_NAME = "task_extractor"
37
  AGENT_DESCRIPTION = "Specialist in converting strategic insights into specific, time-aware actionable tasks and OKRs."
38
 
39
  def __init__(self, api_key: str, model_name: Optional[str] = None, current_date: Optional[date] = None):
40
  """
41
  Initializes the TaskExtractionAgent.
42
+
43
  Args:
44
  api_key: API key (may be used by LlmAgent configuration or future needs).
45
  model_name: Name of the language model to use.
46
  current_date: The current date to use for quarter calculations. Defaults to today.
47
  """
48
+ self.api_key = api_key
49
  self.model_name = model_name or DEFAULT_AGENT_MODEL
50
+ self.current_date = current_date or datetime.utcnow().date()
51
 
52
  # LlmAgent is initialized with dynamic instruction and output schema
53
  self.agent = LlmAgent(
54
  name=self.AGENT_NAME,
55
  model=self.model_name,
56
  description=self.AGENT_DESCRIPTION,
57
+ instruction=self._get_instruction_prompt(),
58
+ output_schema=TaskExtractionOutput,
59
+ output_key="extracted_tasks_okrs"
60
  )
61
+
62
+ self.retry_mechanism = RetryMechanism()
63
+
64
  logger.info(f"{self.AGENT_NAME} initialized for Q{self._get_quarter(self.current_date)}, "
65
+ f"{self._days_until_quarter_end(self.current_date)} days remaining in quarter. Model: {self.model_name}")
66
 
67
  def _get_quarter(self, d: date) -> int:
68
  """Calculates the quarter for a given date."""
 
72
  """Calculates the number of days remaining in the current quarter from date d."""
73
  current_q = self._get_quarter(d)
74
  year = d.year
75
+
76
  if current_q == 1:
77
  quarter_end_date = date(year, 3, 31)
78
  elif current_q == 2:
 
83
  quarter_end_date = date(year, 12, 31)
84
 
85
  days_remaining = (quarter_end_date - d).days
86
+ return max(0, days_remaining)
87
 
88
  def _get_instruction_prompt(self) -> str:
89
  """Generates the dynamic instruction string for the LlmAgent."""
90
  quarter = self._get_quarter(self.current_date)
91
  days_remaining = self._days_until_quarter_end(self.current_date)
 
 
 
 
 
 
 
 
 
 
92
 
93
  return f"""
94
+ You are a Time-Aware Task Extraction Specialist. Your role is to analyze strategic insights from LinkedIn analytics and create structured OKRs with actionable tasks.
95
+
96
+ ## CURRENT CONTEXT (CRITICAL - USE EXACTLY AS SPECIFIED):
97
+ - Current Quarter: Q{quarter}
98
+ - Days remaining in current quarter: {days_remaining}
99
+ - Today's Date: {self.current_date.isoformat()}
100
+ - Required current_quarter_info format: "Q{quarter}, {days_remaining} days remaining"
101
+
102
+ ## MANDATORY OUTPUT STRUCTURE:
103
+ You MUST return a complete JSON object with this exact structure:
104
+
105
+ ```json
106
+ {{
107
+ "current_quarter_info": "Q{quarter}, {days_remaining} days remaining",
108
+ "okrs": [
109
+ {{
110
+ "objective_description": "Clear, aspirational objective statement",
111
+ "key_results": [
 
 
 
 
 
 
 
 
 
 
 
 
 
112
  {{
113
+ "key_result_description": "Specific, measurable key result",
114
+ "target_metric": "Metric name (e.g., 'Engagement Rate')",
115
+ "target_value": "Target value (e.g., '15% increase')",
116
+ "tasks": [
117
+ {{
118
+ "task_category": "Category name",
119
+ "task_description": "Specific action to take",
120
+ "objective_deliverable": "What will be delivered",
121
+ "effort": "Small|Medium|Large",
122
+ "timeline": "Immediate|Short-term|Medium-term|Long-term",
123
+ "responsible_party": "Who is responsible",
124
+ "success_criteria_metrics": "How success is measured",
125
+ "dependencies_prerequisites": "What needs to happen first (or null)",
126
+ "priority": "High|Medium|Low",
127
+ "priority_justification": "Why this priority level",
128
+ "why_proposed": "Connection to analysis insights",
129
+ "task_type": "initiative|tracking",
130
+ "data_subject": "follower_stats|posts|mentions|general (or null)"
131
+ }}
132
+ ]
133
  }}
134
+ ],
135
+ "objective_timeline": "Short-term|Medium-term|Long-term",
136
+ "objective_owner": "Team or role responsible"
137
+ }}
138
+ ],
139
+ "overall_strategic_focus": "Summary of main strategic themes",
140
+ "generation_timestamp": "{datetime.utcnow().isoformat()}"
141
+ }}
142
+ ```
143
+
144
+ ## CRITICAL REQUIREMENTS:
145
+
146
+ ### 1. OKR Structure (MANDATORY):
147
+ - Create 2-4 Objectives maximum
148
+ - Each Objective MUST have 2-4 Key Results
149
+ - Each Key Result MUST have 2-5 Tasks
150
+ - NO EMPTY ARRAYS - every section must contain items
151
+
152
+ ### 2. Task Requirements (ALL FIELDS MANDATORY):
153
+ - task_category: Broad theme (e.g., "Content Strategy", "Audience Growth")
154
+ - task_description: Specific action statement
155
+ - objective_deliverable: Clear deliverable description
156
+ - effort: Must be exactly "Small", "Medium", or "Large"
157
+ - timeline: Must be exactly "Immediate", "Short-term", "Medium-term", or "Long-term"
158
+ - responsible_party: Specific role or team
159
+ - success_criteria_metrics: Measurable success indicators
160
+ - dependencies_prerequisites: Prerequisites or null
161
+ - priority: Must be exactly "High", "Medium", or "Low"
162
+ - priority_justification: Brief explanation for priority
163
+ - why_proposed: Direct link to analysis findings
164
+ - task_type: Must be exactly "initiative" or "tracking"
165
+ - data_subject: For tracking tasks, use "follower_stats", "posts", "mentions", or "general"; for initiatives, use "general" or null
166
+
167
+ ### 3. Key Result Requirements:
168
+ - key_result_description: Specific, measurable outcome
169
+ - target_metric: The metric being measured (required)
170
+ - target_value: The target to achieve (required)
171
+ - tasks: Array of tasks (minimum 2 tasks per key result)
172
+
173
+ ### 4. Timeline Considerations:
174
+ Given {days_remaining} days left in Q{quarter}:
175
+ - "Immediate": 1-2 weeks
176
+ - "Short-term": Rest of current quarter
177
+ - "Medium-term": Next quarter (3-6 months)
178
+ - "Long-term": 6+ months
179
+
180
+ ### 5. Task Type Guidelines:
181
+ - "initiative": New projects, campaigns, process changes
182
+ - "tracking": Monitoring, measurement, ongoing analysis
183
+
184
+ ### 6. Priority Assignment Logic:
185
+ - High: Critical for quarter goals, high impact, urgent
186
+ - Medium: Important but not critical, moderate impact
187
+ - Low: Nice to have, low impact, can be delayed
188
+
189
+ ## VALIDATION CHECKLIST:
190
+ Before submitting your response, verify:
191
+ □ JSON is valid and complete
192
+ □ current_quarter_info matches exact format required
193
+ □ Every OKR has 2-4 key results
194
+ □ Every key result has 2-5 tasks with ALL required fields
195
+ □ All enum values match exactly (case-sensitive)
196
+ □ All tasks have clear connection to analysis in why_proposed
197
+ □ Target metrics and values are specific and measurable
198
+ □ Timeline assignments are realistic for remaining quarter days
199
+
200
+ ## ERROR PREVENTION:
201
+ - Double-check all field names match the schema exactly
202
+ - Ensure no null values where fields are required
203
+ - Verify all enum values are spelled correctly
204
+ - Make sure every key result has tasks (never empty array)
205
+ - Confirm JSON syntax is valid
206
+
207
+ Your response must be ONLY the JSON object, no additional text or formatting.
208
+ """
209
+
210
+ def _validate_extracted_data(self, data: dict) -> bool:
211
+ """
212
+ Validates the extracted data structure before creating TaskExtractionOutput.
213
+
214
+ Args:
215
+ data: Dictionary containing extracted data
216
+
217
+ Returns:
218
+ bool: True if data is valid, False otherwise
219
  """
220
+ try:
221
+ # Check required top-level fields
222
+ if not isinstance(data.get('okrs'), list):
223
+ logger.error("Missing or invalid 'okrs' field")
224
+ return False
225
+
226
+ if not data.get('current_quarter_info'):
227
+ logger.error("Missing 'current_quarter_info' field")
228
+ return False
229
+
230
+ # Validate each OKR
231
+ for i, okr in enumerate(data['okrs']):
232
+ if not isinstance(okr.get('key_results'), list) or len(okr.get('key_results', [])) == 0:
233
+ logger.error(f"OKR {i} has empty or missing key_results")
234
+ return False
235
+
236
+ # Validate each key result
237
+ for j, kr in enumerate(okr['key_results']):
238
+ if not isinstance(kr.get('tasks'), list) or len(kr.get('tasks', [])) == 0:
239
+ logger.error(f"OKR {i}, Key Result {j} has empty or missing tasks")
240
+ return False
241
+
242
+ # Validate each task has required fields
243
+ for k, task in enumerate(kr['tasks']):
244
+ required_fields = [
245
+ 'task_category', 'task_description', 'objective_deliverable',
246
+ 'effort', 'timeline', 'responsible_party', 'success_criteria_metrics',
247
+ 'priority', 'priority_justification', 'why_proposed', 'task_type'
248
+ ]
249
+
250
+ for field in required_fields:
251
+ if not task.get(field):
252
+ logger.error(f"Task {k} in OKR {i}, KR {j} missing required field: {field}")
253
+ return False
254
+
255
+ return True
256
+
257
+ except Exception as e:
258
+ logger.error(f"Error validating extracted data: {e}")
259
+ return False
260
+
261
+ def _create_fallback_output(self, analysis_summary: str = "") -> TaskExtractionOutput:
262
+ """Creates a fallback output with at least one complete OKR structure."""
263
+ quarter = self._get_quarter(self.current_date)
264
+ days_remaining = self._days_until_quarter_end(self.current_date)
265
+
266
+ fallback_task = Task(
267
+ task_category="Analysis Review",
268
+ task_description="Conduct comprehensive review of LinkedIn analytics insights",
269
+ objective_deliverable="Complete analysis review report with actionable recommendations",
270
+ effort=EffortLevel.MEDIUM,
271
+ timeline=TimelineCategory.SHORT_TERM,
272
+ responsible_party="Social Media Manager",
273
+ success_criteria_metrics="Review completed within 1 week, 3+ actionable insights identified",
274
+ dependencies_prerequisites=None,
275
+ priority=PriorityLevel.HIGH,
276
+ priority_justification="Required to understand current performance and identify improvement areas",
277
+ why_proposed="Based on provided comprehensive analysis requiring strategic review",
278
+ task_type=TaskType.INITIATIVE,
279
+ data_subject=DataSubject.GENERAL
280
+ )
281
+
282
+ fallback_key_result = KeyResult(
283
+ key_result_description="Complete strategic analysis review and identify improvement opportunities",
284
+ target_metric="Analysis Completion Rate",
285
+ target_value="100% within 1 week",
286
+ tasks=[fallback_task]
287
+ )
288
+
289
+ fallback_okr = OKR(
290
+ objective_description="Establish baseline understanding of current LinkedIn performance",
291
+ key_results=[fallback_key_result],
292
+ objective_timeline=TimelineCategory.SHORT_TERM,
293
+ objective_owner="Social Media Team"
294
+ )
295
+
296
+ return TaskExtractionOutput(
297
+ current_quarter_info=f"Q{quarter}, {days_remaining} days remaining",
298
+ okrs=[fallback_okr],
299
+ overall_strategic_focus=f"Focus on analysis review and strategic planning. {analysis_summary}".strip(),
300
+ generation_timestamp=datetime.utcnow().isoformat()
301
+ )
302
 
303
  async def extract_tasks(self, comprehensive_analysis: str) -> TaskExtractionOutput:
304
  """
305
  Extracts time-aware actionable tasks from the comprehensive analysis text.
306
+
307
  Args:
308
  comprehensive_analysis: The text analysis from which to extract tasks.
309
+
310
  Returns:
311
  A TaskExtractionOutput Pydantic model instance.
312
  """
313
  if not comprehensive_analysis or not comprehensive_analysis.strip():
314
  logger.warning("Comprehensive analysis text is empty. Cannot extract tasks.")
315
+ return self._create_fallback_output("No analysis provided")
 
 
 
 
316
 
317
+ # Create more structured prompt
318
+ quarter = self._get_quarter(self.current_date)
319
+ days_remaining = self._days_until_quarter_end(self.current_date)
320
+
321
  prompt_for_adk_agent = f"""
322
+ ANALYSIS TO PROCESS:
323
+ {comprehensive_analysis}
324
+
325
+ INSTRUCTIONS:
326
+ Based on the analysis above, create OKRs following the exact JSON structure specified in your system instructions.
327
+
328
+ CRITICAL REMINDERS:
329
+ - current_quarter_info must be exactly: "Q{quarter}, {days_remaining} days remaining"
330
+ - Every OKR must have 2-4 key results
331
+ - Every key result must have 2-5 tasks
332
+ - All tasks must include ALL required fields
333
+ - All enum values must match exactly (case-sensitive)
334
+ - Response must be ONLY valid JSON, no additional text
335
+
336
+ Generate the complete JSON structure now:
337
+ """
338
 
339
  user_input_content = genai_types.Content(
340
  role="user",
341
  parts=[genai_types.Part(text=prompt_for_adk_agent)]
342
  )
343
 
344
+ # Using InMemoryRunner for LlmAgent
345
  runner = InMemoryRunner(agent=self.agent, app_name=f"{self.AGENT_NAME}Runner")
 
346
  user_id = f"system_user_task_extractor_{int(datetime.utcnow().timestamp())}"
347
 
348
  session = await runner.session_service.create_session(
 
351
  )
352
 
353
  extracted_data_dict = None
354
+ full_response_text_for_debug = ""
355
 
356
  try:
357
  logger.info(f"Running TaskExtractionAgent for user_id: {user_id}, session_id: {session.id}")
358
 
 
359
  run_result = runner.run(
360
  user_id=user_id,
361
  session_id=session.id,
362
  new_message=user_input_content
363
  )
364
+
365
+ # Handle both async and sync iterators
366
  if hasattr(run_result, '__aiter__'):
 
367
  async for event in run_result:
368
  if (hasattr(event, 'actions') and event.actions and
369
  hasattr(event.actions, 'state_delta') and
 
371
  self.agent.output_key in event.actions.state_delta):
372
 
373
  extracted_data_dict = event.actions.state_delta[self.agent.output_key]
374
+ logger.info("Successfully extracted structured data via LlmAgent state_delta.")
375
  break
376
+
377
+ # Capture text for debugging
378
  if hasattr(event, 'content') and event.content and event.content.parts:
379
  for part in event.content.parts:
380
  if hasattr(part, 'text'):
381
+ full_response_text_for_debug += part.text
382
  else:
 
383
  for event in run_result:
384
  if (hasattr(event, 'actions') and event.actions and
385
  hasattr(event.actions, 'state_delta') and
 
387
  self.agent.output_key in event.actions.state_delta):
388
 
389
  extracted_data_dict = event.actions.state_delta[self.agent.output_key]
390
+ logger.info("Successfully extracted structured data via LlmAgent state_delta.")
391
  break
392
+
393
+ # Capture text for debugging
394
  if hasattr(event, 'content') and event.content and event.content.parts:
395
  for part in event.content.parts:
396
  if hasattr(part, 'text'):
397
+ full_response_text_for_debug += part.text
398
 
399
+ # If no structured output, try parsing the text response
400
  if not extracted_data_dict and full_response_text_for_debug:
401
+ logger.info("Attempting to parse JSON from text response")
402
+ try:
403
+ # Clean the response text
404
+ cleaned_text = full_response_text_for_debug.strip()
405
+ if cleaned_text.startswith('```json'):
406
+ cleaned_text = cleaned_text[7:]
407
+ if cleaned_text.endswith('```'):
408
+ cleaned_text = cleaned_text[:-3]
409
+
410
+ parsed_json = json.loads(cleaned_text)
411
+ if self._validate_extracted_data(parsed_json):
412
+ extracted_data_dict = parsed_json
413
+ logger.info("Successfully parsed and validated JSON from text response")
414
+ else:
415
+ logger.warning("Parsed JSON failed validation")
416
+
417
+ except json.JSONDecodeError as je:
418
+ logger.error(f"Failed to parse JSON from text response: {je}")
419
+ logger.error(f"Raw response (first 1000 chars): {full_response_text_for_debug[:1000]}")
420
 
421
  except Exception as e:
422
  logger.error(f"Error during TaskExtractionAgent execution: {e}", exc_info=True)
423
  finally:
424
  try:
425
  await runner.session_service.delete_session(
426
+ app_name=f"{self.AGENT_NAME}Runner",
427
+ user_id=user_id,
428
+ session_id=session.id
429
  )
430
  except Exception as session_del_e:
431
  logger.error(f"Error deleting task extractor session: {session_del_e}")
432
 
433
+ # Process the extracted data
434
  if extracted_data_dict:
435
+ try:
436
+ if isinstance(extracted_data_dict, TaskExtractionOutput):
437
+ return extracted_data_dict
438
+ elif isinstance(extracted_data_dict, dict):
439
+ # Validate before creating the model
440
+ if self._validate_extracted_data(extracted_data_dict):
441
+ return TaskExtractionOutput(**extracted_data_dict)
442
+ else:
443
+ logger.error("Extracted data failed validation, using fallback")
444
+ return self._create_fallback_output("Invalid extracted data structure")
445
+ else:
446
+ logger.error(f"Extracted data is unexpected type: {type(extracted_data_dict)}")
447
+ return self._create_fallback_output("Unexpected data type")
448
+
449
+ except Exception as pydantic_error:
450
+ logger.error(f"Error parsing extracted dictionary into TaskExtractionOutput: {pydantic_error}", exc_info=True)
451
+ logger.error(f"Problematic dictionary data: {extracted_data_dict}")
452
+ return self._create_fallback_output("Pydantic parsing error")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
453
 
454
+ # Fallback if no valid data extracted
455
+ logger.warning("No valid structured data extracted by TaskExtractionAgent, using fallback")
456
+ return self._create_fallback_output("No structured data extracted")