File size: 18,310 Bytes
37335e7
 
 
 
 
 
f023388
37335e7
 
 
 
 
f023388
 
37335e7
 
f023388
37335e7
f023388
37335e7
 
2dfff06
f023388
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37335e7
 
f023388
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37335e7
f023388
37335e7
f023388
 
37335e7
f023388
37335e7
f023388
 
 
37335e7
f023388
 
 
37335e7
 
f023388
 
 
 
 
 
 
 
 
 
 
 
 
37335e7
f023388
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37335e7
f023388
 
 
 
 
 
 
 
37335e7
 
f023388
 
 
 
 
37335e7
 
 
 
 
 
f023388
37335e7
f023388
 
 
37335e7
f023388
 
 
 
 
 
 
 
 
37335e7
f023388
 
 
 
 
 
 
 
37335e7
 
f023388
 
 
 
 
 
 
 
37335e7
 
 
f023388
 
 
 
37335e7
 
 
f023388
37335e7
f023388
 
 
 
 
 
 
 
 
 
 
 
 
37335e7
f023388
37335e7
f023388
 
 
37335e7
f023388
37335e7
 
 
f023388
 
 
37335e7
f023388
 
 
 
 
 
 
 
37335e7
f023388
 
37335e7
f1fb052
f023388
 
 
 
 
 
f1fb052
 
f023388
 
138ba23
f023388
 
f1fb052
f023388
 
 
 
 
9a99f17
517dd8f
 
138ba23
f023388
f1fb052
 
f023388
f1fb052
f023388
9a99f17
517dd8f
f023388
 
 
 
138ba23
 
f1fb052
 
f023388
 
 
 
 
f1fb052
f023388
138ba23
3cff7a2
f023388
f1fb052
f023388
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
import json
import requests
import html
from datetime import datetime
from collections import defaultdict
from transformers import pipeline

from sessions import create_session
from error_handling import display_error
from posts_categorization import batch_summarize_and_classify
import logging


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

API_V2_BASE = 'https://api.linkedin.com/v2'
API_REST_BASE = "https://api.linkedin.com/rest" # Corrected from API_REST_BASE to API_REST_BASE

# Initialize sentiment pipeline (consider loading it once globally if this module is imported multiple times)
sentiment_pipeline = pipeline("text-classification", model="tabularisai/multilingual-sentiment-analysis")

