GuglielmoTor commited on
Commit
eaea9c5
Β·
verified Β·
1 Parent(s): 762599c

Delete apis/Linkedin_Data_API_Calls.py

Browse files
Files changed (1) hide show
  1. apis/Linkedin_Data_API_Calls.py +0 -864
apis/Linkedin_Data_API_Calls.py DELETED
@@ -1,864 +0,0 @@
1
- import json
2
- import requests
3
- import html
4
- import time # Added for potential rate limiting if needed
5
- from datetime import datetime
6
- from collections import defaultdict
7
- from urllib.parse import quote # Added for URL encoding
8
- from transformers import pipeline
9
-
10
- from utils.sessions import create_session
11
- from utils.error_handling import display_error
12
- from data_processing.posts_categorization import batch_summarize_and_classify
13
- import logging
14
-
15
-
16
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
17
-
18
- API_V2_BASE = 'https://api.linkedin.com/v2'
19
- API_REST_BASE = "https://api.linkedin.com/rest"
20
-
21
- # Initialize sentiment pipeline (loaded once globally)
22
- sentiment_pipeline = pipeline("text-classification", model="tabularisai/multilingual-sentiment-analysis")
23
-
24
- # --- Utility Function ---
25
- def extract_text_from_mention_commentary(commentary):
26
- """
27
- Extracts clean text from a commentary string, removing potential placeholders like {mention}.
28
- """
29
- import re
30
- if not commentary:
31
- return ""
32
- return re.sub(r"{.*?}", "", commentary).strip()
33
-
34
- # --- Core Sentiment Analysis Helper ---
35
- def _get_sentiment_from_text(text_to_analyze):
36
- """
37
- Analyzes a single piece of text and returns its sentiment label and raw counts.
38
- Returns a dict: {"label": "Sentiment Label", "counts": defaultdict(int)}
39
- """
40
- sentiment_counts = defaultdict(int)
41
- dominant_sentiment_label = "Neutral 😐" # Default
42
-
43
- if not text_to_analyze or not text_to_analyze.strip():
44
- return {"label": dominant_sentiment_label, "counts": sentiment_counts}
45
-
46
- try:
47
- # Truncate to avoid issues with very long texts for the model
48
- analysis_result = sentiment_pipeline(str(text_to_analyze)[:512])
49
- label = analysis_result[0]['label'].upper()
50
-
51
- if label in ['POSITIVE', 'VERY POSITIVE']:
52
- dominant_sentiment_label = 'Positive πŸ‘'
53
- sentiment_counts['Positive πŸ‘'] += 1
54
- elif label in ['NEGATIVE', 'VERY NEGATIVE']:
55
- dominant_sentiment_label = 'Negative πŸ‘Ž'
56
- sentiment_counts['Negative πŸ‘Ž'] += 1
57
- elif label == 'NEUTRAL':
58
- dominant_sentiment_label = 'Neutral 😐' # Already default, but for clarity
59
- sentiment_counts['Neutral 😐'] += 1
60
- else:
61
- dominant_sentiment_label = 'Unknown' # Catch any other labels from the model
62
- sentiment_counts['Unknown'] += 1
63
-
64
- except Exception as e:
65
- # Log the error with more context if possible
66
- logging.error(f"Sentiment analysis failed for text snippet '{str(text_to_analyze)[:50]}...'. Error: {e}")
67
- sentiment_counts['Error'] += 1
68
- dominant_sentiment_label = "Error" # Indicate error in sentiment
69
-
70
- return {"label": dominant_sentiment_label, "counts": sentiment_counts}
71
-
72
- def get_post_media_category(post_content):
73
- """
74
- Determines the media category from the post's content object.
75
- Args:
76
- post_content (dict or None): The content dictionary of the post.
77
- Returns:
78
- str: The determined media category (e.g., "Video", "Article", "Document", "Image", "Multi-Image", "NONE").
79
- """
80
- if not post_content:
81
- return "NONE"
82
-
83
- # 1. Check for specific LinkedIn Video Component (from your original logic)
84
- # You might want to refine this if 'mediaCategory' within the video component is more specific
85
- if "com.linkedin.voyager.feed.render.LinkedInVideoComponent" in post_content:
86
- # video_component_data = post_content.get("com.linkedin.voyager.feed.render.LinkedInVideoComponent", {})
87
- # return video_component_data.get("mediaCategory", "Video") # Example if you want to use its specific category
88
- return "Video"
89
-
90
- # 2. Check for Article (based on your "old code" and examples)
91
- if 'article' in post_content:
92
- return "Article"
93
-
94
- # 3. Check for Multi-Image (based on your "old code")
95
- if 'multiImage' in post_content:
96
- return "Multi-Image"
97
-
98
- # 4. Check for Media (Document or Image - based on your "old code" and examples)
99
- if 'media' in post_content:
100
- media_item = post_content['media']
101
- # Heuristic from your "old code": if 'title' is present, it's likely a Document.
102
- if 'title' in media_item:
103
- # Example: "content": {"media": {"title": "...", "id": "urn:li:document:..."}}
104
- return "Document"
105
- # Else, if 'id' is present (and no title was found for Document), assume Image.
106
- elif 'id' in media_item:
107
- # Example: "content": {"media": {"altText": "", "id": "urn:li:image:..."}}
108
- return "Image"
109
-
110
- return "NONE"
111
- # --- Post Retrieval Functions ---
112
- def fetch_linkedin_posts_core(comm_client_id, community_token, org_urn, count):
113
- """
114
- Fetches raw posts, their basic statistics, and performs summarization/categorization.
115
- Does NOT fetch comments or analyze sentiment of comments here.
116
- """
117
- token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
118
- session = create_session(comm_client_id, token=token_dict)
119
- session.headers.update({
120
- "LinkedIn-Version": "202502"
121
- })
122
-
123
- posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED"
124
- logging.info(f"Fetching posts from URL: {posts_url}")
125
- try:
126
- resp = session.get(posts_url)
127
- resp.raise_for_status()
128
- raw_posts_api = resp.json().get("elements", [])
129
- logging.info(f"Fetched {len(raw_posts_api)} raw posts from API.")
130
- except requests.exceptions.RequestException as e:
131
- status = getattr(e.response, 'status_code', 'N/A')
132
- text = getattr(e.response, 'text', 'No response text')
133
- logging.error(f"Failed to fetch posts (Status: {status}): {e}. Response: {text}")
134
- raise ValueError(f"Failed to fetch posts (Status: {status})") from e
135
- except json.JSONDecodeError as e:
136
- logging.error(f"Failed to decode JSON from posts response: {e}. Response text: {resp.text if resp else 'No response object'}")
137
- raise ValueError("Failed to decode JSON from posts response") from e
138
-
139
- if not raw_posts_api:
140
- logging.info("No raw posts found.")
141
- return [], {}, "DefaultOrgName"
142
-
143
- post_urns_for_stats = [p["id"] for p in raw_posts_api if p.get("id")]
144
-
145
- post_texts_for_nlp = []
146
- for p in raw_posts_api:
147
- text_content = p.get("commentary") or \
148
- p.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text", "") or \
149
- "[No text content]"
150
- post_texts_for_nlp.append({"text": text_content, "id": p.get("id")})
151
-
152
- logging.info(f"Prepared {len(post_texts_for_nlp)} posts for NLP (summarization/classification).")
153
- if 'batch_summarize_and_classify' in globals():
154
- structured_results_list = batch_summarize_and_classify(post_texts_for_nlp)
155
- else:
156
- logging.warning("batch_summarize_and_classify not found, using fallback.")
157
- structured_results_list = [{"id": p["id"], "summary": "N/A", "category": "N/A"} for p in post_texts_for_nlp]
158
-
159
- structured_results_map = {res["id"]: res for res in structured_results_list if "id" in res}
160
-
161
- stats_map = {}
162
- if post_urns_for_stats:
163
- batch_size_stats = 20
164
- for i in range(0, len(post_urns_for_stats), batch_size_stats):
165
- batch_urns = post_urns_for_stats[i:i+batch_size_stats]
166
- params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn}
167
-
168
- share_idx = 0 # Index for share URNs in the current batch's params
169
- ugc_idx = 0 # Index for ugcPost URNs in the current batch's params
170
-
171
- # Keep track of URNs actually added to this batch's parameters for logging
172
- urns_in_current_api_call = []
173
-
174
- for urn_str in batch_urns:
175
- if ":share:" in urn_str:
176
- params[f"shares[{share_idx}]"] = urn_str
177
- share_idx += 1
178
- urns_in_current_api_call.append(urn_str)
179
- elif ":ugcPost:" in urn_str:
180
- params[f"ugcPosts[{ugc_idx}]"] = urn_str
181
- ugc_idx += 1
182
- urns_in_current_api_call.append(urn_str)
183
- else:
184
- logging.warning(f"URN {urn_str} is not a recognized share or ugcPost type for stats. Skipping.")
185
- continue
186
-
187
- # If no valid URNs were prepared for this batch, skip the API call
188
- if not share_idx and not ugc_idx: # or check 'if not urns_in_current_api_call:'
189
- logging.info(f"Skipping API call for an empty or invalid batch of URNs (original batch segment size: {len(batch_urns)}).")
190
- continue
191
-
192
- try:
193
- # Log the URNs being sent in this specific API call
194
- logging.info(f"Fetching stats for batch of {len(urns_in_current_api_call)} URNs. First URN in call: {urns_in_current_api_call[0] if urns_in_current_api_call else 'N/A'}")
195
-
196
- # Actual API call
197
- stat_resp = session.get(f"{API_REST_BASE}/organizationalEntityShareStatistics", params=params)
198
- stat_resp.raise_for_status() # Raises an HTTPError for bad responses (4XX or 5XX)
199
- stats_data = stat_resp.json()
200
-
201
- # --- Corrected Parsing Logic ---
202
- # LinkedIn API for batch stats often returns an "elements" list.
203
- elements_from_api = stats_data.get("elements")
204
-
205
- if isinstance(elements_from_api, list):
206
- if not elements_from_api:
207
- logging.info(f"API returned 'elements' but it's an empty list for the URNs in this call.")
208
-
209
- processed_urns_in_batch = 0
210
- for item in elements_from_api:
211
- urn_in_item = None
212
- # Determine the URN key (e.g., 'share' or 'ugcPost')
213
- if "share" in item:
214
- urn_in_item = item.get("share")
215
- elif "ugcPost" in item:
216
- urn_in_item = item.get("ugcPost")
217
- # Add other URN types if necessary, e.g., elif "article" in item: ...
218
-
219
- if urn_in_item:
220
- stats_values = item.get("totalShareStatistics", {})
221
- if stats_values: # Only add if there are actual stats
222
- stats_map[urn_in_item] = stats_values
223
- processed_urns_in_batch +=1
224
- else:
225
- # It's possible an URN is returned without stats, or with empty stats
226
- logging.debug(f"No 'totalShareStatistics' data found for URN: {urn_in_item} in API item: {item}")
227
- stats_map[urn_in_item] = {} # Store empty stats if URN was processed but had no data
228
- else:
229
- logging.warning(f"Could not extract a recognized URN key from API element: {item}")
230
- logging.info(f"Successfully processed {processed_urns_in_batch} URNs with stats from the API response for this batch. Current total stats_map size: {len(stats_map)}")
231
-
232
- elif elements_from_api is None and "results" in stats_data:
233
- # Fallback or alternative check if your API version *does* use "results"
234
- # This was your original attempt. If "elements" is consistently missing,
235
- # you might need to debug the exact structure of "results".
236
- logging.warning(f"API response does not contain 'elements' key, but 'results' key is present. Attempting to parse 'results'. Response keys: {stats_data.keys()}")
237
- results_dict = stats_data.get("results", {})
238
- if isinstance(results_dict, dict):
239
- for urn_key, stat_element_values in results_dict.items():
240
- stats_map[urn_key] = stat_element_values.get("totalShareStatistics", {})
241
- logging.info(f"Processed stats from 'results' dictionary. Current stats_map size: {len(stats_map)}")
242
- else:
243
- logging.error(f"'results' key found but is not a dictionary. Type: {type(results_dict)}")
244
-
245
- else:
246
- # Neither "elements" (as list) nor "results" (as dict) found as expected
247
- logging.error(f"API response structure not recognized. Expected 'elements' (list) or 'results' (dict). Got keys: {stats_data.keys()}. Full response sample: {str(stats_data)[:500]}")
248
-
249
- # --- End Corrected Parsing Logic ---
250
-
251
- # Check for specific errors reported by the API within the JSON response
252
- if stats_data.get("errors"):
253
- for urn_errored, error_detail in stats_data.get("errors", {}).items():
254
- logging.warning(f"API reported error for URN {urn_errored}: {error_detail.get('message', 'Unknown API error message')}")
255
-
256
- # This log might be slightly misleading if parsing failed but no exception occurred.
257
- # The more specific log after parsing 'elements' is better.
258
- # logging.info(f"Successfully processed stats response for {len(urns_in_current_api_call)} URNs. Current stats_map size: {len(stats_map)}")
259
-
260
-
261
- except requests.exceptions.HTTPError as e:
262
- # Specific handling for HTTP errors (4xx, 5xx)
263
- status_code = e.response.status_code
264
- response_text = e.response.text
265
- logging.warning(f"HTTP error fetching stats for a batch (Status: {status_code}): {e}. Params: {params}. Response: {response_text[:500]}") # Log first 500 chars of response
266
- except requests.exceptions.RequestException as e:
267
- # Catch other requests-related errors (e.g., connection issues)
268
- status_code = getattr(e.response, 'status_code', 'N/A')
269
- response_text = getattr(e.response, 'text', 'No response text')
270
- logging.warning(f"Request failed for stats batch (Status: {status_code}): {e}. Params: {params}. Response: {response_text[:500]}")
271
- except json.JSONDecodeError as e:
272
- # Handle cases where the response is not valid JSON
273
- response_text_for_json_error = stat_resp.text if 'stat_resp' in locals() and hasattr(stat_resp, 'text') else 'Response object not available or no text attribute'
274
- logging.warning(f"Failed to decode JSON from stats response: {e}. Response text: {response_text_for_json_error[:500]}") # Log first 500 chars
275
- except Exception as e:
276
- # Catch any other unexpected errors during the batch processing
277
- logging.error(f"An unexpected error occurred processing stats batch: {e}", exc_info=True)
278
-
279
-
280
- logging.info(f"Finished processing all URN batches. Final stats_map size: {len(stats_map)}")
281
-
282
- processed_raw_posts = []
283
- for p in raw_posts_api:
284
- post_id = p.get("id")
285
- if not post_id:
286
- logging.warning("Skipping raw post due to missing ID.")
287
- continue
288
-
289
- text_content = p.get("commentary") or \
290
- p.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text", "") or \
291
- "[No text content]"
292
-
293
- timestamp = p.get("publishedAt") or p.get("createdAt") or p.get("firstPublishedAt")
294
- published_at_iso = datetime.fromtimestamp(timestamp / 1000).isoformat() if timestamp else None
295
-
296
- structured_res = structured_results_map.get(post_id, {"summary": "N/A", "category": "N/A"})
297
-
298
- processed_raw_posts.append({
299
- "id": post_id,
300
- "raw_text": text_content,
301
- "summary": structured_res["summary"],
302
- "category": structured_res["category"],
303
- "published_at_timestamp": timestamp,
304
- "published_at_iso": published_at_iso,
305
- "organization_urn": p.get("author", f"urn:li:organization:{org_urn.split(':')[-1]}"),
306
- "is_ad": 'adContext' in p,
307
- "media_category": get_post_media_category(p.get("content")),
308
- })
309
- logging.info(f"Processed {len(processed_raw_posts)} posts with core data.")
310
- return processed_raw_posts, stats_map, "DefaultOrgName"
311
-
312
-
313
- def fetch_comments(comm_client_id, community_token, post_urns, stats_map):
314
- """
315
- Fetches comments for a list of post URNs using the socialActions endpoint.
316
- Uses stats_map to potentially skip posts with 0 comments.
317
- """
318
- # Ensure community_token is in the expected dictionary format for create_session
319
- if isinstance(community_token, str):
320
- token_dict = {'access_token': community_token, 'token_type': 'Bearer'}
321
- elif isinstance(community_token, dict) and 'access_token' in community_token:
322
- token_dict = community_token
323
- else:
324
- logging.error("Invalid community_token format. Expected a string or a dict with 'access_token'.")
325
- return {urn: [] for urn in post_urns} # Return empty for all if token is bad
326
-
327
- linkedin_session = create_session(comm_client_id, token=token_dict)
328
-
329
- # Set the LinkedIn API version header
330
- # This is crucial for API compatibility.
331
- linkedin_session.headers.update({
332
- 'LinkedIn-Version': "202502" # Or your target version
333
- })
334
-
335
- all_comments_by_post = {}
336
- logging.info(f"Fetching comments for {len(post_urns)} posts.")
337
-
338
- for post_urn in post_urns:
339
- post_stats = stats_map.get(post_urn, {})
340
- # Try to get comment count from "commentSummary" first, then fallback to "commentCount"
341
- comment_summary = post_stats.get("commentSummary", {})
342
- comment_count_from_stats = comment_summary.get("totalComments", post_stats.get('commentCount', 0))
343
-
344
- if comment_count_from_stats == 0:
345
- logging.info(f"Skipping comment fetch for {post_urn} as commentCount is 0 in stats_map.")
346
- all_comments_by_post[post_urn] = []
347
- continue
348
-
349
- try:
350
- # IMPORTANT: Use the correct endpoint structure from your working code.
351
- # The post_urn goes directly into the path and should NOT be URL-encoded here.
352
- url = f"{API_REST_BASE}/socialActions/{post_urn}/comments"
353
- # If you want to add other parameters like 'count' or 'start', append them, e.g.,
354
- # url = f"{API_REST_BASE}/socialActions/{post_urn}/comments?sortOrder=CHRONOLOGICAL&count=10"
355
-
356
- logging.debug(f"Fetching comments from URL: {url} for post URN: {post_urn}")
357
- response = linkedin_session.get(url)
358
-
359
- if response.status_code == 200:
360
- elements = response.json().get('elements', [])
361
- comments_texts = []
362
- for c in elements:
363
- # Extracting comment text. Adjust if the structure is different.
364
- # The original working code stored `data.get('elements', [])`
365
- # If you need the full comment object, store 'c' instead of 'comment_text'.
366
- message_obj = c.get('message', {})
367
- if isinstance(message_obj, dict): # Ensure message is a dict before .get('text')
368
- comment_text = message_obj.get('text')
369
- if comment_text:
370
- comments_texts.append(comment_text)
371
- elif isinstance(message_obj, str): # Sometimes message might be just a string
372
- comments_texts.append(message_obj)
373
-
374
- all_comments_by_post[post_urn] = comments_texts
375
- logging.info(f"Fetched {len(comments_texts)} comments for {post_urn}.")
376
- elif response.status_code == 403:
377
- logging.warning(f"Forbidden (403) to fetch comments for {post_urn}. URL: {url}. Response: {response.text}. Check permissions or API version.")
378
- all_comments_by_post[post_urn] = [] # Or some error indicator
379
- elif response.status_code == 404:
380
- logging.warning(f"Comments not found (404) for {post_urn}. URL: {url}. Response: {response.text}")
381
- all_comments_by_post[post_urn] = []
382
- else:
383
- logging.error(f"Error fetching comments for {post_urn}. Status: {response.status_code}. URL: {url}. Response: {response.text}")
384
- all_comments_by_post[post_urn] = [] # Or some error indicator
385
- except requests.exceptions.RequestException as e:
386
- logging.error(f"RequestException fetching comments for {post_urn}: {e}")
387
- all_comments_by_post[post_urn] = []
388
- except json.JSONDecodeError as e:
389
- # Log the response text if it's available and JSON decoding fails
390
- response_text_for_log = 'N/A'
391
- if 'response' in locals() and hasattr(response, 'text'):
392
- response_text_for_log = response.text
393
- logging.error(f"JSONDecodeError fetching comments for {post_urn}. Response: {response_text_for_log}. Error: {e}")
394
- all_comments_by_post[post_urn] = []
395
- except Exception as e:
396
- # Catch any other unexpected errors
397
- logging.error(f"Unexpected error fetching comments for {post_urn}: {e}", exc_info=True) # exc_info=True for traceback
398
- all_comments_by_post[post_urn] = []
399
-
400
- return all_comments_by_post
401
-
402
- def analyze_sentiment(all_comments_data):
403
- """
404
- Analyzes sentiment for comments grouped by post_urn using the helper function.
405
- all_comments_data is a dict: {post_urn: [comment_text_1, comment_text_2,...]}
406
- Returns a dict: {post_urn: {"sentiment": "DominantOverallSentiment", "percentage": X.X, "details": {aggregated_counts}}}
407
- """
408
- results_by_post = {}
409
- logging.info(f"Analyzing aggregated sentiment for comments from {len(all_comments_data)} posts.")
410
- for post_urn, comments_list in all_comments_data.items():
411
- aggregated_sentiment_counts = defaultdict(int)
412
- total_valid_comments_for_post = 0
413
-
414
- if not comments_list:
415
- results_by_post[post_urn] = {"sentiment": "Neutral 😐", "percentage": 0.0, "details": dict(aggregated_sentiment_counts)}
416
- continue
417
-
418
- for comment_text in comments_list:
419
- if not comment_text or not comment_text.strip():
420
- continue
421
-
422
- # Use the helper for individual comment sentiment
423
- single_comment_sentiment = _get_sentiment_from_text(comment_text)
424
-
425
- # Aggregate counts
426
- for label, count in single_comment_sentiment["counts"].items():
427
- aggregated_sentiment_counts[label] += count
428
-
429
- if single_comment_sentiment["label"] != "Error": # Count valid analyses
430
- total_valid_comments_for_post +=1
431
-
432
- dominant_overall_sentiment = "Neutral 😐" # Default
433
- percentage = 0.0
434
-
435
- if total_valid_comments_for_post > 0:
436
- # Determine dominant sentiment from aggregated_sentiment_counts
437
- # Exclude 'Error' from being a dominant sentiment unless it's the only category with counts
438
- valid_sentiments = {k: v for k, v in aggregated_sentiment_counts.items() if k != 'Error' and v > 0}
439
- if not valid_sentiments:
440
- dominant_overall_sentiment = 'Error' if aggregated_sentiment_counts['Error'] > 0 else 'Neutral 😐'
441
- else:
442
- # Simple max count logic for dominance
443
- dominant_overall_sentiment = max(valid_sentiments, key=valid_sentiments.get)
444
-
445
- if dominant_overall_sentiment != 'Error':
446
- percentage = round((aggregated_sentiment_counts[dominant_overall_sentiment] / total_valid_comments_for_post) * 100, 1)
447
- else: # if dominant is 'Error' or only errors were found
448
- percentage = 0.0
449
- elif aggregated_sentiment_counts['Error'] > 0 : # No valid comments, but errors occurred
450
- dominant_overall_sentiment = 'Error'
451
-
452
-
453
- results_by_post[post_urn] = {
454
- "sentiment": dominant_overall_sentiment,
455
- "percentage": percentage,
456
- "details": dict(aggregated_sentiment_counts) # Store aggregated counts
457
- }
458
- logging.debug(f"Aggregated sentiment for post {post_urn}: {results_by_post[post_urn]}")
459
-
460
- return results_by_post
461
-
462
-
463
- def compile_detailed_posts(processed_raw_posts, stats_map, sentiments_per_post):
464
- """
465
- Combines processed raw post data with their statistics and overall comment sentiment.
466
- """
467
- detailed_post_list = []
468
- logging.info(f"Compiling detailed data for {len(processed_raw_posts)} posts.")
469
- for proc_post in processed_raw_posts:
470
- post_id = proc_post["id"]
471
- stats = stats_map.get(post_id, {})
472
-
473
- likes = stats.get("likeCount", 0)
474
- comments_stat_count = stats.get("commentSummary", {}).get("totalComments", stats.get("commentCount", 0))
475
-
476
- clicks = stats.get("clickCount", 0)
477
- shares = stats.get("shareCount", 0)
478
- impressions = stats.get("impressionCount", 0)
479
- unique_impressions = stats.get("uniqueImpressionsCount", stats.get("impressionCount", 0))
480
-
481
- engagement_numerator = likes + comments_stat_count + clicks + shares
482
- engagement_rate = (engagement_numerator / impressions * 100) if impressions and impressions > 0 else 0.0
483
-
484
- sentiment_info = sentiments_per_post.get(post_id, {"sentiment": "Neutral 😐", "percentage": 0.0, "details": {}})
485
-
486
- display_text = html.escape(proc_post["raw_text"][:250]).replace("\n", "<br>") + \
487
- ("..." if len(proc_post["raw_text"]) > 250 else "")
488
-
489
- when_formatted = datetime.fromtimestamp(proc_post["published_at_timestamp"] / 1000).strftime("%Y-%m-%d %H:%M") \
490
- if proc_post["published_at_timestamp"] else "Unknown"
491
-
492
- detailed_post_list.append({
493
- "id": post_id,
494
- "when": when_formatted,
495
- "text_for_display": display_text,
496
- "raw_text": proc_post["raw_text"],
497
- "likes": likes,
498
- "comments_stat_count": comments_stat_count,
499
- "clicks": clicks,
500
- "shares": shares,
501
- "impressions": impressions,
502
- "uniqueImpressionsCount": unique_impressions,
503
- "engagement": f"{engagement_rate:.2f}%",
504
- "engagement_raw": engagement_rate,
505
- "sentiment": sentiment_info["sentiment"],
506
- "sentiment_percent": sentiment_info["percentage"],
507
- "sentiment_details": sentiment_info.get("details", {}),
508
- "summary": proc_post["summary"],
509
- "category": proc_post["category"],
510
- "organization_urn": proc_post["organization_urn"],
511
- "is_ad": proc_post["is_ad"],
512
- "media_category": proc_post.get("media_category", "NONE"),
513
- "published_at": proc_post["published_at_iso"]
514
- })
515
- logging.info(f"Compiled {len(detailed_post_list)} detailed posts.")
516
- return detailed_post_list
517
-
518
-
519
- def prepare_data_for_bubble(detailed_posts, all_actual_comments_data):
520
- """
521
- Prepares data lists for uploading to Bubble.
522
- - detailed_posts: List of comprehensively compiled post objects.
523
- - all_actual_comments_data: Dict of {post_urn: [comment_texts]} from fetch_comments.
524
- """
525
- li_posts = []
526
- li_post_stats = []
527
- li_post_comments = []
528
- logging.info("Preparing posts data for Bubble.")
529
-
530
- if not detailed_posts:
531
- logging.warning("No detailed posts to prepare for Bubble.")
532
- return [], [], []
533
-
534
- org_urn_default = detailed_posts[0]["organization_urn"] if detailed_posts else "urn:li:organization:UNKNOWN"
535
-
536
- for post_data in detailed_posts:
537
- li_posts.append({
538
- "organization_urn": post_data["organization_urn"],
539
- "id": post_data["id"],
540
- "is_ad": post_data["is_ad"],
541
- "media_type": post_data.get("media_category", "NONE"),
542
- "published_at": post_data["published_at"],
543
- "sentiment": post_data["sentiment"],
544
- "text": post_data["raw_text"],
545
- #"summary_text": post_data["summary"],
546
- "li_eb_label": post_data["category"]
547
- })
548
-
549
- li_post_stats.append({
550
- "clickCount": post_data["clicks"],
551
- "commentCount": post_data["comments_stat_count"],
552
- "engagement": post_data["engagement_raw"],
553
- "impressionCount": post_data["impressions"],
554
- "likeCount": post_data["likes"],
555
- "shareCount": post_data["shares"],
556
- "uniqueImpressionsCount": post_data["uniqueImpressionsCount"],
557
- "post_id": post_data["id"],
558
- "organization_urn": post_data["organization_urn"]
559
- })
560
-
561
- for post_urn, comments_text_list in all_actual_comments_data.items():
562
- current_post_org_urn = org_urn_default
563
- for p in detailed_posts:
564
- if p["id"] == post_urn:
565
- current_post_org_urn = p["organization_urn"]
566
- break
567
-
568
- for single_comment_text in comments_text_list:
569
- if single_comment_text and single_comment_text.strip():
570
- li_post_comments.append({
571
- "comment_text": single_comment_text,
572
- "post_id": post_urn,
573
- "organization_urn": current_post_org_urn
574
- })
575
-
576
- logging.info(f"Prepared {len(li_posts)} posts, {len(li_post_stats)} stats entries, and {len(li_post_comments)} comments for Bubble.")
577
- return li_posts, li_post_stats, li_post_comments
578
-
579
- # --- Mentions Retrieval Functions ---
580
-
581
- def fetch_linkedin_mentions_core(comm_client_id, community_token, org_urn, count=20):
582
- """
583
- Fetches raw mention notifications and the details of the posts where the organization was mentioned.
584
- Returns a list of processed mention data (internal structure).
585
- """
586
- token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
587
- session = create_session(comm_client_id, token=token_dict)
588
- session.headers.update({
589
- "X-Restli-Protocol-Version": "2.0.0",
590
- "LinkedIn-Version": "202502"
591
- })
592
-
593
- encoded_org_urn = quote(org_urn, safe='')
594
-
595
- notifications_url_base = (
596
- f"{API_REST_BASE}/organizationalEntityNotifications"
597
- f"?q=criteria"
598
- f"&actions=List(SHARE_MENTION)"
599
- f"&organizationalEntity={encoded_org_urn}"
600
- f"&count={count}"
601
- )
602
-
603
- all_notifications = []
604
- start_index = 0
605
- processed_mentions_internal = []
606
- page_count = 0
607
- max_pages = 10
608
-
609
- while page_count < max_pages:
610
- current_url = f"{notifications_url_base}&start={start_index}"
611
- logging.info(f"Fetching notifications page {page_count + 1} from URL: {current_url}")
612
- try:
613
- resp = session.get(current_url)
614
- resp.raise_for_status()
615
- data = resp.json()
616
- elements = data.get("elements", [])
617
-
618
- if not elements:
619
- logging.info(f"No more notifications found on page {page_count + 1}. Total notifications fetched: {len(all_notifications)}.")
620
- break
621
-
622
- all_notifications.extend(elements)
623
-
624
- paging = data.get("paging", {})
625
- if 'start' not in paging or 'count' not in paging or len(elements) < paging.get('count', count):
626
- logging.info(f"Last page of notifications fetched. Total notifications: {len(all_notifications)}.")
627
- break
628
-
629
- start_index = paging['start'] + paging['count']
630
- page_count += 1
631
-
632
- except requests.exceptions.RequestException as e:
633
- status = getattr(e.response, 'status_code', 'N/A')
634
- text = getattr(e.response, 'text', 'No response text')
635
- logging.error(f"Failed to fetch notifications (Status: {status}): {e}. Response: {text}")
636
- break
637
- except json.JSONDecodeError as e:
638
- logging.error(f"Failed to decode JSON from notifications response: {e}. Response: {resp.text if resp else 'No resp obj'}")
639
- break
640
- if page_count >= max_pages:
641
- logging.info(f"Reached max_pages ({max_pages}) for fetching notifications.")
642
- break
643
-
644
- if not all_notifications:
645
- logging.info("No mention notifications found after fetching.")
646
- return []
647
-
648
- mention_share_urns = list(set([
649
- n.get("generatedActivity") for n in all_notifications
650
- if n.get("action") == "SHARE_MENTION" and n.get("generatedActivity")
651
- ]))
652
-
653
- logging.info(f"Found {len(mention_share_urns)} unique share URNs from SHARE_MENTION notifications.")
654
-
655
- # for share_urn in mention_share_urns:
656
- # encoded_share_urn = quote(share_urn, safe='')
657
- # post_detail_url = f"{API_REST_BASE}/posts/{encoded_share_urn}"
658
- # logging.info(f"Fetching details for mentioned post: {post_detail_url}")
659
- # try:
660
- # post_resp = session.get(post_detail_url)
661
- # post_resp.raise_for_status()
662
- # post_data = post_resp.json()
663
-
664
- # commentary_raw = post_data.get("commentary")
665
- # if not commentary_raw and "specificContent" in post_data:
666
- # share_content = post_data.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {})
667
- # commentary_raw = share_content.get("shareCommentaryV2", {}).get("text", "")
668
-
669
- # if not commentary_raw:
670
- # logging.warning(f"No commentary found for share URN {share_urn}. Skipping.")
671
- # continue
672
-
673
- # mention_text_cleaned = extract_text_from_mention_commentary(commentary_raw)
674
- # timestamp = post_data.get("publishedAt") or post_data.get("createdAt") or post_data.get("firstPublishedAt")
675
- # published_at_iso = datetime.fromtimestamp(timestamp / 1000).isoformat() if timestamp else None
676
- # author_urn = post_data.get("author", "urn:li:unknown")
677
-
678
- # processed_mentions_internal.append({
679
- # "mention_id": f"mention_{share_urn}",
680
- # "share_urn": share_urn,
681
- # "mention_text_raw": commentary_raw,
682
- # "mention_text_cleaned": mention_text_cleaned,
683
- # "published_at_timestamp": timestamp,
684
- # "published_at_iso": published_at_iso,
685
- # "mentioned_by_author_urn": author_urn,
686
- # "organization_urn_mentioned": org_urn
687
- # })
688
- # except requests.exceptions.RequestException as e:
689
- # status = getattr(e.response, 'status_code', 'N/A')
690
- # text = getattr(e.response, 'text', 'No response text')
691
- # logging.warning(f"Failed to fetch post details for share URN {share_urn} (Status: {status}): {e}. Response: {text}")
692
- # except json.JSONDecodeError as e:
693
- # logging.warning(f"Failed to decode JSON for post details {share_urn}: {e}. Response: {post_resp.text if post_resp else 'No resp obj'}")
694
-
695
- if mention_share_urns:
696
- # Encode URNs for the batch request URL
697
- encoded_urns = [quote(urn, safe='') for urn in mention_share_urns]
698
- formatted_urns = ",".join(encoded_urns)
699
-
700
- # Construct the URL for batch fetching post details
701
- # API_REST_BASE should be the base URL like "https://api.linkedin.com/rest"
702
- batch_posts_url = f"{API_REST_BASE}/posts?ids=List({formatted_urns})"
703
- logging.info(f"Fetching details for {len(mention_share_urns)} posts in a batch: {batch_posts_url}")
704
-
705
- try:
706
- batch_resp = session.get(batch_posts_url)
707
- batch_resp.raise_for_status() # Raise an exception for HTTP errors
708
- batch_data = batch_resp.json()
709
-
710
- results = batch_data.get("results", {}) # Contains post details keyed by URN
711
- errors = batch_data.get("errors", {}) # Contains errors for specific URNs
712
- statuses = batch_data.get("statuses", {}) # Contains HTTP statuses for specific URNs
713
-
714
- # Process each share URN using the data from the batch response
715
- for share_urn in mention_share_urns:
716
- if share_urn not in results:
717
- # Log if a URN was requested but not found in the results
718
- logging.warning(
719
- f"Post details for share URN {share_urn} not found in batch response. "
720
- f"Status: {statuses.get(share_urn)}, Error: {errors.get(share_urn)}"
721
- )
722
- continue
723
-
724
- post_data = results[share_urn]
725
-
726
- # Extract commentary - try direct 'commentary' field first, then fallback
727
- commentary_raw = post_data.get("commentary")
728
- if not commentary_raw and "specificContent" in post_data:
729
- # Fallback for older structures or specific share types if 'commentary' is not top-level
730
- share_content = post_data.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {})
731
- commentary_raw = share_content.get("shareCommentaryV2", {}).get("text", "")
732
-
733
- if not commentary_raw:
734
- logging.warning(f"No commentary found for share URN {share_urn} in batch data. Skipping.")
735
- continue
736
-
737
- # Clean the commentary text (assuming this function is defined)
738
- mention_text_cleaned = extract_text_from_mention_commentary(commentary_raw)
739
-
740
- # Extract timestamp and convert to ISO format
741
- timestamp = post_data.get("publishedAt") or post_data.get("createdAt") or post_data.get("firstPublishedAt")
742
- published_at_iso = datetime.fromtimestamp(timestamp / 1000).isoformat() if timestamp else None
743
-
744
- # Extract author URN
745
- author_urn = post_data.get("author", "urn:li:unknown") # Default if author is not found
746
-
747
- # Append processed mention data
748
- processed_mentions_internal.append({
749
- "mention_id": f"mention_{share_urn}", # Create a unique ID for the mention
750
- "share_urn": share_urn,
751
- "mention_text_raw": commentary_raw,
752
- "mention_text_cleaned": mention_text_cleaned,
753
- "published_at_timestamp": timestamp,
754
- "published_at_iso": published_at_iso,
755
- "mentioned_by_author_urn": author_urn,
756
- "organization_urn_mentioned": org_urn # The URN of the organization that was mentioned
757
- })
758
-
759
- except requests.exceptions.RequestException as e:
760
- status = getattr(e.response, 'status_code', 'N/A')
761
- text = getattr(e.response, 'text', 'No response text')
762
- logging.error(f"Failed to fetch batch post details (Status: {status}): {e}. Response: {text}")
763
- except json.JSONDecodeError as e:
764
- # Log error if JSON decoding fails for the batch response
765
- logging.error(f"Failed to decode JSON from batch posts response: {e}. Response: {batch_resp.text if batch_resp else 'No resp obj'}")
766
-
767
-
768
- logging.info(f"Processed {len(processed_mentions_internal)} mentions with their post details.")
769
- return processed_mentions_internal
770
-
771
-
772
- def analyze_mentions_sentiment(processed_mentions_list):
773
- """
774
- Analyzes sentiment for the text of each processed mention using the helper function.
775
- Input: list of processed_mention dicts (internal structure from fetch_linkedin_mentions_core).
776
- Returns: a dict {mention_id: {"sentiment_label": "DominantSentiment", "percentage": 100.0, "details": {counts}}}
777
- """
778
- mention_sentiments_map = {}
779
- logging.info(f"Analyzing individual sentiment for {len(processed_mentions_list)} mentions.")
780
-
781
- for mention_data in processed_mentions_list:
782
- mention_internal_id = mention_data["mention_id"] # Internal ID from fetch_linkedin_mentions_core
783
- text_to_analyze = mention_data.get("mention_text_cleaned", "")
784
-
785
- sentiment_result = _get_sentiment_from_text(text_to_analyze)
786
-
787
- # For single text, percentage is 100% for the dominant label if not error
788
- percentage = 0.0
789
- if sentiment_result["label"] != "Error" and any(sentiment_result["counts"].values()):
790
- percentage = 100.0
791
-
792
- mention_sentiments_map[mention_internal_id] = {
793
- "sentiment_label": sentiment_result["label"], # The dominant sentiment label
794
- "percentage": percentage,
795
- "details": dict(sentiment_result["counts"]) # Raw counts for this specific mention
796
- }
797
- logging.debug(f"Individual sentiment for mention {mention_internal_id}: {mention_sentiments_map[mention_internal_id]}")
798
-
799
- return mention_sentiments_map
800
-
801
-
802
- def compile_detailed_mentions(processed_mentions_list, mention_sentiments_map):
803
- """
804
- Combines processed mention data (internal structure) with their sentiment analysis
805
- into the user-specified output format.
806
- processed_mentions_list: list of dicts from fetch_linkedin_mentions_core
807
- mention_sentiments_map: dict from analyze_mentions_sentiment, keyed by "mention_id" (internal)
808
- and contains "sentiment_label".
809
- """
810
- detailed_mentions_output = []
811
- logging.info(f"Compiling detailed data for {len(processed_mentions_list)} mentions into specified format.")
812
-
813
- for mention_core_data in processed_mentions_list:
814
- mention_internal_id = mention_core_data["mention_id"]
815
- sentiment_info = mention_sentiments_map.get(mention_internal_id, {"sentiment_label": "Neutral 😐"})
816
-
817
- date_formatted = "Unknown"
818
- if mention_core_data["published_at_timestamp"]:
819
- try:
820
- date_formatted = datetime.fromtimestamp(mention_core_data["published_at_timestamp"] / 1000).strftime("%Y-%m-%d %H:%M")
821
- except TypeError:
822
- logging.warning(f"Could not format timestamp for mention_id {mention_internal_id}")
823
-
824
- detailed_mentions_output.append({
825
- "date": date_formatted, # User-specified field name
826
- "id": mention_core_data["share_urn"], # User-specified field name (URN of the post with mention)
827
- "mention_text": mention_core_data["mention_text_cleaned"], # User-specified field name
828
- "organization_urn": mention_core_data["organization_urn_mentioned"], # User-specified field name
829
- "sentiment_label": sentiment_info["sentiment_label"] # User-specified field name
830
- })
831
- logging.info(f"Compiled {len(detailed_mentions_output)} detailed mentions with specified fields.")
832
- return detailed_mentions_output
833
-
834
-
835
- def prepare_mentions_for_bubble(compiled_detailed_mentions_list):
836
- """
837
- Prepares mention data for uploading to a Bubble table.
838
- The input `compiled_detailed_mentions_list` is already in the user-specified format:
839
- [{"date": ..., "id": ..., "mention_text": ..., "organization_urn": ..., "sentiment_label": ...}, ...]
840
- This function directly uses these fields as per user's selection for Bubble upload.
841
- """
842
- li_mentions_bubble = []
843
- logging.info(f"Preparing {len(compiled_detailed_mentions_list)} compiled mentions for Bubble upload.")
844
-
845
- if not compiled_detailed_mentions_list:
846
- return []
847
-
848
- for mention_data in compiled_detailed_mentions_list:
849
- # The mention_data dictionary already has the keys:
850
- # "date", "id", "mention_text", "organization_urn", "sentiment_label"
851
- # These are used directly for the Bubble upload list.
852
- li_mentions_bubble.append({
853
- "date": mention_data["date"],
854
- "id": mention_data["id"],
855
- "mention_text": mention_data["mention_text"],
856
- "organization_urn": mention_data["organization_urn"],
857
- "sentiment_label": mention_data["sentiment_label"]
858
- # If Bubble table has different field names, mapping would be done here.
859
- # Example: "bubble_mention_date": mention_data["date"],
860
- # For now, using direct mapping as per user's selected code for the append.
861
- })
862
-
863
- logging.info(f"Prepared {len(li_mentions_bubble)} mention entries for Bubble, using direct field names from compiled data.")
864
- return li_mentions_bubble