import json import requests import html import time # Added for potential rate limiting if needed from datetime import datetime from collections import defaultdict from urllib.parse import quote # Added for URL encoding from transformers import pipeline from sessions import create_session from error_handling import display_error from posts_categorization import batch_summarize_and_classify import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') API_V2_BASE = 'https://api.linkedin.com/v2' API_REST_BASE = "https://api.linkedin.com/rest" # Initialize sentiment pipeline (loaded once globally) sentiment_pipeline = pipeline("text-classification", model="tabularisai/multilingual-sentiment-analysis") # --- Utility Function --- def extract_text_from_mention_commentary(commentary): """ Extracts clean text from a commentary string, removing potential placeholders like {mention}. """ import re if not commentary: return "" return re.sub(r"{.*?}", "", commentary).strip() # --- Core Sentiment Analysis Helper --- def _get_sentiment_from_text(text_to_analyze): """ Analyzes a single piece of text and returns its sentiment label and raw counts. Returns a dict: {"label": "Sentiment Label", "counts": defaultdict(int)} """ sentiment_counts = defaultdict(int) dominant_sentiment_label = "Neutral 😐" # Default if not text_to_analyze or not text_to_analyze.strip(): return {"label": dominant_sentiment_label, "counts": sentiment_counts} try: # Truncate to avoid issues with very long texts for the model analysis_result = sentiment_pipeline(str(text_to_analyze)[:512]) label = analysis_result[0]['label'].upper() if label in ['POSITIVE', 'VERY POSITIVE']: dominant_sentiment_label = 'Positive 👍' sentiment_counts['Positive 👍'] += 1 elif label in ['NEGATIVE', 'VERY NEGATIVE']: dominant_sentiment_label = 'Negative 👎' sentiment_counts['Negative 👎'] += 1 elif label == 'NEUTRAL': dominant_sentiment_label = 'Neutral 😐' # Already default, but for clarity sentiment_counts['Neutral 😐'] += 1 else: dominant_sentiment_label = 'Unknown' # Catch any other labels from the model sentiment_counts['Unknown'] += 1 except Exception as e: # Log the error with more context if possible logging.error(f"Sentiment analysis failed for text snippet '{str(text_to_analyze)[:50]}...'. Error: {e}") sentiment_counts['Error'] += 1 dominant_sentiment_label = "Error" # Indicate error in sentiment return {"label": dominant_sentiment_label, "counts": sentiment_counts} def get_post_media_category(post_content): """ Determines the media category from the post's content object. Args: post_content (dict or None): The content dictionary of the post. Returns: str: The determined media category (e.g., "Video", "Article", "Document", "Image", "Multi-Image", "NONE"). """ if not post_content: return "NONE" # 1. Check for specific LinkedIn Video Component (from your original logic) # You might want to refine this if 'mediaCategory' within the video component is more specific if "com.linkedin.voyager.feed.render.LinkedInVideoComponent" in post_content: # video_component_data = post_content.get("com.linkedin.voyager.feed.render.LinkedInVideoComponent", {}) # return video_component_data.get("mediaCategory", "Video") # Example if you want to use its specific category return "Video" # 2. Check for Article (based on your "old code" and examples) if 'article' in post_content: return "Article" # 3. Check for Multi-Image (based on your "old code") if 'multiImage' in post_content: return "Multi-Image" # 4. Check for Media (Document or Image - based on your "old code" and examples) if 'media' in post_content: media_item = post_content['media'] # Heuristic from your "old code": if 'title' is present, it's likely a Document. if 'title' in media_item: # Example: "content": {"media": {"title": "...", "id": "urn:li:document:..."}} return "Document" # Else, if 'id' is present (and no title was found for Document), assume Image. elif 'id' in media_item: # Example: "content": {"media": {"altText": "", "id": "urn:li:image:..."}} return "Image" return "NONE" # --- Post Retrieval Functions --- def fetch_linkedin_posts_core(comm_client_id, community_token, org_urn, count): """ Fetches raw posts, their basic statistics, and performs summarization/categorization. Does NOT fetch comments or analyze sentiment of comments here. """ token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'} session = create_session(comm_client_id, token=token_dict) session.headers.update({ "LinkedIn-Version": "202502" }) posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED" logging.info(f"Fetching posts from URL: {posts_url}") try: resp = session.get(posts_url) resp.raise_for_status() raw_posts_api = resp.json().get("elements", []) logging.info(f"Fetched {len(raw_posts_api)} raw posts from API.") except requests.exceptions.RequestException as e: status = getattr(e.response, 'status_code', 'N/A') text = getattr(e.response, 'text', 'No response text') logging.error(f"Failed to fetch posts (Status: {status}): {e}. Response: {text}") raise ValueError(f"Failed to fetch posts (Status: {status})") from e except json.JSONDecodeError as e: logging.error(f"Failed to decode JSON from posts response: {e}. Response text: {resp.text if resp else 'No response object'}") raise ValueError("Failed to decode JSON from posts response") from e if not raw_posts_api: logging.info("No raw posts found.") return [], {}, "DefaultOrgName" post_urns_for_stats = [p["id"] for p in raw_posts_api if p.get("id")] post_texts_for_nlp = [] for p in raw_posts_api: text_content = p.get("commentary") or \ p.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text", "") or \ "[No text content]" post_texts_for_nlp.append({"text": text_content, "id": p.get("id")}) logging.info(f"Prepared {len(post_texts_for_nlp)} posts for NLP (summarization/classification).") if 'batch_summarize_and_classify' in globals(): structured_results_list = batch_summarize_and_classify(post_texts_for_nlp) else: logging.warning("batch_summarize_and_classify not found, using fallback.") structured_results_list = [{"id": p["id"], "summary": "N/A", "category": "N/A"} for p in post_texts_for_nlp] structured_results_map = {res["id"]: res for res in structured_results_list if "id" in res} stats_map = {} if post_urns_for_stats: batch_size_stats = 20 for i in range(0, len(post_urns_for_stats), batch_size_stats): batch_urns = post_urns_for_stats[i:i+batch_size_stats] params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn} share_idx = 0 # Index for share URNs in the current batch's params ugc_idx = 0 # Index for ugcPost URNs in the current batch's params # Keep track of URNs actually added to this batch's parameters for logging urns_in_current_api_call = [] for urn_str in batch_urns: if ":share:" in urn_str: params[f"shares[{share_idx}]"] = urn_str share_idx += 1 urns_in_current_api_call.append(urn_str) elif ":ugcPost:" in urn_str: params[f"ugcPosts[{ugc_idx}]"] = urn_str ugc_idx += 1 urns_in_current_api_call.append(urn_str) else: logging.warning(f"URN {urn_str} is not a recognized share or ugcPost type for stats. Skipping.") continue # If no valid URNs were prepared for this batch, skip the API call if not share_idx and not ugc_idx: # or check 'if not urns_in_current_api_call:' logging.info(f"Skipping API call for an empty or invalid batch of URNs (original batch segment size: {len(batch_urns)}).") continue try: # Log the URNs being sent in this specific API call logging.info(f"Fetching stats for batch of {len(urns_in_current_api_call)} URNs. First URN in call: {urns_in_current_api_call[0] if urns_in_current_api_call else 'N/A'}") # Actual API call stat_resp = session.get(f"{API_REST_BASE}/organizationalEntityShareStatistics", params=params) stat_resp.raise_for_status() # Raises an HTTPError for bad responses (4XX or 5XX) stats_data = stat_resp.json() # --- Corrected Parsing Logic --- # LinkedIn API for batch stats often returns an "elements" list. elements_from_api = stats_data.get("elements") if isinstance(elements_from_api, list): if not elements_from_api: logging.info(f"API returned 'elements' but it's an empty list for the URNs in this call.") processed_urns_in_batch = 0 for item in elements_from_api: urn_in_item = None # Determine the URN key (e.g., 'share' or 'ugcPost') if "share" in item: urn_in_item = item.get("share") elif "ugcPost" in item: urn_in_item = item.get("ugcPost") # Add other URN types if necessary, e.g., elif "article" in item: ... if urn_in_item: stats_values = item.get("totalShareStatistics", {}) if stats_values: # Only add if there are actual stats stats_map[urn_in_item] = stats_values processed_urns_in_batch +=1 else: # It's possible an URN is returned without stats, or with empty stats logging.debug(f"No 'totalShareStatistics' data found for URN: {urn_in_item} in API item: {item}") stats_map[urn_in_item] = {} # Store empty stats if URN was processed but had no data else: logging.warning(f"Could not extract a recognized URN key from API element: {item}") logging.info(f"Successfully processed {processed_urns_in_batch} URNs with stats from the API response for this batch. Current total stats_map size: {len(stats_map)}") elif elements_from_api is None and "results" in stats_data: # Fallback or alternative check if your API version *does* use "results" # This was your original attempt. If "elements" is consistently missing, # you might need to debug the exact structure of "results". logging.warning(f"API response does not contain 'elements' key, but 'results' key is present. Attempting to parse 'results'. Response keys: {stats_data.keys()}") results_dict = stats_data.get("results", {}) if isinstance(results_dict, dict): for urn_key, stat_element_values in results_dict.items(): stats_map[urn_key] = stat_element_values.get("totalShareStatistics", {}) logging.info(f"Processed stats from 'results' dictionary. Current stats_map size: {len(stats_map)}") else: logging.error(f"'results' key found but is not a dictionary. Type: {type(results_dict)}") else: # Neither "elements" (as list) nor "results" (as dict) found as expected logging.error(f"API response structure not recognized. Expected 'elements' (list) or 'results' (dict). Got keys: {stats_data.keys()}. Full response sample: {str(stats_data)[:500]}") # --- End Corrected Parsing Logic --- # Check for specific errors reported by the API within the JSON response if stats_data.get("errors"): for urn_errored, error_detail in stats_data.get("errors", {}).items(): logging.warning(f"API reported error for URN {urn_errored}: {error_detail.get('message', 'Unknown API error message')}") # This log might be slightly misleading if parsing failed but no exception occurred. # The more specific log after parsing 'elements' is better. # logging.info(f"Successfully processed stats response for {len(urns_in_current_api_call)} URNs. Current stats_map size: {len(stats_map)}") except requests.exceptions.HTTPError as e: # Specific handling for HTTP errors (4xx, 5xx) status_code = e.response.status_code response_text = e.response.text logging.warning(f"HTTP error fetching stats for a batch (Status: {status_code}): {e}. Params: {params}. Response: {response_text[:500]}") # Log first 500 chars of response except requests.exceptions.RequestException as e: # Catch other requests-related errors (e.g., connection issues) status_code = getattr(e.response, 'status_code', 'N/A') response_text = getattr(e.response, 'text', 'No response text') logging.warning(f"Request failed for stats batch (Status: {status_code}): {e}. Params: {params}. Response: {response_text[:500]}") except json.JSONDecodeError as e: # Handle cases where the response is not valid JSON response_text_for_json_error = stat_resp.text if 'stat_resp' in locals() and hasattr(stat_resp, 'text') else 'Response object not available or no text attribute' logging.warning(f"Failed to decode JSON from stats response: {e}. Response text: {response_text_for_json_error[:500]}") # Log first 500 chars except Exception as e: # Catch any other unexpected errors during the batch processing logging.error(f"An unexpected error occurred processing stats batch: {e}", exc_info=True) logging.info(f"Finished processing all URN batches. Final stats_map size: {len(stats_map)}") processed_raw_posts = [] for p in raw_posts_api: post_id = p.get("id") if not post_id: logging.warning("Skipping raw post due to missing ID.") continue text_content = p.get("commentary") or \ p.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text", "") or \ "[No text content]" timestamp = p.get("publishedAt") or p.get("createdAt") or p.get("firstPublishedAt") published_at_iso = datetime.fromtimestamp(timestamp / 1000).isoformat() if timestamp else None structured_res = structured_results_map.get(post_id, {"summary": "N/A", "category": "N/A"}) processed_raw_posts.append({ "id": post_id, "raw_text": text_content, "summary": structured_res["summary"], "category": structured_res["category"], "published_at_timestamp": timestamp, "published_at_iso": published_at_iso, "organization_urn": p.get("author", f"urn:li:organization:{org_urn.split(':')[-1]}"), "is_ad": 'adContext' in p, "media_category": get_post_media_category(p.get("content")), }) logging.info(f"Processed {len(processed_raw_posts)} posts with core data.") return processed_raw_posts, stats_map, "DefaultOrgName" def fetch_comments(comm_client_id, community_token, post_urns, stats_map): """ Fetches comments for a list of post URNs using the socialActions endpoint. Uses stats_map to potentially skip posts with 0 comments. """ # Ensure community_token is in the expected dictionary format for create_session if isinstance(community_token, str): token_dict = {'access_token': community_token, 'token_type': 'Bearer'} elif isinstance(community_token, dict) and 'access_token' in community_token: token_dict = community_token else: logging.error("Invalid community_token format. Expected a string or a dict with 'access_token'.") return {urn: [] for urn in post_urns} # Return empty for all if token is bad linkedin_session = create_session(comm_client_id, token=token_dict) # Set the LinkedIn API version header # This is crucial for API compatibility. linkedin_session.headers.update({ 'LinkedIn-Version': "202502" # Or your target version }) all_comments_by_post = {} logging.info(f"Fetching comments for {len(post_urns)} posts.") for post_urn in post_urns: post_stats = stats_map.get(post_urn, {}) # Try to get comment count from "commentSummary" first, then fallback to "commentCount" comment_summary = post_stats.get("commentSummary", {}) comment_count_from_stats = comment_summary.get("totalComments", post_stats.get('commentCount', 0)) if comment_count_from_stats == 0: logging.info(f"Skipping comment fetch for {post_urn} as commentCount is 0 in stats_map.") all_comments_by_post[post_urn] = [] continue try: # IMPORTANT: Use the correct endpoint structure from your working code. # The post_urn goes directly into the path and should NOT be URL-encoded here. url = f"{API_REST_BASE}/socialActions/{post_urn}/comments" # If you want to add other parameters like 'count' or 'start', append them, e.g., # url = f"{API_REST_BASE}/socialActions/{post_urn}/comments?sortOrder=CHRONOLOGICAL&count=10" logging.debug(f"Fetching comments from URL: {url} for post URN: {post_urn}") response = linkedin_session.get(url) if response.status_code == 200: elements = response.json().get('elements', []) comments_texts = [] for c in elements: # Extracting comment text. Adjust if the structure is different. # The original working code stored `data.get('elements', [])` # If you need the full comment object, store 'c' instead of 'comment_text'. message_obj = c.get('message', {}) if isinstance(message_obj, dict): # Ensure message is a dict before .get('text') comment_text = message_obj.get('text') if comment_text: comments_texts.append(comment_text) elif isinstance(message_obj, str): # Sometimes message might be just a string comments_texts.append(message_obj) all_comments_by_post[post_urn] = comments_texts logging.info(f"Fetched {len(comments_texts)} comments for {post_urn}.") elif response.status_code == 403: logging.warning(f"Forbidden (403) to fetch comments for {post_urn}. URL: {url}. Response: {response.text}. Check permissions or API version.") all_comments_by_post[post_urn] = [] # Or some error indicator elif response.status_code == 404: logging.warning(f"Comments not found (404) for {post_urn}. URL: {url}. Response: {response.text}") all_comments_by_post[post_urn] = [] else: logging.error(f"Error fetching comments for {post_urn}. Status: {response.status_code}. URL: {url}. Response: {response.text}") all_comments_by_post[post_urn] = [] # Or some error indicator except requests.exceptions.RequestException as e: logging.error(f"RequestException fetching comments for {post_urn}: {e}") all_comments_by_post[post_urn] = [] except json.JSONDecodeError as e: # Log the response text if it's available and JSON decoding fails response_text_for_log = 'N/A' if 'response' in locals() and hasattr(response, 'text'): response_text_for_log = response.text logging.error(f"JSONDecodeError fetching comments for {post_urn}. Response: {response_text_for_log}. Error: {e}") all_comments_by_post[post_urn] = [] except Exception as e: # Catch any other unexpected errors logging.error(f"Unexpected error fetching comments for {post_urn}: {e}", exc_info=True) # exc_info=True for traceback all_comments_by_post[post_urn] = [] return all_comments_by_post def analyze_sentiment(all_comments_data): """ Analyzes sentiment for comments grouped by post_urn using the helper function. all_comments_data is a dict: {post_urn: [comment_text_1, comment_text_2,...]} Returns a dict: {post_urn: {"sentiment": "DominantOverallSentiment", "percentage": X.X, "details": {aggregated_counts}}} """ results_by_post = {} logging.info(f"Analyzing aggregated sentiment for comments from {len(all_comments_data)} posts.") for post_urn, comments_list in all_comments_data.items(): aggregated_sentiment_counts = defaultdict(int) total_valid_comments_for_post = 0 if not comments_list: results_by_post[post_urn] = {"sentiment": "Neutral 😐", "percentage": 0.0, "details": dict(aggregated_sentiment_counts)} continue for comment_text in comments_list: if not comment_text or not comment_text.strip(): continue # Use the helper for individual comment sentiment single_comment_sentiment = _get_sentiment_from_text(comment_text) # Aggregate counts for label, count in single_comment_sentiment["counts"].items(): aggregated_sentiment_counts[label] += count if single_comment_sentiment["label"] != "Error": # Count valid analyses total_valid_comments_for_post +=1 dominant_overall_sentiment = "Neutral 😐" # Default percentage = 0.0 if total_valid_comments_for_post > 0: # Determine dominant sentiment from aggregated_sentiment_counts # Exclude 'Error' from being a dominant sentiment unless it's the only category with counts valid_sentiments = {k: v for k, v in aggregated_sentiment_counts.items() if k != 'Error' and v > 0} if not valid_sentiments: dominant_overall_sentiment = 'Error' if aggregated_sentiment_counts['Error'] > 0 else 'Neutral 😐' else: # Simple max count logic for dominance dominant_overall_sentiment = max(valid_sentiments, key=valid_sentiments.get) if dominant_overall_sentiment != 'Error': percentage = round((aggregated_sentiment_counts[dominant_overall_sentiment] / total_valid_comments_for_post) * 100, 1) else: # if dominant is 'Error' or only errors were found percentage = 0.0 elif aggregated_sentiment_counts['Error'] > 0 : # No valid comments, but errors occurred dominant_overall_sentiment = 'Error' results_by_post[post_urn] = { "sentiment": dominant_overall_sentiment, "percentage": percentage, "details": dict(aggregated_sentiment_counts) # Store aggregated counts } logging.debug(f"Aggregated sentiment for post {post_urn}: {results_by_post[post_urn]}") return results_by_post def compile_detailed_posts(processed_raw_posts, stats_map, sentiments_per_post): """ Combines processed raw post data with their statistics and overall comment sentiment. """ detailed_post_list = [] logging.info(f"Compiling detailed data for {len(processed_raw_posts)} posts.") for proc_post in processed_raw_posts: post_id = proc_post["id"] stats = stats_map.get(post_id, {}) likes = stats.get("likeCount", 0) comments_stat_count = stats.get("commentSummary", {}).get("totalComments", stats.get("commentCount", 0)) clicks = stats.get("clickCount", 0) shares = stats.get("shareCount", 0) impressions = stats.get("impressionCount", 0) unique_impressions = stats.get("uniqueImpressionsCount", stats.get("impressionCount", 0)) engagement_numerator = likes + comments_stat_count + clicks + shares engagement_rate = (engagement_numerator / impressions * 100) if impressions and impressions > 0 else 0.0 sentiment_info = sentiments_per_post.get(post_id, {"sentiment": "Neutral 😐", "percentage": 0.0, "details": {}}) display_text = html.escape(proc_post["raw_text"][:250]).replace("\n", "
") + \ ("..." if len(proc_post["raw_text"]) > 250 else "") when_formatted = datetime.fromtimestamp(proc_post["published_at_timestamp"] / 1000).strftime("%Y-%m-%d %H:%M") \ if proc_post["published_at_timestamp"] else "Unknown" detailed_post_list.append({ "id": post_id, "when": when_formatted, "text_for_display": display_text, "raw_text": proc_post["raw_text"], "likes": likes, "comments_stat_count": comments_stat_count, "clicks": clicks, "shares": shares, "impressions": impressions, "uniqueImpressionsCount": unique_impressions, "engagement": f"{engagement_rate:.2f}%", "engagement_raw": engagement_rate, "sentiment": sentiment_info["sentiment"], "sentiment_percent": sentiment_info["percentage"], "sentiment_details": sentiment_info.get("details", {}), "summary": proc_post["summary"], "category": proc_post["category"], "organization_urn": proc_post["organization_urn"], "is_ad": proc_post["is_ad"], "media_category": proc_post.get("media_category", "NONE"), "published_at": proc_post["published_at_iso"] }) logging.info(f"Compiled {len(detailed_post_list)} detailed posts.") return detailed_post_list def prepare_data_for_bubble(detailed_posts, all_actual_comments_data): """ Prepares data lists for uploading to Bubble. - detailed_posts: List of comprehensively compiled post objects. - all_actual_comments_data: Dict of {post_urn: [comment_texts]} from fetch_comments. """ li_posts = [] li_post_stats = [] li_post_comments = [] logging.info("Preparing posts data for Bubble.") if not detailed_posts: logging.warning("No detailed posts to prepare for Bubble.") return [], [], [] org_urn_default = detailed_posts[0]["organization_urn"] if detailed_posts else "urn:li:organization:UNKNOWN" for post_data in detailed_posts: li_posts.append({ "organization_urn": post_data["organization_urn"], "id": post_data["id"], "is_ad": post_data["is_ad"], "media_type": post_data.get("media_category", "NONE"), "published_at": post_data["published_at"], "sentiment": post_data["sentiment"], "text": post_data["raw_text"], #"summary_text": post_data["summary"], "li_eb_label": post_data["category"] }) li_post_stats.append({ "clickCount": post_data["clicks"], "commentCount": post_data["comments_stat_count"], "engagement": post_data["engagement_raw"], "impressionCount": post_data["impressions"], "likeCount": post_data["likes"], "shareCount": post_data["shares"], "uniqueImpressionsCount": post_data["uniqueImpressionsCount"], "post_id": post_data["id"], "organization_urn": post_data["organization_urn"] }) for post_urn, comments_text_list in all_actual_comments_data.items(): current_post_org_urn = org_urn_default for p in detailed_posts: if p["id"] == post_urn: current_post_org_urn = p["organization_urn"] break for single_comment_text in comments_text_list: if single_comment_text and single_comment_text.strip(): li_post_comments.append({ "comment_text": single_comment_text, "post_id": post_urn, "organization_urn": current_post_org_urn }) logging.info(f"Prepared {len(li_posts)} posts, {len(li_post_stats)} stats entries, and {len(li_post_comments)} comments for Bubble.") return li_posts, li_post_stats, li_post_comments # --- Mentions Retrieval Functions --- def fetch_linkedin_mentions_core(comm_client_id, community_token, org_urn, count=20): """ Fetches raw mention notifications and the details of the posts where the organization was mentioned. Returns a list of processed mention data (internal structure). """ token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'} session = create_session(comm_client_id, token=token_dict) session.headers.update({ "X-Restli-Protocol-Version": "2.0.0", "LinkedIn-Version": "202502" }) encoded_org_urn = quote(org_urn, safe='') notifications_url_base = ( f"{API_REST_BASE}/organizationalEntityNotifications" f"?q=criteria" f"&actions=List(SHARE_MENTION)" f"&organizationalEntity={encoded_org_urn}" f"&count={count}" ) all_notifications = [] start_index = 0 processed_mentions_internal = [] page_count = 0 max_pages = 10 while page_count < max_pages: current_url = f"{notifications_url_base}&start={start_index}" logging.info(f"Fetching notifications page {page_count + 1} from URL: {current_url}") try: resp = session.get(current_url) resp.raise_for_status() data = resp.json() elements = data.get("elements", []) if not elements: logging.info(f"No more notifications found on page {page_count + 1}. Total notifications fetched: {len(all_notifications)}.") break all_notifications.extend(elements) paging = data.get("paging", {}) if 'start' not in paging or 'count' not in paging or len(elements) < paging.get('count', count): logging.info(f"Last page of notifications fetched. Total notifications: {len(all_notifications)}.") break start_index = paging['start'] + paging['count'] page_count += 1 except requests.exceptions.RequestException as e: status = getattr(e.response, 'status_code', 'N/A') text = getattr(e.response, 'text', 'No response text') logging.error(f"Failed to fetch notifications (Status: {status}): {e}. Response: {text}") break except json.JSONDecodeError as e: logging.error(f"Failed to decode JSON from notifications response: {e}. Response: {resp.text if resp else 'No resp obj'}") break if page_count >= max_pages: logging.info(f"Reached max_pages ({max_pages}) for fetching notifications.") break if not all_notifications: logging.info("No mention notifications found after fetching.") return [] mention_share_urns = list(set([ n.get("generatedActivity") for n in all_notifications if n.get("action") == "SHARE_MENTION" and n.get("generatedActivity") ])) logging.info(f"Found {len(mention_share_urns)} unique share URNs from SHARE_MENTION notifications.") for share_urn in mention_share_urns: encoded_share_urn = quote(share_urn, safe='') post_detail_url = f"{API_REST_BASE}/posts/{encoded_share_urn}" logging.info(f"Fetching details for mentioned post: {post_detail_url}") try: post_resp = session.get(post_detail_url) post_resp.raise_for_status() post_data = post_resp.json() commentary_raw = post_data.get("commentary") if not commentary_raw and "specificContent" in post_data: share_content = post_data.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}) commentary_raw = share_content.get("shareCommentaryV2", {}).get("text", "") if not commentary_raw: logging.warning(f"No commentary found for share URN {share_urn}. Skipping.") continue mention_text_cleaned = extract_text_from_mention_commentary(commentary_raw) timestamp = post_data.get("publishedAt") or post_data.get("createdAt") or post_data.get("firstPublishedAt") published_at_iso = datetime.fromtimestamp(timestamp / 1000).isoformat() if timestamp else None author_urn = post_data.get("author", "urn:li:unknown") processed_mentions_internal.append({ "mention_id": f"mention_{share_urn}", "share_urn": share_urn, "mention_text_raw": commentary_raw, "mention_text_cleaned": mention_text_cleaned, "published_at_timestamp": timestamp, "published_at_iso": published_at_iso, "mentioned_by_author_urn": author_urn, "organization_urn_mentioned": org_urn }) except requests.exceptions.RequestException as e: status = getattr(e.response, 'status_code', 'N/A') text = getattr(e.response, 'text', 'No response text') logging.warning(f"Failed to fetch post details for share URN {share_urn} (Status: {status}): {e}. Response: {text}") except json.JSONDecodeError as e: logging.warning(f"Failed to decode JSON for post details {share_urn}: {e}. Response: {post_resp.text if post_resp else 'No resp obj'}") logging.info(f"Processed {len(processed_mentions_internal)} mentions with their post details.") return processed_mentions_internal def analyze_mentions_sentiment(processed_mentions_list): """ Analyzes sentiment for the text of each processed mention using the helper function. Input: list of processed_mention dicts (internal structure from fetch_linkedin_mentions_core). Returns: a dict {mention_id: {"sentiment_label": "DominantSentiment", "percentage": 100.0, "details": {counts}}} """ mention_sentiments_map = {} logging.info(f"Analyzing individual sentiment for {len(processed_mentions_list)} mentions.") for mention_data in processed_mentions_list: mention_internal_id = mention_data["mention_id"] # Internal ID from fetch_linkedin_mentions_core text_to_analyze = mention_data.get("mention_text_cleaned", "") sentiment_result = _get_sentiment_from_text(text_to_analyze) # For single text, percentage is 100% for the dominant label if not error percentage = 0.0 if sentiment_result["label"] != "Error" and any(sentiment_result["counts"].values()): percentage = 100.0 mention_sentiments_map[mention_internal_id] = { "sentiment_label": sentiment_result["label"], # The dominant sentiment label "percentage": percentage, "details": dict(sentiment_result["counts"]) # Raw counts for this specific mention } logging.debug(f"Individual sentiment for mention {mention_internal_id}: {mention_sentiments_map[mention_internal_id]}") return mention_sentiments_map def compile_detailed_mentions(processed_mentions_list, mention_sentiments_map): """ Combines processed mention data (internal structure) with their sentiment analysis into the user-specified output format. processed_mentions_list: list of dicts from fetch_linkedin_mentions_core mention_sentiments_map: dict from analyze_mentions_sentiment, keyed by "mention_id" (internal) and contains "sentiment_label". """ detailed_mentions_output = [] logging.info(f"Compiling detailed data for {len(processed_mentions_list)} mentions into specified format.") for mention_core_data in processed_mentions_list: mention_internal_id = mention_core_data["mention_id"] sentiment_info = mention_sentiments_map.get(mention_internal_id, {"sentiment_label": "Neutral 😐"}) date_formatted = "Unknown" if mention_core_data["published_at_timestamp"]: try: date_formatted = datetime.fromtimestamp(mention_core_data["published_at_timestamp"] / 1000).strftime("%Y-%m-%d %H:%M") except TypeError: logging.warning(f"Could not format timestamp for mention_id {mention_internal_id}") detailed_mentions_output.append({ "date": date_formatted, # User-specified field name "id": mention_core_data["share_urn"], # User-specified field name (URN of the post with mention) "mention_text": mention_core_data["mention_text_cleaned"], # User-specified field name "organization_urn": mention_core_data["organization_urn_mentioned"], # User-specified field name "sentiment_label": sentiment_info["sentiment_label"] # User-specified field name }) logging.info(f"Compiled {len(detailed_mentions_output)} detailed mentions with specified fields.") return detailed_mentions_output def prepare_mentions_for_bubble(compiled_detailed_mentions_list): """ Prepares mention data for uploading to a Bubble table. The input `compiled_detailed_mentions_list` is already in the user-specified format: [{"date": ..., "id": ..., "mention_text": ..., "organization_urn": ..., "sentiment_label": ...}, ...] This function directly uses these fields as per user's selection for Bubble upload. """ li_mentions_bubble = [] logging.info(f"Preparing {len(compiled_detailed_mentions_list)} compiled mentions for Bubble upload.") if not compiled_detailed_mentions_list: return [] for mention_data in compiled_detailed_mentions_list: # The mention_data dictionary already has the keys: # "date", "id", "mention_text", "organization_urn", "sentiment_label" # These are used directly for the Bubble upload list. li_mentions_bubble.append({ "date": mention_data["date"], "id": mention_data["id"], "mention_text": mention_data["mention_text"], "organization_urn": mention_data["organization_urn"], "sentiment_label": mention_data["sentiment_label"] # If Bubble table has different field names, mapping would be done here. # Example: "bubble_mention_date": mention_data["date"], # For now, using direct mapping as per user's selected code for the append. }) logging.info(f"Prepared {len(li_mentions_bubble)} mention entries for Bubble, using direct field names from compiled data.") return li_mentions_bubble