Spaces:
Running
Running
Update data_processing/analytics_data_processing.py
Browse files
data_processing/analytics_data_processing.py
CHANGED
@@ -7,149 +7,170 @@ import numpy as np
|
|
7 |
# Configure logging for this module
|
8 |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(module)s - %(message)s')
|
9 |
|
|
|
|
|
10 |
def filter_dataframe_by_date(df, date_column, start_date, end_date):
|
11 |
-
"""
|
|
|
|
|
|
|
|
|
|
|
12 |
if df is None or df.empty or not date_column:
|
13 |
-
logging.warning(f"Filter by date: DataFrame is None, empty, or no date_column provided.
|
14 |
return pd.DataFrame()
|
|
|
15 |
if date_column not in df.columns:
|
16 |
logging.warning(f"Filter by date: Date column '{date_column}' not found in DataFrame columns: {df.columns.tolist()}.")
|
17 |
return pd.DataFrame()
|
|
|
|
|
|
|
|
|
|
|
18 |
|
19 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
20 |
try:
|
21 |
-
# Ensure the date column is pandas datetime objects
|
22 |
if not pd.api.types.is_datetime64_any_dtype(df_copy[date_column]):
|
23 |
df_copy[date_column] = pd.to_datetime(df_copy[date_column], errors='coerce')
|
24 |
-
|
25 |
-
# Drop rows where date conversion might have failed (NaT) or was originally NaT
|
26 |
df_copy.dropna(subset=[date_column], inplace=True)
|
|
|
27 |
if df_copy.empty:
|
28 |
logging.info(f"Filter by date: DataFrame empty after to_datetime and dropna for column '{date_column}'.")
|
29 |
return pd.DataFrame()
|
30 |
|
31 |
-
# Normalize to midnight. This preserves timezone information if present.
|
32 |
df_copy[date_column] = df_copy[date_column].dt.normalize()
|
33 |
|
34 |
-
# If the column is timezone-aware, convert its values to naive UTC equivalent.
|
35 |
-
# This allows comparison with naive filter dates.
|
36 |
if hasattr(df_copy[date_column].dt, 'tz') and df_copy[date_column].dt.tz is not None:
|
37 |
-
logging.info(f"Column '{date_column}' is timezone-aware ({df_copy[date_column].dt.tz}). Converting to naive (from UTC) for comparison.")
|
38 |
df_copy[date_column] = df_copy[date_column].dt.tz_convert('UTC').dt.tz_localize(None)
|
39 |
-
|
40 |
except Exception as e:
|
41 |
logging.error(f"Error processing date column '{date_column}': {e}", exc_info=True)
|
42 |
-
return pd.DataFrame()
|
43 |
|
44 |
-
#
|
45 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
46 |
start_dt_obj = pd.to_datetime(start_date, errors='coerce').normalize() if start_date else None
|
47 |
end_dt_obj = pd.to_datetime(end_date, errors='coerce').normalize() if end_date else None
|
48 |
|
49 |
-
|
50 |
-
|
51 |
-
|
52 |
-
|
53 |
-
|
54 |
-
|
55 |
-
|
56 |
-
|
|
|
|
|
|
|
|
|
|
|
57 |
else:
|
58 |
-
|
59 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
60 |
if df_filtered_final.empty:
|
61 |
logging.info(f"Filter by date: DataFrame became empty after applying date range to column '{date_column}'.")
|
62 |
-
|
63 |
return df_filtered_final
|
|
|
|
|
64 |
|
65 |
def prepare_filtered_analytics_data(token_state_value, date_filter_option, custom_start_date, custom_end_date):
|
66 |
"""
|
67 |
Retrieves data from token_state, determines date range, filters posts, mentions, and follower time-series data.
|
68 |
Merges posts with post stats.
|
69 |
-
Returns:
|
70 |
-
- filtered_merged_posts_df: Posts merged with stats, filtered by date.
|
71 |
-
- filtered_mentions_df: Mentions filtered by date.
|
72 |
-
- date_filtered_follower_stats_df: Follower stats filtered by date (for time-series plots).
|
73 |
-
- raw_follower_stats_df: Unfiltered follower stats (for demographic plots).
|
74 |
-
- start_dt_filter: Determined start date for filtering.
|
75 |
-
- end_dt_filter: Determined end date for filtering.
|
76 |
"""
|
77 |
logging.info(f"Preparing filtered analytics data. Filter: {date_filter_option}, Custom Start: {custom_start_date}, Custom End: {custom_end_date}")
|
78 |
-
|
79 |
posts_df = token_state_value.get("bubble_posts_df", pd.DataFrame()).copy()
|
80 |
mentions_df = token_state_value.get("bubble_mentions_df", pd.DataFrame()).copy()
|
81 |
follower_stats_df = token_state_value.get("bubble_follower_stats_df", pd.DataFrame()).copy()
|
82 |
post_stats_df = token_state_value.get("bubble_post_stats_df", pd.DataFrame()).copy()
|
83 |
-
|
84 |
date_column_posts = token_state_value.get("config_date_col_posts", "published_at")
|
85 |
date_column_mentions = token_state_value.get("config_date_col_mentions", "date")
|
86 |
-
date_column_followers = token_state_value.get("config_date_col_followers", "date")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
87 |
|
88 |
# Determine date range for filtering
|
89 |
current_datetime_obj = datetime.now()
|
90 |
-
current_time_normalized = current_datetime_obj.replace(hour=0, minute=0, second=0, microsecond=0)
|
91 |
|
92 |
-
end_dt_filter = current_time_normalized
|
93 |
start_dt_filter = None
|
94 |
|
95 |
-
# --- FIX STARTS HERE ---
|
96 |
-
# The filter option strings from the UI must exactly match the strings being checked here.
|
97 |
-
# The original code checked for "Last 7 Days" but the UI sent "Ultimi 7 Giorni".
|
98 |
if date_filter_option == "Ultimi 7 Giorni":
|
99 |
-
start_dt_filter = current_time_normalized - timedelta(days=6)
|
100 |
elif date_filter_option == "Ultimi 30 Giorni":
|
101 |
-
start_dt_filter = current_time_normalized - timedelta(days=29)
|
102 |
elif date_filter_option == "Intervallo Personalizzato":
|
103 |
start_dt_filter_temp = pd.to_datetime(custom_start_date, errors='coerce')
|
104 |
-
start_dt_filter = start_dt_filter_temp.replace(hour=0, minute=0, second=0, microsecond=0) if pd.notna(start_dt_filter_temp) else None
|
105 |
|
106 |
end_dt_filter_temp = pd.to_datetime(custom_end_date, errors='coerce')
|
107 |
-
# If end date is specified, use it. Otherwise, default to today.
|
108 |
end_dt_filter = end_dt_filter_temp.replace(hour=0, minute=0, second=0, microsecond=0) if pd.notna(end_dt_filter_temp) else current_time_normalized
|
109 |
-
|
110 |
-
|
111 |
logging.info(f"Date range for filtering: Start: {start_dt_filter}, End: {end_dt_filter}")
|
112 |
-
|
113 |
# Merge posts_df and post_stats_df
|
114 |
merged_posts_df = pd.DataFrame()
|
115 |
if not posts_df.empty and not post_stats_df.empty:
|
116 |
if 'id' in posts_df.columns and 'post_id' in post_stats_df.columns:
|
117 |
merged_posts_df = pd.merge(posts_df, post_stats_df, left_on='id', right_on='post_id', how='left')
|
118 |
-
logging.info(f"Merged posts_df ({len(posts_df)} rows) and post_stats_df ({len(post_stats_df)} rows) into merged_posts_df ({len(merged_posts_df)} rows).")
|
119 |
else:
|
120 |
logging.warning("Cannot merge posts_df and post_stats_df due to missing 'id' or 'post_id' columns.")
|
121 |
merged_posts_df = posts_df
|
122 |
elif not posts_df.empty:
|
123 |
-
logging.warning("post_stats_df is empty. Proceeding with posts_df only.")
|
124 |
merged_posts_df = posts_df
|
125 |
expected_stat_cols = ['engagement', 'impressionCount', 'clickCount', 'likeCount', 'commentCount', 'shareCount']
|
126 |
for col in expected_stat_cols:
|
127 |
if col not in merged_posts_df.columns:
|
128 |
merged_posts_df[col] = pd.NA
|
129 |
-
|
130 |
-
# Filter DataFrames by date
|
131 |
-
filtered_merged_posts_data =
|
132 |
-
|
133 |
-
filtered_merged_posts_data = filter_dataframe_by_date(merged_posts_df, date_column_posts, start_dt_filter, end_dt_filter)
|
134 |
-
elif not merged_posts_df.empty:
|
135 |
-
logging.warning(f"Date column '{date_column_posts}' not found in merged_posts_df. Returning unfiltered merged posts data.")
|
136 |
-
filtered_merged_posts_data = merged_posts_df
|
137 |
|
138 |
-
filtered_mentions_data = pd.DataFrame()
|
139 |
-
if not mentions_df.empty and date_column_mentions in mentions_df.columns:
|
140 |
-
filtered_mentions_data = filter_dataframe_by_date(mentions_df, date_column_mentions, start_dt_filter, end_dt_filter)
|
141 |
-
elif not mentions_df.empty:
|
142 |
-
logging.warning(f"Date column '{date_column_mentions}' not found in mentions_df. Returning unfiltered mentions data.")
|
143 |
-
filtered_mentions_data = mentions_df
|
144 |
-
|
145 |
date_filtered_follower_stats_df = pd.DataFrame()
|
146 |
-
raw_follower_stats_df = follower_stats_df.copy()
|
147 |
-
|
148 |
-
if not follower_stats_df.empty and date_column_followers in follower_stats_df.columns:
|
149 |
date_filtered_follower_stats_df = filter_dataframe_by_date(follower_stats_df, date_column_followers, start_dt_filter, end_dt_filter)
|
150 |
-
elif not follower_stats_df.empty:
|
151 |
-
logging.warning(f"Date column '{date_column_followers}' not found in follower_stats_df. Time-series follower plots might be empty or use unfiltered data.")
|
152 |
-
date_filtered_follower_stats_df = follower_stats_df
|
153 |
|
154 |
logging.info(f"Processed - Filtered Merged Posts: {len(filtered_merged_posts_data)} rows, Filtered Mentions: {len(filtered_mentions_data)} rows, Date-Filtered Follower Stats: {len(date_filtered_follower_stats_df)} rows.")
|
155 |
|
|
|
7 |
# Configure logging for this module
|
8 |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(module)s - %(message)s')
|
9 |
|
10 |
+
|
11 |
+
# --- CORRECTED FUNCTION START (V2) ---
|
12 |
def filter_dataframe_by_date(df, date_column, start_date, end_date):
|
13 |
+
"""
|
14 |
+
Filters a DataFrame by a date column within a given date range.
|
15 |
+
|
16 |
+
This robust version correctly handles both daily ('YYYY-MM-DD') and monthly ('YYYY-MM')
|
17 |
+
date formats by using a two-pass detection system.
|
18 |
+
"""
|
19 |
if df is None or df.empty or not date_column:
|
20 |
+
logging.warning(f"Filter by date: DataFrame is None, empty, or no date_column provided.")
|
21 |
return pd.DataFrame()
|
22 |
+
|
23 |
if date_column not in df.columns:
|
24 |
logging.warning(f"Filter by date: Date column '{date_column}' not found in DataFrame columns: {df.columns.tolist()}.")
|
25 |
return pd.DataFrame()
|
26 |
+
|
27 |
+
df_copy = df.copy()
|
28 |
+
|
29 |
+
# --- NEW TWO-PASS DETECTION LOGIC ---
|
30 |
+
use_month_logic = False
|
31 |
|
32 |
+
# Pass 1: Check if all non-null values are 'YYYY-MM' strings. This is fast and specific.
|
33 |
+
valid_dates_str = df_copy[date_column].dropna()
|
34 |
+
if pd.api.types.is_string_dtype(valid_dates_str.dtype) and not valid_dates_str.empty:
|
35 |
+
# This regex ensures the entire string is just 'YYYY-MM'
|
36 |
+
if valid_dates_str.str.match(r'^\d{4}-\d{2}$').all():
|
37 |
+
use_month_logic = True
|
38 |
+
logging.info(f"Filter by date (Pass 1): Detected 'YYYY-MM' string format for column '{date_column}'.")
|
39 |
+
|
40 |
+
# Standardize column to datetime objects for filtering and for the second pass
|
41 |
try:
|
|
|
42 |
if not pd.api.types.is_datetime64_any_dtype(df_copy[date_column]):
|
43 |
df_copy[date_column] = pd.to_datetime(df_copy[date_column], errors='coerce')
|
44 |
+
|
|
|
45 |
df_copy.dropna(subset=[date_column], inplace=True)
|
46 |
+
|
47 |
if df_copy.empty:
|
48 |
logging.info(f"Filter by date: DataFrame empty after to_datetime and dropna for column '{date_column}'.")
|
49 |
return pd.DataFrame()
|
50 |
|
|
|
51 |
df_copy[date_column] = df_copy[date_column].dt.normalize()
|
52 |
|
|
|
|
|
53 |
if hasattr(df_copy[date_column].dt, 'tz') and df_copy[date_column].dt.tz is not None:
|
|
|
54 |
df_copy[date_column] = df_copy[date_column].dt.tz_convert('UTC').dt.tz_localize(None)
|
|
|
55 |
except Exception as e:
|
56 |
logging.error(f"Error processing date column '{date_column}': {e}", exc_info=True)
|
57 |
+
return pd.DataFrame()
|
58 |
|
59 |
+
# Pass 2: If not detected by string format, check if all dates are the 1st of the month.
|
60 |
+
if not use_month_logic and not df_copy.empty:
|
61 |
+
if (df_copy[date_column].dt.day == 1).all():
|
62 |
+
use_month_logic = True
|
63 |
+
logging.info(f"Filter by date (Pass 2): All dates in '{date_column}' are 1st of the month. Applying month-range filtering.")
|
64 |
+
# --- END OF NEW LOGIC ---
|
65 |
+
|
66 |
+
# Convert filter start/end dates to normalized, naive Timestamps
|
67 |
start_dt_obj = pd.to_datetime(start_date, errors='coerce').normalize() if start_date else None
|
68 |
end_dt_obj = pd.to_datetime(end_date, errors='coerce').normalize() if end_date else None
|
69 |
|
70 |
+
if not start_dt_obj and not end_dt_obj:
|
71 |
+
return df_copy
|
72 |
+
|
73 |
+
# Perform the filtering based on the detected format
|
74 |
+
if use_month_logic:
|
75 |
+
logging.info(f"Applying month-overlap filtering for column '{date_column}'.")
|
76 |
+
# For monthly data, include a row if its month overlaps with the filter range.
|
77 |
+
df_copy['end_of_month'] = df_copy[date_column] + pd.offsets.MonthEnd(1)
|
78 |
+
filter_start = start_dt_obj if start_dt_obj else pd.Timestamp.min
|
79 |
+
filter_end = end_dt_obj if end_dt_obj else pd.Timestamp.max
|
80 |
+
|
81 |
+
mask = (df_copy[date_column] <= filter_end) & (df_copy['end_of_month'] >= filter_start)
|
82 |
+
df_filtered_final = df_copy[mask].drop(columns=['end_of_month'])
|
83 |
else:
|
84 |
+
logging.info(f"Applying standard daily filtering for column '{date_column}'.")
|
85 |
+
# Standard filtering for daily ('YYYY-MM-DD') data
|
86 |
+
df_filtered_final = df_copy
|
87 |
+
if start_dt_obj:
|
88 |
+
df_filtered_final = df_filtered_final[df_filtered_final[date_column] >= start_dt_obj]
|
89 |
+
if end_dt_obj:
|
90 |
+
df_filtered_final = df_filtered_final[df_filtered_final[date_column] <= end_dt_obj]
|
91 |
+
|
92 |
if df_filtered_final.empty:
|
93 |
logging.info(f"Filter by date: DataFrame became empty after applying date range to column '{date_column}'.")
|
94 |
+
|
95 |
return df_filtered_final
|
96 |
+
# --- CORRECTED FUNCTION END (V2) ---
|
97 |
+
|
98 |
|
99 |
def prepare_filtered_analytics_data(token_state_value, date_filter_option, custom_start_date, custom_end_date):
|
100 |
"""
|
101 |
Retrieves data from token_state, determines date range, filters posts, mentions, and follower time-series data.
|
102 |
Merges posts with post stats.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
103 |
"""
|
104 |
logging.info(f"Preparing filtered analytics data. Filter: {date_filter_option}, Custom Start: {custom_start_date}, Custom End: {custom_end_date}")
|
|
|
105 |
posts_df = token_state_value.get("bubble_posts_df", pd.DataFrame()).copy()
|
106 |
mentions_df = token_state_value.get("bubble_mentions_df", pd.DataFrame()).copy()
|
107 |
follower_stats_df = token_state_value.get("bubble_follower_stats_df", pd.DataFrame()).copy()
|
108 |
post_stats_df = token_state_value.get("bubble_post_stats_df", pd.DataFrame()).copy()
|
|
|
109 |
date_column_posts = token_state_value.get("config_date_col_posts", "published_at")
|
110 |
date_column_mentions = token_state_value.get("config_date_col_mentions", "date")
|
111 |
+
date_column_followers = token_state_value.get("config_date_col_followers", "date")
|
112 |
+
|
113 |
+
# --- NEW: PRE-PROCESSING STEP FOR FOLLOWER STATS ---
|
114 |
+
# This block handles the case where date information is in the 'category_name' column.
|
115 |
+
if not follower_stats_df.empty and 'category_name' in follower_stats_df.columns:
|
116 |
+
logging.info("Pre-processing follower_stats_df: Checking 'category_name' for dates.")
|
117 |
+
# Create a series of datetime objects from 'category_name'.
|
118 |
+
# 'coerce' will turn any non-date strings into NaT (Not a Time).
|
119 |
+
category_as_dates = pd.to_datetime(follower_stats_df['category_name'], errors='coerce')
|
120 |
+
|
121 |
+
# Create a boolean mask for rows where the conversion was successful.
|
122 |
+
valid_dates_mask = category_as_dates.notna()
|
123 |
+
|
124 |
+
# If any dates were found, update the main 'date' column with them.
|
125 |
+
if valid_dates_mask.any():
|
126 |
+
logging.info(f"Found {valid_dates_mask.sum()} date-like values in 'category_name'. Consolidating them into the '{date_column_followers}' column.")
|
127 |
+
# Use .loc[] to update the 'date' column only for the relevant rows.
|
128 |
+
follower_stats_df.loc[valid_dates_mask, date_column_followers] = category_as_dates[valid_dates_mask]
|
129 |
+
# --- END OF PRE-PROCESSING STEP ---
|
130 |
|
131 |
# Determine date range for filtering
|
132 |
current_datetime_obj = datetime.now()
|
133 |
+
current_time_normalized = current_datetime_obj.replace(hour=0, minute=0, second=0, microsecond=0)
|
134 |
|
135 |
+
end_dt_filter = current_time_normalized
|
136 |
start_dt_filter = None
|
137 |
|
|
|
|
|
|
|
138 |
if date_filter_option == "Ultimi 7 Giorni":
|
139 |
+
start_dt_filter = current_time_normalized - timedelta(days=6)
|
140 |
elif date_filter_option == "Ultimi 30 Giorni":
|
141 |
+
start_dt_filter = current_time_normalized - timedelta(days=29)
|
142 |
elif date_filter_option == "Intervallo Personalizzato":
|
143 |
start_dt_filter_temp = pd.to_datetime(custom_start_date, errors='coerce')
|
144 |
+
start_dt_filter = start_dt_filter_temp.replace(hour=0, minute=0, second=0, microsecond=0) if pd.notna(start_dt_filter_temp) else None
|
145 |
|
146 |
end_dt_filter_temp = pd.to_datetime(custom_end_date, errors='coerce')
|
|
|
147 |
end_dt_filter = end_dt_filter_temp.replace(hour=0, minute=0, second=0, microsecond=0) if pd.notna(end_dt_filter_temp) else current_time_normalized
|
148 |
+
|
|
|
149 |
logging.info(f"Date range for filtering: Start: {start_dt_filter}, End: {end_dt_filter}")
|
150 |
+
|
151 |
# Merge posts_df and post_stats_df
|
152 |
merged_posts_df = pd.DataFrame()
|
153 |
if not posts_df.empty and not post_stats_df.empty:
|
154 |
if 'id' in posts_df.columns and 'post_id' in post_stats_df.columns:
|
155 |
merged_posts_df = pd.merge(posts_df, post_stats_df, left_on='id', right_on='post_id', how='left')
|
|
|
156 |
else:
|
157 |
logging.warning("Cannot merge posts_df and post_stats_df due to missing 'id' or 'post_id' columns.")
|
158 |
merged_posts_df = posts_df
|
159 |
elif not posts_df.empty:
|
|
|
160 |
merged_posts_df = posts_df
|
161 |
expected_stat_cols = ['engagement', 'impressionCount', 'clickCount', 'likeCount', 'commentCount', 'shareCount']
|
162 |
for col in expected_stat_cols:
|
163 |
if col not in merged_posts_df.columns:
|
164 |
merged_posts_df[col] = pd.NA
|
165 |
+
|
166 |
+
# Filter DataFrames by date (now using pre-processed follower_stats_df)
|
167 |
+
filtered_merged_posts_data = filter_dataframe_by_date(merged_posts_df, date_column_posts, start_dt_filter, end_dt_filter)
|
168 |
+
filtered_mentions_data = filter_dataframe_by_date(mentions_df, date_column_mentions, start_dt_filter, end_dt_filter)
|
|
|
|
|
|
|
|
|
169 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
170 |
date_filtered_follower_stats_df = pd.DataFrame()
|
171 |
+
raw_follower_stats_df = follower_stats_df.copy() # Use a copy of the *original* for raw data
|
172 |
+
if not follower_stats_df.empty:
|
|
|
173 |
date_filtered_follower_stats_df = filter_dataframe_by_date(follower_stats_df, date_column_followers, start_dt_filter, end_dt_filter)
|
|
|
|
|
|
|
174 |
|
175 |
logging.info(f"Processed - Filtered Merged Posts: {len(filtered_merged_posts_data)} rows, Filtered Mentions: {len(filtered_mentions_data)} rows, Date-Filtered Follower Stats: {len(date_filtered_follower_stats_df)} rows.")
|
176 |
|