Spaces:
Running
Running
import json | |
import requests | |
import html | |
from datetime import datetime | |
from collections import defaultdict | |
from transformers import pipeline | |
from sessions import create_session | |
from error_handling import display_error | |
from posts_categorization import batch_summarize_and_classify | |
import logging | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
API_V2_BASE = 'https://api.linkedin.com/v2' | |
API_REST_BASE = "https://api.linkedin.com/rest" | |
sentiment_pipeline = pipeline("text-classification", model="tabularisai/multilingual-sentiment-analysis") | |
def fetch_comments(comm_client_id, token_dict, post_urns, stats_map): | |
from requests_oauthlib import OAuth2Session | |
linkedin = OAuth2Session(comm_client_id, token=token_dict) | |
linkedin.headers.update({'LinkedIn-Version': "202502"}) | |
all_comments = {} | |
for post_urn in post_urns: | |
if stats_map.get(post_urn, {}).get('commentCount', 0) == 0: | |
continue | |
try: | |
url = f"{API_REST_BASE}/socialActions/{post_urn}/comments" | |
response = linkedin.get(url) | |
if response.status_code == 200: | |
elements = response.json().get('elements', []) | |
all_comments[post_urn] = [c.get('message', {}).get('text') for c in elements if c.get('message')] | |
else: | |
all_comments[post_urn] = [] | |
except Exception: | |
all_comments[post_urn] = [] | |
return all_comments | |
def analyze_sentiment(comments_data): | |
results = {} | |
for post_urn, comments in comments_data.items(): | |
sentiment_counts = defaultdict(int) | |
total = 0 | |
for comment in comments: | |
if not comment: | |
continue | |
try: | |
result = sentiment_pipeline(comment) | |
label = result[0]['label'].upper() | |
if label in ['POSITIVE', 'VERY POSITIVE']: | |
sentiment_counts['Positive π'] += 1 | |
elif label in ['NEGATIVE', 'VERY NEGATIVE']: | |
sentiment_counts['Negative π'] += 1 | |
elif label == 'NEUTRAL': | |
sentiment_counts['Neutral π'] += 1 | |
else: | |
sentiment_counts['Unknown'] += 1 | |
total += 1 | |
except: | |
sentiment_counts['Error'] += 1 | |
dominant = max(sentiment_counts, key=sentiment_counts.get, default='Neutral π') | |
percentage = round((sentiment_counts[dominant] / total) * 100, 1) if total else 0.0 | |
results[post_urn] = {"sentiment": dominant, "percentage": percentage} | |
return results | |
def fetch_posts_and_stats(comm_client_id, community_token, org_urn, count=10): | |
token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'} | |
session = create_session(comm_client_id, token=token_dict) | |
#org_urn, org_name = fetch_org_urn(comm_client_id, token_dict) | |
org_name = "GRLS" | |
posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED" | |
try: | |
resp = session.get(posts_url) | |
resp.raise_for_status() | |
raw_posts = resp.json().get("elements", []) | |
except requests.exceptions.RequestException as e: | |
status = getattr(e.response, 'status_code', 'N/A') | |
raise ValueError(f"Failed to fetch posts (Status: {status})") from e | |
if not raw_posts: | |
return [], org_name, {} | |
post_urns = [p["id"] for p in raw_posts if ":share:" in p["id"] or ":ugcPost:" in p["id"]] | |
stats_map = {} | |
post_texts = [{"text": p.get("commentary") or p.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text", "")} for p in raw_posts] | |
structured_results = batch_summarize_and_classify(post_texts) | |
for i in range(0, len(post_urns), 20): | |
batch = post_urns[i:i+20] | |
params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn} | |
for idx, urn in enumerate(batch): | |
key = f"shares[{idx}]" if ":share:" in urn else f"ugcPosts[{idx}]" | |
params[key] = urn | |
try: | |
stat_resp = session.get(f"{API_REST_BASE}/organizationalEntityShareStatistics", params=params) | |
stat_resp.raise_for_status() | |
for stat in stat_resp.json().get("elements", []): | |
urn = stat.get("share") or stat.get("ugcPost") | |
if urn: | |
stats_map[urn] = stat.get("totalShareStatistics", {}) | |
except: | |
continue | |
comments = fetch_comments(comm_client_id, token_dict, post_urns, stats_map) | |
sentiments = analyze_sentiment(comments) | |
posts = [] | |
for post in raw_posts: | |
post_id = post.get("id") | |
stats = stats_map.get(post_id, {}) | |
timestamp = post.get("publishedAt") or post.get("createdAt") | |
when = datetime.fromtimestamp(timestamp / 1000).strftime("%Y-%m-%d %H:%M") if timestamp else "Unknown" | |
text = post.get("commentary") or post.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text") or "[No text]" | |
text = html.escape(text[:250]).replace("\n", "<br>") + ("..." if len(text) > 250 else "") | |
likes = stats.get("likeCount", 0) | |
comments_count = stats.get("commentCount", 0) | |
clicks = stats.get("clickCount", 0) | |
shares = stats.get("shareCount", 0) | |
impressions = stats.get("impressionCount", 0) | |
engagement = stats.get("engagement", likes + comments_count + clicks + shares) / impressions * 100 if impressions else 0.0 | |
sentiment_info = sentiments.get(post_id, {"sentiment": "Neutral π", "percentage": 0.0}) | |
posts.append({ | |
"id": post_id, | |
"when": when, | |
"text": text, | |
"likes": likes, | |
"comments": comments_count, | |
"clicks": clicks, | |
"shares": shares, | |
"impressions": impressions, | |
"engagement": f"{engagement:.2f}%", | |
"sentiment": sentiment_info["sentiment"], | |
"sentiment_percent": sentiment_info["percentage"] | |
}) | |
logging.info(f"Appended post data for {post_id}: Likes={likes}, Comments={comments_count}, Shares={shares}, Clicks={clicks}") | |
for post, structured in zip(posts, structured_results): | |
post["summary"] = structured["summary"] | |
post["category"] = structured["category"] | |
return posts, org_name, sentiments | |
def prepare_data_for_bubble(posts, sentiments): | |
li_posts = [] | |
li_post_stats = [] | |
li_post_comments = [] | |
for post in posts: | |
li_posts.append({ | |
"author_urn": post["author_urn"], | |
"id": post["id"], | |
"is_ad": post["is_ad"], | |
"media_type": post["media_type"], | |
"published_at": post["published_at"], | |
"sentiment": sentiments.get(post["id"], {}).get("sentiment", "Neutral"), | |
"text": post["text"] | |
}) | |
li_post_stats.append({ | |
"clickCount": post["clicks"], | |
"commentCount": post["comments"], | |
"engagement": post["engagement"], | |
"impressionCount": post["impressions"], | |
"likeCount": post["likes"], | |
"shareCount": post["shares"], | |
"uniqueImpressionsCount": post.get("uniqueImpressionsCount", 0), | |
"post_id": post["id"] | |
}) | |
for comment in post.get("comments_data", []): | |
message = comment.get('message', {}).get('text') | |
if message: | |
li_post_comments.append({ | |
"comment_text": message, | |
"post_id": post["id"] | |
}) | |
return li_posts, li_post_stats, li_post_comments | |