def fetch_linkedin_posts_core(comm_client_id, community_token, org_urn, count=20):
    """
    Fetches raw posts, their basic statistics, and performs summarization/categorization.
    Does NOT fetch comments or analyze sentiment.
    """
    token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
    session = create_session(comm_client_id, token=token_dict)
    org_name = "GRLS" # Placeholder or fetch if necessary

    posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED"
    logging.info(f"Fetching posts from URL: {posts_url}")
    try:
        resp = session.get(posts_url)
        resp.raise_for_status()
        raw_posts_api = resp.json().get("elements", [])
        logging.info(f"Fetched {len(raw_posts_api)} raw posts from API.")
    except requests.exceptions.RequestException as e:
        status = getattr(e.response, 'status_code', 'N/A')
        logging.error(f"Failed to fetch posts (Status: {status}): {e}")
        raise ValueError(f"Failed to fetch posts (Status: {status})") from e

    if not raw_posts_api:
        logging.info("No raw posts found.")
        return [], {}, org_name

    # Filter for valid post types if necessary, e.g., shares or ugcPosts
    # post_urns_for_stats = [p["id"] for p in raw_posts_api if ":share:" in p["id"] or ":ugcPost:" in p["id"]]
    post_urns_for_stats = [p["id"] for p in raw_posts_api if p.get("id")]


    # Prepare texts for summarization/classification
    post_texts_for_nlp = []
    for p in raw_posts_api:
        text_content = p.get("commentary") or \
                       p.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text", "") or \
                       "[No text content]"
        post_texts_for_nlp.append({"text": text_content, "id": p.get("id")})

    logging.info(f"Prepared {len(post_texts_for_nlp)} posts for NLP.")
    structured_results_list = batch_summarize_and_classify(post_texts_for_nlp)
    # Create a dictionary for easy lookup of structured results by post ID
    structured_results_map = {res["id"]: res for res in structured_results_list if "id" in res}


    # Fetch statistics
    stats_map = {}
    if post_urns_for_stats:
        for i in range(0, len(post_urns_for_stats), 20): # LinkedIn API often has batch limits
            batch_urns = post_urns_for_stats[i:i+20]
            params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn}
            for idx, urn_str in enumerate(batch_urns):
                # Determine if it's a share or ugcPost based on URN structure (simplified)
                key_prefix = "shares" if ":share:" in urn_str else "ugcPosts"
                params[f"{key_prefix}[{idx}]"] = urn_str
            
            try:
                logging.info(f"Fetching stats for batch starting with URN: {batch_urns[0]}")
                stat_resp = session.get(f"{API_REST_BASE}/organizationalEntityShareStatistics", params=params)
                stat_resp.raise_for_status()
                for stat_element in stat_resp.json().get("elements", []):
                    urn = stat_element.get("share") or stat_element.get("ugcPost")
                    if urn:
                        stats_map[urn] = stat_element.get("totalShareStatistics", {})
                logging.info(f"Successfully fetched stats for {len(batch_urns)} URNs. Current stats_map size: {len(stats_map)}")
            except requests.exceptions.RequestException as e:
                logging.warning(f"Failed to fetch stats for a batch: {e}. Response: {e.response.text if e.response else 'No response'}")
                # Continue to next batch, some stats might be missing
            except json.JSONDecodeError as e:
                logging.warning(f"Failed to decode JSON from stats response: {e}. Response: {stat_resp.text if stat_resp else 'No response text'}")


    processed_raw_posts = []
    for p in raw_posts_api:
        post_id = p.get("id")
        if not post_id:
            logging.warning("Skipping raw post due to missing ID.")
            continue

        text_content = p.get("commentary") or \
                       p.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text", "") or \
                       "[No text content]"
        
        timestamp = p.get("publishedAt") or p.get("createdAt")
        published_at_iso = datetime.fromtimestamp(timestamp / 1000).isoformat() if timestamp else None
        
        structured_res = structured_results_map.get(post_id, {"summary": "N/A", "category": "N/A"})

        processed_raw_posts.append({
            "id": post_id,
            "raw_text": text_content,
            "summary": structured_res["summary"],
            "category": structured_res["category"],
            "published_at_timestamp": timestamp,
            "published_at_iso": published_at_iso,
            # These are placeholders for actual fields from LinkedIn API response. Verify field names.
            "author_urn": p.get("author", "urn:li:unknown"), # e.g., "urn:li:person:xxxx" or "urn:li:organization:xxxx"
            "is_ad": p.get("isSponsored", False), # LinkedIn might use a different field like 'sponsored' or 'promoted'
            "media_type": p.get("mediaCategory", "NONE") # e.g., ARTICLE, IMAGE, VIDEO, NONE
        })
    logging.info(f"Processed {len(processed_raw_posts)} posts with core data.")
    return processed_raw_posts, stats_map, org_name


def fetch_comments(comm_client_id, token_dict, post_urns, stats_map):
    """
    Fetches comments for a list of post URNs.
    Uses stats_map to potentially skip posts with 0 comments.
    """
    from requests_oauthlib import OAuth2Session # Keep import here if OAuth2Session is specific to this
    
    linkedin_session = OAuth2Session(comm_client_id, token=token_dict)
    # LinkedIn API versions can change, ensure this is up-to-date.
    # Using a recent version like "202402" or as per current LinkedIn docs.
    # The user had "202502", which might be a future version. Using a slightly older one for safety.
    linkedin_session.headers.update({'LinkedIn-Version': "202405", 'X-Restli-Protocol-Version': '2.0.0'})
    
    all_comments_by_post = {}
    logging.info(f"Fetching comments for {len(post_urns)} posts.")

    for post_urn in post_urns:
        # Optimization: if stats show 0 comments, skip API call for this post's comments
        if stats_map.get(post_urn, {}).get('commentCount', 0) == 0:
            logging.info(f"Skipping comment fetch for {post_urn} as commentCount is 0 in stats_map.")
            all_comments_by_post[post_urn] = []
            continue
        
        try:
            # According to LinkedIn docs, comments are often under /socialActions/{activityUrn}/comments
            # or /commentsV2?q=entity&entity={activityUrn}
            # The user's URL was /socialActions/{post_urn}/comments - this seems plausible for URNs like ugcPost URNs.
            url = f"{API_REST_BASE}/socialActions/{post_urn}/comments"
            logging.debug(f"Fetching comments from URL: {url} for post URN: {post_urn}")
            response = linkedin_session.get(url)
            
            if response.status_code == 200:
                elements = response.json().get('elements', [])
                comments_texts = [
                    c.get('message', {}).get('text') 
                    for c in elements 
                    if c.get('message') and c.get('message', {}).get('text')
                ]
                all_comments_by_post[post_urn] = comments_texts
                logging.info(f"Fetched {len(comments_texts)} comments for {post_urn}.")
            elif response.status_code == 403: # Forbidden, often permissions or versioning
                 logging.warning(f"Forbidden (403) to fetch comments for {post_urn}. URL: {url}. Response: {response.text}")
                 all_comments_by_post[post_urn] = []
            elif response.status_code == 404: # Not found
                 logging.warning(f"Comments not found (404) for {post_urn}. URL: {url}. Response: {response.text}")
                 all_comments_by_post[post_urn] = []
            else:
                logging.error(f"Error fetching comments for {post_urn}. Status: {response.status_code}. Response: {response.text}")
                all_comments_by_post[post_urn] = []
        except requests.exceptions.RequestException as e:
            logging.error(f"RequestException fetching comments for {post_urn}: {e}")
            all_comments_by_post[post_urn] = []
        except Exception as e: # Catch any other unexpected errors
            logging.error(f"Unexpected error fetching comments for {post_urn}: {e}")
            all_comments_by_post[post_urn] = []
            
    return all_comments_by_post

