Spaces:
Running
Running
File size: 6,186 Bytes
9c8c059 07be99a 9c2556f ce03b42 bef41db ce03b42 57d921c f3b41b9 a96ea35 9c2556f a96ea35 9f1c65e ce03b42 9f1c65e ce03b42 9f1c65e ce03b42 9f1c65e ce03b42 9f1c65e ce03b42 9f1c65e ce03b42 9f1c65e ce03b42 9f1c65e ce03b42 9f1c65e ce03b42 9f1c65e 9c2556f ce03b42 9c2556f ce03b42 9c2556f ce03b42 9c2556f eb9a482 09f2695 9c2556f ce03b42 9c2556f ce03b42 9c2556f ce03b42 9c2556f ce03b42 9c2556f ce03b42 9c2556f ce03b42 9c2556f ce03b42 9c2556f ce03b42 9c2556f ce03b42 9c2556f ce03b42 a01c074 ce03b42 a01c074 ce03b42 a01c074 ce03b42 a01c074 ce03b42 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
import json
import requests
import html
from datetime import datetime
from sessions import create_session
from error_handling import display_error
API_V2_BASE = 'https://api.linkedin.com/v2'
API_REST_BASE = "https://api.linkedin.com/rest"
def fetch_org_urn(comm_client_id, comm_token_dict):
if not comm_token_dict or 'access_token' not in comm_token_dict:
raise ValueError("Marketing token is missing or invalid.")
session = create_session(comm_client_id, token=comm_token_dict)
url = (
f"{API_V2_BASE}/organizationalEntityAcls"
"?q=roleAssignee&role=ADMINISTRATOR&state=APPROVED"
"&projection=(elements*(*,organizationalTarget~(id,localizedName)))"
)
try:
response = session.get(url)
response.raise_for_status()
except requests.exceptions.RequestException as e:
status = getattr(e.response, 'status_code', 'N/A')
try:
details = e.response.json()
except Exception:
details = str(e)
raise ValueError(f"Failed to fetch Organization details (Status: {status}): {details}") from e
elements = response.json().get('elements')
if not elements:
raise ValueError("No organizations found with ADMINISTRATOR role.")
org = elements[0]
org_urn = org.get('organizationalTarget')
org_name = org.get(next((k for k in org if k.endswith('organizationalTarget~')), {}), {}).get('localizedName')
if not org_urn or not org_urn.startswith("urn:li:organization:"):
raise ValueError("Invalid Organization URN.")
if not org_name:
org_id = org_urn.split(":")[-1]
org_name = f"Organization ({org_id})"
return org_urn, org_name
def fetch_posts_and_stats(comm_client_id, community_token, count=10):
if not community_token:
raise ValueError("Community token is missing.")
token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
session = create_session(comm_client_id, token=token_dict)
org_urn, org_name = fetch_org_urn(token_dict)
#org_urn, org_name = "urn:li:organization:19010008", "GRLS"
posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED"
try:
resp = session.get(posts_url)
resp.raise_for_status()
raw_posts = resp.json().get("elements", [])
except requests.exceptions.RequestException as e:
status = getattr(e.response, 'status_code', 'N/A')
raise ValueError(f"Failed to fetch posts (Status: {status})") from e
if not raw_posts:
return [], org_name
post_urns = [p["id"] for p in raw_posts if ":share:" in p["id"] or ":ugcPost:" in p["id"]]
if not post_urns:
return [], org_name
stats_map = {}
for i in range(0, len(post_urns), 20):
batch = post_urns[i:i+20]
params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn}
for idx, urn in enumerate(batch):
key = f"shares[{idx}]" if ":share:" in urn else f"ugcPosts[{idx}]"
params[key] = urn
try:
stat_resp = session.get(f"{API_REST_BASE}/organizationalEntityShareStatistics", params=params)
stat_resp.raise_for_status()
for stat in stat_resp.json().get("elements", []):
urn = stat.get("share") or stat.get("ugcPost")
if urn:
stats_map[urn] = stat.get("totalShareStatistics", {})
except:
continue
posts = []
for post in raw_posts:
post_id = post.get("id")
stats = stats_map.get(post_id, {})
timestamp = post.get("publishedAt") or post.get("createdAt")
when = datetime.fromtimestamp(timestamp / 1000).strftime("%Y-%m-%d %H:%M") if timestamp else "Unknown"
text = post.get("commentary") or post.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text") or "[No text]"
text = html.escape(text[:250]).replace("\n", "<br>") + ("..." if len(text) > 250 else "")
likes = stats.get("likeCount", 0)
comments = stats.get("commentCount", 0)
clicks = stats.get("clickCount", 0)
shares = stats.get("shareCount", 0)
impressions = stats.get("impressionCount", 0)
engagement = stats.get("engagement", likes + comments + clicks + shares) / impressions * 100 if impressions else 0.0
posts.append({
"id": post_id, "when": when, "text": text, "likes": likes,
"comments": comments, "clicks": clicks, "shares": shares,
"impressions": impressions, "engagement": f"{engagement:.2f}%"
})
return posts, org_name
def render_post_cards(posts, org_name):
safe_name = html.escape(org_name or "Your Organization")
if not posts:
return f"<h2 style='text-align:center;color:#555;'>No recent posts found for {safe_name}.</h2>"
cards = [
f"<div style='border:1px solid #ccc;border-radius:8px;padding:15px;width:280px;background:#fff;'>"
f"<div style='font-size:0.8em;color:#666;margin-bottom:8px;'>{p['when']}</div>"
f"<div style='font-size:0.95em;margin-bottom:12px;max-height:120px;overflow:auto'>{p['text']}</div>"
f"<div style='font-size:0.9em;color:#333;border-top:1px solid #eee;padding-top:10px;'>"
f"ποΈ {p['impressions']:,} | π {p['likes']:,} | π¬ {p['comments']:,} | π {p['shares']:,} | π±οΈ {p['clicks']:,}<br>"
f"<strong>π {p['engagement']}</strong></div></div>"
for p in posts
]
return f"<h2 style='text-align:center;margin-bottom:20px;'>Recent Posts for {safe_name}</h2><div style='display:flex;flex-wrap:wrap;gap:15px;justify-content:center;'>" + "".join(cards) + "</div>"
def fetch_and_render_dashboard(comm_client_id, community_token):
try:
posts, org_name = fetch_posts_and_stats(comm_client_id, community_token)
return render_post_cards(posts, org_name)
except Exception as e:
return display_error("Dashboard Error", e).get('value', '<p style="color:red;text-align:center;">β An error occurred.</p>')
|