Spaces:
Running
Running
File size: 20,398 Bytes
7f592b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
# sync_logic.py
"""
Handles the logic for syncing LinkedIn data: posts, mentions, and follower statistics.
Fetches data from LinkedIn APIs and uploads to Bubble.
"""
import pandas as pd
import logging
import html
# Assuming Bubble_API_Calls contains bulk_upload_to_bubble
from Bubble_API_Calls import bulk_upload_to_bubble
# Assuming Linkedin_Data_API_Calls contains all necessary LinkedIn data fetching and processing functions
from Linkedin_Data_API_Calls import (
fetch_linkedin_posts_core,
fetch_comments,
analyze_sentiment, # For post comments
compile_detailed_posts,
prepare_data_for_bubble, # For posts, stats, comments
fetch_linkedin_mentions_core,
analyze_mentions_sentiment, # For individual mentions
compile_detailed_mentions, # Compiles to user-specified format
prepare_mentions_for_bubble # Prepares user-specified format for Bubble
)
# Assuming linkedin_follower_stats.py contains get_linkedin_follower_stats
from linkedin_follower_stats import get_linkedin_follower_stats
# Assuming config.py contains all necessary constants
from config import (
LINKEDIN_POST_URN_KEY, BUBBLE_POST_URN_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME,
BUBBLE_POST_STATS_TABLE_NAME, BUBBLE_POST_COMMENTS_TABLE_NAME,
BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_ID_COLUMN_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME,
DEFAULT_MENTIONS_INITIAL_FETCH_COUNT, DEFAULT_MENTIONS_UPDATE_FETCH_COUNT,
BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN,
FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN,
LINKEDIN_CLIENT_ID_ENV_VAR
)
def _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api):
"""Internal logic for syncing LinkedIn posts."""
logging.info(f"Posts sync: Starting fetch for {fetch_count_for_posts_api} posts.")
client_id = token_state.get("client_id")
token_dict = token_state.get("token")
org_urn = token_state.get('org_urn')
bubble_posts_df_orig = token_state.get("bubble_posts_df", pd.DataFrame()).copy()
posts_sync_message = ""
try:
processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_for_posts_api)
if not processed_raw_posts:
posts_sync_message = "Posts: None found via API. "
logging.info("Posts sync: No raw posts returned from API.")
return posts_sync_message, token_state
existing_post_urns = set()
if not bubble_posts_df_orig.empty and BUBBLE_POST_URN_COLUMN_NAME in bubble_posts_df_orig.columns:
existing_post_urns = set(bubble_posts_df_orig[BUBBLE_POST_URN_COLUMN_NAME].dropna().astype(str))
new_raw_posts = [p for p in processed_raw_posts if str(p.get(LINKEDIN_POST_URN_KEY)) not in existing_post_urns]
if not new_raw_posts:
posts_sync_message = "Posts: All fetched already in Bubble. "
logging.info("Posts sync: All fetched posts were already found in Bubble.")
return posts_sync_message, token_state
logging.info(f"Posts sync: Processing {len(new_raw_posts)} new raw posts.")
post_urns_to_process = [p[LINKEDIN_POST_URN_KEY] for p in new_raw_posts if p.get(LINKEDIN_POST_URN_KEY)]
all_comments_data = fetch_comments(client_id, token_dict, post_urns_to_process, stats_map)
sentiments_per_post = analyze_sentiment(all_comments_data) # Assumes analysis of comments for posts
detailed_new_posts = compile_detailed_posts(new_raw_posts, stats_map, sentiments_per_post)
li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_new_posts, all_comments_data)
if li_posts:
bulk_upload_to_bubble(li_posts, BUBBLE_POSTS_TABLE_NAME)
updated_posts_df = pd.concat([bubble_posts_df_orig, pd.DataFrame(li_posts)], ignore_index=True)
token_state["bubble_posts_df"] = updated_posts_df.drop_duplicates(subset=[BUBBLE_POST_URN_COLUMN_NAME], keep='last')
logging.info(f"Posts sync: Uploaded {len(li_posts)} new posts to Bubble.")
if li_post_stats:
bulk_upload_to_bubble(li_post_stats, BUBBLE_POST_STATS_TABLE_NAME)
logging.info(f"Posts sync: Uploaded {len(li_post_stats)} post_stats entries.")
if li_post_comments:
bulk_upload_to_bubble(li_post_comments, BUBBLE_POST_COMMENTS_TABLE_NAME)
logging.info(f"Posts sync: Uploaded {len(li_post_comments)} post_comments entries.")
posts_sync_message = f"Posts: Synced {len(li_posts)} new. "
else:
posts_sync_message = "Posts: No new ones to upload after processing. "
logging.info("Posts sync: No new posts were prepared for Bubble upload.")
except ValueError as ve:
posts_sync_message = f"Posts Error: {html.escape(str(ve))}. "
logging.error(f"Posts sync: ValueError: {ve}", exc_info=True)
except Exception as e:
logging.exception("Posts sync: Unexpected error during processing.")
posts_sync_message = f"Posts: Unexpected error ({type(e).__name__}). "
return posts_sync_message, token_state
def sync_linkedin_mentions(token_state):
"""Fetches new LinkedIn mentions and uploads them to Bubble."""
logging.info("Starting LinkedIn mentions sync process.")
if not token_state or not token_state.get("token"):
logging.error("Mentions sync: Access denied. No LinkedIn token.")
return "Mentions: No token. ", token_state
client_id = token_state.get("client_id")
token_dict = token_state.get("token")
org_urn = token_state.get('org_urn')
# Work with a copy, original df in token_state will be updated at the end
bubble_mentions_df_orig = token_state.get("bubble_mentions_df", pd.DataFrame()).copy()
if not org_urn or not client_id or client_id == "ENV VAR MISSING":
logging.error("Mentions sync: Configuration error (Org URN or Client ID missing).")
return "Mentions: Config error. ", token_state
fetch_count_for_mentions_api = 0
mentions_sync_is_needed_now = False
if bubble_mentions_df_orig.empty:
mentions_sync_is_needed_now = True
fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT
logging.info("Mentions sync needed: Bubble DF empty. Fetching initial count.")
else:
if BUBBLE_MENTIONS_DATE_COLUMN_NAME not in bubble_mentions_df_orig.columns or \
bubble_mentions_df_orig[BUBBLE_MENTIONS_DATE_COLUMN_NAME].isnull().all():
mentions_sync_is_needed_now = True
fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT
logging.info(f"Mentions sync needed: Date column '{BUBBLE_MENTIONS_DATE_COLUMN_NAME}' missing or all null. Fetching initial count.")
else:
# Use a copy for date checks to avoid SettingWithCopyWarning if any modification were made
mentions_df_check = bubble_mentions_df_orig.copy()
mentions_df_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME] = pd.to_datetime(mentions_df_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME], errors='coerce', utc=True)
last_mention_date_utc = mentions_df_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME].dropna().max()
if pd.isna(last_mention_date_utc) or \
(pd.Timestamp('now', tz='UTC').normalize() - last_mention_date_utc.normalize()).days >= 7:
mentions_sync_is_needed_now = True
fetch_count_for_mentions_api = DEFAULT_MENTIONS_UPDATE_FETCH_COUNT
logging.info(f"Mentions sync needed: Last mention date {last_mention_date_utc} is old or invalid. Fetching update count.")
if not mentions_sync_is_needed_now:
logging.info("Mentions data is fresh based on current check. No API fetch needed for mentions.")
return "Mentions: Up-to-date. ", token_state
logging.info(f"Mentions sync proceeding. Fetch count: {fetch_count_for_mentions_api}")
try:
processed_raw_mentions = fetch_linkedin_mentions_core(client_id, token_dict, org_urn, count=fetch_count_for_mentions_api)
if not processed_raw_mentions:
logging.info("Mentions sync: No new mentions found via API.")
return "Mentions: None found via API. ", token_state
existing_mention_ids = set()
if not bubble_mentions_df_orig.empty and BUBBLE_MENTIONS_ID_COLUMN_NAME in bubble_mentions_df_orig.columns:
existing_mention_ids = set(bubble_mentions_df_orig[BUBBLE_MENTIONS_ID_COLUMN_NAME].dropna().astype(str))
sentiments_map = analyze_mentions_sentiment(processed_raw_mentions)
all_compiled_mentions = compile_detailed_mentions(processed_raw_mentions, sentiments_map)
new_compiled_mentions_to_upload = [
m for m in all_compiled_mentions if str(m.get("id")) not in existing_mention_ids
]
if not new_compiled_mentions_to_upload:
logging.info("Mentions sync: All fetched mentions are already in Bubble.")
return "Mentions: All fetched already in Bubble. ", token_state
bubble_ready_mentions = prepare_mentions_for_bubble(new_compiled_mentions_to_upload)
if bubble_ready_mentions:
bulk_upload_to_bubble(bubble_ready_mentions, BUBBLE_MENTIONS_TABLE_NAME)
logging.info(f"Successfully uploaded {len(bubble_ready_mentions)} new mentions to Bubble.")
updated_mentions_df = pd.concat([bubble_mentions_df_orig, pd.DataFrame(bubble_ready_mentions)], ignore_index=True)
token_state["bubble_mentions_df"] = updated_mentions_df.drop_duplicates(subset=[BUBBLE_MENTIONS_ID_COLUMN_NAME], keep='last')
return f"Mentions: Synced {len(bubble_ready_mentions)} new. ", token_state
else:
logging.info("Mentions sync: No new mentions were prepared for Bubble upload.")
return "Mentions: No new ones to upload. ", token_state
except ValueError as ve:
logging.error(f"ValueError during mentions sync: {ve}", exc_info=True)
return f"Mentions Error: {html.escape(str(ve))}. ", token_state
except Exception as e:
logging.exception("Unexpected error in sync_linkedin_mentions.")
return f"Mentions: Unexpected error ({type(e).__name__}). ", token_state
def sync_linkedin_follower_stats(token_state):
"""Fetches new LinkedIn follower statistics and uploads them to Bubble."""
logging.info("Starting LinkedIn follower stats sync process.")
if not token_state or not token_state.get("token"):
logging.error("Follower Stats sync: Access denied. No LinkedIn token.")
return "Follower Stats: No token. ", token_state
client_id = token_state.get("client_id")
token_dict = token_state.get("token")
org_urn = token_state.get('org_urn')
bubble_follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy()
if not org_urn or not client_id or client_id == "ENV VAR MISSING":
logging.error("Follower Stats sync: Configuration error (Org URN or Client ID missing).")
return "Follower Stats: Config error. ", token_state
follower_stats_sync_is_needed_now = False
if bubble_follower_stats_df_orig.empty:
follower_stats_sync_is_needed_now = True
logging.info("Follower stats sync needed: Bubble DF is empty.")
else:
monthly_gains_df_check = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].copy()
if monthly_gains_df_check.empty or FOLLOWER_STATS_CATEGORY_COLUMN not in monthly_gains_df_check.columns:
follower_stats_sync_is_needed_now = True
logging.info("Follower stats sync needed: Monthly gains data missing or date column absent.")
else:
monthly_gains_df_check.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_gains_df_check[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.normalize()
last_gain_date = monthly_gains_df_check[FOLLOWER_STATS_CATEGORY_COLUMN].dropna().max()
if pd.isna(last_gain_date):
follower_stats_sync_is_needed_now = True
logging.info("Follower stats sync needed: No valid dates in monthly gains after conversion for check.")
else:
if last_gain_date.tzinfo is None or last_gain_date.tzinfo.utcoffset(last_gain_date) is None:
last_gain_date = last_gain_date.tz_localize('UTC')
else:
last_gain_date = last_gain_date.tz_convert('UTC')
start_of_current_month = pd.Timestamp('now', tz='UTC').normalize().replace(day=1)
if last_gain_date < start_of_current_month:
follower_stats_sync_is_needed_now = True
logging.info(f"Follower stats sync needed: Last gain date {last_gain_date} is old or invalid.")
if bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].empty:
follower_stats_sync_is_needed_now = True
logging.info("Follower stats sync needed: Demographic data (non-monthly) is missing.")
if not follower_stats_sync_is_needed_now:
logging.info("Follower stats data is fresh based on current check. No API fetch needed.")
return "Follower Stats: Data up-to-date. ", token_state
logging.info(f"Follower stats sync proceeding for org_urn: {org_urn}")
try:
api_follower_stats = get_linkedin_follower_stats(client_id, token_dict, org_urn)
if not api_follower_stats:
logging.info(f"Follower Stats sync: No stats found via API for org {org_urn}.")
return "Follower Stats: None found via API. ", token_state
new_stats_to_upload = []
api_monthly_gains = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) == 'follower_gains_monthly']
existing_monthly_gain_dates = set()
if not bubble_follower_stats_df_orig.empty:
bubble_monthly_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly']
if FOLLOWER_STATS_CATEGORY_COLUMN in bubble_monthly_df.columns:
existing_monthly_gain_dates = set(bubble_monthly_df[FOLLOWER_STATS_CATEGORY_COLUMN].astype(str).unique())
for gain_stat in api_monthly_gains:
if str(gain_stat.get(FOLLOWER_STATS_CATEGORY_COLUMN)) not in existing_monthly_gain_dates:
new_stats_to_upload.append(gain_stat)
api_demographics = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) != 'follower_gains_monthly']
existing_demographics_map = {}
if not bubble_follower_stats_df_orig.empty:
bubble_demographics_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly']
if not bubble_demographics_df.empty and \
all(col in bubble_demographics_df.columns for col in [
FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN,
FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN,
FOLLOWER_STATS_PAID_COLUMN
]):
for _, row in bubble_demographics_df.iterrows():
key = (
str(row[FOLLOWER_STATS_ORG_URN_COLUMN]),
str(row[FOLLOWER_STATS_TYPE_COLUMN]),
str(row[FOLLOWER_STATS_CATEGORY_COLUMN])
)
existing_demographics_map[key] = (
row[FOLLOWER_STATS_ORGANIC_COLUMN],
row[FOLLOWER_STATS_PAID_COLUMN]
)
for demo_stat in api_demographics:
key = (
str(demo_stat.get(FOLLOWER_STATS_ORG_URN_COLUMN)),
str(demo_stat.get(FOLLOWER_STATS_TYPE_COLUMN)),
str(demo_stat.get(FOLLOWER_STATS_CATEGORY_COLUMN))
)
api_counts = (
demo_stat.get(FOLLOWER_STATS_ORGANIC_COLUMN, 0),
demo_stat.get(FOLLOWER_STATS_PAID_COLUMN, 0)
)
if key not in existing_demographics_map or existing_demographics_map[key] != api_counts:
new_stats_to_upload.append(demo_stat)
if not new_stats_to_upload:
logging.info(f"Follower Stats sync: Data for org {org_urn} is up-to-date or no changes found.")
return "Follower Stats: Data up-to-date or no changes. ", token_state
bulk_upload_to_bubble(new_stats_to_upload, BUBBLE_FOLLOWER_STATS_TABLE_NAME)
logging.info(f"Successfully uploaded {len(new_stats_to_upload)} follower stat entries to Bubble for org {org_urn}.")
temp_df = pd.concat([bubble_follower_stats_df_orig, pd.DataFrame(new_stats_to_upload)], ignore_index=True)
monthly_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].drop_duplicates(
subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN],
keep='last'
)
demographics_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].drop_duplicates(
subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN],
keep='last'
)
token_state["bubble_follower_stats_df"] = pd.concat([monthly_part, demographics_part], ignore_index=True)
return f"Follower Stats: Synced {len(new_stats_to_upload)} entries. ", token_state
except ValueError as ve:
logging.error(f"ValueError during follower stats sync for {org_urn}: {ve}", exc_info=True)
return f"Follower Stats Error: {html.escape(str(ve))}. ", token_state
except Exception as e:
logging.exception(f"Unexpected error in sync_linkedin_follower_stats for {org_urn}.")
return f"Follower Stats: Unexpected error ({type(e).__name__}). ", token_state
def sync_all_linkedin_data_orchestrator(token_state):
"""Orchestrates the syncing of all LinkedIn data types (Posts, Mentions, Follower Stats)."""
logging.info("Starting sync_all_linkedin_data_orchestrator process.")
if not token_state or not token_state.get("token"):
logging.error("Sync All: Access denied. LinkedIn token not available.")
return "<p style='color:red; text-align:center;'>β Access denied. LinkedIn token not available.</p>", token_state
org_urn = token_state.get('org_urn')
client_id = token_state.get("client_id") # Client ID should be in token_state from process_and_store_bubble_token
posts_sync_message = ""
mentions_sync_message = ""
follower_stats_sync_message = ""
if not org_urn:
logging.error("Sync All: Org URN missing in token_state.")
return "<p style='color:red;'>β Config error: Org URN missing.</p>", token_state
if not client_id or client_id == "ENV VAR MISSING": # Check client_id from token_state
logging.error("Sync All: Client ID missing or not set in token_state.")
return "<p style='color:red;'>β Config error: Client ID missing.</p>", token_state
# --- Sync Posts ---
fetch_count_for_posts_api = token_state.get('fetch_count_for_api', 0)
if fetch_count_for_posts_api == 0:
posts_sync_message = "Posts: Already up-to-date. "
logging.info("Posts sync: Skipped as fetch_count_for_posts_api is 0.")
else:
posts_sync_message, token_state = _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api)
# --- Sync Mentions ---
mentions_sync_message, token_state = sync_linkedin_mentions(token_state)
# --- Sync Follower Stats ---
follower_stats_sync_message, token_state = sync_linkedin_follower_stats(token_state)
logging.info(f"Sync process complete. Messages: Posts: [{posts_sync_message.strip()}], Mentions: [{mentions_sync_message.strip()}], Follower Stats: [{follower_stats_sync_message.strip()}]")
final_message = f"<p style='color:green; text-align:center;'>β
Sync Attempted. {posts_sync_message} {mentions_sync_message} {follower_stats_sync_message}</p>"
return final_message, token_state
|