Spaces:
Running
Running
Update features/insight_and_tasks/orchestrators/linkedin_analytics_orchestrator.py
Browse files
features/insight_and_tasks/orchestrators/linkedin_analytics_orchestrator.py
CHANGED
|
@@ -1,9 +1,9 @@
|
|
| 1 |
# orchestrators/linkedin_analytics_orchestrator.py
|
| 2 |
import pandas as pd
|
| 3 |
import logging
|
| 4 |
-
from typing import Dict, Any, Optional
|
| 5 |
-
from datetime import date, datetime
|
| 6 |
-
from dataclasses import asdict
|
| 7 |
import os
|
| 8 |
|
| 9 |
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
|
|
@@ -11,115 +11,286 @@ GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
|
|
| 11 |
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
|
| 12 |
|
| 13 |
# Project-specific imports
|
| 14 |
-
from features.insight_and_tasks.utils.pandasai_setup import configure_pandasai
|
| 15 |
from features.insight_and_tasks.coordinators.employer_branding_coordinator import EnhancedEmployerBrandingCoordinator
|
| 16 |
from features.insight_and_tasks.agents.task_extraction_agent import TaskExtractionAgent
|
| 17 |
-
from features.insight_and_tasks.data_models.metrics import AgentMetrics
|
| 18 |
-
from features.insight_and_tasks.data_models.tasks import TaskExtractionOutput
|
| 19 |
from features.insight_and_tasks.agents.task_extraction_model import extract_tasks_from_text
|
| 20 |
|
|
|
|
| 21 |
logger = logging.getLogger(__name__)
|
| 22 |
|
| 23 |
class EnhancedLinkedInAnalyticsOrchestrator:
|
| 24 |
"""
|
| 25 |
-
Orchestrates the end-to-end LinkedIn analytics process
|
|
|
|
| 26 |
"""
|
| 27 |
|
| 28 |
def __init__(self, api_key: str, llm_model_name: Optional[str] = None, current_date_for_tasks: Optional[date] = None):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 29 |
self.api_key = api_key
|
| 30 |
-
self.llm_model_name = llm_model_name
|
| 31 |
|
|
|
|
|
|
|
| 32 |
try:
|
| 33 |
configure_pandasai(api_key=self.api_key, model_name=self.llm_model_name)
|
| 34 |
logger.info(f"PandasAI configured by orchestrator with model hint: {self.llm_model_name or 'default'}.")
|
| 35 |
except Exception as e:
|
| 36 |
logger.error(f"Failed to configure PandasAI in orchestrator: {e}", exc_info=True)
|
|
|
|
| 37 |
|
|
|
|
|
|
|
| 38 |
self.coordinator = EnhancedEmployerBrandingCoordinator(api_key=self.api_key, model_name=self.llm_model_name)
|
|
|
|
|
|
|
|
|
|
| 39 |
self.task_extractor = TaskExtractionAgent(
|
| 40 |
api_key=self.api_key,
|
| 41 |
-
model_name=self.llm_model_name,
|
| 42 |
-
current_date=current_date_for_tasks
|
| 43 |
)
|
| 44 |
logger.info("EnhancedLinkedInAnalyticsOrchestrator initialized.")
|
| 45 |
|
| 46 |
-
async def
|
| 47 |
self,
|
| 48 |
follower_stats_df: pd.DataFrame,
|
| 49 |
post_df: pd.DataFrame,
|
| 50 |
mentions_df: pd.DataFrame
|
| 51 |
-
) ->
|
| 52 |
"""
|
| 53 |
-
Executes the full pipeline
|
| 54 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 55 |
"""
|
| 56 |
-
logger.info("Starting
|
| 57 |
|
| 58 |
-
# Step 1: Get analyses and metrics from specialized agents
|
|
|
|
| 59 |
logger.info("Running follower analysis...")
|
| 60 |
follower_agent_metrics: AgentMetrics = self.coordinator.follower_agent.analyze_follower_data(follower_stats_df)
|
| 61 |
-
logger.info(f"Follower analysis complete.")
|
| 62 |
|
| 63 |
logger.info("Running post performance analysis...")
|
| 64 |
post_agent_metrics: AgentMetrics = self.coordinator.post_agent.analyze_post_data(post_df)
|
| 65 |
-
logger.info(f"Post analysis complete.")
|
| 66 |
|
| 67 |
logger.info("Running mentions analysis...")
|
| 68 |
mentions_agent_metrics: AgentMetrics = self.coordinator.mentions_agent.analyze_mentions_data(mentions_df)
|
| 69 |
-
logger.info(f"Mentions analysis complete.")
|
| 70 |
|
| 71 |
-
# Step 2: Coordinator synthesizes these metrics into a comprehensive analysis text
|
| 72 |
logger.info("Running coordinator for synthesis...")
|
| 73 |
comprehensive_analysis_text: str = await self.coordinator.generate_comprehensive_analysis(
|
| 74 |
follower_agent_metrics, post_agent_metrics, mentions_agent_metrics
|
| 75 |
)
|
| 76 |
logger.info(f"Coordinator synthesis complete. Report length: {len(comprehensive_analysis_text)} chars.")
|
|
|
|
|
|
|
|
|
|
| 77 |
|
| 78 |
-
#
|
| 79 |
-
partial_results = {
|
| 80 |
-
"comprehensive_analysis_report": comprehensive_analysis_text,
|
| 81 |
-
"actionable_okrs_and_tasks": None, # Not ready yet
|
| 82 |
-
"detailed_metrics": {
|
| 83 |
-
"follower_agent": asdict(follower_agent_metrics) if follower_agent_metrics else None,
|
| 84 |
-
"post_agent": asdict(post_agent_metrics) if post_agent_metrics else None,
|
| 85 |
-
"mentions_agent": asdict(mentions_agent_metrics) if mentions_agent_metrics else None,
|
| 86 |
-
},
|
| 87 |
-
"status": "report_ready" # Indicate what's available
|
| 88 |
-
}
|
| 89 |
-
logger.info("Yielding report results...")
|
| 90 |
-
yield partial_results
|
| 91 |
-
|
| 92 |
-
# Step 3: TaskExtractionAgent extracts actionable tasks (OKRs) from the comprehensive text
|
| 93 |
logger.info("Running task extraction...")
|
|
|
|
| 94 |
actionable_tasks_okrs: TaskExtractionOutput = extract_tasks_from_text(comprehensive_analysis_text, GOOGLE_API_KEY)
|
| 95 |
logger.info(f"Task extraction complete. Number of OKRs: {len(actionable_tasks_okrs.okrs) if actionable_tasks_okrs else 'Error'}.")
|
| 96 |
|
| 97 |
-
#
|
|
|
|
|
|
|
|
|
|
|
|
|
| 98 |
final_results = {
|
| 99 |
"comprehensive_analysis_report": comprehensive_analysis_text,
|
| 100 |
-
"actionable_okrs_and_tasks": actionable_tasks_okrs.model_dump() if actionable_tasks_okrs else None,
|
|
|
|
| 101 |
"detailed_metrics": {
|
| 102 |
"follower_agent": asdict(follower_agent_metrics) if follower_agent_metrics else None,
|
| 103 |
"post_agent": asdict(post_agent_metrics) if post_agent_metrics else None,
|
| 104 |
"mentions_agent": asdict(mentions_agent_metrics) if mentions_agent_metrics else None,
|
| 105 |
-
}
|
| 106 |
-
"status": "complete" # Indicate everything is ready
|
| 107 |
}
|
| 108 |
-
logger.info("
|
| 109 |
-
|
| 110 |
|
| 111 |
-
|
| 112 |
-
|
| 113 |
-
|
| 114 |
-
|
| 115 |
-
|
| 116 |
-
|
| 117 |
-
|
| 118 |
-
|
| 119 |
-
|
| 120 |
-
|
| 121 |
-
|
| 122 |
-
|
| 123 |
-
|
| 124 |
-
|
| 125 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
# orchestrators/linkedin_analytics_orchestrator.py
|
| 2 |
import pandas as pd
|
| 3 |
import logging
|
| 4 |
+
from typing import Dict, Any, Optional
|
| 5 |
+
from datetime import date, datetime # For TaskExtractionAgent date
|
| 6 |
+
from dataclasses import asdict # For converting AgentMetrics to dict if needed for final output
|
| 7 |
import os
|
| 8 |
|
| 9 |
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
|
|
|
|
| 11 |
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
|
| 12 |
|
| 13 |
# Project-specific imports
|
| 14 |
+
from features.insight_and_tasks.utils.pandasai_setup import configure_pandasai # Centralized PandasAI config
|
| 15 |
from features.insight_and_tasks.coordinators.employer_branding_coordinator import EnhancedEmployerBrandingCoordinator
|
| 16 |
from features.insight_and_tasks.agents.task_extraction_agent import TaskExtractionAgent
|
| 17 |
+
from features.insight_and_tasks.data_models.metrics import AgentMetrics # For type hinting
|
| 18 |
+
from features.insight_and_tasks.data_models.tasks import TaskExtractionOutput # For type hinting
|
| 19 |
from features.insight_and_tasks.agents.task_extraction_model import extract_tasks_from_text
|
| 20 |
|
| 21 |
+
# Configure logger for this module
|
| 22 |
logger = logging.getLogger(__name__)
|
| 23 |
|
| 24 |
class EnhancedLinkedInAnalyticsOrchestrator:
|
| 25 |
"""
|
| 26 |
+
Orchestrates the end-to-end LinkedIn analytics process, from data input through
|
| 27 |
+
specialized agent analysis, coordinator synthesis, and actionable task extraction.
|
| 28 |
"""
|
| 29 |
|
| 30 |
def __init__(self, api_key: str, llm_model_name: Optional[str] = None, current_date_for_tasks: Optional[date] = None):
|
| 31 |
+
"""
|
| 32 |
+
Initializes the orchestrator.
|
| 33 |
+
Args:
|
| 34 |
+
api_key: The API key for Google services (used by PandasAI and LlmAgents).
|
| 35 |
+
llm_model_name: Optional. The primary LLM model name to be used by agents.
|
| 36 |
+
Specific agents/coordinator might override with their defaults if not set.
|
| 37 |
+
current_date_for_tasks: Optional. The date to be used by TaskExtractionAgent for quarter calculations. Defaults to today.
|
| 38 |
+
"""
|
| 39 |
self.api_key = api_key
|
| 40 |
+
self.llm_model_name = llm_model_name # Can be passed down or agents use their defaults
|
| 41 |
|
| 42 |
+
# Configure PandasAI globally at the start of orchestration.
|
| 43 |
+
# Pass the model_name if specified, otherwise pandasai_setup might use its own default.
|
| 44 |
try:
|
| 45 |
configure_pandasai(api_key=self.api_key, model_name=self.llm_model_name)
|
| 46 |
logger.info(f"PandasAI configured by orchestrator with model hint: {self.llm_model_name or 'default'}.")
|
| 47 |
except Exception as e:
|
| 48 |
logger.error(f"Failed to configure PandasAI in orchestrator: {e}", exc_info=True)
|
| 49 |
+
# Decide if this is a critical failure or if agents can proceed (they might try to reconfigure)
|
| 50 |
|
| 51 |
+
# Initialize the coordinator, which in turn initializes its specialized agents.
|
| 52 |
+
# Pass the model_name hint to the coordinator.
|
| 53 |
self.coordinator = EnhancedEmployerBrandingCoordinator(api_key=self.api_key, model_name=self.llm_model_name)
|
| 54 |
+
|
| 55 |
+
# Initialize the TaskExtractionAgent.
|
| 56 |
+
# It uses its own default model unless overridden here.
|
| 57 |
self.task_extractor = TaskExtractionAgent(
|
| 58 |
api_key=self.api_key,
|
| 59 |
+
model_name=self.llm_model_name, # Pass model hint
|
| 60 |
+
current_date=current_date_for_tasks # Defaults to today if None
|
| 61 |
)
|
| 62 |
logger.info("EnhancedLinkedInAnalyticsOrchestrator initialized.")
|
| 63 |
|
| 64 |
+
async def generate_full_analysis_and_tasks(
|
| 65 |
self,
|
| 66 |
follower_stats_df: pd.DataFrame,
|
| 67 |
post_df: pd.DataFrame,
|
| 68 |
mentions_df: pd.DataFrame
|
| 69 |
+
) -> Dict[str, Any]:
|
| 70 |
"""
|
| 71 |
+
Executes the full pipeline: agent analyses, coordinator synthesis, and task extraction.
|
| 72 |
+
Args:
|
| 73 |
+
follower_stats_df: DataFrame containing follower statistics.
|
| 74 |
+
post_df: DataFrame containing post performance data.
|
| 75 |
+
mentions_df: DataFrame containing brand mentions data.
|
| 76 |
+
Returns:
|
| 77 |
+
A dictionary containing the comprehensive analysis text, actionable tasks (OKRs),
|
| 78 |
+
and the detailed metrics from each specialized agent.
|
| 79 |
"""
|
| 80 |
+
logger.info("Starting full analysis and task generation pipeline...")
|
| 81 |
|
| 82 |
+
# Step 1: Get analyses and metrics from specialized agents.
|
| 83 |
+
# The coordinator's internal agents are used here.
|
| 84 |
logger.info("Running follower analysis...")
|
| 85 |
follower_agent_metrics: AgentMetrics = self.coordinator.follower_agent.analyze_follower_data(follower_stats_df)
|
| 86 |
+
logger.info(f"Follower analysis complete. Summary: {follower_agent_metrics.analysis_summary[:100]}...")
|
| 87 |
|
| 88 |
logger.info("Running post performance analysis...")
|
| 89 |
post_agent_metrics: AgentMetrics = self.coordinator.post_agent.analyze_post_data(post_df)
|
| 90 |
+
logger.info(f"Post analysis complete. Summary: {post_agent_metrics.analysis_summary[:100]}...")
|
| 91 |
|
| 92 |
logger.info("Running mentions analysis...")
|
| 93 |
mentions_agent_metrics: AgentMetrics = self.coordinator.mentions_agent.analyze_mentions_data(mentions_df)
|
| 94 |
+
logger.info(f"Mentions analysis complete. Summary: {mentions_agent_metrics.analysis_summary[:100]}...")
|
| 95 |
|
| 96 |
+
# Step 2: Coordinator synthesizes these metrics into a comprehensive analysis text.
|
| 97 |
logger.info("Running coordinator for synthesis...")
|
| 98 |
comprehensive_analysis_text: str = await self.coordinator.generate_comprehensive_analysis(
|
| 99 |
follower_agent_metrics, post_agent_metrics, mentions_agent_metrics
|
| 100 |
)
|
| 101 |
logger.info(f"Coordinator synthesis complete. Report length: {len(comprehensive_analysis_text)} chars.")
|
| 102 |
+
if not comprehensive_analysis_text or comprehensive_analysis_text.startswith("Error"):
|
| 103 |
+
logger.error(f"Coordinator synthesis failed or produced an error message: {comprehensive_analysis_text}")
|
| 104 |
+
# Potentially stop here or proceed with task extraction on whatever text was generated.
|
| 105 |
|
| 106 |
+
# Step 3: TaskExtractionAgent extracts actionable tasks (OKRs) from the comprehensive text.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 107 |
logger.info("Running task extraction...")
|
| 108 |
+
#actionable_tasks_okrs: TaskExtractionOutput = await self.task_extractor.extract_tasks(comprehensive_analysis_text)
|
| 109 |
actionable_tasks_okrs: TaskExtractionOutput = extract_tasks_from_text(comprehensive_analysis_text, GOOGLE_API_KEY)
|
| 110 |
logger.info(f"Task extraction complete. Number of OKRs: {len(actionable_tasks_okrs.okrs) if actionable_tasks_okrs else 'Error'}.")
|
| 111 |
|
| 112 |
+
# Step 4: Compile and return all results.
|
| 113 |
+
# Convert Pydantic/dataclass objects to dicts for easier JSON serialization if the final output needs it.
|
| 114 |
+
# The `actionable_tasks_okrs` is already a Pydantic model, which can be serialized with .model_dump() / .json().
|
| 115 |
+
# `AgentMetrics` are dataclasses, use `asdict`.
|
| 116 |
+
|
| 117 |
final_results = {
|
| 118 |
"comprehensive_analysis_report": comprehensive_analysis_text,
|
| 119 |
+
"actionable_okrs_and_tasks": actionable_tasks_okrs.model_dump() if actionable_tasks_okrs else None, # Pydantic v2
|
| 120 |
+
# "actionable_okrs_and_tasks": actionable_tasks_okrs.dict() if actionable_tasks_okrs else None, # Pydantic v1
|
| 121 |
"detailed_metrics": {
|
| 122 |
"follower_agent": asdict(follower_agent_metrics) if follower_agent_metrics else None,
|
| 123 |
"post_agent": asdict(post_agent_metrics) if post_agent_metrics else None,
|
| 124 |
"mentions_agent": asdict(mentions_agent_metrics) if mentions_agent_metrics else None,
|
| 125 |
+
}
|
|
|
|
| 126 |
}
|
| 127 |
+
logger.info("Full analysis and task generation pipeline finished successfully.")
|
| 128 |
+
return final_results
|
| 129 |
|
| 130 |
+
# Example usage (similar to the original script's main execution block)
|
| 131 |
+
if __name__ == '__main__':
|
| 132 |
+
import asyncio
|
| 133 |
+
import os
|
| 134 |
+
from utils.logging_config import setup_logging
|
| 135 |
+
from utils.data_fetching import fetch_linkedin_data_from_bubble, VALID_DATA_TYPES
|
| 136 |
+
|
| 137 |
+
setup_logging() # Configure logging for the application
|
| 138 |
+
|
| 139 |
+
# --- Configuration ---
|
| 140 |
+
# Attempt to get API key from environment variable
|
| 141 |
+
# IMPORTANT: Set GOOGLE_API_KEY and BUBBLE_API_KEY in your environment for this to run.
|
| 142 |
+
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
|
| 143 |
+
BUBBLE_API_KEY_ENV = os.environ.get("BUBBLE_API_KEY") # Used by data_fetching
|
| 144 |
+
|
| 145 |
+
if not GOOGLE_API_KEY:
|
| 146 |
+
logger.critical("GOOGLE_API_KEY environment variable not set. Orchestrator cannot initialize LLM agents.")
|
| 147 |
+
exit(1)
|
| 148 |
+
if not BUBBLE_API_KEY_ENV: # data_fetching will also check, but good to note here
|
| 149 |
+
logger.warning("BUBBLE_API_KEY environment variable not set. Data fetching from Bubble will fail.")
|
| 150 |
+
# You might want to exit or use mock data if Bubble is essential.
|
| 151 |
+
|
| 152 |
+
# Set the Google Vertex AI environment variable if not using Vertex AI (as in original)
|
| 153 |
+
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
|
| 154 |
+
|
| 155 |
+
# Orchestrator settings
|
| 156 |
+
ORG_URN_EXAMPLE = "urn:li:organization:19010008" # Example, replace with actual
|
| 157 |
+
# Specify a model or let orchestrator/agents use their defaults
|
| 158 |
+
# LLM_MODEL_FOR_ORCHESTRATION = "gemini-2.5-flash-preview-05-20" # Example: use a powerful model
|
| 159 |
+
LLM_MODEL_FOR_ORCHESTRATION = None # Let agents use their defaults or pass a specific one
|
| 160 |
+
|
| 161 |
+
# --- Initialize Orchestrator ---
|
| 162 |
+
orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
|
| 163 |
+
api_key=GOOGLE_API_KEY,
|
| 164 |
+
llm_model_name=LLM_MODEL_FOR_ORCHESTRATION,
|
| 165 |
+
current_date_for_tasks=datetime.utcnow().date() # Use today for task planning
|
| 166 |
+
)
|
| 167 |
+
|
| 168 |
+
# --- Data Fetching ---
|
| 169 |
+
logger.info(f"Fetching data for organization URN: {ORG_URN_EXAMPLE}")
|
| 170 |
+
|
| 171 |
+
# Helper to fetch and log
|
| 172 |
+
def get_data(data_type: VALID_DATA_TYPES, org_urn: str) -> pd.DataFrame:
|
| 173 |
+
df, error = fetch_linkedin_data_from_bubble(org_urn=org_urn, data_type=data_type)
|
| 174 |
+
if error:
|
| 175 |
+
logger.error(f"Error fetching {data_type}: {error}. Using empty DataFrame.")
|
| 176 |
+
return pd.DataFrame()
|
| 177 |
+
if df is None: # Should not happen if error is None, but as a safeguard
|
| 178 |
+
logger.warning(f"Fetched {data_type} is None but no error reported. Using empty DataFrame.")
|
| 179 |
+
return pd.DataFrame()
|
| 180 |
+
logger.info(f"Successfully fetched {data_type} with {len(df)} rows.")
|
| 181 |
+
return df
|
| 182 |
+
|
| 183 |
+
follower_stats_df_raw = get_data("li_follower_stats", ORG_URN_EXAMPLE)
|
| 184 |
+
posts_df_raw = get_data("LI_posts", ORG_URN_EXAMPLE) # Contains post content, media_type, etc.
|
| 185 |
+
mentions_df_raw = get_data("Li_mentions", ORG_URN_EXAMPLE)
|
| 186 |
+
post_stats_df_raw = get_data("LI_post_stats", ORG_URN_EXAMPLE) # Contains engagement numbers for posts
|
| 187 |
+
|
| 188 |
+
# --- Data Preprocessing/Merging (as in original example) ---
|
| 189 |
+
|
| 190 |
+
# Select relevant columns for follower_stats_df
|
| 191 |
+
if not follower_stats_df_raw.empty:
|
| 192 |
+
follower_stats_df = follower_stats_df_raw[[
|
| 193 |
+
'category_name', "follower_count_organic", "follower_count_paid", "follower_count_type"
|
| 194 |
+
]].copy()
|
| 195 |
+
else:
|
| 196 |
+
follower_stats_df = pd.DataFrame() # Ensure it's an empty DF if raw is empty
|
| 197 |
+
|
| 198 |
+
# Merge posts_df and post_stats_df
|
| 199 |
+
# This logic assumes 'id' in posts_df_raw and 'post_id' in post_stats_df_raw
|
| 200 |
+
merged_posts_df = pd.DataFrame()
|
| 201 |
+
if not posts_df_raw.empty and not post_stats_df_raw.empty:
|
| 202 |
+
if 'id' in posts_df_raw.columns and 'post_id' in post_stats_df_raw.columns:
|
| 203 |
+
# Ensure 'id' in posts_df_raw is unique before merge if it's a left table key
|
| 204 |
+
# posts_df_raw.drop_duplicates(subset=['id'], keep='first', inplace=True)
|
| 205 |
+
merged_posts_df = pd.merge(posts_df_raw, post_stats_df_raw, left_on='id', right_on='post_id', how='left', suffixes=('', '_stats'))
|
| 206 |
+
logger.info(f"Merged posts_df ({len(posts_df_raw)}) and post_stats_df ({len(post_stats_df_raw)}) into merged_posts_df ({len(merged_posts_df)}).")
|
| 207 |
+
else:
|
| 208 |
+
logger.warning("Cannot merge posts_df and post_stats_df due to missing 'id' or 'post_id'. Using posts_df_raw.")
|
| 209 |
+
merged_posts_df = posts_df_raw.copy() # Fallback to posts_df_raw
|
| 210 |
+
elif not posts_df_raw.empty:
|
| 211 |
+
logger.info("post_stats_df is empty. Using posts_df_raw for post analysis.")
|
| 212 |
+
merged_posts_df = posts_df_raw.copy()
|
| 213 |
+
else:
|
| 214 |
+
logger.warning("Both posts_df_raw and post_stats_df_raw are empty.")
|
| 215 |
+
merged_posts_df = pd.DataFrame() # Empty DF
|
| 216 |
+
|
| 217 |
+
# Select and ensure essential columns for merged_posts_df
|
| 218 |
+
# These are columns expected by EnhancedPostPerformanceAgent
|
| 219 |
+
expected_post_cols = [
|
| 220 |
+
'li_eb_label', 'media_type', 'is_ad', 'id', 'published_at', 'sentiment',
|
| 221 |
+
'engagement', 'impressionCount', 'clickCount', 'likeCount', 'commentCount', 'shareCount'
|
| 222 |
+
]
|
| 223 |
+
if not merged_posts_df.empty:
|
| 224 |
+
final_post_df_cols = {}
|
| 225 |
+
for col in expected_post_cols:
|
| 226 |
+
if col in merged_posts_df.columns:
|
| 227 |
+
final_post_df_cols[col] = merged_posts_df[col]
|
| 228 |
+
elif f"{col}_stats" in merged_posts_df.columns: # Check for suffixed columns from merge
|
| 229 |
+
final_post_df_cols[col] = merged_posts_df[f"{col}_stats"]
|
| 230 |
+
else:
|
| 231 |
+
logger.debug(f"Expected column '{col}' not found in merged_posts_df. Will be created as empty/default by agent if needed.")
|
| 232 |
+
# Agent preprocessing should handle missing columns by creating them with defaults (0 or 'Unknown')
|
| 233 |
+
|
| 234 |
+
# Create the final DataFrame with only the selected/available columns
|
| 235 |
+
# This ensures that if a column is missing, it doesn't cause an error here,
|
| 236 |
+
# but the agent's preprocessing will handle it.
|
| 237 |
+
# However, it's better to ensure they exist with NAs if the agent expects them.
|
| 238 |
+
temp_post_df = pd.DataFrame(final_post_df_cols)
|
| 239 |
+
# Ensure all expected columns are present, filling with NA if missing from selection
|
| 240 |
+
for col in expected_post_cols:
|
| 241 |
+
if col not in temp_post_df.columns:
|
| 242 |
+
temp_post_df[col] = pd.NA # Or appropriate default like 0 for numeric, 'Unknown' for categorical
|
| 243 |
+
merged_posts_df = temp_post_df[expected_post_cols].copy() # Ensure correct order and all columns
|
| 244 |
+
|
| 245 |
+
else: # If merged_posts_df started empty and stayed empty
|
| 246 |
+
merged_posts_df = pd.DataFrame(columns=expected_post_cols)
|
| 247 |
+
|
| 248 |
+
|
| 249 |
+
# Mentions DataFrame - select relevant columns if necessary, or pass as is
|
| 250 |
+
# Assuming mentions_df_raw is already in the correct shape or agent handles it.
|
| 251 |
+
# For example, if it needs specific columns:
|
| 252 |
+
# mentions_df = mentions_df_raw[['date', 'sentiment_label', 'mention_content']].copy() if not mentions_df_raw.empty else pd.DataFrame()
|
| 253 |
+
mentions_df = mentions_df_raw.copy() # Pass as is, agent will preprocess
|
| 254 |
+
|
| 255 |
+
|
| 256 |
+
# --- Run Orchestration ---
|
| 257 |
+
async def main_orchestration():
|
| 258 |
+
if follower_stats_df.empty and merged_posts_df.empty and mentions_df.empty:
|
| 259 |
+
logger.error("All input DataFrames are empty. Aborting orchestration.")
|
| 260 |
+
return None
|
| 261 |
+
|
| 262 |
+
logger.info("Orchestrator starting generate_full_analysis_and_tasks...")
|
| 263 |
+
results = await orchestrator.generate_full_analysis_and_tasks(
|
| 264 |
+
follower_stats_df=follower_stats_df,
|
| 265 |
+
post_df=merged_posts_df,
|
| 266 |
+
mentions_df=mentions_df
|
| 267 |
+
)
|
| 268 |
+
return results
|
| 269 |
+
|
| 270 |
+
orchestration_results = asyncio.run(main_orchestration())
|
| 271 |
+
|
| 272 |
+
# --- Output Results ---
|
| 273 |
+
if orchestration_results:
|
| 274 |
+
print("\n\n" + "="*30 + " COMPREHENSIVE ANALYSIS REPORT " + "="*30)
|
| 275 |
+
print(orchestration_results.get("comprehensive_analysis_report", "Report not generated."))
|
| 276 |
+
|
| 277 |
+
print("\n\n" + "="*30 + " ACTIONABLE TASKS (OKRs) " + "="*30)
|
| 278 |
+
okrs_data = orchestration_results.get("actionable_okrs_and_tasks")
|
| 279 |
+
if okrs_data:
|
| 280 |
+
# okrs_data is already a dict from .model_dump()
|
| 281 |
+
print(json.dumps(okrs_data, indent=2))
|
| 282 |
+
else:
|
| 283 |
+
print("No actionable tasks (OKRs) generated or an error occurred.")
|
| 284 |
+
|
| 285 |
+
print("\n\n" + "="*30 + " DETAILED AGENT METRICS " + "="*30)
|
| 286 |
+
detailed_metrics = orchestration_results.get("detailed_metrics", {})
|
| 287 |
+
for agent_name, metrics_dict in detailed_metrics.items():
|
| 288 |
+
print(f"\n--- {agent_name.replace('_', ' ').title()} Metrics ---")
|
| 289 |
+
if metrics_dict:
|
| 290 |
+
print(json.dumps(metrics_dict, indent=2, default=str)) # default=str for any non-serializable types
|
| 291 |
+
else:
|
| 292 |
+
print("Metrics not available for this agent.")
|
| 293 |
+
else:
|
| 294 |
+
logger.info("Orchestration did not produce results (likely due to empty input data).")
|
| 295 |
+
|
| 296 |
+
logger.info("Orchestration example finished.")
|