GuglielmoTor commited on
Commit
c7fbc19
·
verified ·
1 Parent(s): 3332e5b

Update insight_and_tasks/agents/task_extraction_agent.py

Browse files
insight_and_tasks/agents/task_extraction_agent.py CHANGED
@@ -0,0 +1,387 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # agents/task_extraction_agent.py
2
+ import logging
3
+ from typing import Optional
4
+ from datetime import datetime, date # Ensure date is imported if used for type hints
5
+
6
+ from google.adk.agents import LlmAgent
7
+ from google.adk.runners import InMemoryRunner # Assuming this is used for direct agent running
8
+ from google.genai import types as genai_types # For constructing ADK agent inputs
9
+
10
+ # Project-specific imports
11
+ from data_models.tasks import (
12
+ TaskExtractionOutput,
13
+ OKR,
14
+ KeyResult,
15
+ Task,
16
+ EffortLevel,
17
+ TimelineCategory,
18
+ PriorityLevel,
19
+ TaskType,
20
+ DataSubject # Ensure all are imported
21
+ )
22
+ from utils.retry_mechanism import RetryMechanism # If retries are needed for ADK calls
23
+
24
+ # Configure logger for this module
25
+ logger = logging.getLogger(__name__)
26
+
27
+ DEFAULT_AGENT_MODEL = "gemini-2.5-flash-preview-05-20" # Or your specific model
28
+
29
+ class TaskExtractionAgent:
30
+ """
31
+ Agent specialized in extracting actionable tasks and OKRs from analysis insights,
32
+ with awareness of the current date and quarter.
33
+ """
34
+ AGENT_NAME = "task_extractor"
35
+ AGENT_DESCRIPTION = "Specialist in converting strategic insights into specific, time-aware actionable tasks and OKRs."
36
+
37
+ def __init__(self, api_key: str, model_name: Optional[str] = None, current_date: Optional[date] = None):
38
+ """
39
+ Initializes the TaskExtractionAgent.
40
+
41
+ Args:
42
+ api_key: API key (may be used by LlmAgent configuration or future needs).
43
+ model_name: Name of the language model to use.
44
+ current_date: The current date to use for quarter calculations. Defaults to today.
45
+ """
46
+ self.api_key = api_key # Store if needed by LlmAgent or other components
47
+ self.model_name = model_name or DEFAULT_AGENT_MODEL
48
+ self.current_date = current_date or datetime.utcnow().date() # Use date object for consistency
49
+
50
+ # LlmAgent is initialized with dynamic instruction and output schema
51
+ self.agent = LlmAgent(
52
+ name=self.AGENT_NAME,
53
+ model=self.model_name,
54
+ description=self.AGENT_DESCRIPTION,
55
+ instruction=self._get_instruction_prompt(), # Instruction generated dynamically
56
+ output_schema=TaskExtractionOutput, # Pydantic model for structured output
57
+ output_key="extracted_tasks_okrs" # Key where LlmAgent stores structured output in state
58
+ )
59
+ self.retry_mechanism = RetryMechanism() # For retrying ADK runner if needed
60
+ logger.info(f"{self.AGENT_NAME} initialized for Q{self._get_quarter(self.current_date)}, "
61
+ f"{self._days_until_quarter_end(self.current_date)} days remaining in quarter. Model: {self.model_name}")
62
+
63
+ def _get_quarter(self, d: date) -> int:
64
+ """Calculates the quarter for a given date."""
65
+ return (d.month - 1) // 3 + 1
66
+
67
+ def _days_until_quarter_end(self, d: date) -> int:
68
+ """Calculates the number of days remaining in the current quarter from date d."""
69
+ current_q = self._get_quarter(d)
70
+ year = d.year
71
+ if current_q == 1:
72
+ quarter_end_date = date(year, 3, 31)
73
+ elif current_q == 2:
74
+ quarter_end_date = date(year, 6, 30)
75
+ elif current_q == 3:
76
+ quarter_end_date = date(year, 9, 30)
77
+ else: # Quarter 4
78
+ quarter_end_date = date(year, 12, 31)
79
+
80
+ days_remaining = (quarter_end_date - d).days
81
+ return max(0, days_remaining) # Ensure non-negative
82
+
83
+ def _get_instruction_prompt(self) -> str:
84
+ """Generates the dynamic instruction string for the LlmAgent."""
85
+ quarter = self._get_quarter(self.current_date)
86
+ days_remaining = self._days_until_quarter_end(self.current_date)
87
+
88
+ # Dynamically include Pydantic model field descriptions for better LLM guidance
89
+ # This part can be complex if done fully automatically. For now, manually summarizing key fields.
90
+ task_fields_summary = (
91
+ "Each Task must include: task_category (e.g., Content Strategy), task_description, "
92
+ "objective_deliverable, effort (Small, Medium, Large), timeline (Immediate, Short-term, Medium-term, Long-term), "
93
+ "responsible_party, success_criteria_metrics, dependencies_prerequisites (optional), "
94
+ "priority (High, Medium, Low) with priority_justification, why_proposed (linking to analysis), "
95
+ "task_type (initiative or tracking), data_subject (for tracking tasks: follower_stats, posts, mentions, general)."
96
+ )
97
+
98
+ return f"""
99
+ You are a Time-Aware Task Extraction Specialist. Your primary function is to meticulously analyze strategic insights
100
+ derived from LinkedIn analytics and transform them into a structured set of actionable tasks. These tasks should be
101
+ organized within an Objectives and Key Results (OKRs) framework.
102
+
103
+ CURRENT CONTEXTUAL INFORMATION (DO NOT CHANGE THIS IN YOUR OUTPUT):
104
+ - Current Quarter: Q{quarter}
105
+ - Days remaining in current quarter: {days_remaining}
106
+ - Today's Date (for context): {self.current_date.isoformat()}
107
+
108
+ YOUR MANDATE:
109
+ 1. Define clear, aspirational Objectives (qualitative goals).
110
+ 2. For each Objective, formulate 2-3 specific, measurable Key Results.
111
+ 3. Under each Key Result, list detailed, actionable Tasks required to achieve it.
112
+ 4. CRITICAL: Each Task MUST strictly adhere to the 'Task' Pydantic model fields. This means providing values for ALL required fields: {task_fields_summary}
113
+ 5. Task Timelines: Must be realistic given the {days_remaining} days left in Q{quarter}. Prioritize actions that can make significant progress or be completed within this timeframe. Use TimelineCategory enum values.
114
+ 6. Task Types: Clearly distinguish between 'initiative' (new actions/projects) and 'tracking' (ongoing monitoring/measurement).
115
+ 7. Data Subjects: For 'tracking' tasks, accurately specify the relevant 'data_subject'. For 'initiative' tasks, this can be 'general' or null if not specific to one data type.
116
+ 8. Rationale ('why_proposed'): This is crucial. Each task's proposal must be explicitly justified by and linked back to specific findings, trends, or recommendations mentioned in the input 'comprehensive_analysis'.
117
+ 9. Priority: Assign a priority (High, Medium, Low) to each task and provide a 'priority_justification'.
118
+
119
+ INPUT: You will receive a 'comprehensive_analysis' text.
120
+
121
+ OUTPUT FORMAT:
122
+ You MUST return a single JSON object that strictly conforms to the 'TaskExtractionOutput' Pydantic schema.
123
+ This JSON object will contain:
124
+ - 'current_quarter_info': A string exactly like "Q{quarter}, {days_remaining} days remaining". (This is fixed based on the context above).
125
+ - 'okrs': A list, where each item is an 'OKR' object.
126
+ - 'overall_strategic_focus': (Optional) A brief summary of the main strategic themes emerging from the OKRs.
127
+ - 'generation_timestamp': (This will be auto-filled if you conform to the schema, or you can provide an ISO timestamp).
128
+
129
+ Example of a Task (ensure all fields from the Pydantic model are covered):
130
+ {{
131
+ "task_category": "Content Creation",
132
+ "task_description": "Launch a 3-part blog series on AI in Marketing.",
133
+ "objective_deliverable": "Objective: Increase thought leadership in AI marketing. Deliverable: 3 published blog posts.",
134
+ "effort": "Medium",
135
+ "timeline": "Short-term",
136
+ "responsible_party": "Content Team Lead",
137
+ "success_criteria_metrics": "Average 500 views per post, 10+ shares per post.",
138
+ "dependencies_prerequisites": "Keyword research for AI marketing topics completed.",
139
+ "priority": "High",
140
+ "priority_justification": "Addresses key strategic goal of establishing AI expertise.",
141
+ "why_proposed": "Analysis highlighted a gap in content related to AI, a high-interest area for our target audience.",
142
+ "task_type": "initiative",
143
+ "data_subject": "general"
144
+ }}
145
+
146
+ Focus on quality, actionability, and strict adherence to the output schema.
147
+ """
148
+
149
+ async def extract_tasks(self, comprehensive_analysis: str) -> TaskExtractionOutput:
150
+ """
151
+ Extracts time-aware actionable tasks from the comprehensive analysis text.
152
+
153
+ Args:
154
+ comprehensive_analysis: The text analysis from which to extract tasks.
155
+
156
+ Returns:
157
+ A TaskExtractionOutput Pydantic model instance.
158
+ """
159
+ if not comprehensive_analysis or not comprehensive_analysis.strip():
160
+ logger.warning("Comprehensive analysis text is empty. Cannot extract tasks.")
161
+ return TaskExtractionOutput(
162
+ current_quarter_info=f"Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining",
163
+ okrs=[],
164
+ overall_strategic_focus="No analysis provided to extract tasks."
165
+ )
166
+
167
+ # The LlmAgent's instruction already contains the dynamic date info and output format.
168
+ # The input to the agent's run method will be the comprehensive_analysis.
169
+ prompt_for_adk_agent = f"""
170
+ Comprehensive Analysis for Task Extraction:
171
+ ---
172
+ {comprehensive_analysis}
173
+ ---
174
+ Based on the analysis above, and adhering strictly to your primary instructions (especially regarding current quarter context, task field requirements, and JSON output schema 'TaskExtractionOutput'), generate the OKRs and tasks.
175
+ Ensure the 'current_quarter_info' field in your JSON output is exactly: "Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining".
176
+ """
177
+
178
+ user_input_content = genai_types.Content(
179
+ role="user",
180
+ parts=[genai_types.Part(text=prompt_for_adk_agent)]
181
+ )
182
+
183
+ # Using InMemoryRunner as per original structure for LlmAgent with output_schema
184
+ runner = InMemoryRunner(agent=self.agent, app_name=f"{self.AGENT_NAME}Runner")
185
+ # Generate a unique user_id for each run to ensure fresh session state if needed.
186
+ user_id = f"system_user_task_extractor_{int(datetime.utcnow().timestamp())}"
187
+
188
+ session = await runner.session_service.create_session(
189
+ app_name=f"{self.AGENT_NAME}Runner",
190
+ user_id=user_id
191
+ )
192
+
193
+ extracted_data_dict = None
194
+ full_response_text_for_debug = "" # To capture raw text if parsing fails
195
+
196
+ try:
197
+ logger.info(f"Running TaskExtractionAgent for user_id: {user_id}, session_id: {session.id}")
198
+ async for event in runner.run(
199
+ user_id=user_id,
200
+ session_id=session.id,
201
+ new_message=user_input_content
202
+ ):
203
+ # LlmAgent with output_schema stores the result in event.actions.state_delta[output_key]
204
+ if (hasattr(event, 'actions') and event.actions and
205
+ hasattr(event.actions, 'state_delta') and
206
+ isinstance(event.actions.state_delta, dict) and
207
+ self.agent.output_key in event.actions.state_delta):
208
+
209
+ extracted_data_dict = event.actions.state_delta[self.agent.output_key]
210
+ logger.info(f"Successfully extracted structured data via LlmAgent state_delta.")
211
+ break # Assuming full structured output comes in one event with state_delta
212
+
213
+ # Capture text parts for debugging if direct structured output isn't found first
214
+ if hasattr(event, 'content') and event.content and event.content.parts:
215
+ for part in event.content.parts:
216
+ if hasattr(part, 'text'):
217
+ full_response_text_for_debug += part.text
218
+
219
+ if not extracted_data_dict and full_response_text_for_debug:
220
+ logger.warning("LlmAgent did not produce structured output in state_delta. Raw text response was: %s",
221
+ full_response_text_for_debug[:500] + "...") # Log snippet
222
+ # Attempt to parse the raw text if it looks like JSON (fallback, not ideal)
223
+ # This is a basic fallback; robust JSON cleaning might be needed if LLM doesn't adhere to schema.
224
+ # For now, we rely on LlmAgent's output_schema handling.
225
+
226
+ except Exception as e:
227
+ logger.error(f"Error during TaskExtractionAgent execution: {e}", exc_info=True)
228
+ # Fallback to returning an empty TaskExtractionOutput with error info
229
+ return TaskExtractionOutput(
230
+ current_quarter_info=f"Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining",
231
+ okrs=[],
232
+ overall_strategic_focus=f"Error during task extraction: {e}",
233
+ generation_timestamp=datetime.utcnow().isoformat()
234
+ )
235
+ finally:
236
+ try:
237
+ await runner.session_service.delete_session(
238
+ app_name=f"{self.AGENT_NAME}Runner", user_id=user_id, session_id=session.id
239
+ )
240
+ except Exception as session_del_e:
241
+ logger.error(f"Error deleting task extractor session: {session_del_e}")
242
+
243
+ if extracted_data_dict:
244
+ if isinstance(extracted_data_dict, TaskExtractionOutput): # Already a Pydantic model
245
+ return extracted_data_dict
246
+ elif isinstance(extracted_data_dict, dict): # If it's a dict, parse it
247
+ try:
248
+ return TaskExtractionOutput(**extracted_data_dict)
249
+ except Exception as pydantic_error:
250
+ logger.error(f"Error parsing extracted dictionary into TaskExtractionOutput: {pydantic_error}", exc_info=True)
251
+ logger.error(f"Problematic dictionary data: {extracted_data_dict}")
252
+ else:
253
+ logger.error(f"Extracted data is not a dictionary or TaskExtractionOutput model: {type(extracted_data_dict)}")
254
+
255
+ # Fallback if no valid data extracted
256
+ logger.warning("No valid structured data extracted by TaskExtractionAgent.")
257
+ return TaskExtractionOutput(
258
+ current_quarter_info=f"Q{self._get_quarter(self.current_date)}, {self._days_until_quarter_end(self.current_date)} days remaining",
259
+ okrs=[],
260
+ overall_strategic_focus="Failed to extract tasks or no tasks were identified.",
261
+ generation_timestamp=datetime.utcnow().isoformat()
262
+ )
263
+
264
+ def update_current_date(self, new_date: date):
265
+ """
266
+ Updates the current date for the agent and re-initializes the LlmAgent
267
+ to reflect the new date context in its instructions.
268
+ """
269
+ self.current_date = new_date
270
+ # Re-initialize the LlmAgent with the new instruction based on the new date
271
+ self.agent = LlmAgent(
272
+ name=self.AGENT_NAME,
273
+ model=self.model_name,
274
+ description=self.AGENT_DESCRIPTION,
275
+ instruction=self._get_instruction_prompt(), # Get updated instruction
276
+ output_schema=TaskExtractionOutput,
277
+ output_key="extracted_tasks_okrs"
278
+ )
279
+ logger.info(f"{self.AGENT_NAME} date updated. New context: Q{self._get_quarter(self.current_date)}, "
280
+ f"{self._days_until_quarter_end(self.current_date)} days remaining.")
281
+
282
+
283
+ if __name__ == '__main__':
284
+ import asyncio
285
+ # (Ensure logging_config.py is in the same directory or PYTHONPATH is set for this example to run standalone)
286
+ try:
287
+ from utils.logging_config import setup_logging
288
+ setup_logging()
289
+ logger.info("Logging setup for TaskExtractionAgent test.")
290
+ except ImportError:
291
+ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
292
+ logger.warning("logging_config.py not found, using basicConfig for logging.")
293
+
294
+ MOCK_API_KEY = os.environ.get("GOOGLE_API_KEY", "test_api_key_task_extractor") # Use your actual key or env var
295
+ MODEL_NAME = DEFAULT_AGENT_MODEL
296
+
297
+ # Example comprehensive analysis text (replace with actual analysis output)
298
+ sample_analysis_text = """
299
+ Overall Summary: Follower growth is steady at 5% MoM. Post engagement is highest for video content
300
+ (avg 8% engagement rate) published on weekdays. However, mentions sentiment dipped in the last month
301
+ (-0.2 avg score) due to complaints about customer service response times.
302
+ Key opportunity: Improve customer service communication and leverage video content more effectively.
303
+ Strategic Recommendation: Launch a 'Customer First' initiative and create a video series showcasing customer success stories.
304
+ """
305
+
306
+ # Test with a specific date
307
+ test_date = date(2025, 4, 15) # Example: Mid-Q2 2025
308
+ task_agent = TaskExtractionAgent(api_key=MOCK_API_KEY, model_name=MODEL_NAME, current_date=test_date)
309
+
310
+ logger.info(f"Task Agent Instruction for test_date {test_date}:\n{task_agent._get_instruction_prompt()[:500]}...")
311
+
312
+ async def run_extraction():
313
+ logger.info("Extracting tasks from sample analysis...")
314
+ # In a real scenario, ensure GOOGLE_API_KEY is set if the LlmAgent makes actual calls.
315
+ # For local tests without real API calls, the LlmAgent might behave as a mock or require specific test setup.
316
+ if MOCK_API_KEY == "test_api_key_task_extractor":
317
+ logger.warning("Using a mock API key. LlmAgent behavior might be limited or mocked for task extraction.")
318
+ # Mock the runner if no real API call should be made
319
+ class MockADKRunner:
320
+ def __init__(self, agent, app_name): self.agent = agent
321
+ async def session_service_create_session(self, app_name, user_id):
322
+ class MockSession: id = "mock_session_id"
323
+ return MockSession()
324
+ async def run(self, user_id, session_id, new_message):
325
+ # Simulate a response structure
326
+ mock_okr = OKR(
327
+ objective_description="Improve Customer Satisfaction",
328
+ key_results=[KeyResult(
329
+ key_result_description="Reduce negative mentions by 10%",
330
+ tasks=[Task(
331
+ task_category="Customer Service", task_description="Respond to all negative mentions within 2 hours.",
332
+ objective_deliverable="Improved response time.", effort=EffortLevel.MEDIUM, timeline=TimelineCategory.IMMEDIATE,
333
+ responsible_party="Support Team", success_criteria_metrics="Avg response time < 2hrs.",
334
+ priority=PriorityLevel.HIGH, priority_justification="Critical for reputation.",
335
+ why_proposed="Analysis showed dip in sentiment due to slow responses.", task_type=TaskType.INITIATIVE,
336
+ data_subject=DataSubject.MENTIONS
337
+ )]
338
+ )],
339
+ objective_timeline=TimelineCategory.SHORT_TERM
340
+ )
341
+ mock_output = TaskExtractionOutput(
342
+ current_quarter_info=f"Q{task_agent._get_quarter(task_agent.current_date)}, {task_agent._days_until_quarter_end(task_agent.current_date)} days remaining",
343
+ okrs=[mock_okr],
344
+ overall_strategic_focus="Focus on customer service improvement."
345
+ )
346
+ # Simulate the event structure LlmAgent with output_schema would produce
347
+ class MockEvent:
348
+ def __init__(self):
349
+ self.actions = type('Actions', (), {'state_delta': {task_agent.agent.output_key: mock_output.model_dump()}})() # .model_dump() for Pydantic v2
350
+ yield MockEvent()
351
+
352
+ async def session_service_delete_session(self, app_name, user_id, session_id): pass
353
+
354
+ # Monkey patch the InMemoryRunner for this test if using mock key
355
+ global InMemoryRunner
356
+ OriginalInMemoryRunner = InMemoryRunner
357
+ InMemoryRunner = MockADKRunner
358
+
359
+
360
+ extracted_okrs_output = await task_agent.extract_tasks(sample_analysis_text)
361
+
362
+ # Restore InMemoryRunner if it was patched
363
+ if MOCK_API_KEY == "test_api_key_task_extractor" and 'OriginalInMemoryRunner' in globals():
364
+ InMemoryRunner = OriginalInMemoryRunner
365
+
366
+
367
+ print("\n--- TaskExtractionAgent Results ---")
368
+ if extracted_okrs_output:
369
+ print(f"Current Quarter Info: {extracted_okrs_output.current_quarter_info}")
370
+ print(f"Overall Strategic Focus: {extracted_okrs_output.overall_strategic_focus}")
371
+ print(f"Generated Timestamp: {extracted_okrs_output.generation_timestamp}")
372
+ print("\nOKRs Extracted:")
373
+ # Use .model_dump_json() for Pydantic v2 for pretty printing
374
+ print(extracted_okrs_output.model_dump_json(indent=2))
375
+ else:
376
+ print("No OKRs extracted or an error occurred.")
377
+
378
+ if __name__ == '__main__': # This check is technically inside another if __name__ == '__main__'
379
+ asyncio.run(run_extraction())
380
+
381
+ # Example of updating date
382
+ logger.info("\n--- Updating date for Task Agent ---")
383
+ new_test_date = date(2025, 10, 5) # Q4
384
+ task_agent.update_current_date(new_test_date)
385
+ # The instruction within task_agent.agent is now updated.
386
+ # logger.info(f"Task Agent NEW Instruction for test_date {new_test_date}:\n{task_agent.agent.instruction[:500]}...")
387
+ # A new call to extract_tasks would use this updated context.