Spaces:
Running
Running
File size: 16,152 Bytes
7c999dd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 |
# -- coding: utf-8 --
import json
import requests
import logging
from datetime import datetime, timezone
from dateutil.relativedelta import relativedelta # For robust month arithmetic
from urllib.parse import quote
# Assuming you have a sessions.py with create_session
# If sessions.py or create_session is not found, it will raise an ImportError,
# which is appropriate for a module that depends on it.
from sessions import create_session
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
API_V2_BASE = 'https://api.linkedin.com/v2'
API_REST_BASE = "https://api.linkedin.com/rest"
LINKEDIN_API_VERSION = "202502" # As per user's example for follower stats
# --- ID to Name Mapping Helper Functions ---
def _fetch_linkedin_names(session, url, params, result_key_path, name_key_path, id_key="id"):
"""
Generic helper to fetch and map IDs to names from a LinkedIn API endpoint.
result_key_path: list of keys to navigate to the list of items (e.g., ["elements"])
name_key_path: list of keys to navigate to the name within an item (e.g., ["name", "localized", "en_US"])
Revised: Removed locale_needed parameter; calling functions should provide locale in params if required.
"""
mapping = {}
try:
logging.debug(f"Fetching names from URL: {url} with params: {params}")
response = session.get(url, params=params)
response.raise_for_status()
data = response.json()
items = data
for key in result_key_path: # Navigate to the list/dict of items
if isinstance(items, dict):
items = items.get(key, []) # Default to empty list if key not found
else: # If items is already not a dict
logging.warning(f"Expected dict to get key '{key}' but got {type(items)} at path {result_key_path} for URL {url}. Check result_key_path.")
return mapping # Cannot proceed with this path
if isinstance(items, dict): # For batch responses like geo/industry (where keys are IDs)
for item_id_str, item_data in items.items():
name = item_data
for key_nav in name_key_path: # Navigate to the name string
if isinstance(name, dict):
name = name.get(key_nav)
else:
name = None # Path broken
break
if name:
mapping[item_id_str] = name
else:
logging.warning(f"No name found for ID {item_id_str} at path {name_key_path} in item: {item_data} from URL {url}")
elif isinstance(items, list): # For list responses like functions/seniorities
for item in items:
item_id_val = item.get(id_key)
name = item
for key_nav in name_key_path: # Navigate to the name string
if isinstance(name, dict):
name = name.get(key_nav)
else:
name = None # Path broken
break
if item_id_val is not None and name:
mapping[str(item_id_val)] = name # Ensure ID is string for consistency
else:
logging.warning(f"No ID ('{id_key}') or name found at path {name_key_path} in item: {item} from URL {url}")
else:
logging.warning(f"Expected list or dict of items at {result_key_path} from URL {url}, got {type(items)}")
except requests.exceptions.RequestException as e:
status_code = getattr(e.response, 'status_code', 'N/A')
error_text = getattr(e.response, 'text', str(e))
logging.error(f"Error fetching names from {url} (Status: {status_code}): {error_text}")
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON for names from {url}: {e}. Response: {response.text if 'response' in locals() else 'N/A'}")
except Exception as e:
logging.error(f"Unexpected error fetching names from {url}: {e}", exc_info=True)
return mapping
def get_functions_map(session):
"""Fetches all LinkedIn functions and returns a map of {id: name}."""
url = f"{API_V2_BASE}/functions"
params = {'locale': 'en_US'}
logging.info("Fetching all LinkedIn functions.")
return _fetch_linkedin_names(session, url, params, ["elements"], ["name", "localized", "en_US"], "id")
def get_seniorities_map(session):
"""Fetches all LinkedIn seniorities and returns a map of {id: name}."""
url = f"{API_V2_BASE}/seniorities"
params = {'locale': 'en_US'}
logging.info("Fetching all LinkedIn seniorities.")
return _fetch_linkedin_names(session, url, params, ["elements"], ["name", "localized", "en_US"], "id")
def get_industries_map(session, industry_urns, version="DEFAULT"):
"""Fetches names for a list of industry URNs. Returns a map {id: name}."""
if not industry_urns: return {}
industry_ids = [_parse_urn_to_id(urn) for urn in industry_urns if urn]
unique_ids = list(set(filter(None, industry_ids))) # Filter out None IDs from parsing
if not unique_ids: return {}
url = f"{API_V2_BASE}/industryTaxonomyVersions/{version}/industries"
# LinkedIn API for batch industries expects ids as repeated query parameters: ids=1&ids=23
# The requests library handles lists in params by creating repeated query parameters.
params = {'ids': unique_ids, 'locale.language': 'en', 'locale.country': 'US'}
logging.info(f"Fetching names for {len(unique_ids)} unique industry IDs.")
return _fetch_linkedin_names(session, url, params, ["results"], ["name", "localized", "en_US"])
def get_geo_map(session, geo_urns):
"""Fetches names for a list of geo URNs. Returns a map {id: name}."""
if not geo_urns: return {}
geo_ids = [_parse_urn_to_id(urn) for urn in geo_urns if urn]
unique_ids = list(set(filter(None, geo_ids)))
if not unique_ids: return {}
# API expects ids=List(123,456) format in query string.
ids_param_value = "List(" + ",".join(map(str,unique_ids)) + ")" # Ensure IDs are strings
# Parameters are embedded in the URL for this specific format
# Note: locale params are added here directly as part of the URL construction for this specific endpoint style.
url = f"{API_V2_BASE}/geo?ids={quote(ids_param_value)}&locale.language=en&locale.country=US"
logging.info(f"Fetching names for {len(unique_ids)} unique geo IDs using URL: {url}")
# Params dict is empty as all params are in the URL string for this call.
return _fetch_linkedin_names(session, url, {}, ["results"], ["defaultLocalizedName", "value"])
def _parse_urn_to_id(urn_string):
"""Helper to get the last part (ID) from a URN string."""
if not isinstance(urn_string, str):
logging.warning(f"Invalid URN type: {type(urn_string)}, value: {urn_string}")
return None
try:
return urn_string.split(':')[-1]
except IndexError: # Handle cases where split doesn't yield enough parts
logging.warning(f"Could not parse ID from URN: {urn_string}")
return None
except Exception as e:
logging.error(f"Unexpected error parsing URN {urn_string}: {e}")
return None
# --- Follower Data Fetching Functions ---
def fetch_monthly_follower_gains(session, org_urn):
"""
Fetches monthly follower gains for the last 12-13 months to ensure full coverage.
"""
results = []
now = datetime.now(timezone.utc)
# Go back 13 months to ensure we capture at least 12 full previous months
thirteen_months_ago = now - relativedelta(months=13)
start_of_period = thirteen_months_ago.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
start_ms = int(start_of_period.timestamp() * 1000)
url = (
f"{API_REST_BASE}/organizationalEntityFollowerStatistics"
f"?q=organizationalEntity"
f"&organizationalEntity={quote(org_urn)}"
f"&timeIntervals.timeGranularityType=MONTH"
f"&timeIntervals.timeRange.start={start_ms}"
)
logging.info(f"Fetching monthly follower gains from: {url}")
try:
response = session.get(url)
response.raise_for_status()
data = response.json()
for item in data.get("elements", []):
time_range = item.get("timeRange", {})
start_timestamp_ms = time_range.get("start")
if start_timestamp_ms is None:
logging.warning("Skipping item due to missing start timestamp in monthly gains.")
continue
date_obj = datetime.fromtimestamp(start_timestamp_ms / 1000, tz=timezone.utc)
date_str = date_obj.strftime('%Y-%m-%d') # First day of the month
follower_gains = item.get("followerGains", {})
organic_gain = follower_gains.get("organicFollowerGain", 0)
paid_gain = follower_gains.get("paidFollowerGain", 0)
results.append({
"category_name": date_str,
"follower_count_organic": organic_gain,
"follower_count_paid": paid_gain,
"follower_count_type": "follower_gains_monthly",
"organization_urn": org_urn # Add org_urn for consistency
})
logging.info(f"Fetched {len(results)} monthly follower gain entries for org URN {org_urn}.")
except requests.exceptions.RequestException as e:
status_code = getattr(e.response, 'status_code', 'N/A')
error_text = getattr(e.response, 'text', str(e))
logging.error(f"Error fetching monthly follower gains for {org_urn} (Status: {status_code}): {error_text}")
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON for monthly follower gains for {org_urn}: {e}. Response: {response.text if 'response' in locals() else 'N/A'}")
except Exception as e:
logging.error(f"Unexpected error fetching monthly follower gains for {org_urn}: {e}", exc_info=True)
return results
def fetch_follower_demographics(session, org_urn, functions_map, seniorities_map):
"""
Fetches current follower demographics (seniority, industry, function, geo, association).
"""
results = []
url = (
f"{API_REST_BASE}/organizationalEntityFollowerStatistics"
f"?q=organizationalEntity&organizationalEntity={quote(org_urn)}"
)
logging.info(f"Fetching follower demographics from: {url} for org URN {org_urn}")
try:
response = session.get(url)
response.raise_for_status()
data = response.json()
elements = data.get("elements", [])
if not elements:
logging.warning(f"No elements found in follower demographics response for {org_urn}.")
return []
stat_element = elements[0] # Data is usually in the first element
# Collect URNs for batch mapping
industry_urns_to_map = [item.get("industry") for item in stat_element.get("followerCountsByIndustry", []) if item.get("industry")]
geo_urns_to_map = [item.get("geo") for item in stat_element.get("followerCountsByGeoCountry", []) if item.get("geo")]
industries_map = get_industries_map(session, industry_urns_to_map)
geo_map = get_geo_map(session, geo_urns_to_map)
# Helper to create demographic entries
def _add_demographic_entry(items_list, type_name, id_map, id_field_name, org_urn_val):
if not items_list:
logging.info(f"No items found for demographic type '{type_name}' for org {org_urn_val}.")
return
for item in items_list:
category_name_val = "Unknown"
if type_name == "follower_association": # associationType is directly the name
category_name_val = item.get("associationType", f"Unknown AssociationType")
else: # For URN-based categories
urn_val = item.get(id_field_name)
entity_id = _parse_urn_to_id(urn_val)
category_name_val = id_map.get(str(entity_id), f"Unknown {type_name.split('_')[-1].capitalize()} (ID: {entity_id if entity_id else urn_val})")
counts = item.get("followerCounts", {})
results.append({
"category_name": category_name_val,
"follower_count_organic": counts.get("organicFollowerCount", 0),
"follower_count_paid": counts.get("paidFollowerCount", 0),
"follower_count_type": type_name,
"organization_urn": org_urn_val
})
_add_demographic_entry(stat_element.get("followerCountsByAssociationType", []), "follower_association", {}, "associationType", org_urn)
_add_demographic_entry(stat_element.get("followerCountsBySeniority", []), "follower_seniority", seniorities_map, "seniority", org_urn)
_add_demographic_entry(stat_element.get("followerCountsByFunction", []), "follower_function", functions_map, "function", org_urn)
_add_demographic_entry(stat_element.get("followerCountsByIndustry", []), "follower_industry", industries_map, "industry", org_urn)
_add_demographic_entry(stat_element.get("followerCountsByGeoCountry", []), "follower_geo", geo_map, "geo", org_urn)
logging.info(f"Processed follower demographics for {org_urn}. Total entries from this type: {len(results)}")
except requests.exceptions.RequestException as e:
status_code = getattr(e.response, 'status_code', 'N/A')
error_text = getattr(e.response, 'text', str(e))
logging.error(f"Error fetching follower demographics for {org_urn} (Status: {status_code}): {error_text}")
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON for follower demographics for {org_urn}: {e}. Response: {response.text if 'response' in locals() else 'N/A'}")
except Exception as e:
logging.error(f"Unexpected error fetching follower demographics for {org_urn}: {e}", exc_info=True)
return results
# --- Main Orchestration Function ---
def get_linkedin_follower_stats(comm_client_id, community_token, org_urn):
"""
Main function to fetch all follower statistics (monthly gains and demographics)
and format them for Bubble.
"""
if not all([comm_client_id, community_token, org_urn]):
logging.error("Client ID, token, or Organization URN is missing for get_linkedin_follower_stats.")
return []
token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
session = None # Initialize session to None
try:
session = create_session(comm_client_id, token=token_dict)
session.headers.update({
"X-Restli-Protocol-Version": "2.0.0",
"LinkedIn-Version": LINKEDIN_API_VERSION
})
except Exception as e:
logging.error(f"Failed to create session or update headers for org {org_urn}: {e}", exc_info=True)
return [] # Cannot proceed without a session
logging.info(f"Starting follower stats retrieval for org: {org_urn}")
# These maps are fetched once per call to get_linkedin_follower_stats
functions_map = get_functions_map(session)
seniorities_map = get_seniorities_map(session)
if not functions_map: logging.warning(f"Functions map is empty for org {org_urn}. Function names might not be resolved.")
if not seniorities_map: logging.warning(f"Seniorities map is empty for org {org_urn}. Seniority names might not be resolved.")
all_follower_data = []
monthly_gains = fetch_monthly_follower_gains(session, org_urn)
all_follower_data.extend(monthly_gains)
demographics = fetch_follower_demographics(session, org_urn, functions_map, seniorities_map)
all_follower_data.extend(demographics)
logging.info(f"Successfully compiled {len(all_follower_data)} total follower stat entries for {org_urn}.")
return all_follower_data
|