LinkedinMonitor / Data_Fetching_and_Rendering.py
GuglielmoTor's picture
Update Data_Fetching_and_Rendering.py
eb9a482 verified
raw
history blame
6.19 kB
import json
import requests
import html
from datetime import datetime
from sessions import create_session
from error_handling import display_error
API_V2_BASE = 'https://api.linkedin.com/v2'
API_REST_BASE = "https://api.linkedin.com/rest"
def fetch_org_urn(comm_client_id, comm_token_dict):
if not comm_token_dict or 'access_token' not in comm_token_dict:
raise ValueError("Marketing token is missing or invalid.")
session = create_session(comm_client_id, token=comm_token_dict)
url = (
f"{API_V2_BASE}/organizationalEntityAcls"
"?q=roleAssignee&role=ADMINISTRATOR&state=APPROVED"
"&projection=(elements*(*,organizationalTarget~(id,localizedName)))"
)
try:
response = session.get(url)
response.raise_for_status()
except requests.exceptions.RequestException as e:
status = getattr(e.response, 'status_code', 'N/A')
try:
details = e.response.json()
except Exception:
details = str(e)
raise ValueError(f"Failed to fetch Organization details (Status: {status}): {details}") from e
elements = response.json().get('elements')
if not elements:
raise ValueError("No organizations found with ADMINISTRATOR role.")
org = elements[0]
org_urn = org.get('organizationalTarget')
org_name = org.get(next((k for k in org if k.endswith('organizationalTarget~')), {}), {}).get('localizedName')
if not org_urn or not org_urn.startswith("urn:li:organization:"):
raise ValueError("Invalid Organization URN.")
if not org_name:
org_id = org_urn.split(":")[-1]
org_name = f"Organization ({org_id})"
return org_urn, org_name
def fetch_posts_and_stats(comm_client_id, community_token, count=10):
if not community_token:
raise ValueError("Community token is missing.")
token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
session = create_session(comm_client_id, token=token_dict)
org_urn, org_name = fetch_org_urn(token_dict)
#org_urn, org_name = "urn:li:organization:19010008", "GRLS"
posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED"
try:
resp = session.get(posts_url)
resp.raise_for_status()
raw_posts = resp.json().get("elements", [])
except requests.exceptions.RequestException as e:
status = getattr(e.response, 'status_code', 'N/A')
raise ValueError(f"Failed to fetch posts (Status: {status})") from e
if not raw_posts:
return [], org_name
post_urns = [p["id"] for p in raw_posts if ":share:" in p["id"] or ":ugcPost:" in p["id"]]
if not post_urns:
return [], org_name
stats_map = {}
for i in range(0, len(post_urns), 20):
batch = post_urns[i:i+20]
params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn}
for idx, urn in enumerate(batch):
key = f"shares[{idx}]" if ":share:" in urn else f"ugcPosts[{idx}]"
params[key] = urn
try:
stat_resp = session.get(f"{API_REST_BASE}/organizationalEntityShareStatistics", params=params)
stat_resp.raise_for_status()
for stat in stat_resp.json().get("elements", []):
urn = stat.get("share") or stat.get("ugcPost")
if urn:
stats_map[urn] = stat.get("totalShareStatistics", {})
except:
continue
posts = []
for post in raw_posts:
post_id = post.get("id")
stats = stats_map.get(post_id, {})
timestamp = post.get("publishedAt") or post.get("createdAt")
when = datetime.fromtimestamp(timestamp / 1000).strftime("%Y-%m-%d %H:%M") if timestamp else "Unknown"
text = post.get("commentary") or post.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text") or "[No text]"
text = html.escape(text[:250]).replace("\n", "<br>") + ("..." if len(text) > 250 else "")
likes = stats.get("likeCount", 0)
comments = stats.get("commentCount", 0)
clicks = stats.get("clickCount", 0)
shares = stats.get("shareCount", 0)
impressions = stats.get("impressionCount", 0)
engagement = stats.get("engagement", likes + comments + clicks + shares) / impressions * 100 if impressions else 0.0
posts.append({
"id": post_id, "when": when, "text": text, "likes": likes,
"comments": comments, "clicks": clicks, "shares": shares,
"impressions": impressions, "engagement": f"{engagement:.2f}%"
})
return posts, org_name
def render_post_cards(posts, org_name):
safe_name = html.escape(org_name or "Your Organization")
if not posts:
return f"<h2 style='text-align:center;color:#555;'>No recent posts found for {safe_name}.</h2>"
cards = [
f"<div style='border:1px solid #ccc;border-radius:8px;padding:15px;width:280px;background:#fff;'>"
f"<div style='font-size:0.8em;color:#666;margin-bottom:8px;'>{p['when']}</div>"
f"<div style='font-size:0.95em;margin-bottom:12px;max-height:120px;overflow:auto'>{p['text']}</div>"
f"<div style='font-size:0.9em;color:#333;border-top:1px solid #eee;padding-top:10px;'>"
f"πŸ‘οΈ {p['impressions']:,} | πŸ‘ {p['likes']:,} | πŸ’¬ {p['comments']:,} | πŸ”— {p['shares']:,} | πŸ–±οΈ {p['clicks']:,}<br>"
f"<strong>πŸ“ˆ {p['engagement']}</strong></div></div>"
for p in posts
]
return f"<h2 style='text-align:center;margin-bottom:20px;'>Recent Posts for {safe_name}</h2><div style='display:flex;flex-wrap:wrap;gap:15px;justify-content:center;'>" + "".join(cards) + "</div>"
def fetch_and_render_dashboard(comm_client_id, community_token):
try:
posts, org_name = fetch_posts_and_stats(comm_client_id, community_token)
return render_post_cards(posts, org_name)
except Exception as e:
return display_error("Dashboard Error", e).get('value', '<p style="color:red;text-align:center;">❌ An error occurred.</p>')