import json
import requests
import html
from datetime import datetime
from collections import defaultdict
from transformers import pipeline
from sessions import create_session
from error_handling import display_error
from posts_categorization import batch_summarize_and_classify
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
API_V2_BASE = 'https://api.linkedin.com/v2'
API_REST_BASE = "https://api.linkedin.com/rest"
# Load sentiment model
sentiment_pipeline = pipeline("text-classification", model="tabularisai/multilingual-sentiment-analysis")
def fetch_org_urn(comm_client_id, comm_token_dict):
if not comm_token_dict or 'access_token' not in comm_token_dict:
raise ValueError("Marketing token is missing or invalid.")
session = create_session(comm_client_id, token=comm_token_dict)
url = (
f"{API_V2_BASE}/organizationalEntityAcls"
"?q=roleAssignee&role=ADMINISTRATOR&state=APPROVED"
"&projection=(elements*(*,organizationalTarget~(id,localizedName)))"
)
try:
response = session.get(url)
response.raise_for_status()
except requests.exceptions.RequestException as e:
status = getattr(e.response, 'status_code', 'N/A')
try:
details = e.response.json()
except Exception:
details = str(e)
raise ValueError(f"Failed to fetch Organization details (Status: {status}): {details}") from e
elements = response.json().get('elements')
if not elements:
raise ValueError("No organizations found with ADMINISTRATOR role.")
org = elements[0]
org_urn = org.get('organizationalTarget')
org_name = org.get(next((k for k in org if k.endswith('organizationalTarget~')), {}), {}).get('localizedName')
if not org_urn or not org_urn.startswith("urn:li:organization:"):
raise ValueError("Invalid Organization URN.")
if not org_name:
org_id = org_urn.split(":")[-1]
org_name = f"Organization ({org_id})"
return org_urn, org_name
def fetch_comments(comm_client_id, token_dict, post_urns, stats_map):
from requests_oauthlib import OAuth2Session
linkedin = OAuth2Session(comm_client_id, token=token_dict)
linkedin.headers.update({'LinkedIn-Version': "202502"})
all_comments = {}
for post_urn in post_urns:
if stats_map.get(post_urn, {}).get('commentCount', 0) == 0:
continue
try:
url = f"{API_REST_BASE}/socialActions/{post_urn}/comments"
response = linkedin.get(url)
if response.status_code == 200:
elements = response.json().get('elements', [])
all_comments[post_urn] = [c.get('message', {}).get('text') for c in elements if c.get('message')]
else:
all_comments[post_urn] = []
except Exception:
all_comments[post_urn] = []
return all_comments
def analyze_sentiment(comments_data):
results = {}
for post_urn, comments in comments_data.items():
sentiment_counts = defaultdict(int)
total = 0
for comment in comments:
if not comment:
continue
try:
result = sentiment_pipeline(comment)
label = result[0]['label'].upper()
if label in ['POSITIVE', 'VERY POSITIVE']:
sentiment_counts['Positive 👍'] += 1
elif label in ['NEGATIVE', 'VERY NEGATIVE']:
sentiment_counts['Negative 👎'] += 1
elif label == 'NEUTRAL':
sentiment_counts['Neutral 😐'] += 1
else:
sentiment_counts['Unknown'] += 1
total += 1
except:
sentiment_counts['Error'] += 1
dominant = max(sentiment_counts, key=sentiment_counts.get, default='Neutral 😐')
percentage = round((sentiment_counts[dominant] / total) * 100, 1) if total else 0.0
results[post_urn] = {"sentiment": dominant, "percentage": percentage}
return results
def fetch_posts_and_stats(comm_client_id, community_token, count=10):
token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
session = create_session(comm_client_id, token=token_dict)
#org_urn, org_name = fetch_org_urn(comm_client_id, token_dict)
org_urn, org_name = "urn:li:organization:19010008", "GRLS"
posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED"
try:
resp = session.get(posts_url)
resp.raise_for_status()
raw_posts = resp.json().get("elements", [])
except requests.exceptions.RequestException as e:
status = getattr(e.response, 'status_code', 'N/A')
raise ValueError(f"Failed to fetch posts (Status: {status})") from e
if not raw_posts:
return [], org_name, {}
post_urns = [p["id"] for p in raw_posts if ":share:" in p["id"] or ":ugcPost:" in p["id"]]
stats_map = {}
post_texts = [{"text": p["commentary"] or p.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text", "")} for p in raw_posts]
structured_results = batch_summarize_and_classify(post_texts)
for i in range(0, len(post_urns), 20):
batch = post_urns[i:i+20]
params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn}
for idx, urn in enumerate(batch):
key = f"shares[{idx}]" if ":share:" in urn else f"ugcPosts[{idx}]"
params[key] = urn
try:
stat_resp = session.get(f"{API_REST_BASE}/organizationalEntityShareStatistics", params=params)
stat_resp.raise_for_status()
for stat in stat_resp.json().get("elements", []):
urn = stat.get("share") or stat.get("ugcPost")
if urn:
stats_map[urn] = stat.get("totalShareStatistics", {})
except:
continue
comments = fetch_comments(comm_client_id, token_dict, post_urns, stats_map)
sentiments = analyze_sentiment(comments)
posts = []
for post in raw_posts:
post_id = post.get("id")
stats = stats_map.get(post_id, {})
timestamp = post.get("publishedAt") or post.get("createdAt")
when = datetime.fromtimestamp(timestamp / 1000).strftime("%Y-%m-%d %H:%M") if timestamp else "Unknown"
text = post.get("commentary") or post.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text") or "[No text]"
text = html.escape(text[:250]).replace("\n", "
") + ("..." if len(text) > 250 else "")
likes = stats.get("likeCount", 0)
comments_count = stats.get("commentCount", 0)
clicks = stats.get("clickCount", 0)
shares = stats.get("shareCount", 0)
impressions = stats.get("impressionCount", 0)
engagement = stats.get("engagement", likes + comments_count + clicks + shares) / impressions * 100 if impressions else 0.0
sentiment_info = sentiments.get(post_id, {"sentiment": "Neutral 😐", "percentage": 0.0})
posts.append({
"id": post_id, "when": when, "text": text, "likes": likes,
"comments": comments_count, "clicks": clicks, "shares": shares,
"impressions": impressions, "engagement": f"{engagement:.2f}%",
"sentiment": sentiment_info["sentiment"], "sentiment_percent": sentiment_info["percentage"]
})
logging.info(f"Appended post data for {post_id}: Likes={likes}, Comments={comments_count}, Shares={shares}, Clicks={clicks}")
for post, structured in zip(posts, structured_results):
post["summary"] = structured["summary"]
post["category"] = structured["category"]
return posts, org_name, sentiments
def render_post_cards(posts, org_name):
safe_name = html.escape(org_name or "Your Organization")
if not posts:
return f"
❌ An error occurred.
')