Spaces:
Running
Running
File size: 17,353 Bytes
7c999dd 543fdff 7c999dd 543fdff 7c999dd 517193e 7c999dd 517193e 7c999dd 543fdff 7c999dd 517193e 7c999dd 517193e 7c999dd 543fdff 7c999dd 517193e 7c999dd 517193e 7c999dd 517193e 7c999dd 543fdff 7c999dd 543fdff 7c999dd 543fdff 7c999dd 517193e 7c999dd 543fdff 7c999dd 543fdff 7c999dd 517193e 7c999dd 517193e 7c999dd 543fdff 7c999dd 543fdff 7c999dd 543fdff 7c999dd 543fdff 7c999dd 543fdff 7c999dd 517193e 7c999dd 517193e 7c999dd 517193e 543fdff 7c999dd 543fdff 7c999dd 543fdff 7c999dd 517193e 7c999dd 517193e 7c999dd 517193e 543fdff 517193e 7c999dd 517193e 7c999dd 517193e 7c999dd 517193e 7c999dd 517193e 7c999dd 517193e 543fdff 517193e 543fdff 517193e 543fdff 517193e 7c999dd 517193e 7c999dd 517193e 7c999dd 517193e 543fdff 7c999dd 517193e 7c999dd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# -- coding: utf-8 --
import json
import requests
import logging
from datetime import datetime, timezone
from dateutil.relativedelta import relativedelta # For robust month arithmetic
from urllib.parse import quote
# Assuming you have a sessions.py with create_session
# If sessions.py or create_session is not found, it will raise an ImportError,
# which is appropriate for a module that depends on it.
from sessions import create_session
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
API_V2_BASE = 'https://api.linkedin.com/v2'
API_REST_BASE = "https://api.linkedin.com/rest"
LINKEDIN_API_VERSION = "202502" # As per user's example for follower stats
# --- ID to Name Mapping Helper Functions ---
def _fetch_linkedin_names(session, url, params, result_key_path, name_key_path, id_key="id"):
"""
Generic helper to fetch and map IDs to names from a LinkedIn API endpoint.
result_key_path: list of keys to navigate to the list of items (e.g., ["elements"])
name_key_path: list of keys to navigate to the name within an item (e.g., ["name", "localized", "en_US"])
"""
mapping = {}
try:
logging.debug(f"Fetching names from URL: {url} with params: {json.dumps(params)}") # Log params for clarity
response = session.get(url, params=params)
response.raise_for_status()
data = response.json()
items = data
for key in result_key_path:
if isinstance(items, dict):
items = items.get(key, [])
else:
logging.warning(f"Expected dict to get key '{key}' but got {type(items)} at path {result_key_path} for URL {url}. Check result_key_path.")
return mapping
if isinstance(items, dict):
for item_id_str, item_data in items.items():
name = item_data
for key_nav in name_key_path:
if isinstance(name, dict):
name = name.get(key_nav)
else:
name = None
break
if name:
mapping[item_id_str] = name
else:
logging.warning(f"No name found for ID {item_id_str} at path {name_key_path} in item: {item_data} from URL {url}")
elif isinstance(items, list):
for item in items:
item_id_val = item.get(id_key)
name = item
for key_nav in name_key_path:
if isinstance(name, dict):
name = name.get(key_nav)
else:
name = None
break
if item_id_val is not None and name:
mapping[str(item_id_val)] = name
else:
logging.warning(f"No ID ('{id_key}') or name found at path {name_key_path} in item: {item} from URL {url}")
else:
logging.warning(f"Expected list or dict of items at {result_key_path} from URL {url}, got {type(items)}")
except requests.exceptions.RequestException as e:
status_code = getattr(e.response, 'status_code', 'N/A')
error_text = getattr(e.response, 'text', str(e)) # Log the raw error text
logging.error(f"Error fetching names from {url} (Status: {status_code}): {error_text}")
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON for names from {url}: {e}. Response: {response.text if 'response' in locals() else 'N/A'}")
except Exception as e:
logging.error(f"Unexpected error fetching names from {url}: {e}", exc_info=True)
return mapping
def get_functions_map(session):
"""Fetches all LinkedIn functions and returns a map of {id: name}."""
url = f"{API_V2_BASE}/functions"
params = {} # Relies on Accept-Language header from session
logging.info("Fetching all LinkedIn functions.")
return _fetch_linkedin_names(session, url, params, ["elements"], ["name", "localized", "en_US"], "id")
def get_seniorities_map(session):
"""Fetches all LinkedIn seniorities and returns a map of {id: name}."""
url = f"{API_V2_BASE}/seniorities"
params = {} # Relies on Accept-Language header from session
logging.info("Fetching all LinkedIn seniorities.")
return _fetch_linkedin_names(session, url, params, ["elements"], ["name", "localized", "en_US"], "id")
def get_industries_map(session, industry_urns, version="DEFAULT"):
"""Fetches names for a list of industry URNs. Returns a map {id: name}."""
if not industry_urns: return {}
industry_ids = [_parse_urn_to_id(urn) for urn in industry_urns if urn]
unique_ids = list(set(filter(None, industry_ids)))
if not unique_ids: return {}
url = f"{API_V2_BASE}/industryTaxonomyVersions/{version}/industries"
# As per LinkedIn docs for BATCH_GET: ids={id1}&ids={id2}&locale.language=en&locale.country=US
params = {
'ids': unique_ids, # requests library will format this as ids=id1&ids=id2...
'locale.language': 'en',
'locale.country': 'US'
}
logging.info(f"Fetching names for {len(unique_ids)} unique industry IDs using BATCH_GET.")
return _fetch_linkedin_names(session, url, params, ["results"], ["name", "localized", "en_US"])
def get_geo_map(session, geo_urns):
"""Fetches names for a list of geo URNs. Returns a map {id: name}."""
if not geo_urns: return {}
geo_ids = [_parse_urn_to_id(urn) for urn in geo_urns if urn]
unique_ids = list(set(filter(None, geo_ids)))
if not unique_ids: return {}
# As per LinkedIn docs for BATCH_GET: ids=List(12345,23456)&locale=(language:en,country:US)
ids_param_string = "List(" + ",".join(map(str, unique_ids)) + ")"
locale_param_string = "(language:en,country:US)" # Must be exactly this string format
# Parameters must be passed in the URL string directly for this specific API format
# The `params` dict for session.get() will be empty.
url = f"{API_V2_BASE}/geo?ids={quote(ids_param_string)}&locale={quote(locale_param_string)}"
logging.info(f"Fetching names for {len(unique_ids)} unique geo IDs using URL: {url}")
return _fetch_linkedin_names(session, url, {}, ["results"], ["defaultLocalizedName", "value"])
def _parse_urn_to_id(urn_string):
"""Helper to get the last part (ID) from a URN string."""
if not isinstance(urn_string, str):
logging.debug(f"Invalid URN type: {type(urn_string)}, value: {urn_string}. Cannot parse ID.")
return None
try:
return urn_string.split(':')[-1]
except IndexError:
logging.warning(f"Could not parse ID from URN: {urn_string}")
return None
except Exception as e:
logging.error(f"Unexpected error parsing URN {urn_string}: {e}")
return None
# --- Follower Data Fetching Functions ---
def fetch_monthly_follower_gains(session, org_urn):
"""
Fetches monthly follower gains for the last 12-13 months to ensure full coverage.
Uses parameter names as confirmed by user's working script.
"""
results = []
now = datetime.now(timezone.utc)
# Go back 13 months to ensure we capture at least 12 full previous months
# and have a buffer, as LinkedIn might report based on full previous months.
thirteen_months_ago = now - relativedelta(months=13)
start_of_period = thirteen_months_ago.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
start_ms = int(start_of_period.timestamp() * 1000)
# Parameters as per user's working script and common LinkedIn patterns for time-bound stats
params = {
'q': 'organizationalEntity',
'organizationalEntity': org_urn,
'timeIntervals.timeGranularityType': 'MONTH',
'timeIntervals.timeRange.start': start_ms
}
url = f"{API_REST_BASE}/organizationalEntityFollowerStatistics"
logging.info(f"Fetching monthly follower gains from: {url} with params: {json.dumps(params)}")
try:
response = session.get(url, params=params)
response.raise_for_status()
data = response.json()
for item in data.get("elements", []):
time_range = item.get("timeRange", {})
start_timestamp_ms = time_range.get("start")
if start_timestamp_ms is None:
logging.warning("Skipping item due to missing start timestamp in monthly gains.")
continue
date_obj = datetime.fromtimestamp(start_timestamp_ms / 1000, tz=timezone.utc)
date_str = date_obj.strftime('%Y-%m-%d') # First day of the month
follower_gains = item.get("followerGains", {})
organic_gain = follower_gains.get("organicFollowerGain", 0)
paid_gain = follower_gains.get("paidFollowerGain", 0)
results.append({
"category_name": date_str,
"follower_count_organic": organic_gain,
"follower_count_paid": paid_gain,
"follower_count_type": "follower_gains_monthly",
"organization_urn": org_urn
})
logging.info(f"Fetched {len(results)} monthly follower gain entries for org URN {org_urn}.")
except requests.exceptions.RequestException as e:
status_code = getattr(e.response, 'status_code', 'N/A')
error_text = getattr(e.response, 'text', str(e))
logging.error(f"Error fetching monthly follower gains for {org_urn} (Status: {status_code}): {error_text}")
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON for monthly follower gains for {org_urn}: {e}. Response: {response.text if 'response' in locals() else 'N/A'}")
except Exception as e:
logging.error(f"Unexpected error fetching monthly follower gains for {org_urn}: {e}", exc_info=True)
return results
def fetch_follower_demographics(session, org_urn, functions_map, seniorities_map):
"""
Fetches current follower demographics, applying Top-N for specified categories.
"""
final_demographics_results = []
# Parameters for the main demographics call
params = {
'q': 'organizationalEntity',
'organizationalEntity': org_urn
}
url = f"{API_REST_BASE}/organizationalEntityFollowerStatistics"
logging.info(f"Fetching follower demographics from: {url} for org URN {org_urn} with params: {json.dumps(params)}")
try:
response = session.get(url, params=params)
response.raise_for_status()
data = response.json()
elements = data.get("elements", [])
if not elements:
logging.warning(f"No elements found in follower demographics response for {org_urn}.")
return []
stat_element = elements[0]
def _get_entries_for_type(raw_items_list, type_name, id_map, id_field_name_in_item, org_urn_val):
current_type_entries = []
if not raw_items_list:
logging.debug(f"No raw items for demographic type '{type_name}' for org {org_urn_val}.")
return current_type_entries
for item in raw_items_list:
category_name_val = "Unknown"
if type_name == "follower_association":
category_name_val = item.get(id_field_name_in_item, f"Unknown {id_field_name_in_item}")
else:
urn_val = item.get(id_field_name_in_item)
entity_id = _parse_urn_to_id(urn_val)
category_name_val = id_map.get(str(entity_id), f"Unknown {type_name.split('_')[-1].capitalize()} (ID: {entity_id if entity_id else urn_val})")
counts = item.get("followerCounts", {})
organic_count = counts.get("organicFollowerCount", 0)
paid_count = counts.get("paidFollowerCount", 0)
current_type_entries.append({
"category_name": category_name_val,
"follower_count_organic": organic_count,
"follower_count_paid": paid_count,
"follower_count_type": type_name,
"organization_urn": org_urn_val
})
return current_type_entries
industry_urns_to_map = [item.get("industry") for item in stat_element.get("followerCountsByIndustry", []) if item.get("industry")]
geo_urns_to_map = [item.get("geo") for item in stat_element.get("followerCountsByGeoCountry", []) if item.get("geo")]
live_industries_map = get_industries_map(session, industry_urns_to_map)
live_geo_map = get_geo_map(session, geo_urns_to_map)
demographic_configs = [
{"items_key": "followerCountsBySeniority", "type_name": "follower_seniority", "id_map": seniorities_map, "id_field": "seniority", "top_n": 10},
{"items_key": "followerCountsByFunction", "type_name": "follower_function", "id_map": functions_map, "id_field": "function", "top_n": 10},
{"items_key": "followerCountsByIndustry", "type_name": "follower_industry", "id_map": live_industries_map, "id_field": "industry", "top_n": 10},
{"items_key": "followerCountsByGeoCountry", "type_name": "follower_geo", "id_map": live_geo_map, "id_field": "geo", "top_n": 10},
{"items_key": "followerCountsByAssociationType", "type_name": "follower_association", "id_map": {}, "id_field": "associationType", "top_n": None}
]
for config in demographic_configs:
raw_items = stat_element.get(config["items_key"], [])
processed_entries = _get_entries_for_type(raw_items, config["type_name"], config["id_map"], config["id_field"], org_urn)
if config["top_n"] is not None and processed_entries:
for entry in processed_entries:
if not isinstance(entry.get("follower_count_organic"), (int, float)):
entry["follower_count_organic"] = 0
sorted_entries = sorted(processed_entries, key=lambda x: x.get("follower_count_organic", 0), reverse=True)
final_demographics_results.extend(sorted_entries[:config["top_n"]])
logging.debug(f"Added top {config['top_n']} for {config['type_name']}. Count: {len(sorted_entries[:config['top_n']])}")
else:
final_demographics_results.extend(processed_entries)
logging.debug(f"Added all for {config['type_name']}. Count: {len(processed_entries)}")
logging.info(f"Processed follower demographics for {org_urn}. Total entries from all types: {len(final_demographics_results)}")
except requests.exceptions.RequestException as e:
status_code = getattr(e.response, 'status_code', 'N/A')
error_text = getattr(e.response, 'text', str(e))
logging.error(f"Error fetching follower demographics for {org_urn} (Status: {status_code}): {error_text}")
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON for follower demographics for {org_urn}: {e}. Response: {response.text if 'response' in locals() else 'N/A'}")
except Exception as e:
logging.error(f"Unexpected error fetching follower demographics for {org_urn}: {e}", exc_info=True)
return final_demographics_results
# --- Main Orchestration Function ---
def get_linkedin_follower_stats(comm_client_id, community_token, org_urn):
"""
Main function to fetch all follower statistics (monthly gains and demographics)
and format them for Bubble.
"""
if not all([comm_client_id, community_token, org_urn]):
logging.error("Client ID, token, or Organization URN is missing for get_linkedin_follower_stats.")
return []
token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
session = None
try:
session = create_session(comm_client_id, token=token_dict)
session.headers.update({
"X-Restli-Protocol-Version": "2.0.0",
"LinkedIn-Version": LINKEDIN_API_VERSION,
"Accept-Language": "en_US" # Explicitly set for v2 name lookups if not default in session
})
except Exception as e:
logging.error(f"Failed to create session or update headers for org {org_urn}: {e}", exc_info=True)
return []
logging.info(f"Starting follower stats retrieval for org: {org_urn}")
functions_map = get_functions_map(session)
seniorities_map = get_seniorities_map(session)
if not functions_map: logging.warning(f"Functions map is empty for org {org_urn}. Function names might not be resolved.")
if not seniorities_map: logging.warning(f"Seniorities map is empty for org {org_urn}. Seniority names might not be resolved.")
all_follower_data = []
monthly_gains = fetch_monthly_follower_gains(session, org_urn)
all_follower_data.extend(monthly_gains)
demographics = fetch_follower_demographics(session, org_urn, functions_map, seniorities_map)
all_follower_data.extend(demographics)
logging.info(f"Successfully compiled {len(all_follower_data)} total follower stat entries for {org_urn}.")
return all_follower_data
|