Spaces:
Restarting
Restarting
# sync_logic.py | |
""" | |
Handles the logic for syncing LinkedIn data: posts, mentions, and follower statistics. | |
Fetches data from LinkedIn APIs and uploads to Bubble. | |
""" | |
import pandas as pd | |
import logging | |
import html | |
# Assuming Bubble_API_Calls contains bulk_upload_to_bubble | |
from Bubble_API_Calls import bulk_upload_to_bubble | |
# Assuming Linkedin_Data_API_Calls contains all necessary LinkedIn data fetching and processing functions | |
from Linkedin_Data_API_Calls import ( | |
fetch_linkedin_posts_core, | |
fetch_comments, | |
analyze_sentiment, # For post comments | |
compile_detailed_posts, | |
prepare_data_for_bubble, # For posts, stats, comments | |
fetch_linkedin_mentions_core, | |
analyze_mentions_sentiment, # For individual mentions | |
compile_detailed_mentions, # Compiles to user-specified format | |
prepare_mentions_for_bubble # Prepares user-specified format for Bubble | |
) | |
# Assuming linkedin_follower_stats.py contains get_linkedin_follower_stats | |
from linkedin_follower_stats import get_linkedin_follower_stats | |
# Assuming config.py contains all necessary constants | |
from config import ( | |
LINKEDIN_POST_URN_KEY, BUBBLE_POST_URN_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME, | |
BUBBLE_POST_STATS_TABLE_NAME, BUBBLE_POST_COMMENTS_TABLE_NAME, | |
BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_ID_COLUMN_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME, | |
DEFAULT_MENTIONS_INITIAL_FETCH_COUNT, DEFAULT_MENTIONS_UPDATE_FETCH_COUNT, | |
BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN, | |
FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, FOLLOWER_STATS_PAID_COLUMN, | |
LINKEDIN_CLIENT_ID_ENV_VAR | |
) | |
def _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api): | |
"""Internal logic for syncing LinkedIn posts.""" | |
logging.info(f"Posts sync: Starting fetch for {fetch_count_for_posts_api} posts.") | |
client_id = token_state.get("client_id") | |
token_dict = token_state.get("token") | |
org_urn = token_state.get('org_urn') | |
bubble_posts_df_orig = token_state.get("bubble_posts_df", pd.DataFrame()).copy() | |
posts_sync_message = "" | |
try: | |
processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_for_posts_api) | |
if not processed_raw_posts: | |
posts_sync_message = "Posts: None found via API. " | |
logging.info("Posts sync: No raw posts returned from API.") | |
return posts_sync_message, token_state | |
existing_post_urns = set() | |
if not bubble_posts_df_orig.empty and BUBBLE_POST_URN_COLUMN_NAME in bubble_posts_df_orig.columns: | |
existing_post_urns = set(bubble_posts_df_orig[BUBBLE_POST_URN_COLUMN_NAME].dropna().astype(str)) | |
new_raw_posts = [p for p in processed_raw_posts if str(p.get(LINKEDIN_POST_URN_KEY)) not in existing_post_urns] | |
if not new_raw_posts: | |
posts_sync_message = "Posts: All fetched already in Bubble. " | |
logging.info("Posts sync: All fetched posts were already found in Bubble.") | |
return posts_sync_message, token_state | |
logging.info(f"Posts sync: Processing {len(new_raw_posts)} new raw posts.") | |
post_urns_to_process = [p[LINKEDIN_POST_URN_KEY] for p in new_raw_posts if p.get(LINKEDIN_POST_URN_KEY)] | |
all_comments_data = fetch_comments(client_id, token_dict, post_urns_to_process, stats_map) | |
sentiments_per_post = analyze_sentiment(all_comments_data) # Assumes analysis of comments for posts | |
detailed_new_posts = compile_detailed_posts(new_raw_posts, stats_map, sentiments_per_post) | |
li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_new_posts, all_comments_data) | |
if li_posts: | |
bulk_upload_to_bubble(li_posts, BUBBLE_POSTS_TABLE_NAME) | |
updated_posts_df = pd.concat([bubble_posts_df_orig, pd.DataFrame(li_posts)], ignore_index=True) | |
token_state["bubble_posts_df"] = updated_posts_df.drop_duplicates(subset=[BUBBLE_POST_URN_COLUMN_NAME], keep='last') | |
logging.info(f"Posts sync: Uploaded {len(li_posts)} new posts to Bubble.") | |
if li_post_stats: | |
bulk_upload_to_bubble(li_post_stats, BUBBLE_POST_STATS_TABLE_NAME) | |
logging.info(f"Posts sync: Uploaded {len(li_post_stats)} post_stats entries.") | |
if li_post_comments: | |
bulk_upload_to_bubble(li_post_comments, BUBBLE_POST_COMMENTS_TABLE_NAME) | |
logging.info(f"Posts sync: Uploaded {len(li_post_comments)} post_comments entries.") | |
posts_sync_message = f"Posts: Synced {len(li_posts)} new. " | |
else: | |
posts_sync_message = "Posts: No new ones to upload after processing. " | |
logging.info("Posts sync: No new posts were prepared for Bubble upload.") | |
except ValueError as ve: | |
posts_sync_message = f"Posts Error: {html.escape(str(ve))}. " | |
logging.error(f"Posts sync: ValueError: {ve}", exc_info=True) | |
except Exception as e: | |
logging.exception("Posts sync: Unexpected error during processing.") | |
posts_sync_message = f"Posts: Unexpected error ({type(e).__name__}). " | |
return posts_sync_message, token_state | |
def sync_linkedin_mentions(token_state): | |
"""Fetches new LinkedIn mentions and uploads them to Bubble.""" | |
logging.info("Starting LinkedIn mentions sync process.") | |
if not token_state or not token_state.get("token"): | |
logging.error("Mentions sync: Access denied. No LinkedIn token.") | |
return "Mentions: No token. ", token_state | |
client_id = token_state.get("client_id") | |
token_dict = token_state.get("token") | |
org_urn = token_state.get('org_urn') | |
# Work with a copy, original df in token_state will be updated at the end | |
bubble_mentions_df_orig = token_state.get("bubble_mentions_df", pd.DataFrame()).copy() | |
if not org_urn or not client_id or client_id == "ENV VAR MISSING": | |
logging.error("Mentions sync: Configuration error (Org URN or Client ID missing).") | |
return "Mentions: Config error. ", token_state | |
fetch_count_for_mentions_api = 0 | |
mentions_sync_is_needed_now = False | |
if bubble_mentions_df_orig.empty: | |
mentions_sync_is_needed_now = True | |
fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT | |
logging.info("Mentions sync needed: Bubble DF empty. Fetching initial count.") | |
else: | |
if BUBBLE_MENTIONS_DATE_COLUMN_NAME not in bubble_mentions_df_orig.columns or \ | |
bubble_mentions_df_orig[BUBBLE_MENTIONS_DATE_COLUMN_NAME].isnull().all(): | |
mentions_sync_is_needed_now = True | |
fetch_count_for_mentions_api = DEFAULT_MENTIONS_INITIAL_FETCH_COUNT | |
logging.info(f"Mentions sync needed: Date column '{BUBBLE_MENTIONS_DATE_COLUMN_NAME}' missing or all null. Fetching initial count.") | |
else: | |
# Use a copy for date checks to avoid SettingWithCopyWarning if any modification were made | |
mentions_df_check = bubble_mentions_df_orig.copy() | |
mentions_df_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME] = pd.to_datetime(mentions_df_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME], errors='coerce', utc=True) | |
last_mention_date_utc = mentions_df_check[BUBBLE_MENTIONS_DATE_COLUMN_NAME].dropna().max() | |
if pd.isna(last_mention_date_utc) or \ | |
(pd.Timestamp('now', tz='UTC').normalize() - last_mention_date_utc.normalize()).days >= 7: | |
mentions_sync_is_needed_now = True | |
fetch_count_for_mentions_api = DEFAULT_MENTIONS_UPDATE_FETCH_COUNT | |
logging.info(f"Mentions sync needed: Last mention date {last_mention_date_utc} is old or invalid. Fetching update count.") | |
if not mentions_sync_is_needed_now: | |
logging.info("Mentions data is fresh based on current check. No API fetch needed for mentions.") | |
return "Mentions: Up-to-date. ", token_state | |
logging.info(f"Mentions sync proceeding. Fetch count: {fetch_count_for_mentions_api}") | |
try: | |
processed_raw_mentions = fetch_linkedin_mentions_core(client_id, token_dict, org_urn, count=fetch_count_for_mentions_api) | |
if not processed_raw_mentions: | |
logging.info("Mentions sync: No new mentions found via API.") | |
return "Mentions: None found via API. ", token_state | |
existing_mention_ids = set() | |
if not bubble_mentions_df_orig.empty and BUBBLE_MENTIONS_ID_COLUMN_NAME in bubble_mentions_df_orig.columns: | |
existing_mention_ids = set(bubble_mentions_df_orig[BUBBLE_MENTIONS_ID_COLUMN_NAME].dropna().astype(str)) | |
sentiments_map = analyze_mentions_sentiment(processed_raw_mentions) | |
all_compiled_mentions = compile_detailed_mentions(processed_raw_mentions, sentiments_map) | |
new_compiled_mentions_to_upload = [ | |
m for m in all_compiled_mentions if str(m.get("id")) not in existing_mention_ids | |
] | |
if not new_compiled_mentions_to_upload: | |
logging.info("Mentions sync: All fetched mentions are already in Bubble.") | |
return "Mentions: All fetched already in Bubble. ", token_state | |
bubble_ready_mentions = prepare_mentions_for_bubble(new_compiled_mentions_to_upload) | |
if bubble_ready_mentions: | |
bulk_upload_to_bubble(bubble_ready_mentions, BUBBLE_MENTIONS_TABLE_NAME) | |
logging.info(f"Successfully uploaded {len(bubble_ready_mentions)} new mentions to Bubble.") | |
updated_mentions_df = pd.concat([bubble_mentions_df_orig, pd.DataFrame(bubble_ready_mentions)], ignore_index=True) | |
token_state["bubble_mentions_df"] = updated_mentions_df.drop_duplicates(subset=[BUBBLE_MENTIONS_ID_COLUMN_NAME], keep='last') | |
return f"Mentions: Synced {len(bubble_ready_mentions)} new. ", token_state | |
else: | |
logging.info("Mentions sync: No new mentions were prepared for Bubble upload.") | |
return "Mentions: No new ones to upload. ", token_state | |
except ValueError as ve: | |
logging.error(f"ValueError during mentions sync: {ve}", exc_info=True) | |
return f"Mentions Error: {html.escape(str(ve))}. ", token_state | |
except Exception as e: | |
logging.exception("Unexpected error in sync_linkedin_mentions.") | |
return f"Mentions: Unexpected error ({type(e).__name__}). ", token_state | |
def sync_linkedin_follower_stats(token_state): | |
"""Fetches new LinkedIn follower statistics and uploads them to Bubble.""" | |
logging.info("Starting LinkedIn follower stats sync process.") | |
if not token_state or not token_state.get("token"): | |
logging.error("Follower Stats sync: Access denied. No LinkedIn token.") | |
return "Follower Stats: No token. ", token_state | |
client_id = token_state.get("client_id") | |
token_dict = token_state.get("token") | |
org_urn = token_state.get('org_urn') | |
bubble_follower_stats_df_orig = token_state.get("bubble_follower_stats_df", pd.DataFrame()).copy() | |
if not org_urn or not client_id or client_id == "ENV VAR MISSING": | |
logging.error("Follower Stats sync: Configuration error (Org URN or Client ID missing).") | |
return "Follower Stats: Config error. ", token_state | |
follower_stats_sync_is_needed_now = False | |
if bubble_follower_stats_df_orig.empty: | |
follower_stats_sync_is_needed_now = True | |
logging.info("Follower stats sync needed: Bubble DF is empty.") | |
else: | |
monthly_gains_df_check = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].copy() | |
if monthly_gains_df_check.empty or FOLLOWER_STATS_CATEGORY_COLUMN not in monthly_gains_df_check.columns: | |
follower_stats_sync_is_needed_now = True | |
logging.info("Follower stats sync needed: Monthly gains data missing or date column absent.") | |
else: | |
monthly_gains_df_check.loc[:, FOLLOWER_STATS_CATEGORY_COLUMN] = pd.to_datetime(monthly_gains_df_check[FOLLOWER_STATS_CATEGORY_COLUMN], errors='coerce').dt.normalize() | |
last_gain_date = monthly_gains_df_check[FOLLOWER_STATS_CATEGORY_COLUMN].dropna().max() | |
if pd.isna(last_gain_date): | |
follower_stats_sync_is_needed_now = True | |
logging.info("Follower stats sync needed: No valid dates in monthly gains after conversion for check.") | |
else: | |
if last_gain_date.tzinfo is None or last_gain_date.tzinfo.utcoffset(last_gain_date) is None: | |
last_gain_date = last_gain_date.tz_localize('UTC') | |
else: | |
last_gain_date = last_gain_date.tz_convert('UTC') | |
start_of_current_month = pd.Timestamp('now', tz='UTC').normalize().replace(day=1) | |
if last_gain_date < start_of_current_month: | |
follower_stats_sync_is_needed_now = True | |
logging.info(f"Follower stats sync needed: Last gain date {last_gain_date} is old or invalid.") | |
if bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].empty: | |
follower_stats_sync_is_needed_now = True | |
logging.info("Follower stats sync needed: Demographic data (non-monthly) is missing.") | |
if not follower_stats_sync_is_needed_now: | |
logging.info("Follower stats data is fresh based on current check. No API fetch needed.") | |
return "Follower Stats: Data up-to-date. ", token_state | |
logging.info(f"Follower stats sync proceeding for org_urn: {org_urn}") | |
try: | |
api_follower_stats = get_linkedin_follower_stats(client_id, token_dict, org_urn) | |
if not api_follower_stats: | |
logging.info(f"Follower Stats sync: No stats found via API for org {org_urn}.") | |
return "Follower Stats: None found via API. ", token_state | |
new_stats_to_upload = [] | |
api_monthly_gains = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) == 'follower_gains_monthly'] | |
existing_monthly_gain_dates = set() | |
if not bubble_follower_stats_df_orig.empty: | |
bubble_monthly_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'] | |
if FOLLOWER_STATS_CATEGORY_COLUMN in bubble_monthly_df.columns: | |
existing_monthly_gain_dates = set(bubble_monthly_df[FOLLOWER_STATS_CATEGORY_COLUMN].astype(str).unique()) | |
for gain_stat in api_monthly_gains: | |
if str(gain_stat.get(FOLLOWER_STATS_CATEGORY_COLUMN)) not in existing_monthly_gain_dates: | |
new_stats_to_upload.append(gain_stat) | |
api_demographics = [s for s in api_follower_stats if s.get(FOLLOWER_STATS_TYPE_COLUMN) != 'follower_gains_monthly'] | |
existing_demographics_map = {} | |
if not bubble_follower_stats_df_orig.empty: | |
bubble_demographics_df = bubble_follower_stats_df_orig[bubble_follower_stats_df_orig[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'] | |
if not bubble_demographics_df.empty and \ | |
all(col in bubble_demographics_df.columns for col in [ | |
FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, | |
FOLLOWER_STATS_CATEGORY_COLUMN, FOLLOWER_STATS_ORGANIC_COLUMN, | |
FOLLOWER_STATS_PAID_COLUMN | |
]): | |
for _, row in bubble_demographics_df.iterrows(): | |
key = ( | |
str(row[FOLLOWER_STATS_ORG_URN_COLUMN]), | |
str(row[FOLLOWER_STATS_TYPE_COLUMN]), | |
str(row[FOLLOWER_STATS_CATEGORY_COLUMN]) | |
) | |
existing_demographics_map[key] = ( | |
row[FOLLOWER_STATS_ORGANIC_COLUMN], | |
row[FOLLOWER_STATS_PAID_COLUMN] | |
) | |
for demo_stat in api_demographics: | |
key = ( | |
str(demo_stat.get(FOLLOWER_STATS_ORG_URN_COLUMN)), | |
str(demo_stat.get(FOLLOWER_STATS_TYPE_COLUMN)), | |
str(demo_stat.get(FOLLOWER_STATS_CATEGORY_COLUMN)) | |
) | |
api_counts = ( | |
demo_stat.get(FOLLOWER_STATS_ORGANIC_COLUMN, 0), | |
demo_stat.get(FOLLOWER_STATS_PAID_COLUMN, 0) | |
) | |
if key not in existing_demographics_map or existing_demographics_map[key] != api_counts: | |
new_stats_to_upload.append(demo_stat) | |
if not new_stats_to_upload: | |
logging.info(f"Follower Stats sync: Data for org {org_urn} is up-to-date or no changes found.") | |
return "Follower Stats: Data up-to-date or no changes. ", token_state | |
bulk_upload_to_bubble(new_stats_to_upload, BUBBLE_FOLLOWER_STATS_TABLE_NAME) | |
logging.info(f"Successfully uploaded {len(new_stats_to_upload)} follower stat entries to Bubble for org {org_urn}.") | |
temp_df = pd.concat([bubble_follower_stats_df_orig, pd.DataFrame(new_stats_to_upload)], ignore_index=True) | |
monthly_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] == 'follower_gains_monthly'].drop_duplicates( | |
subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN], | |
keep='last' | |
) | |
demographics_part = temp_df[temp_df[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].drop_duplicates( | |
subset=[FOLLOWER_STATS_ORG_URN_COLUMN, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN], | |
keep='last' | |
) | |
token_state["bubble_follower_stats_df"] = pd.concat([monthly_part, demographics_part], ignore_index=True) | |
return f"Follower Stats: Synced {len(new_stats_to_upload)} entries. ", token_state | |
except ValueError as ve: | |
logging.error(f"ValueError during follower stats sync for {org_urn}: {ve}", exc_info=True) | |
return f"Follower Stats Error: {html.escape(str(ve))}. ", token_state | |
except Exception as e: | |
logging.exception(f"Unexpected error in sync_linkedin_follower_stats for {org_urn}.") | |
return f"Follower Stats: Unexpected error ({type(e).__name__}). ", token_state | |
def sync_all_linkedin_data_orchestrator(token_state): | |
"""Orchestrates the syncing of all LinkedIn data types (Posts, Mentions, Follower Stats).""" | |
logging.info("Starting sync_all_linkedin_data_orchestrator process.") | |
if not token_state or not token_state.get("token"): | |
logging.error("Sync All: Access denied. LinkedIn token not available.") | |
return "<p style='color:red; text-align:center;'>β Access denied. LinkedIn token not available.</p>", token_state | |
org_urn = token_state.get('org_urn') | |
client_id = token_state.get("client_id") # Client ID should be in token_state from process_and_store_bubble_token | |
posts_sync_message = "" | |
mentions_sync_message = "" | |
follower_stats_sync_message = "" | |
if not org_urn: | |
logging.error("Sync All: Org URN missing in token_state.") | |
return "<p style='color:red;'>β Config error: Org URN missing.</p>", token_state | |
if not client_id or client_id == "ENV VAR MISSING": # Check client_id from token_state | |
logging.error("Sync All: Client ID missing or not set in token_state.") | |
return "<p style='color:red;'>β Config error: Client ID missing.</p>", token_state | |
# --- Sync Posts --- | |
fetch_count_for_posts_api = token_state.get('fetch_count_for_api', 0) | |
if fetch_count_for_posts_api == 0: | |
posts_sync_message = "Posts: Already up-to-date. " | |
logging.info("Posts sync: Skipped as fetch_count_for_posts_api is 0.") | |
else: | |
posts_sync_message, token_state = _sync_linkedin_posts_internal(token_state, fetch_count_for_posts_api) | |
# --- Sync Mentions --- | |
mentions_sync_message, token_state = sync_linkedin_mentions(token_state) | |
# --- Sync Follower Stats --- | |
follower_stats_sync_message, token_state = sync_linkedin_follower_stats(token_state) | |
logging.info(f"Sync process complete. Messages: Posts: [{posts_sync_message.strip()}], Mentions: [{mentions_sync_message.strip()}], Follower Stats: [{follower_stats_sync_message.strip()}]") | |
final_message = f"<p style='color:green; text-align:center;'>β Sync Attempted. {posts_sync_message} {mentions_sync_message} {follower_stats_sync_message}</p>" | |
return final_message, token_state | |