import json import requests import html from datetime import datetime from collections import defaultdict from transformers import pipeline from sessions import create_session from error_handling import display_error from posts_categorization import batch_summarize_and_classify import logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') API_V2_BASE = 'https://api.linkedin.com/v2' API_REST_BASE = "https://api.linkedin.com/rest" sentiment_pipeline = pipeline("text-classification", model="tabularisai/multilingual-sentiment-analysis") def fetch_comments(comm_client_id, token_dict, post_urns, stats_map): from requests_oauthlib import OAuth2Session linkedin = OAuth2Session(comm_client_id, token=token_dict) linkedin.headers.update({'LinkedIn-Version': "202502"}) all_comments = {} for post_urn in post_urns: if stats_map.get(post_urn, {}).get('commentCount', 0) == 0: continue try: url = f"{API_REST_BASE}/socialActions/{post_urn}/comments" response = linkedin.get(url) if response.status_code == 200: elements = response.json().get('elements', []) all_comments[post_urn] = [c.get('message', {}).get('text') for c in elements if c.get('message')] else: all_comments[post_urn] = [] except Exception: all_comments[post_urn] = [] return all_comments def analyze_sentiment(comments_data): results = {} for post_urn, comments in comments_data.items(): sentiment_counts = defaultdict(int) total = 0 for comment in comments: if not comment: continue try: result = sentiment_pipeline(comment) label = result[0]['label'].upper() if label in ['POSITIVE', 'VERY POSITIVE']: sentiment_counts['Positive 👍'] += 1 elif label in ['NEGATIVE', 'VERY NEGATIVE']: sentiment_counts['Negative 👎'] += 1 elif label == 'NEUTRAL': sentiment_counts['Neutral 😐'] += 1 else: sentiment_counts['Unknown'] += 1 total += 1 except: sentiment_counts['Error'] += 1 dominant = max(sentiment_counts, key=sentiment_counts.get, default='Neutral 😐') percentage = round((sentiment_counts[dominant] / total) * 100, 1) if total else 0.0 results[post_urn] = {"sentiment": dominant, "percentage": percentage} return results def fetch_posts_and_stats(comm_client_id, community_token, org_urn, count=10): token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'} session = create_session(comm_client_id, token=token_dict) #org_urn, org_name = fetch_org_urn(comm_client_id, token_dict) org_name = "GRLS" posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED" try: resp = session.get(posts_url) resp.raise_for_status() raw_posts = resp.json().get("elements", []) except requests.exceptions.RequestException as e: status = getattr(e.response, 'status_code', 'N/A') raise ValueError(f"Failed to fetch posts (Status: {status})") from e if not raw_posts: return [], org_name, {} post_urns = [p["id"] for p in raw_posts if ":share:" in p["id"] or ":ugcPost:" in p["id"]] stats_map = {} post_texts = [{"text": p.get("commentary") or p.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text", "")} for p in raw_posts] structured_results = batch_summarize_and_classify(post_texts) for i in range(0, len(post_urns), 20): batch = post_urns[i:i+20] params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn} for idx, urn in enumerate(batch): key = f"shares[{idx}]" if ":share:" in urn else f"ugcPosts[{idx}]" params[key] = urn try: stat_resp = session.get(f"{API_REST_BASE}/organizationalEntityShareStatistics", params=params) stat_resp.raise_for_status() for stat in stat_resp.json().get("elements", []): urn = stat.get("share") or stat.get("ugcPost") if urn: stats_map[urn] = stat.get("totalShareStatistics", {}) except: continue comments = fetch_comments(comm_client_id, token_dict, post_urns, stats_map) sentiments = analyze_sentiment(comments) posts = [] for post in raw_posts: post_id = post.get("id") stats = stats_map.get(post_id, {}) timestamp = post.get("publishedAt") or post.get("createdAt") when = datetime.fromtimestamp(timestamp / 1000).strftime("%Y-%m-%d %H:%M") if timestamp else "Unknown" text = post.get("commentary") or post.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text") or "[No text]" text = html.escape(text[:250]).replace("\n", "
") + ("..." if len(text) > 250 else "") likes = stats.get("likeCount", 0) comments_count = stats.get("commentCount", 0) clicks = stats.get("clickCount", 0) shares = stats.get("shareCount", 0) impressions = stats.get("impressionCount", 0) engagement = stats.get("engagement", likes + comments_count + clicks + shares) / impressions * 100 if impressions else 0.0 sentiment_info = sentiments.get(post_id, {"sentiment": "Neutral 😐", "percentage": 0.0}) posts.append({ "id": post_id, "when": when, "text": text, "likes": likes, "comments": comments_count, "clicks": clicks, "shares": shares, "impressions": impressions, "engagement": f"{engagement:.2f}%", "sentiment": sentiment_info["sentiment"], "sentiment_percent": sentiment_info["percentage"] }) logging.info(f"Appended post data for {post_id}: Likes={likes}, Comments={comments_count}, Shares={shares}, Clicks={clicks}") for post, structured in zip(posts, structured_results): post["summary"] = structured["summary"] post["category"] = structured["category"] return posts, org_name, sentiments def prepare_data_for_bubble(posts, sentiments): li_posts = [] li_post_stats = [] li_post_comments = [] for post in posts: li_posts.append({ "author_urn": post["author_urn"], "id": post["id"], "is_ad": post["is_ad"], "media_type": post["media_type"], "published_at": post["published_at"], "sentiment": sentiments.get(post["id"], {}).get("sentiment", "Neutral"), "text": post["text"] }) li_post_stats.append({ "clickCount": post["clicks"], "commentCount": post["comments"], "engagement": post["engagement"], "impressionCount": post["impressions"], "likeCount": post["likes"], "shareCount": post["shares"], "uniqueImpressionsCount": post.get("uniqueImpressionsCount", 0), "post_id": post["id"] }) for comment in post.get("comments_data", []): message = comment.get('message', {}).get('text') if message: li_post_comments.append({ "comment_text": message, "post_id": post["id"] }) return li_posts, li_post_stats, li_post_comments