import json import requests import html from datetime import datetime from sessions import create_session from error_handling import display_error API_V2_BASE = 'https://api.linkedin.com/v2' API_REST_BASE = "https://api.linkedin.com/rest" def fetch_org_urn(comm_client_id, comm_token_dict): if not comm_token_dict or 'access_token' not in comm_token_dict: raise ValueError("Marketing token is missing or invalid.") session = create_session(comm_client_id, token=comm_token_dict) url = ( f"{API_V2_BASE}/organizationalEntityAcls" "?q=roleAssignee&role=ADMINISTRATOR&state=APPROVED" "&projection=(elements*(*,organizationalTarget~(id,localizedName)))" ) try: response = session.get(url) response.raise_for_status() except requests.exceptions.RequestException as e: status = getattr(e.response, 'status_code', 'N/A') try: details = e.response.json() except Exception: details = str(e) raise ValueError(f"Failed to fetch Organization details (Status: {status}): {details}") from e elements = response.json().get('elements') if not elements: raise ValueError("No organizations found with ADMINISTRATOR role.") org = elements[0] org_urn = org.get('organizationalTarget') org_name = org.get(next((k for k in org if k.endswith('organizationalTarget~')), {}), {}).get('localizedName') if not org_urn or not org_urn.startswith("urn:li:organization:"): raise ValueError("Invalid Organization URN.") if not org_name: org_id = org_urn.split(":")[-1] org_name = f"Organization ({org_id})" return org_urn, org_name def fetch_posts_and_stats(comm_client_id, community_token, count=10): if not community_token: raise ValueError("Community token is missing.") token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'} session = create_session(comm_client_id, token=token_dict) org_urn, org_name = fetch_org_urn(token_dict) #org_urn, org_name = "urn:li:organization:19010008", "GRLS" posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED" try: resp = session.get(posts_url) resp.raise_for_status() raw_posts = resp.json().get("elements", []) except requests.exceptions.RequestException as e: status = getattr(e.response, 'status_code', 'N/A') raise ValueError(f"Failed to fetch posts (Status: {status})") from e if not raw_posts: return [], org_name post_urns = [p["id"] for p in raw_posts if ":share:" in p["id"] or ":ugcPost:" in p["id"]] if not post_urns: return [], org_name stats_map = {} for i in range(0, len(post_urns), 20): batch = post_urns[i:i+20] params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn} for idx, urn in enumerate(batch): key = f"shares[{idx}]" if ":share:" in urn else f"ugcPosts[{idx}]" params[key] = urn try: stat_resp = session.get(f"{API_REST_BASE}/organizationalEntityShareStatistics", params=params) stat_resp.raise_for_status() for stat in stat_resp.json().get("elements", []): urn = stat.get("share") or stat.get("ugcPost") if urn: stats_map[urn] = stat.get("totalShareStatistics", {}) except: continue posts = [] for post in raw_posts: post_id = post.get("id") stats = stats_map.get(post_id, {}) timestamp = post.get("publishedAt") or post.get("createdAt") when = datetime.fromtimestamp(timestamp / 1000).strftime("%Y-%m-%d %H:%M") if timestamp else "Unknown" text = post.get("commentary") or post.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text") or "[No text]" text = html.escape(text[:250]).replace("\n", "
") + ("..." if len(text) > 250 else "") likes = stats.get("likeCount", 0) comments = stats.get("commentCount", 0) clicks = stats.get("clickCount", 0) shares = stats.get("shareCount", 0) impressions = stats.get("impressionCount", 0) engagement = stats.get("engagement", likes + comments + clicks + shares) / impressions * 100 if impressions else 0.0 posts.append({ "id": post_id, "when": when, "text": text, "likes": likes, "comments": comments, "clicks": clicks, "shares": shares, "impressions": impressions, "engagement": f"{engagement:.2f}%" }) return posts, org_name def render_post_cards(posts, org_name): safe_name = html.escape(org_name or "Your Organization") if not posts: return f"

No recent posts found for {safe_name}.

" cards = [ f"
" f"
{p['when']}
" f"
{p['text']}
" f"
" f"👁️ {p['impressions']:,} | 👍 {p['likes']:,} | 💬 {p['comments']:,} | 🔗 {p['shares']:,} | 🖱️ {p['clicks']:,}
" f"📈 {p['engagement']}
" for p in posts ] return f"

Recent Posts for {safe_name}

" + "".join(cards) + "
" def fetch_and_render_dashboard(comm_client_id, community_token): try: posts, org_name = fetch_posts_and_stats(comm_client_id, community_token) return render_post_cards(posts, org_name) except Exception as e: return display_error("Dashboard Error", e).get('value', '

❌ An error occurred.

')