Spaces:
Running
Running
from sessions import create_session | |
def fetch_org_urn(comm_client_id, comm_token_dict): | |
""" | |
Fetches the user's administrated organization URN and name using the Marketing token. | |
Expects comm_token_dict to be the full token dictionary. | |
Raises ValueError on failure. | |
""" | |
print("--- Fetching Organization URN ---") | |
if not comm_token_dict or 'access_token' not in comm_token_dict: | |
print("ERROR: Invalid or missing Marketing token dictionary for fetching Org URN.") | |
raise ValueError("Marketing token is missing or invalid.") | |
ln_mkt = create_session(comm_client_id, token=comm_token_dict) | |
# Fetch organizational roles directly using the V2 API | |
url = ( | |
f"{API_V2_BASE}/organizationalEntityAcls" | |
"?q=roleAssignee&role=ADMINISTRATOR&state=APPROVED" # Find orgs where user is ADMIN | |
"&projection=(elements*(*,organizationalTarget~(id,localizedName)))" # Get URN and name | |
) | |
print(f"Fetching Org URN details from: {url}") | |
try: | |
r = ln_mkt.get(url) | |
r.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
except requests.exceptions.RequestException as e: | |
print(f"ERROR: Failed to fetch organizationalEntityAcls with Marketing token.") | |
# Provide specific feedback based on status code if possible | |
status = e.response.status_code if e.response is not None else "N/A" | |
details = "" | |
if e.response is not None: | |
try: | |
details = f" Details: {e.response.json()}" | |
except json.JSONDecodeError: | |
details = f" Response: {e.response.text[:200]}..." # Show partial text | |
raise ValueError(f"Failed to fetch Organization details (Status: {status}). Check Marketing App permissions (r_organization_admin) and ensure the user is an admin of an org page.{details}") from e | |
data = r.json() | |
print(f"Org URN Response Data: {json.dumps(data, indent=2)}") | |
elements = data.get('elements') | |
if not elements: | |
print("WARNING: No organizations found where the user is an ADMINISTRATOR.") | |
# Try fetching with MEMBER role as a fallback? Might require different scope. | |
# For now, stick to ADMINISTRATOR as per scope. | |
raise ValueError("No organizations found for this user where they have the ADMINISTRATOR role. Ensure the Marketing App has 'r_organization_admin' permission and the user is an admin of an organization page.") | |
# Assuming the first organization found is the target | |
# In a real app, you might let the user choose if they admin multiple orgs. | |
org_element = elements[0] | |
# Extract Full URN ('organizationalTarget' field contains the URN string) | |
org_urn_full = org_element.get('organizationalTarget') | |
if not org_urn_full or not isinstance(org_urn_full, str) or not org_urn_full.startswith("urn:li:organization:"): | |
print(f"ERROR: Could not extract valid Organization URN ('organizationalTarget') from API response element: {org_element}") | |
raise ValueError("Could not extract a valid Organization URN from the API response.") | |
# Extract Name (from the projected 'organizationalTarget~' field) | |
org_name = None | |
# The key might be exactly 'organizationalTarget~' or something similar depending on projection syntax variations | |
org_target_details_key = next((k for k in org_element if k.endswith('organizationalTarget~')), None) | |
if org_target_details_key and isinstance(org_element.get(org_target_details_key), dict): | |
org_name = org_element[org_target_details_key].get('localizedName') | |
if not org_name: | |
# Fallback name using the ID part of the URN | |
org_id = org_urn_full.split(':')[-1] | |
org_name = f"Organization ({org_id})" | |
print(f"WARN: Could not find localizedName, using fallback: {org_name}") | |
print(f"Found Org: {org_name} ({org_urn_full})") | |
return org_urn_full, org_name | |