Spaces:
Running
Running
File size: 7,785 Bytes
37335e7 f1fb052 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
import json
import requests
import html
from datetime import datetime
from collections import defaultdict
from transformers import pipeline
from sessions import create_session
from error_handling import display_error
from posts_categorization import batch_summarize_and_classify
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
API_V2_BASE = 'https://api.linkedin.com/v2'
API_REST_BASE = "https://api.linkedin.com/rest"
sentiment_pipeline = pipeline("text-classification", model="tabularisai/multilingual-sentiment-analysis")
def fetch_comments(comm_client_id, token_dict, post_urns, stats_map):
from requests_oauthlib import OAuth2Session
linkedin = OAuth2Session(comm_client_id, token=token_dict)
linkedin.headers.update({'LinkedIn-Version': "202502"})
all_comments = {}
for post_urn in post_urns:
if stats_map.get(post_urn, {}).get('commentCount', 0) == 0:
continue
try:
url = f"{API_REST_BASE}/socialActions/{post_urn}/comments"
response = linkedin.get(url)
if response.status_code == 200:
elements = response.json().get('elements', [])
all_comments[post_urn] = [c.get('message', {}).get('text') for c in elements if c.get('message')]
else:
all_comments[post_urn] = []
except Exception:
all_comments[post_urn] = []
return all_comments
def analyze_sentiment(comments_data):
results = {}
for post_urn, comments in comments_data.items():
sentiment_counts = defaultdict(int)
total = 0
for comment in comments:
if not comment:
continue
try:
result = sentiment_pipeline(comment)
label = result[0]['label'].upper()
if label in ['POSITIVE', 'VERY POSITIVE']:
sentiment_counts['Positive π'] += 1
elif label in ['NEGATIVE', 'VERY NEGATIVE']:
sentiment_counts['Negative π'] += 1
elif label == 'NEUTRAL':
sentiment_counts['Neutral π'] += 1
else:
sentiment_counts['Unknown'] += 1
total += 1
except:
sentiment_counts['Error'] += 1
dominant = max(sentiment_counts, key=sentiment_counts.get, default='Neutral π')
percentage = round((sentiment_counts[dominant] / total) * 100, 1) if total else 0.0
results[post_urn] = {"sentiment": dominant, "percentage": percentage}
return results
def fetch_posts_and_stats(comm_client_id, community_token, org_urn, count=10):
token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
session = create_session(comm_client_id, token=token_dict)
#org_urn, org_name = fetch_org_urn(comm_client_id, token_dict)
org_name = "GRLS"
posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED"
try:
resp = session.get(posts_url)
resp.raise_for_status()
raw_posts = resp.json().get("elements", [])
except requests.exceptions.RequestException as e:
status = getattr(e.response, 'status_code', 'N/A')
raise ValueError(f"Failed to fetch posts (Status: {status})") from e
if not raw_posts:
return [], org_name, {}
post_urns = [p["id"] for p in raw_posts if ":share:" in p["id"] or ":ugcPost:" in p["id"]]
stats_map = {}
post_texts = [{"text": p.get("commentary") or p.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text", "")} for p in raw_posts]
structured_results = batch_summarize_and_classify(post_texts)
for i in range(0, len(post_urns), 20):
batch = post_urns[i:i+20]
params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn}
for idx, urn in enumerate(batch):
key = f"shares[{idx}]" if ":share:" in urn else f"ugcPosts[{idx}]"
params[key] = urn
try:
stat_resp = session.get(f"{API_REST_BASE}/organizationalEntityShareStatistics", params=params)
stat_resp.raise_for_status()
for stat in stat_resp.json().get("elements", []):
urn = stat.get("share") or stat.get("ugcPost")
if urn:
stats_map[urn] = stat.get("totalShareStatistics", {})
except:
continue
comments = fetch_comments(comm_client_id, token_dict, post_urns, stats_map)
sentiments = analyze_sentiment(comments)
posts = []
for post in raw_posts:
post_id = post.get("id")
stats = stats_map.get(post_id, {})
timestamp = post.get("publishedAt") or post.get("createdAt")
when = datetime.fromtimestamp(timestamp / 1000).strftime("%Y-%m-%d %H:%M") if timestamp else "Unknown"
text = post.get("commentary") or post.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text") or "[No text]"
text = html.escape(text[:250]).replace("\n", "<br>") + ("..." if len(text) > 250 else "")
likes = stats.get("likeCount", 0)
comments_count = stats.get("commentCount", 0)
clicks = stats.get("clickCount", 0)
shares = stats.get("shareCount", 0)
impressions = stats.get("impressionCount", 0)
engagement = stats.get("engagement", likes + comments_count + clicks + shares) / impressions * 100 if impressions else 0.0
sentiment_info = sentiments.get(post_id, {"sentiment": "Neutral π", "percentage": 0.0})
posts.append({
"id": post_id,
"when": when,
"text": text,
"likes": likes,
"comments": comments_count,
"clicks": clicks,
"shares": shares,
"impressions": impressions,
"engagement": f"{engagement:.2f}%",
"sentiment": sentiment_info["sentiment"],
"sentiment_percent": sentiment_info["percentage"]
})
logging.info(f"Appended post data for {post_id}: Likes={likes}, Comments={comments_count}, Shares={shares}, Clicks={clicks}")
for post, structured in zip(posts, structured_results):
post["summary"] = structured["summary"]
post["category"] = structured["category"]
return posts, org_name, sentiments
def prepare_data_for_bubble(posts, sentiments):
li_posts = []
li_post_stats = []
li_post_comments = []
for post in posts:
li_posts.append({
"author_urn": post["author_urn"],
"id": post["id"],
"is_ad": post["is_ad"],
"media_type": post["media_type"],
"published_at": post["published_at"],
"sentiment": sentiments.get(post["id"], {}).get("sentiment", "Neutral"),
"text": post["text"]
})
li_post_stats.append({
"clickCount": post["clicks"],
"commentCount": post["comments"],
"engagement": post["engagement"],
"impressionCount": post["impressions"],
"likeCount": post["likes"],
"shareCount": post["shares"],
"uniqueImpressionsCount": post.get("uniqueImpressionsCount", 0),
"post_id": post["id"]
})
for comment in post.get("comments_data", []):
message = comment.get('message', {}).get('text')
if message:
li_post_comments.append({
"comment_text": message,
"post_id": post["id"]
})
return li_posts, li_post_stats, li_post_comments
|