Spaces:
Running
Running
File size: 20,079 Bytes
7c999dd 0c8b711 7c999dd 543fdff 7c999dd 543fdff 7c999dd 517193e 7c999dd 517193e 7c999dd 543fdff 7c999dd 517193e 7c999dd 517193e 7c999dd 543fdff 7c999dd 517193e 7c999dd 517193e 7c999dd 517193e 7c999dd 543fdff 7c999dd 543fdff 7c999dd 543fdff 7c999dd b0d28b8 7c999dd 543fdff b0d28b8 543fdff b0d28b8 7c999dd 543fdff b205514 543fdff 7c999dd 517193e 7c999dd 517193e 7c999dd c737aea 7c999dd c737aea 7c999dd b205514 0c8b711 b205514 fec5ffe b205514 fec5ffe 7c999dd fec5ffe b0d28b8 fec5ffe 7c999dd fec5ffe c737aea 36403b7 c737aea b0d28b8 c737aea 7c999dd c737aea b0d28b8 c737aea 7c999dd c737aea b0d28b8 7c999dd c737aea 7c999dd c737aea 7c999dd 517193e 7c999dd 517193e 543fdff 7c999dd 543fdff 7c999dd 543fdff 7c999dd 517193e 7c999dd 517193e 7c999dd 517193e 543fdff 517193e 7c999dd 517193e 7c999dd 517193e 7c999dd 517193e 7c999dd 517193e 7c999dd 517193e 543fdff 517193e 543fdff 517193e 543fdff 517193e 7c999dd 517193e 7c999dd 517193e 7c999dd 517193e 543fdff 7c999dd 517193e 7c999dd 19d591e 7c999dd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 |
# -- coding: utf-8 --
import json
import requests
import logging
from datetime import datetime, timezone, timedelta
from urllib.parse import quote
# Assuming you have a sessions.py with create_session
# If sessions.py or create_session is not found, it will raise an ImportError,
# which is appropriate for a module that depends on it.
from sessions import create_session
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
API_V2_BASE = 'https://api.linkedin.com/v2'
API_REST_BASE = "https://api.linkedin.com/rest"
LINKEDIN_API_VERSION = "202502" # As per user's example for follower stats
# --- ID to Name Mapping Helper Functions ---
def _fetch_linkedin_names(session, url, params, result_key_path, name_key_path, id_key="id"):
"""
Generic helper to fetch and map IDs to names from a LinkedIn API endpoint.
result_key_path: list of keys to navigate to the list of items (e.g., ["elements"])
name_key_path: list of keys to navigate to the name within an item (e.g., ["name", "localized", "en_US"])
"""
mapping = {}
try:
logging.debug(f"Fetching names from URL: {url} with params: {json.dumps(params)}") # Log params for clarity
response = session.get(url, params=params)
response.raise_for_status()
data = response.json()
items = data
for key in result_key_path:
if isinstance(items, dict):
items = items.get(key, [])
else:
logging.warning(f"Expected dict to get key '{key}' but got {type(items)} at path {result_key_path} for URL {url}. Check result_key_path.")
return mapping
if isinstance(items, dict):
for item_id_str, item_data in items.items():
name = item_data
for key_nav in name_key_path:
if isinstance(name, dict):
name = name.get(key_nav)
else:
name = None
break
if name:
mapping[item_id_str] = name
else:
logging.warning(f"No name found for ID {item_id_str} at path {name_key_path} in item: {item_data} from URL {url}")
elif isinstance(items, list):
for item in items:
item_id_val = item.get(id_key)
name = item
for key_nav in name_key_path:
if isinstance(name, dict):
name = name.get(key_nav)
else:
name = None
break
if item_id_val is not None and name:
mapping[str(item_id_val)] = name
else:
logging.warning(f"No ID ('{id_key}') or name found at path {name_key_path} in item: {item} from URL {url}")
else:
logging.warning(f"Expected list or dict of items at {result_key_path} from URL {url}, got {type(items)}")
except requests.exceptions.RequestException as e:
status_code = getattr(e.response, 'status_code', 'N/A')
error_text = getattr(e.response, 'text', str(e)) # Log the raw error text
logging.error(f"Error fetching names from {url} (Status: {status_code}): {error_text}")
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON for names from {url}: {e}. Response: {response.text if 'response' in locals() else 'N/A'}")
except Exception as e:
logging.error(f"Unexpected error fetching names from {url}: {e}", exc_info=True)
return mapping
def get_functions_map(session):
"""Fetches all LinkedIn functions and returns a map of {id: name}."""
url = f"{API_V2_BASE}/functions"
params = {} # Relies on Accept-Language header from session
logging.info("Fetching all LinkedIn functions.")
return _fetch_linkedin_names(session, url, params, ["elements"], ["name", "localized", "en_US"], "id")
def get_seniorities_map(session):
"""Fetches all LinkedIn seniorities and returns a map of {id: name}."""
url = f"{API_V2_BASE}/seniorities"
params = {} # Relies on Accept-Language header from session
logging.info("Fetching all LinkedIn seniorities.")
return _fetch_linkedin_names(session, url, params, ["elements"], ["name", "localized", "en_US"], "id")
def get_industries_map(session, industry_urns, version="DEFAULT"):
"""Fetches names for a list of industry URNs by pulling ALL industries and filtering locally."""
# parse and dedupe IDs
industry_ids = [_parse_urn_to_id(urn) for urn in industry_urns or []]
unique_ids = set(filter(None, industry_ids))
if not unique_ids:
return {}
# we'll page through the full list; LinkedIn defaults to 10, so bump count
url = f"{API_V2_BASE}/industryTaxonomyVersions/{version}/industries"
params = {
'start': 0,
'count': 500 # should exceed total # of industries
}
logging.info(f"Fetching all industries (to filter {len(unique_ids)} IDs) from {url}")
try:
response = session.get(url, params=params)
response.raise_for_status()
data = response.json()
elements = data.get('elements', [])
mapping = {}
for el in elements:
el_id = el.get('id')
if el_id and str(el_id) in unique_ids:
# drill into name.localized.en_US
name = el.get('name', {}) \
.get('localized', {}) \
.get('en_US')
if name:
mapping[str(el_id)] = name
else:
logging.warning(f"Industry {el_id} has no en_US name field")
return mapping
except requests.exceptions.RequestException as e:
status_code = getattr(e.response, 'status_code', 'N/A')
logging.error(f"Error fetching all industries: {status_code} – {getattr(e.response, 'text', str(e))}")
return {}
def get_geo_map(session, geo_urns):
"""Fetches names for a list of geo URNs. Returns a map {id: name}."""
if not geo_urns: return {}
geo_ids = [_parse_urn_to_id(urn) for urn in geo_urns if urn]
unique_ids = list(set(filter(None, geo_ids)))
if not unique_ids: return {}
# As per LinkedIn docs for BATCH_GET: ids=List(12345,23456)&locale=(language:en,country:US)
ids_param_string = "List(" + ",".join(map(str, unique_ids)) + ")"
locale_param_string = "(language:en,country:US)" # Must be exactly this string format
# Parameters must be passed in the URL string directly for this specific API format
# The `params` dict for session.get() will be empty.
url = f"{API_V2_BASE}/geo?ids={ids_param_string}&locale={locale_param_string}"
#url = f"{API_V2_BASE}/geo?ids=List({','.join(map(str, unique_ids))})&locale=(language:en,country:US)"
logging.info(f"Fetching names for {len(unique_ids)} unique geo IDs using URL: {url}")
return _fetch_linkedin_names(session, url, {}, ["results"], ["defaultLocalizedName", "value"])
def _parse_urn_to_id(urn_string):
"""Helper to get the last part (ID) from a URN string."""
if not isinstance(urn_string, str):
logging.debug(f"Invalid URN type: {type(urn_string)}, value: {urn_string}. Cannot parse ID.")
return None
try:
return urn_string.split(':')[-1]
except IndexError:
logging.warning(f"Could not parse ID from URN: {urn_string}")
return None
except Exception as e:
logging.error(f"Unexpected error parsing URN {urn_string}: {e}")
return None
# --- Follower Data Fetching Functions ---
def fetch_monthly_follower_gains(session, org_urn, api_rest_base):
"""
Fetches monthly follower gains for the last 12 full months.
The start date is set to the first day of the month, 12 months prior to the current month, at midnight UTC.
"""
# now = datetime.now()
# twelve_months_ago = now - timedelta(days=365)
# twelve_months_ago = twelve_months_ago.replace(day=1)
# start_date = int(twelve_months_ago.timestamp() * 1000)
# # Build URL with explicit query string
# url = (
# f"{api_rest_base}/organizationalEntityFollowerStatistics"
# f"?q=organizationalEntity"
# f"&organizationalEntity={org_urn}"
# f"&timeIntervals.timeGranularityType=MONTH"
# f"&timeIntervals.timeRange.start={start_date}"
# # LinkedIn defaults the end of the timeRange to the current time if not specified.
# )
# logging.info(f"Fetching monthly follower gains from URL: {url}")
now_utc = datetime.now(timezone.utc)
start_of_reporting_period = (now_utc - timedelta(days=365)).replace(day=1, hour=0, minute=0, second=0, microsecond=0)
start_ms = int(start_of_reporting_period.timestamp() * 1000)
base_url = f"{api_rest_base_url}/organizationalEntityFollowerStatistics"
time_intervals_value = f"(timeRange:(start:{start_ms}),timeGranularityType:MONTH)"
api_params = {
"q": "organizationalEntity",
"organizationalEntity": org_urn,
"timeIntervals": time_intervals_value
}
logging.info(f"Preparing to fetch monthly follower gains for {org_urn}.")
logging.debug(f"API Parameters for monthly gains: {json.dumps(api_params)}")
results = []
request_url_for_logging = "Not constructed"
response_obj = None # To store response for logging in broader exception blocks
try:
req = requests.Request('GET', base_url, params=api_params)
prepared_req = session.prepare_request(req)
request_url_for_logging = prepared_req.url
logging.info(f"Requesting monthly follower gains from URL: {request_url_for_logging}")
logging.debug(f"Request Headers for monthly gains: {json.dumps(dict(prepared_req.headers), indent=2)}")
response_obj = session.send(prepared_req, timeout=30) # Added timeout
response_obj.raise_for_status()
data = response_obj.json()
elements = data.get('elements', [])
if not elements:
logging.info(f"No 'elements' found in API response for {org_urn} for start_ms {start_ms}.")
for item in elements:
time_range = item.get('timeRange', {})
ts = time_range.get('start')
if ts is None:
logging.warning(f"Skipping item due to missing 'start' timestamp: {item}")
continue
# Convert timestamp (milliseconds) to YYYY-MM-DD date string in UTC
date_obj = datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
date_str = date_obj.strftime('%Y-%m-%d')
gains = item.get('followerGains', {})
# It's possible 'followerGains' itself is missing or None
if gains is None:
gains = {} # Ensure gains is a dict to prevent error on .get()
results.append({
'category_name': date_str, # This is the start date of the month's data
'follower_count_organic': gains.get('organicFollowerGain', 0),
'follower_count_paid': gains.get('paidFollowerGain', 0),
'follower_count_type': 'follower_gains_monthly',
'organization_urn': org_urn
})
logging.info(f"Fetched {len(results)} monthly follower entries for {org_urn} starting from {start_of_period.strftime('%Y-%m-%d')}.")
except requests.exceptions.HTTPError as http_err:
# More specific error for HTTP errors
code = getattr(http_err.response, 'status_code', 'N/A')
text = getattr(http_err.response, 'text', str(http_err))
logging.error(f"HTTP error fetching monthly gains for {org_urn}: {code} - {text}")
logging.error(f"Request URL: {url}")
except requests.exceptions.RequestException as e:
# Catch other request-related errors (e.g., connection issues)
code = getattr(e.response, 'status_code', 'N/A') if e.response is not None else 'N/A'
text = getattr(e.response, 'text', str(e)) if e.response is not None else str(e)
logging.error(f"Error fetching monthly gains for {org_urn}: {code} - {text}")
logging.error(f"Request URL: {url}")
except Exception as ex:
# Catch any other unexpected errors (e.g., JSON parsing if response is not JSON)
logging.error(f"An unexpected error occurred while fetching monthly gains for {org_urn}: {str(ex)}")
logging.error(f"Request URL: {url}")
return results
def fetch_follower_demographics(session, org_urn, functions_map, seniorities_map):
"""
Fetches current follower demographics, applying Top-N for specified categories.
"""
final_demographics_results = []
# Parameters for the main demographics call
params = {
'q': 'organizationalEntity',
'organizationalEntity': org_urn
}
url = f"{API_REST_BASE}/organizationalEntityFollowerStatistics"
logging.info(f"Fetching follower demographics from: {url} for org URN {org_urn} with params: {json.dumps(params)}")
try:
response = session.get(url, params=params)
response.raise_for_status()
data = response.json()
elements = data.get("elements", [])
if not elements:
logging.warning(f"No elements found in follower demographics response for {org_urn}.")
return []
stat_element = elements[0]
def _get_entries_for_type(raw_items_list, type_name, id_map, id_field_name_in_item, org_urn_val):
current_type_entries = []
if not raw_items_list:
logging.debug(f"No raw items for demographic type '{type_name}' for org {org_urn_val}.")
return current_type_entries
for item in raw_items_list:
category_name_val = "Unknown"
if type_name == "follower_association":
category_name_val = item.get(id_field_name_in_item, f"Unknown {id_field_name_in_item}")
else:
urn_val = item.get(id_field_name_in_item)
entity_id = _parse_urn_to_id(urn_val)
category_name_val = id_map.get(str(entity_id), f"Unknown {type_name.split('_')[-1].capitalize()} (ID: {entity_id if entity_id else urn_val})")
counts = item.get("followerCounts", {})
organic_count = counts.get("organicFollowerCount", 0)
paid_count = counts.get("paidFollowerCount", 0)
current_type_entries.append({
"category_name": category_name_val,
"follower_count_organic": organic_count,
"follower_count_paid": paid_count,
"follower_count_type": type_name,
"organization_urn": org_urn_val
})
return current_type_entries
industry_urns_to_map = [item.get("industry") for item in stat_element.get("followerCountsByIndustry", []) if item.get("industry")]
geo_urns_to_map = [item.get("geo") for item in stat_element.get("followerCountsByGeoCountry", []) if item.get("geo")]
live_industries_map = get_industries_map(session, industry_urns_to_map)
live_geo_map = get_geo_map(session, geo_urns_to_map)
demographic_configs = [
{"items_key": "followerCountsBySeniority", "type_name": "follower_seniority", "id_map": seniorities_map, "id_field": "seniority", "top_n": 10},
{"items_key": "followerCountsByFunction", "type_name": "follower_function", "id_map": functions_map, "id_field": "function", "top_n": 10},
{"items_key": "followerCountsByIndustry", "type_name": "follower_industry", "id_map": live_industries_map, "id_field": "industry", "top_n": 10},
{"items_key": "followerCountsByGeoCountry", "type_name": "follower_geo", "id_map": live_geo_map, "id_field": "geo", "top_n": 10},
{"items_key": "followerCountsByAssociationType", "type_name": "follower_association", "id_map": {}, "id_field": "associationType", "top_n": None}
]
for config in demographic_configs:
raw_items = stat_element.get(config["items_key"], [])
processed_entries = _get_entries_for_type(raw_items, config["type_name"], config["id_map"], config["id_field"], org_urn)
if config["top_n"] is not None and processed_entries:
for entry in processed_entries:
if not isinstance(entry.get("follower_count_organic"), (int, float)):
entry["follower_count_organic"] = 0
sorted_entries = sorted(processed_entries, key=lambda x: x.get("follower_count_organic", 0), reverse=True)
final_demographics_results.extend(sorted_entries[:config["top_n"]])
logging.debug(f"Added top {config['top_n']} for {config['type_name']}. Count: {len(sorted_entries[:config['top_n']])}")
else:
final_demographics_results.extend(processed_entries)
logging.debug(f"Added all for {config['type_name']}. Count: {len(processed_entries)}")
logging.info(f"Processed follower demographics for {org_urn}. Total entries from all types: {len(final_demographics_results)}")
except requests.exceptions.RequestException as e:
status_code = getattr(e.response, 'status_code', 'N/A')
error_text = getattr(e.response, 'text', str(e))
logging.error(f"Error fetching follower demographics for {org_urn} (Status: {status_code}): {error_text}")
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON for follower demographics for {org_urn}: {e}. Response: {response.text if 'response' in locals() else 'N/A'}")
except Exception as e:
logging.error(f"Unexpected error fetching follower demographics for {org_urn}: {e}", exc_info=True)
return final_demographics_results
# --- Main Orchestration Function ---
def get_linkedin_follower_stats(comm_client_id, community_token, org_urn):
"""
Main function to fetch all follower statistics (monthly gains and demographics)
and format them for Bubble.
"""
if not all([comm_client_id, community_token, org_urn]):
logging.error("Client ID, token, or Organization URN is missing for get_linkedin_follower_stats.")
return []
token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
session = None
try:
session = create_session(comm_client_id, token=token_dict)
session.headers.update({
"X-Restli-Protocol-Version": "2.0.0",
"LinkedIn-Version": LINKEDIN_API_VERSION,
"Accept-Language": "en_US" # Explicitly set for v2 name lookups if not default in session
})
except Exception as e:
logging.error(f"Failed to create session or update headers for org {org_urn}: {e}", exc_info=True)
return []
logging.info(f"Starting follower stats retrieval for org: {org_urn}")
functions_map = get_functions_map(session)
seniorities_map = get_seniorities_map(session)
if not functions_map: logging.warning(f"Functions map is empty for org {org_urn}. Function names might not be resolved.")
if not seniorities_map: logging.warning(f"Seniorities map is empty for org {org_urn}. Seniority names might not be resolved.")
all_follower_data = []
monthly_gains = fetch_monthly_follower_gains(session, org_urn, API_REST_BASE)
all_follower_data.extend(monthly_gains)
demographics = fetch_follower_demographics(session, org_urn, functions_map, seniorities_map)
all_follower_data.extend(demographics)
logging.info(f"Successfully compiled {len(all_follower_data)} total follower stat entries for {org_urn}.")
return all_follower_data
|