def analyze_sentiment(all_comments_data):
    """
    Analyzes sentiment for comments grouped by post_urn.
    all_comments_data is a dict: {post_urn: [comment_text_1, comment_text_2,...]}
    Returns a dict: {post_urn: {"sentiment": "DominantSentiment", "percentage": X.X}}
    """
    results_by_post = {}
    logging.info(f"Analyzing sentiment for comments from {len(all_comments_data)} posts.")
    for post_urn, comments_list in all_comments_data.items():
        sentiment_counts = defaultdict(int)
        total_valid_comments_for_post = 0
        
        if not comments_list:
            results_by_post[post_urn] = {"sentiment": "Neutral 😐", "percentage": 0.0, "details": sentiment_counts}
            continue

        for comment_text in comments_list:
            if not comment_text or not comment_text.strip(): # Skip empty comments
                continue
            try:
                # The pipeline expects a string or list of strings.
                # Ensure comment_text is a string.
                analysis_result = sentiment_pipeline(str(comment_text))
                label = analysis_result[0]['label'].upper()
                
                if label in ['POSITIVE', 'VERY POSITIVE']:
                    sentiment_counts['Positive πŸ‘'] += 1
                elif label in ['NEGATIVE', 'VERY NEGATIVE']:
                    sentiment_counts['Negative πŸ‘Ž'] += 1
                elif label == 'NEUTRAL':
                    sentiment_counts['Neutral 😐'] += 1
                else: # Other labels from the model
                    sentiment_counts['Unknown'] += 1
                total_valid_comments_for_post += 1
            except Exception as e:
                logging.error(f"Sentiment analysis failed for comment under {post_urn}: '{comment_text[:50]}...'. Error: {e}")
                sentiment_counts['Error'] += 1
        
        if total_valid_comments_for_post > 0:
            dominant_sentiment = max(sentiment_counts, key=sentiment_counts.get, default='Neutral 😐')
            percentage = round((sentiment_counts[dominant_sentiment] / total_valid_comments_for_post) * 100, 1)
        else: # No valid comments to analyze
            dominant_sentiment = 'Neutral 😐'
            percentage = 0.0
            if sentiment_counts['Error'] > 0 : # If there were only errors
                 dominant_sentiment = 'Error'

        results_by_post[post_urn] = {
            "sentiment": dominant_sentiment, 
            "percentage": percentage,
            "details": dict(sentiment_counts) # Store counts for more detailed reporting if needed
        }
        logging.debug(f"Sentiment for {post_urn}: {results_by_post[post_urn]}")
        
    return results_by_post


