import json import requests from sessions import create_session import html from datetime import datetime, timezone, timedelta # Added timezone, timedelta API_V2_BASE = 'https://api.linkedin.com/v2' API_REST_BASE = "https://api.linkedin.com/rest" def display_error(message, e=None): """Formats an error message for display in Gradio. Returns a gr.update object.""" error_prefix = "❌ Error: " full_message = f"{error_prefix}{message}" if e: tb = traceback.format_exc() print(f"--- ERROR ---") print(f"Message: {message}") print(f"Exception Type: {type(e)}") print(f"Exception: {e}") # Avoid printing traceback for simple Warnings like scope changes unless debugging deep if not isinstance(e, Warning): print(f"Traceback:\n{tb}") print(f"-------------") # Try to get more details from response if it's a requests error if isinstance(e, requests.exceptions.RequestException) and e.response is not None: try: error_details = e.response.json() details_str = json.dumps(error_details, indent=2) full_message += f"\nStatus Code: {e.response.status_code}\nDetails:\n```json\n{details_str}\n```" except json.JSONDecodeError: full_message += f"\nStatus Code: {e.response.status_code}\nResponse Text:\n```\n{e.response.text}\n```" elif hasattr(e, 'description'): # Handle OAuthLib errors which often have a description full_message += f"\nDetails: {getattr(e, 'description', str(e))}" else: # Display the specific warning/error message directly full_message += f"\nDetails: {str(e)}" else: print(f"Error: {message}") # Log simple message # Use Markdown for better formatting in Gradio output # Ensure it's wrapped in a way that Gradio Markdown understands as an error block if possible # Simple red text might be best cross-platform error_html = f"
{html.escape(full_message)}
" return gr.update(value=error_html, visible=True) def fetch_org_urn(comm_client_id, comm_token_dict): """ Fetches the user's administrated organization URN and name using the Marketing token. Expects comm_token_dict to be the full token dictionary. Raises ValueError on failure. """ print("--- Fetching Organization URN ---") if not comm_token_dict or 'access_token' not in comm_token_dict: print("ERROR: Invalid or missing Marketing token dictionary for fetching Org URN.") raise ValueError("Marketing token is missing or invalid.") ln_mkt = create_session(comm_client_id, token=comm_token_dict) # Fetch organizational roles directly using the V2 API url = ( f"{API_V2_BASE}/organizationalEntityAcls" "?q=roleAssignee&role=ADMINISTRATOR&state=APPROVED" # Find orgs where user is ADMIN "&projection=(elements*(*,organizationalTarget~(id,localizedName)))" # Get URN and name ) print(f"Fetching Org URN details from: {url}") try: r = ln_mkt.get(url) r.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) except requests.exceptions.RequestException as e: print(f"ERROR: Failed to fetch organizationalEntityAcls with Marketing token.") # Provide specific feedback based on status code if possible status = e.response.status_code if e.response is not None else "N/A" details = "" if e.response is not None: try: details = f" Details: {e.response.json()}" except json.JSONDecodeError: details = f" Response: {e.response.text[:200]}..." # Show partial text raise ValueError(f"Failed to fetch Organization details (Status: {status}). Check Marketing App permissions (r_organization_admin) and ensure the user is an admin of an org page.{details}") from e data = r.json() print(f"Org URN Response Data: {json.dumps(data, indent=2)}") elements = data.get('elements') if not elements: print("WARNING: No organizations found where the user is an ADMINISTRATOR.") # Try fetching with MEMBER role as a fallback? Might require different scope. # For now, stick to ADMINISTRATOR as per scope. raise ValueError("No organizations found for this user where they have the ADMINISTRATOR role. Ensure the Marketing App has 'r_organization_admin' permission and the user is an admin of an organization page.") # Assuming the first organization found is the target # In a real app, you might let the user choose if they admin multiple orgs. org_element = elements[0] # Extract Full URN ('organizationalTarget' field contains the URN string) org_urn_full = org_element.get('organizationalTarget') if not org_urn_full or not isinstance(org_urn_full, str) or not org_urn_full.startswith("urn:li:organization:"): print(f"ERROR: Could not extract valid Organization URN ('organizationalTarget') from API response element: {org_element}") raise ValueError("Could not extract a valid Organization URN from the API response.") # Extract Name (from the projected 'organizationalTarget~' field) org_name = None # The key might be exactly 'organizationalTarget~' or something similar depending on projection syntax variations org_target_details_key = next((k for k in org_element if k.endswith('organizationalTarget~')), None) if org_target_details_key and isinstance(org_element.get(org_target_details_key), dict): org_name = org_element[org_target_details_key].get('localizedName') if not org_name: # Fallback name using the ID part of the URN org_id = org_urn_full.split(':')[-1] org_name = f"Organization ({org_id})" print(f"WARN: Could not find localizedName, using fallback: {org_name}") print(f"Found Org: {org_name} ({org_urn_full})") return org_urn_full, org_name def fetch_posts_and_stats(comm_client_id, community_token, count=10): """Fetches posts using Marketing token and stats using Marketing token.""" print("--- Fetching Posts and Stats ---") if not community_token: print("WARN: Community token missing, but not currently used for post/stat fetching.") raise ValueError("Community token is missing.") # Don't raise if not needed # Ensure tokens are in the correct format (dict) comm_token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'} # Process if needed later ln_comm = create_session(comm_client_id, token=comm_token_dict) # Keep session available if needed # 1) Get Org URN (using Marketing token) #org_urn, org_name = fetch_org_urn(comm_token_dict) # Reuses the function org_urn, org_name = "urn:li:organization:19010008", "GRLS" # 2) Fetch latest posts (using Marketing Token via REST API) # Endpoint requires r_organization_social permission posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED" print(f"Attempting to fetch posts from: {posts_url} using Marketing token") try: resp_posts = ln_comm.get(posts_url) print(f"→ POSTS Request Headers: {resp_posts.request.headers}") print(f"→ POSTS Response Status: {resp_posts.status_code}") # Limit printing large response bodies print(f"→ POSTS Response Body (first 500 chars): {resp_posts.text[:500]}") resp_posts.raise_for_status() print("Fetched posts using Marketing token.") except requests.exceptions.RequestException as e: status = e.response.status_code if e.response is not None else "N/A" details = "" if e.response is not None: try: details = f" Details: {e.response.json()}" except json.JSONDecodeError: details = f" Response: {e.response.text[:200]}..." print(f"ERROR: Fetching posts failed with Marketing token (Status: {status}).{details}") raise ValueError(f"Failed to fetch posts using Marketing token (Status: {status}). Check permissions (r_organization_social).") from e raw_posts_data = resp_posts.json() raw_posts = raw_posts_data.get("elements", []) print(f"Fetched {len(raw_posts)} raw posts.") if not raw_posts: return [], org_name # Return empty list and org name if no posts # 3) Extract Post URNs (shares or ugcPosts) post_urns = [p.get("id") for p in raw_posts if p.get("id") and (":share:" in p.get("id") or ":ugcPost:" in p.get("id"))] if not post_urns: print("WARN: No post URNs (share or ugcPost) found in the fetched posts.") return [], org_name print(f"Post URNs to fetch stats for: {post_urns}") # 4) Fetch stats (using Comm session via REST API) # Endpoint requires r_organization_social permission stats_map = {} batch_size = 20 # API likely has a limit on number of URNs per request urn_batches = [post_urns[i:i + batch_size] for i in range(0, len(post_urns), batch_size)] for batch in urn_batches: if not batch: continue stats_url = f"{API_REST_BASE}/organizationalEntityShareStatistics" # Parameters need to be structured correctly: q=organizationalEntity, organizationalEntity=orgURN, shares[0]=shareURN1, ugcPosts[0]=ugcURN1 etc. params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn} share_idx, ugc_idx = 0, 0 for urn in batch: if ':share:' in urn: params[f'shares[{share_idx}]'] = urn share_idx += 1 elif ':ugcPost:' in urn: params[f'ugcPosts[{ugc_idx}]'] = urn ugc_idx += 1 else: print(f"WARN: Skipping unknown URN type for stats: {urn}") if share_idx == 0 and ugc_idx == 0: print("WARN: Skipping stats fetch for batch as no valid share/ugcPost URNs found.") continue print(f"Fetching stats for batch from: {stats_url} with {len(params)-2} URNs using Marketing token") try: resp_stats = ln_comm.get(stats_url, params=params) print(f"→ STATS Request URL: {resp_stats.request.url}") # Log the exact URL called print(f"→ STATS Request Headers: {resp_stats.request.headers}") print(f"→ STATS Response Status: {resp_stats.status_code}") print(f"→ STATS Response Body (first 500 chars): {resp_stats.text[:500]}") resp_stats.raise_for_status() stats_data = resp_stats.json().get("elements", []) print(f"Received {len(stats_data)} stats elements for this batch.") # Map stats back to their URNs for elem in stats_data: # Key in response is 'share' or 'ugcPost' containing the URN urn_key = elem.get('share') or elem.get('ugcPost') if urn_key: # Store the whole 'totalShareStatistics' object stats_map[urn_key] = elem.get('totalShareStatistics', {}) else: print(f"WARN: Stats element missing 'share' or 'ugcPost' key: {elem}") except requests.exceptions.RequestException as e: status = e.response.status_code if e.response is not None else "N/A" details = "" if e.response is not None: try: details = f" Details: {e.response.json()}" except json.JSONDecodeError: details = f" Response: {e.response.text[:200]}..." print(f"ERROR fetching stats batch using Marketing token (Status: {status}).{details}") print("WARN: Skipping stats for this batch due to error.") # Optionally raise an error here if stats are critical, or continue with partial data # raise ValueError(f"Failed to fetch stats batch (Status: {status}).") from e print(f"Fetched stats for {len(stats_map)} posts in total.") # 5) Assemble combined post data combined_posts = [] for post in raw_posts: post_id = post.get("id") if not post_id: continue stats = stats_map.get(post_id, {}) # Get stats dict, default to empty if not found published_ts = post.get("publishedAt") created_ts = post.get("createdAt") # Prefer publishedAt, fallback to createdAt timestamp = published_ts or created_ts when = datetime.fromtimestamp(timestamp / 1000).strftime("%Y-%m-%d %H:%M") if timestamp else "Unknown Date" # --- Text Extraction Logic --- text = "" # Priority: REST API 'commentary' field seems most reliable for simple text posts commentary_rest = post.get("commentary") if commentary_rest: text = commentary_rest else: # Fallback to V2 style fields if REST commentary is missing # Check specificContent first (for shares with commentary) specific_content = post.get("specificContent", {}) share_content = specific_content.get("com.linkedin.ugc.ShareContent", {}) share_commentary_v2 = share_content.get("shareCommentaryV2", {}).get("text") if share_commentary_v2: text = share_commentary_v2 else: # Check top-level commentaryV2 (less common?) commentary_v2 = post.get("commentaryV2", {}).get("text") if commentary_v2: text = commentary_v2 else: # Check for article titles if it's an article share article_content = specific_content.get("com.linkedin.ugc.ArticleContent", {}) article_title = article_content.get("title") if article_title: text = f"Article: {article_title}" else: # Check older 'content' field (might be deprecated) content_text = post.get("content", {}).get("text", {}).get("text") if content_text: text = content_text else: # Final fallback text = "[Media post or share without text]" # Escape and truncate text for HTML display display_text = html.escape(text[:250]).replace("\n", "❌ Error: Missing LinkedIn Marketing token. Please complete the login process on the 'Login' tab.
" try: print("Fetching posts and stats for dashboard...") # Pass only the necessary token posts_data, org_name = fetch_posts_and_stats(comm_client_id, community_token) # community_token kept for signature consistency print(f"Rendering {len(posts_data)} posts for {org_name}.") if not org_name: org_name = "[Organization Name Not Found]" # Handle case where org name wasn't fetched return render_post_cards(posts_data, org_name) except ValueError as ve: # Catch specific errors like missing orgs or token issues print(f"VALUE ERROR during dashboard fetch: {ve}") # Use display_error to format the message for HTML/Markdown error_update = display_error(f"Configuration or API Error: {ve}", ve) return error_update.get('value', "❌ A configuration or API error occurred.
") except requests.exceptions.RequestException as re: print(f"HTTP ERROR during dashboard fetch: {re}") status_code = re.response.status_code if re.response else "N/A" error_update = display_error(f"API Request Failed (Status: {status_code}). Check permissions/scopes or API status.", re) return error_update.get('value', f"❌ API Error: {status_code}. Check console logs.
") except Exception as e: print(f"UNEXPECTED ERROR during dashboard fetch: {e}") error_update = display_error("Failed to fetch or render dashboard data.", e) error_html = error_update.get('value', "❌ An unexpected error occurred. Check console logs.
") # Ensure the error message is HTML-safe if isinstance(error_html, str) and not error_html.strip().startswith("<"): error_html = f"{html.escape(error_html)}" return error_html