def compile_detailed_posts(processed_raw_posts, stats_map, sentiments_per_post):
    """
    Combines processed raw post data with their statistics and overall sentiment.
    """
    detailed_post_list = []
    logging.info(f"Compiling detailed data for {len(processed_raw_posts)} posts.")
    for proc_post in processed_raw_posts:
        post_id = proc_post["id"]
        stats = stats_map.get(post_id, {})

        likes = stats.get("likeCount", 0)
        # Use 'commentSummary' from stats for comment count if available, else 'commentCount'
        # LinkedIn sometimes has commentSummary.totalComments
        comments_stat_count = stats.get("commentSummary", {}).get("totalComments") if "commentSummary" in stats else stats.get("commentCount", 0)
        
        clicks = stats.get("clickCount", 0)
        shares = stats.get("shareCount", 0)
        impressions = stats.get("impressionCount", 0)
        unique_impressions = stats.get("uniqueImpressionsCount", 0) # Ensure this field is in API response

        # Calculate engagement: (likes + comments + clicks + shares) / impressions
        # Ensure impressions is not zero to avoid DivisionByZeroError
        engagement_numerator = likes + comments_stat_count + clicks + shares
        engagement_rate = (engagement_numerator / impressions * 100) if impressions else 0.0
        
        sentiment_info = sentiments_per_post.get(post_id, {"sentiment": "Neutral 😐", "percentage": 0.0})
        
        # Format text for display (escaped and truncated)
        display_text = html.escape(proc_post["raw_text"][:250]).replace("\n", "<br>") + \
                       ("..." if len(proc_post["raw_text"]) > 250 else "")
        
        when_formatted = datetime.fromtimestamp(proc_post["published_at_timestamp"] / 1000).strftime("%Y-%m-%d %H:%M") \
            if proc_post["published_at_timestamp"] else "Unknown"

        detailed_post_list.append({
            "id": post_id,
            "when": when_formatted,
            "text_for_display": display_text, # Shortened, escaped text
            "raw_text": proc_post["raw_text"], # Full original text
            "likes": likes,
            "comments_stat_count": comments_stat_count, # Count from post statistics
            "clicks": clicks,
            "shares": shares,
            "impressions": impressions,
            "uniqueImpressionsCount": unique_impressions,
            "engagement": f"{engagement_rate:.2f}%", # Formatted string
            "engagement_raw": engagement_rate, # Raw float for potential calculations
            "sentiment": sentiment_info["sentiment"],
            "sentiment_percent": sentiment_info["percentage"],
            "sentiment_details": sentiment_info.get("details", {}), # Detailed counts
            "summary": proc_post["summary"],
            "category": proc_post["category"],
            "author_urn": proc_post["author_urn"],
            "is_ad": proc_post["is_ad"],
            "media_type": proc_post["media_type"],
            "published_at": proc_post["published_at_iso"] # ISO format datetime string
        })
    logging.info(f"Compiled {len(detailed_post_list)} detailed posts.")
    return detailed_post_list


def prepare_data_for_bubble(detailed_posts, all_actual_comments_data):
    """
    Prepares data lists for uploading to Bubble.
    - detailed_posts: List of comprehensively compiled post objects.
    - all_actual_comments_data: Dict of {post_urn: [comment_texts]} from fetch_comments.
    """
    li_posts = []
    li_post_stats = []
    li_post_comments = [] # For individual comments
    logging.info("Preparing data for Bubble.")
    org_urn = detailed_posts[0]["author_urn"]
    for post_data in detailed_posts:
        # Data for LI_post table in Bubble
        li_posts.append({
            "author_urn": post_data["author_urn"],
            "id": post_data["id"], # Post URN
            "is_ad": post_data["is_ad"],
            "media_type": post_data["media_type"],
            "published_at": post_data["published_at"], # ISO datetime string
            "sentiment": post_data["sentiment"], # Overall sentiment of the post based on its comments
            "text": post_data["raw_text"], # Storing the full raw text
            #"summary_text": post_data["summary"],
            "li_eb_label": post_data["category"]
            # Add any other fields from post_data needed for LI_post table
        })

        # Data for LI_post_stats table in Bubble
        li_post_stats.append({
            "clickCount": post_data["clicks"],
            "commentCount": post_data["comments_stat_count"], # From post's own stats
            "engagement": post_data["engagement"], # Formatted string e.g., "12.34%"
            "impressionCount": post_data["impressions"],
            "likeCount": post_data["likes"],
            "shareCount": post_data["shares"],
            "uniqueImpressionsCount": post_data["uniqueImpressionsCount"],
            "post_id": post_data["id"], # Foreign key to LI_post
            "organization_urn": org_urn
        })

    # Data for LI_post_comments table in Bubble (individual comments)
    # This iterates through the actual comments fetched, not just the count.
    for post_urn, comments_text_list in all_actual_comments_data.items():
        for single_comment_text in comments_text_list:
            if single_comment_text and single_comment_text.strip(): # Ensure comment text is not empty
                li_post_comments.append({
                    "comment_text": single_comment_text,
                    "post_id": post_urn, # Foreign key to LI_post
                    "organization_urn": org_urn
                    # Could add sentiment per comment here if analyzed at that granularity
                })
    
    logging.info(f"Prepared {len(li_posts)} posts, {len(li_post_stats)} stats entries, and {len(li_post_comments)} comments for Bubble.")
    return li_posts, li_post_stats, li_post_comments