GuglielmoTor commited on
Commit
cf2b325
·
verified ·
1 Parent(s): 15c15dc

Update linkedin_follower_stats.py

Browse files
Files changed (1) hide show
  1. linkedin_follower_stats.py +273 -178
linkedin_follower_stats.py CHANGED
@@ -1,14 +1,14 @@
1
- # -- coding: utf-8 --
2
  import json
3
  import requests
4
  import logging
5
  from datetime import datetime, timezone, timedelta
6
- from urllib.parse import quote
 
7
 
8
  # Assuming you have a sessions.py with create_session
9
  # If sessions.py or create_session is not found, it will raise an ImportError,
10
  # which is appropriate for a module that depends on it.
11
- from sessions import create_session
12
 
13
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
14
 
@@ -21,186 +21,213 @@ LINKEDIN_API_VERSION = "202502" # As per user's example for follower stats
21
  def _fetch_linkedin_names(session, url, params, result_key_path, name_key_path, id_key="id"):
22
  """
23
  Generic helper to fetch and map IDs to names from a LinkedIn API endpoint.
24
- result_key_path: list of keys to navigate to the list of items (e.g., ["elements"])
25
- name_key_path: list of keys to navigate to the name within an item (e.g., ["name", "localized", "en_US"])
26
  """
27
  mapping = {}
 
 
28
  try:
29
- logging.debug(f"Fetching names from URL: {url} with params: {json.dumps(params)}") # Log params for clarity
30
- response = session.get(url, params=params)
31
- response.raise_for_status()
32
- data = response.json()
 
 
 
 
 
 
 
33
 
34
  items = data
35
- for key in result_key_path:
36
  if isinstance(items, dict):
37
- items = items.get(key, [])
38
- else:
39
- logging.warning(f"Expected dict to get key '{key}' but got {type(items)} at path {result_key_path} for URL {url}. Check result_key_path.")
40
- return mapping
 
 
 
 
 
 
 
41
 
42
  if isinstance(items, dict):
43
  for item_id_str, item_data in items.items():
44
  name = item_data
45
- for key_nav in name_key_path:
46
  if isinstance(name, dict):
47
  name = name.get(key_nav)
48
  else:
49
- name = None
50
  break
51
  if name:
52
- mapping[item_id_str] = name
53
  else:
54
- logging.warning(f"No name found for ID {item_id_str} at path {name_key_path} in item: {item_data} from URL {url}")
55
  elif isinstance(items, list):
56
  for item in items:
57
  item_id_val = item.get(id_key)
58
  name = item
59
- for key_nav in name_key_path:
60
  if isinstance(name, dict):
61
  name = name.get(key_nav)
62
  else:
63
- name = None
64
  break
65
  if item_id_val is not None and name:
66
- mapping[str(item_id_val)] = name
67
  else:
68
- logging.warning(f"No ID ('{id_key}') or name found at path {name_key_path} in item: {item} from URL {url}")
 
 
 
69
  else:
70
- logging.warning(f"Expected list or dict of items at {result_key_path} from URL {url}, got {type(items)}")
71
-
72
- except requests.exceptions.RequestException as e:
73
- status_code = getattr(e.response, 'status_code', 'N/A')
74
- error_text = getattr(e.response, 'text', str(e)) # Log the raw error text
75
- logging.error(f"Error fetching names from {url} (Status: {status_code}): {error_text}")
76
- except json.JSONDecodeError as e:
77
- logging.error(f"Error decoding JSON for names from {url}: {e}. Response: {response.text if 'response' in locals() else 'N/A'}")
 
 
 
 
 
 
 
 
 
 
78
  except Exception as e:
79
- logging.error(f"Unexpected error fetching names from {url}: {e}", exc_info=True)
 
 
80
  return mapping
81
 
82
  def get_functions_map(session):
83
- """Fetches all LinkedIn functions and returns a map of {id: name}."""
84
  url = f"{API_V2_BASE}/functions"
85
- params = {} # Relies on Accept-Language header from session
86
  logging.info("Fetching all LinkedIn functions.")
87
  return _fetch_linkedin_names(session, url, params, ["elements"], ["name", "localized", "en_US"], "id")
88
 
89
  def get_seniorities_map(session):
90
- """Fetches all LinkedIn seniorities and returns a map of {id: name}."""
91
  url = f"{API_V2_BASE}/seniorities"
92
- params = {} # Relies on Accept-Language header from session
93
  logging.info("Fetching all LinkedIn seniorities.")
94
  return _fetch_linkedin_names(session, url, params, ["elements"], ["name", "localized", "en_US"], "id")
95
 
96
  def get_industries_map(session, industry_urns, version="DEFAULT"):
97
- """Fetches names for a list of industry URNs by pulling ALL industries and filtering locally."""
98
- # parse and dedupe IDs
99
  industry_ids = [_parse_urn_to_id(urn) for urn in industry_urns or []]
100
  unique_ids = set(filter(None, industry_ids))
101
  if not unique_ids:
102
  return {}
103
 
104
- # we'll page through the full list; LinkedIn defaults to 10, so bump count
105
  url = f"{API_V2_BASE}/industryTaxonomyVersions/{version}/industries"
106
- params = {
107
- 'start': 0,
108
- 'count': 500 # should exceed total # of industries
109
- }
110
-
111
- logging.info(f"Fetching all industries (to filter {len(unique_ids)} IDs) from {url}")
112
  try:
113
- response = session.get(url, params=params)
114
- response.raise_for_status()
115
- data = response.json()
 
 
 
 
 
 
 
116
  elements = data.get('elements', [])
117
 
118
  mapping = {}
 
119
  for el in elements:
120
  el_id = el.get('id')
121
  if el_id and str(el_id) in unique_ids:
122
- # drill into name.localized.en_US
123
- name = el.get('name', {}) \
124
- .get('localized', {}) \
125
- .get('en_US')
126
  if name:
127
  mapping[str(el_id)] = name
128
  else:
129
- logging.warning(f"Industry {el_id} has no en_US name field")
130
  return mapping
131
-
132
- except requests.exceptions.RequestException as e:
133
- status_code = getattr(e.response, 'status_code', 'N/A')
134
- logging.error(f"Error fetching all industries: {status_code} {getattr(e.response, 'text', str(e))}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
135
  return {}
136
-
137
-
138
 
139
  def get_geo_map(session, geo_urns):
140
- """Fetches names for a list of geo URNs. Returns a map {id: name}."""
141
  if not geo_urns: return {}
142
  geo_ids = [_parse_urn_to_id(urn) for urn in geo_urns if urn]
143
  unique_ids = list(set(filter(None, geo_ids)))
144
  if not unique_ids: return {}
145
 
146
- # As per LinkedIn docs for BATCH_GET: ids=List(12345,23456)&locale=(language:en,country:US)
147
- ids_param_string = "List(" + ",".join(map(str, unique_ids)) + ")"
148
- locale_param_string = "(language:en,country:US)" # Must be exactly this string format
149
 
150
- # Parameters must be passed in the URL string directly for this specific API format
151
- # The `params` dict for session.get() will be empty.
152
- url = f"{API_V2_BASE}/geo?ids={ids_param_string}&locale={locale_param_string}"
153
- #url = f"{API_V2_BASE}/geo?ids=List({','.join(map(str, unique_ids))})&locale=(language:en,country:US)"
154
 
155
-
156
- logging.info(f"Fetching names for {len(unique_ids)} unique geo IDs using URL: {url}")
157
- return _fetch_linkedin_names(session, url, {}, ["results"], ["defaultLocalizedName", "value"])
 
 
 
 
 
 
 
 
 
158
 
159
 
160
  def _parse_urn_to_id(urn_string):
161
- """Helper to get the last part (ID) from a URN string."""
162
  if not isinstance(urn_string, str):
163
  logging.debug(f"Invalid URN type: {type(urn_string)}, value: {urn_string}. Cannot parse ID.")
164
  return None
165
  try:
166
  return urn_string.split(':')[-1]
167
- except IndexError:
168
  logging.warning(f"Could not parse ID from URN: {urn_string}")
169
  return None
170
- except Exception as e:
171
  logging.error(f"Unexpected error parsing URN {urn_string}: {e}")
172
  return None
173
 
174
  # --- Follower Data Fetching Functions ---
175
 
176
- def fetch_monthly_follower_gains(session, org_urn, api_rest_base):
177
- """
178
- Fetches monthly follower gains for the last 12 full months.
179
- The start date is set to the first day of the month, 12 months prior to the current month, at midnight UTC.
180
- """
181
- # now = datetime.now()
182
-
183
- # twelve_months_ago = now - timedelta(days=365)
184
- # twelve_months_ago = twelve_months_ago.replace(day=1)
185
-
186
- # start_date = int(twelve_months_ago.timestamp() * 1000)
187
-
188
- # # Build URL with explicit query string
189
- # url = (
190
- # f"{api_rest_base}/organizationalEntityFollowerStatistics"
191
- # f"?q=organizationalEntity"
192
- # f"&organizationalEntity={org_urn}"
193
- # f"&timeIntervals.timeGranularityType=MONTH"
194
- # f"&timeIntervals.timeRange.start={start_date}"
195
- # # LinkedIn defaults the end of the timeRange to the current time if not specified.
196
- # )
197
- # logging.info(f"Fetching monthly follower gains from URL: {url}")
198
-
199
  now_utc = datetime.now(timezone.utc)
200
  start_of_reporting_period = (now_utc - timedelta(days=365)).replace(day=1, hour=0, minute=0, second=0, microsecond=0)
201
  start_ms = int(start_of_reporting_period.timestamp() * 1000)
202
 
203
- base_url = f"{api_rest_base}/organizationalEntityFollowerStatistics"
204
  time_intervals_value = f"(timeRange:(start:{start_ms}),timeGranularityType:MONTH)"
205
 
206
  api_params = {
@@ -214,9 +241,16 @@ def fetch_monthly_follower_gains(session, org_urn, api_rest_base):
214
 
215
  results = []
216
  request_url_for_logging = "Not constructed"
217
- response_obj = None # To store response for logging in broader exception blocks
218
 
219
  try:
 
 
 
 
 
 
 
220
  req = requests.Request('GET', base_url, params=api_params)
221
  prepared_req = session.prepare_request(req)
222
  request_url_for_logging = prepared_req.url
@@ -224,14 +258,14 @@ def fetch_monthly_follower_gains(session, org_urn, api_rest_base):
224
  logging.info(f"Requesting monthly follower gains from URL: {request_url_for_logging}")
225
  logging.debug(f"Request Headers for monthly gains: {json.dumps(dict(prepared_req.headers), indent=2)}")
226
 
227
- response_obj = session.send(prepared_req, timeout=30) # Added timeout
228
  response_obj.raise_for_status()
229
  data = response_obj.json()
230
 
231
-
232
  elements = data.get('elements', [])
 
233
  if not elements:
234
- logging.info(f"No 'elements' found in API response for {org_urn} for start_ms {start_ms}.")
235
 
236
  for item in elements:
237
  time_range = item.get('timeRange', {})
@@ -240,104 +274,128 @@ def fetch_monthly_follower_gains(session, org_urn, api_rest_base):
240
  logging.warning(f"Skipping item due to missing 'start' timestamp: {item}")
241
  continue
242
 
243
- # Convert timestamp (milliseconds) to YYYY-MM-DD date string in UTC
244
  date_obj = datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
245
  date_str = date_obj.strftime('%Y-%m-%d')
246
 
247
  gains = item.get('followerGains', {})
248
- # It's possible 'followerGains' itself is missing or None
249
- if gains is None:
250
- gains = {} # Ensure gains is a dict to prevent error on .get()
251
 
252
  results.append({
253
- 'category_name': date_str, # This is the start date of the month's data
254
  'follower_count_organic': gains.get('organicFollowerGain', 0),
255
  'follower_count_paid': gains.get('paidFollowerGain', 0),
256
  'follower_count_type': 'follower_gains_monthly',
257
  'organization_urn': org_urn
258
  })
259
- logging.info(f"Fetched {len(results)} monthly follower entries for {org_urn} starting from {start_of_period.strftime('%Y-%m-%d')}.")
260
 
261
  except requests.exceptions.HTTPError as http_err:
262
- # More specific error for HTTP errors
263
- code = getattr(http_err.response, 'status_code', 'N/A')
264
- text = getattr(http_err.response, 'text', str(http_err))
 
 
265
  logging.error(f"HTTP error fetching monthly gains for {org_urn}: {code} - {text}")
266
- logging.error(f"Request URL: {request_url_for_logging}")
267
- except requests.exceptions.RequestException as e:
268
- # Catch other request-related errors (e.g., connection issues)
269
- code = getattr(e.response, 'status_code', 'N/A') if e.response is not None else 'N/A'
270
- text = getattr(e.response, 'text', str(e)) if e.response is not None else str(e)
271
- logging.error(f"Error fetching monthly gains for {org_urn}: {code} - {text}")
272
- logging.error(f"Request URL: {request_url_for_logging}")
 
 
 
 
 
 
 
 
 
273
  except Exception as ex:
274
- # Catch any other unexpected errors (e.g., JSON parsing if response is not JSON)
275
- logging.error(f"An unexpected error occurred while fetching monthly gains for {org_urn}: {str(ex)}")
276
- logging.error(f"Request URL: {request_url_for_logging}")
277
-
 
 
 
278
  return results
279
 
280
 
281
  def fetch_follower_demographics(session, org_urn, functions_map, seniorities_map):
282
- """
283
- Fetches current follower demographics, applying Top-N for specified categories.
284
- """
285
  final_demographics_results = []
286
- # Parameters for the main demographics call
287
- params = {
288
- 'q': 'organizationalEntity',
289
- 'organizationalEntity': org_urn
290
- }
291
- url = f"{API_REST_BASE}/organizationalEntityFollowerStatistics"
292
 
293
- logging.info(f"Fetching follower demographics from: {url} for org URN {org_urn} with params: {json.dumps(params)}")
 
 
 
294
 
295
  try:
296
- response = session.get(url, params=params)
297
- response.raise_for_status()
298
- data = response.json()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
299
 
300
  elements = data.get("elements", [])
 
301
  if not elements:
302
- logging.warning(f"No elements found in follower demographics response for {org_urn}.")
303
  return []
304
 
305
- stat_element = elements[0]
 
 
 
306
 
307
  def _get_entries_for_type(raw_items_list, type_name, id_map, id_field_name_in_item, org_urn_val):
308
  current_type_entries = []
309
  if not raw_items_list:
310
  logging.debug(f"No raw items for demographic type '{type_name}' for org {org_urn_val}.")
311
  return current_type_entries
312
-
313
  for item in raw_items_list:
314
  category_name_val = "Unknown"
315
- if type_name == "follower_association":
316
  category_name_val = item.get(id_field_name_in_item, f"Unknown {id_field_name_in_item}")
317
  else:
318
  urn_val = item.get(id_field_name_in_item)
319
  entity_id = _parse_urn_to_id(urn_val)
320
- category_name_val = id_map.get(str(entity_id), f"Unknown {type_name.split('_')[-1].capitalize()} (ID: {entity_id if entity_id else urn_val})")
321
-
 
 
 
 
322
  counts = item.get("followerCounts", {})
323
  organic_count = counts.get("organicFollowerCount", 0)
324
- paid_count = counts.get("paidFollowerCount", 0)
325
-
326
  current_type_entries.append({
327
  "category_name": category_name_val,
328
- "follower_count_organic": organic_count,
329
- "follower_count_paid": paid_count,
330
  "follower_count_type": type_name,
331
  "organization_urn": org_urn_val
332
  })
333
  return current_type_entries
334
-
335
  industry_urns_to_map = [item.get("industry") for item in stat_element.get("followerCountsByIndustry", []) if item.get("industry")]
336
  geo_urns_to_map = [item.get("geo") for item in stat_element.get("followerCountsByGeoCountry", []) if item.get("geo")]
337
-
338
- live_industries_map = get_industries_map(session, industry_urns_to_map)
339
- live_geo_map = get_geo_map(session, geo_urns_to_map)
340
-
341
  demographic_configs = [
342
  {"items_key": "followerCountsBySeniority", "type_name": "follower_seniority", "id_map": seniorities_map, "id_field": "seniority", "top_n": 10},
343
  {"items_key": "followerCountsByFunction", "type_name": "follower_function", "id_map": functions_map, "id_field": "function", "top_n": 10},
@@ -345,74 +403,108 @@ def fetch_follower_demographics(session, org_urn, functions_map, seniorities_map
345
  {"items_key": "followerCountsByGeoCountry", "type_name": "follower_geo", "id_map": live_geo_map, "id_field": "geo", "top_n": 10},
346
  {"items_key": "followerCountsByAssociationType", "type_name": "follower_association", "id_map": {}, "id_field": "associationType", "top_n": None}
347
  ]
348
-
349
  for config in demographic_configs:
350
  raw_items = stat_element.get(config["items_key"], [])
351
  processed_entries = _get_entries_for_type(raw_items, config["type_name"], config["id_map"], config["id_field"], org_urn)
352
-
353
  if config["top_n"] is not None and processed_entries:
354
- for entry in processed_entries:
355
  if not isinstance(entry.get("follower_count_organic"), (int, float)):
 
356
  entry["follower_count_organic"] = 0
357
  sorted_entries = sorted(processed_entries, key=lambda x: x.get("follower_count_organic", 0), reverse=True)
358
  final_demographics_results.extend(sorted_entries[:config["top_n"]])
359
  logging.debug(f"Added top {config['top_n']} for {config['type_name']}. Count: {len(sorted_entries[:config['top_n']])}")
360
- else:
361
- final_demographics_results.extend(processed_entries)
362
  logging.debug(f"Added all for {config['type_name']}. Count: {len(processed_entries)}")
363
-
364
  logging.info(f"Processed follower demographics for {org_urn}. Total entries from all types: {len(final_demographics_results)}")
365
-
366
- except requests.exceptions.RequestException as e:
367
- status_code = getattr(e.response, 'status_code', 'N/A')
368
- error_text = getattr(e.response, 'text', str(e))
369
- logging.error(f"Error fetching follower demographics for {org_urn} (Status: {status_code}): {error_text}")
370
- except json.JSONDecodeError as e:
371
- logging.error(f"Error decoding JSON for follower demographics for {org_urn}: {e}. Response: {response.text if 'response' in locals() else 'N/A'}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
372
  except Exception as e:
373
  logging.error(f"Unexpected error fetching follower demographics for {org_urn}: {e}", exc_info=True)
 
 
 
 
 
374
  return final_demographics_results
375
 
376
  # --- Main Orchestration Function ---
377
 
378
  def get_linkedin_follower_stats(comm_client_id, community_token, org_urn):
379
- """
380
- Main function to fetch all follower statistics (monthly gains and demographics)
381
- and format them for Bubble.
382
- """
 
 
 
 
383
  if not all([comm_client_id, community_token, org_urn]):
384
  logging.error("Client ID, community_token, or Organization URN is missing or empty.")
385
  if not comm_client_id: logging.error("comm_client_id is missing.")
386
- if not community_token: logging.error("community_token is missing or empty.") # This is key
387
  if not org_urn: logging.error("org_urn is missing.")
388
  return []
389
 
390
- token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
391
-
392
  if not token_dict.get('access_token'):
393
  logging.error("Failed to construct a valid token_dict: 'access_token' is empty.")
394
  logging.debug(f"Problematic token_dict: {token_dict}")
395
  return []
396
 
397
- session = None
 
 
398
  try:
399
- session = create_session(comm_client_id, token=token_dict)
400
  session.headers.update({
401
  "X-Restli-Protocol-Version": "2.0.0",
402
- "LinkedIn-Version": LINKEDIN_API_VERSION,
403
- "Accept-Language": "en_US" # Explicitly set for v2 name lookups if not default in session
404
  })
 
 
 
 
 
 
 
 
 
405
  except Exception as e:
406
  logging.error(f"Failed to create session or update headers for org {org_urn}: {e}", exc_info=True)
407
- return []
408
 
409
  logging.info(f"Starting follower stats retrieval for org: {org_urn}")
410
 
411
  functions_map = get_functions_map(session)
412
  seniorities_map = get_seniorities_map(session)
413
 
414
- if not functions_map: logging.warning(f"Functions map is empty for org {org_urn}. Function names might not be resolved.")
415
- if not seniorities_map: logging.warning(f"Seniorities map is empty for org {org_urn}. Seniority names might not be resolved.")
416
 
417
  all_follower_data = []
418
 
@@ -422,6 +514,9 @@ def get_linkedin_follower_stats(comm_client_id, community_token, org_urn):
422
  demographics = fetch_follower_demographics(session, org_urn, functions_map, seniorities_map)
423
  all_follower_data.extend(demographics)
424
 
425
- logging.info(f"Successfully compiled {len(all_follower_data)} total follower stat entries for {org_urn}.")
426
- return all_follower_data
427
-
 
 
 
 
 
1
  import json
2
  import requests
3
  import logging
4
  from datetime import datetime, timezone, timedelta
5
+ from urllib.parse import quote, urlencode
6
+ # from dateutil.relativedelta import relativedelta # For more precise month arithmetic if needed
7
 
8
  # Assuming you have a sessions.py with create_session
9
  # If sessions.py or create_session is not found, it will raise an ImportError,
10
  # which is appropriate for a module that depends on it.
11
+ from sessions import create_session # Make sure this file exists and is correct
12
 
13
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
14
 
 
21
  def _fetch_linkedin_names(session, url, params, result_key_path, name_key_path, id_key="id"):
22
  """
23
  Generic helper to fetch and map IDs to names from a LinkedIn API endpoint.
 
 
24
  """
25
  mapping = {}
26
+ request_url_for_logging = url
27
+ response_obj = None
28
  try:
29
+ logging.debug(f"_fetch_linkedin_names: About to prepare request. Session token: {session.token}")
30
+ req = requests.Request('GET', url, params=params)
31
+ prepared_req = session.prepare_request(req)
32
+ request_url_for_logging = prepared_req.url
33
+
34
+ logging.debug(f"Fetching names from URL: {request_url_for_logging}")
35
+ logging.debug(f"Request headers for _fetch_linkedin_names: {json.dumps(dict(prepared_req.headers), indent=2)}")
36
+
37
+ response_obj = session.send(prepared_req, timeout=30)
38
+ response_obj.raise_for_status()
39
+ data = response_obj.json()
40
 
41
  items = data
42
+ for key in result_key_path:
43
  if isinstance(items, dict):
44
+ items = items.get(key)
45
+ if items is None:
46
+ logging.warning(f"Key '{key}' not found in response from {request_url_for_logging} at path {result_key_path}. Response data: {json.dumps(data, indent=2)}")
47
+ return mapping
48
+ else:
49
+ logging.warning(f"Expected dict to get key '{key}' but got {type(items)} at path {result_key_path} for URL {request_url_for_logging}. Check result_key_path.")
50
+ return mapping
51
+
52
+ if items is None:
53
+ logging.warning(f"Items became None after navigating result_key_path for URL {request_url_for_logging}. Path: {result_key_path}")
54
+ return mapping
55
 
56
  if isinstance(items, dict):
57
  for item_id_str, item_data in items.items():
58
  name = item_data
59
+ for key_nav in name_key_path:
60
  if isinstance(name, dict):
61
  name = name.get(key_nav)
62
  else:
63
+ name = None
64
  break
65
  if name:
66
+ mapping[item_id_str] = name
67
  else:
68
+ logging.warning(f"No name found for ID {item_id_str} at path {name_key_path} in item: {item_data} from URL {request_url_for_logging}")
69
  elif isinstance(items, list):
70
  for item in items:
71
  item_id_val = item.get(id_key)
72
  name = item
73
+ for key_nav in name_key_path:
74
  if isinstance(name, dict):
75
  name = name.get(key_nav)
76
  else:
77
+ name = None
78
  break
79
  if item_id_val is not None and name:
80
+ mapping[str(item_id_val)] = name
81
  else:
82
+ if item_id_val is None:
83
+ logging.warning(f"No ID ('{id_key}') found in item: {item} from URL {request_url_for_logging}")
84
+ if name is None:
85
+ logging.warning(f"No name found at path {name_key_path} for item with ID '{item_id_val}' in item: {item} from URL {request_url_for_logging}")
86
  else:
87
+ logging.warning(f"Expected list or dict of items at {result_key_path} from URL {request_url_for_logging}, got {type(items)}. Full items: {items}")
88
+
89
+ except requests.exceptions.HTTPError as http_err:
90
+ status_code = "N/A"; error_text = str(http_err); response_headers = {}
91
+ if http_err.response is not None:
92
+ status_code = http_err.response.status_code
93
+ error_text = http_err.response.text
94
+ response_headers = dict(http_err.response.headers)
95
+ logging.error(f"HTTP error in _fetch_linkedin_names from {request_url_for_logging} (Status: {status_code}): {error_text}")
96
+ logging.error(f"Response Headers: {json.dumps(response_headers, indent=2)}")
97
+ except requests.exceptions.RequestException as req_err:
98
+ logging.error(f"Request error in _fetch_linkedin_names from {request_url_for_logging}: {str(req_err)}")
99
+ if req_err.response is not None:
100
+ logging.error(f"Associated Response Status: {req_err.response.status_code}, Text: {req_err.response.text}, Headers: {json.dumps(dict(req_err.response.headers), indent=2)}")
101
+ except json.JSONDecodeError as json_err:
102
+ response_text = "Not available"
103
+ if response_obj is not None and hasattr(response_obj, 'text'): response_text = response_obj.text
104
+ logging.error(f"Error decoding JSON for names from {request_url_for_logging}: {json_err}. Response text: {response_text}")
105
  except Exception as e:
106
+ logging.error(f"Unexpected error in _fetch_linkedin_names from {request_url_for_logging}: {e}", exc_info=True)
107
+ if response_obj is not None:
108
+ logging.error(f"Associated Response (if any) Status: {getattr(response_obj, 'status_code', 'N/A')}, Text: {getattr(response_obj, 'text', 'N/A')}")
109
  return mapping
110
 
111
  def get_functions_map(session):
 
112
  url = f"{API_V2_BASE}/functions"
113
+ params = {'count': 200}
114
  logging.info("Fetching all LinkedIn functions.")
115
  return _fetch_linkedin_names(session, url, params, ["elements"], ["name", "localized", "en_US"], "id")
116
 
117
  def get_seniorities_map(session):
 
118
  url = f"{API_V2_BASE}/seniorities"
119
+ params = {'count': 200}
120
  logging.info("Fetching all LinkedIn seniorities.")
121
  return _fetch_linkedin_names(session, url, params, ["elements"], ["name", "localized", "en_US"], "id")
122
 
123
  def get_industries_map(session, industry_urns, version="DEFAULT"):
 
 
124
  industry_ids = [_parse_urn_to_id(urn) for urn in industry_urns or []]
125
  unique_ids = set(filter(None, industry_ids))
126
  if not unique_ids:
127
  return {}
128
 
 
129
  url = f"{API_V2_BASE}/industryTaxonomyVersions/{version}/industries"
130
+ params = { 'start': 0, 'count': 500 }
131
+ request_url_for_logging = url
132
+ response_obj = None
133
+ logging.info(f"Fetching all industries (to filter {len(unique_ids)} IDs)")
 
 
134
  try:
135
+ logging.debug(f"get_industries_map: About to prepare request. Session token: {session.token}")
136
+ req = requests.Request('GET', url, params=params)
137
+ prepared_req = session.prepare_request(req)
138
+ request_url_for_logging = prepared_req.url
139
+ logging.debug(f"Requesting all industries from URL: {request_url_for_logging}")
140
+ logging.debug(f"Request headers for get_industries_map: {json.dumps(dict(prepared_req.headers), indent=2)}")
141
+
142
+ response_obj = session.send(prepared_req, timeout=30)
143
+ response_obj.raise_for_status()
144
+ data = response_obj.json()
145
  elements = data.get('elements', [])
146
 
147
  mapping = {}
148
+ # ... (rest of the function)
149
  for el in elements:
150
  el_id = el.get('id')
151
  if el_id and str(el_id) in unique_ids:
152
+ name = el.get('name', {}).get('localized', {}).get('en_US')
 
 
 
153
  if name:
154
  mapping[str(el_id)] = name
155
  else:
156
+ logging.warning(f"Industry {el_id} has no en_US name field in element: {el} from URL {request_url_for_logging}")
157
  return mapping
158
+ except requests.exceptions.HTTPError as http_err:
159
+ status_code = "N/A"; error_text = str(http_err); response_headers = {}
160
+ if http_err.response is not None:
161
+ status_code = http_err.response.status_code
162
+ error_text = http_err.response.text
163
+ response_headers = dict(http_err.response.headers)
164
+ logging.error(f"HTTP error fetching all industries from {request_url_for_logging} (Status: {status_code}): {error_text}")
165
+ logging.error(f"Response Headers: {json.dumps(response_headers, indent=2)}")
166
+ return {}
167
+ except requests.exceptions.RequestException as req_err:
168
+ logging.error(f"Request error fetching all industries from {request_url_for_logging}: {str(req_err)}")
169
+ if req_err.response is not None:
170
+ logging.error(f"Associated Response Status: {req_err.response.status_code}, Text: {req_err.response.text}, Headers: {json.dumps(dict(req_err.response.headers), indent=2)}")
171
+ return {}
172
+ except json.JSONDecodeError as json_err:
173
+ response_text = "Not available"
174
+ if response_obj is not None and hasattr(response_obj, 'text'): response_text = response_obj.text
175
+ logging.error(f"Error decoding JSON for industries from {request_url_for_logging}: {json_err}. Response text: {response_text}")
176
+ return {}
177
+ except Exception as e:
178
+ logging.error(f"Unexpected error fetching all industries from {request_url_for_logging}: {e}", exc_info=True)
179
+ if response_obj is not None:
180
+ logging.error(f"Associated Response (if any) Status: {getattr(response_obj, 'status_code', 'N/A')}, Text: {getattr(response_obj, 'text', 'N/A')}")
181
  return {}
 
 
182
 
183
  def get_geo_map(session, geo_urns):
 
184
  if not geo_urns: return {}
185
  geo_ids = [_parse_urn_to_id(urn) for urn in geo_urns if urn]
186
  unique_ids = list(set(filter(None, geo_ids)))
187
  if not unique_ids: return {}
188
 
189
+ MAX_GEO_IDS_PER_CALL = 100
190
+ all_geo_mappings = {}
 
191
 
192
+ for i in range(0, len(unique_ids), MAX_GEO_IDS_PER_CALL):
193
+ chunk_ids = unique_ids[i:i + MAX_GEO_IDS_PER_CALL]
194
+ if not chunk_ids: continue
 
195
 
196
+ ids_param_value = "List(" + ",".join(map(str, chunk_ids)) + ")"
197
+ locale_param_value = "(language:en,country:US)"
198
+
199
+ url = f"{API_V2_BASE}/geo"
200
+ geo_params = { 'ids': ids_param_value, 'locale': locale_param_value }
201
+
202
+ logging.info(f"Fetching names for {len(chunk_ids)} geo IDs (chunk {i//MAX_GEO_IDS_PER_CALL + 1})")
203
+ # _fetch_linkedin_names will log its own session.token
204
+ chunk_mapping = _fetch_linkedin_names(session, url, geo_params, ["results"], ["defaultLocalizedName", "value"])
205
+ all_geo_mappings.update(chunk_mapping)
206
+
207
+ return all_geo_mappings
208
 
209
 
210
  def _parse_urn_to_id(urn_string):
 
211
  if not isinstance(urn_string, str):
212
  logging.debug(f"Invalid URN type: {type(urn_string)}, value: {urn_string}. Cannot parse ID.")
213
  return None
214
  try:
215
  return urn_string.split(':')[-1]
216
+ except IndexError:
217
  logging.warning(f"Could not parse ID from URN: {urn_string}")
218
  return None
219
+ except Exception as e:
220
  logging.error(f"Unexpected error parsing URN {urn_string}: {e}")
221
  return None
222
 
223
  # --- Follower Data Fetching Functions ---
224
 
225
+ def fetch_monthly_follower_gains(session, org_urn, api_rest_base_url):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
226
  now_utc = datetime.now(timezone.utc)
227
  start_of_reporting_period = (now_utc - timedelta(days=365)).replace(day=1, hour=0, minute=0, second=0, microsecond=0)
228
  start_ms = int(start_of_reporting_period.timestamp() * 1000)
229
 
230
+ base_url = f"{api_rest_base_url}/organizationalEntityFollowerStatistics"
231
  time_intervals_value = f"(timeRange:(start:{start_ms}),timeGranularityType:MONTH)"
232
 
233
  api_params = {
 
241
 
242
  results = []
243
  request_url_for_logging = "Not constructed"
244
+ response_obj = None
245
 
246
  try:
247
+ # ***** ADDED LOGGING HERE *****
248
+ logging.debug(f"fetch_monthly_follower_gains: About to prepare request. Session token: {session.token}")
249
+ if session.token and 'access_token' in session.token:
250
+ logging.debug(f"fetch_monthly_follower_gains: Access token (partial): {str(session.token['access_token'])[:20]}...")
251
+ else:
252
+ logging.warning("fetch_monthly_follower_gains: session.token is None or 'access_token' key is missing before prepare_request.")
253
+
254
  req = requests.Request('GET', base_url, params=api_params)
255
  prepared_req = session.prepare_request(req)
256
  request_url_for_logging = prepared_req.url
 
258
  logging.info(f"Requesting monthly follower gains from URL: {request_url_for_logging}")
259
  logging.debug(f"Request Headers for monthly gains: {json.dumps(dict(prepared_req.headers), indent=2)}")
260
 
261
+ response_obj = session.send(prepared_req, timeout=30)
262
  response_obj.raise_for_status()
263
  data = response_obj.json()
264
 
 
265
  elements = data.get('elements', [])
266
+ # ... (rest of the function)
267
  if not elements:
268
+ logging.info(f"No 'elements' found in monthly follower gains API response for {org_urn} (start_ms {start_ms}). Response data: {json.dumps(data, indent=2)}")
269
 
270
  for item in elements:
271
  time_range = item.get('timeRange', {})
 
274
  logging.warning(f"Skipping item due to missing 'start' timestamp: {item}")
275
  continue
276
 
 
277
  date_obj = datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
278
  date_str = date_obj.strftime('%Y-%m-%d')
279
 
280
  gains = item.get('followerGains', {})
281
+ if gains is None: gains = {}
 
 
282
 
283
  results.append({
284
+ 'category_name': date_str,
285
  'follower_count_organic': gains.get('organicFollowerGain', 0),
286
  'follower_count_paid': gains.get('paidFollowerGain', 0),
287
  'follower_count_type': 'follower_gains_monthly',
288
  'organization_urn': org_urn
289
  })
290
+ logging.info(f"Fetched {len(results)} monthly follower entries for {org_urn} starting from {start_of_reporting_period.strftime('%Y-%m-%d')}.")
291
 
292
  except requests.exceptions.HTTPError as http_err:
293
+ code = "N/A"; text = str(http_err); resp_headers = {}
294
+ if http_err.response is not None:
295
+ code = http_err.response.status_code
296
+ text = http_err.response.text
297
+ resp_headers = dict(http_err.response.headers)
298
  logging.error(f"HTTP error fetching monthly gains for {org_urn}: {code} - {text}")
299
+ logging.error(f"Request URL was: {request_url_for_logging}")
300
+ logging.error(f"Response Headers: {json.dumps(resp_headers, indent=2)}")
301
+ except requests.exceptions.RequestException as req_err:
302
+ logging.error(f"RequestException fetching monthly gains for {org_urn}: {str(req_err)}")
303
+ logging.error(f"Request URL was: {request_url_for_logging}")
304
+ if req_err.response is not None:
305
+ logging.error(f"Associated Response Status: {req_err.response.status_code}")
306
+ logging.error(f"Associated Response Text: {req_err.response.text}")
307
+ logging.error(f"Associated Response Headers: {json.dumps(dict(req_err.response.headers), indent=2)}")
308
+ except json.JSONDecodeError as json_err:
309
+ response_text = "Not available"
310
+ if response_obj is not None and hasattr(response_obj, 'text'):
311
+ response_text = response_obj.text
312
+ logging.error(f"Error decoding JSON for monthly follower gains for {org_urn}: {json_err}")
313
+ logging.error(f"Request URL was: {request_url_for_logging}")
314
+ logging.error(f"Raw Response Text: {response_text}")
315
  except Exception as ex:
316
+ logging.error(f"An unexpected error occurred while fetching monthly gains for {org_urn}: {str(ex)}", exc_info=True)
317
+ logging.error(f"Request URL was: {request_url_for_logging}")
318
+ if response_obj is not None:
319
+ logging.error(f"Response Status (if available): {getattr(response_obj, 'status_code', 'N/A')}")
320
+ logging.error(f"Response Text (if available): {getattr(response_obj, 'text', 'N/A')}")
321
+ logging.error(f"Response Headers (if available): {json.dumps(dict(getattr(response_obj, 'headers', {})), indent=2)}")
322
+
323
  return results
324
 
325
 
326
  def fetch_follower_demographics(session, org_urn, functions_map, seniorities_map):
 
 
 
327
  final_demographics_results = []
328
+ base_url = f"{API_REST_BASE}/organizationalEntityFollowerStatistics"
329
+ params = { 'q': 'organizationalEntity', 'organizationalEntity': org_urn }
 
 
 
 
330
 
331
+ logging.info(f"Preparing to fetch follower demographics for org URN {org_urn}.")
332
+ logging.debug(f"API Parameters for demographics: {json.dumps(params)}")
333
+ request_url_for_logging = "Not constructed"
334
+ response_obj = None
335
 
336
  try:
337
+ # ***** ADDED LOGGING HERE *****
338
+ logging.debug(f"fetch_follower_demographics: About to prepare request. Session token: {session.token}")
339
+ if session.token and 'access_token' in session.token:
340
+ logging.debug(f"fetch_follower_demographics: Access token (partial): {str(session.token['access_token'])[:20]}...")
341
+ else:
342
+ logging.warning("fetch_follower_demographics: session.token is None or 'access_token' key is missing before prepare_request.")
343
+
344
+ req = requests.Request('GET', base_url, params=params)
345
+ prepared_req = session.prepare_request(req)
346
+ request_url_for_logging = prepared_req.url
347
+
348
+ logging.info(f"Requesting follower demographics from URL: {request_url_for_logging}")
349
+ logging.debug(f"Request Headers for demographics: {json.dumps(dict(prepared_req.headers), indent=2)}")
350
+
351
+ response_obj = session.send(prepared_req, timeout=30)
352
+ response_obj.raise_for_status()
353
+ data = response_obj.json()
354
 
355
  elements = data.get("elements", [])
356
+ # ... (rest of the function)
357
  if not elements:
358
+ logging.warning(f"No elements found in follower demographics response for {org_urn}. Response data: {json.dumps(data, indent=2)}")
359
  return []
360
 
361
+ stat_element = elements[0] if len(elements) > 0 else None
362
+ if not stat_element:
363
+ logging.warning(f"Elements list is empty or stat_element is None in demographics response for {org_urn}. Response data: {json.dumps(data, indent=2)}")
364
+ return []
365
 
366
  def _get_entries_for_type(raw_items_list, type_name, id_map, id_field_name_in_item, org_urn_val):
367
  current_type_entries = []
368
  if not raw_items_list:
369
  logging.debug(f"No raw items for demographic type '{type_name}' for org {org_urn_val}.")
370
  return current_type_entries
 
371
  for item in raw_items_list:
372
  category_name_val = "Unknown"
373
+ if type_name == "follower_association":
374
  category_name_val = item.get(id_field_name_in_item, f"Unknown {id_field_name_in_item}")
375
  else:
376
  urn_val = item.get(id_field_name_in_item)
377
  entity_id = _parse_urn_to_id(urn_val)
378
+ if entity_id and id_map:
379
+ category_name_val = id_map.get(str(entity_id), f"Unknown {type_name.split('_')[-1].capitalize()} (ID: {entity_id})")
380
+ elif urn_val:
381
+ category_name_val = f"Unmapped {type_name.split('_')[-1].capitalize()} (URN: {urn_val})"
382
+ else:
383
+ category_name_val = f"Missing URN for {type_name.split('_')[-1].capitalize()}"
384
  counts = item.get("followerCounts", {})
385
  organic_count = counts.get("organicFollowerCount", 0)
386
+ paid_count = counts.get("paidFollowerCount", 0)
 
387
  current_type_entries.append({
388
  "category_name": category_name_val,
389
+ "follower_count_organic": organic_count,
390
+ "follower_count_paid": paid_count,
391
  "follower_count_type": type_name,
392
  "organization_urn": org_urn_val
393
  })
394
  return current_type_entries
 
395
  industry_urns_to_map = [item.get("industry") for item in stat_element.get("followerCountsByIndustry", []) if item.get("industry")]
396
  geo_urns_to_map = [item.get("geo") for item in stat_element.get("followerCountsByGeoCountry", []) if item.get("geo")]
397
+ live_industries_map = get_industries_map(session, list(set(industry_urns_to_map)))
398
+ live_geo_map = get_geo_map(session, list(set(geo_urns_to_map)))
 
 
399
  demographic_configs = [
400
  {"items_key": "followerCountsBySeniority", "type_name": "follower_seniority", "id_map": seniorities_map, "id_field": "seniority", "top_n": 10},
401
  {"items_key": "followerCountsByFunction", "type_name": "follower_function", "id_map": functions_map, "id_field": "function", "top_n": 10},
 
403
  {"items_key": "followerCountsByGeoCountry", "type_name": "follower_geo", "id_map": live_geo_map, "id_field": "geo", "top_n": 10},
404
  {"items_key": "followerCountsByAssociationType", "type_name": "follower_association", "id_map": {}, "id_field": "associationType", "top_n": None}
405
  ]
 
406
  for config in demographic_configs:
407
  raw_items = stat_element.get(config["items_key"], [])
408
  processed_entries = _get_entries_for_type(raw_items, config["type_name"], config["id_map"], config["id_field"], org_urn)
 
409
  if config["top_n"] is not None and processed_entries:
410
+ for entry in processed_entries:
411
  if not isinstance(entry.get("follower_count_organic"), (int, float)):
412
+ logging.warning(f"Invalid organic follower count for sorting in {config['type_name']}: {entry['follower_count_organic']}. Setting to 0.")
413
  entry["follower_count_organic"] = 0
414
  sorted_entries = sorted(processed_entries, key=lambda x: x.get("follower_count_organic", 0), reverse=True)
415
  final_demographics_results.extend(sorted_entries[:config["top_n"]])
416
  logging.debug(f"Added top {config['top_n']} for {config['type_name']}. Count: {len(sorted_entries[:config['top_n']])}")
417
+ else:
418
+ final_demographics_results.extend(processed_entries)
419
  logging.debug(f"Added all for {config['type_name']}. Count: {len(processed_entries)}")
 
420
  logging.info(f"Processed follower demographics for {org_urn}. Total entries from all types: {len(final_demographics_results)}")
421
+ except requests.exceptions.HTTPError as http_err:
422
+ code = "N/A"; text = str(http_err); resp_headers = {}
423
+ if http_err.response is not None:
424
+ code = http_err.response.status_code
425
+ text = http_err.response.text
426
+ resp_headers = dict(http_err.response.headers)
427
+ logging.error(f"HTTP error fetching follower demographics for {org_urn} (Status: {code}): {text}")
428
+ logging.error(f"Request URL was: {request_url_for_logging}")
429
+ logging.error(f"Response Headers: {json.dumps(resp_headers, indent=2)}")
430
+ except requests.exceptions.RequestException as req_err:
431
+ logging.error(f"RequestException fetching follower demographics for {org_urn}: {str(req_err)}")
432
+ logging.error(f"Request URL was: {request_url_for_logging}")
433
+ if req_err.response is not None:
434
+ logging.error(f"Associated Response Status: {req_err.response.status_code}")
435
+ logging.error(f"Associated Response Text: {req_err.response.text}")
436
+ logging.error(f"Associated Response Headers: {json.dumps(dict(req_err.response.headers), indent=2)}")
437
+ except json.JSONDecodeError as json_err:
438
+ response_text = "Not available"
439
+ if response_obj is not None and hasattr(response_obj, 'text'):
440
+ response_text = response_obj.text
441
+ logging.error(f"Error decoding JSON for follower demographics for {org_urn}: {json_err}")
442
+ logging.error(f"Request URL was: {request_url_for_logging}")
443
+ logging.error(f"Raw Response Text: {response_text}")
444
  except Exception as e:
445
  logging.error(f"Unexpected error fetching follower demographics for {org_urn}: {e}", exc_info=True)
446
+ logging.error(f"Request URL was: {request_url_for_logging}")
447
+ if response_obj is not None:
448
+ logging.error(f"Response Status (if available): {getattr(response_obj, 'status_code', 'N/A')}")
449
+ logging.error(f"Response Text (if available): {getattr(response_obj, 'text', 'N/A')}")
450
+ logging.error(f"Response Headers (if available): {json.dumps(dict(getattr(response_obj, 'headers', {})), indent=2)}")
451
  return final_demographics_results
452
 
453
  # --- Main Orchestration Function ---
454
 
455
  def get_linkedin_follower_stats(comm_client_id, community_token, org_urn):
456
+ logging.info(f"--- Initiating get_linkedin_follower_stats for org: {org_urn} ---")
457
+ logging.debug(f"Received comm_client_id: {comm_client_id}")
458
+ logging.debug(f"Received community_token - Type: {type(community_token)}, IsSet: {bool(community_token)}")
459
+ if isinstance(community_token, str) and len(community_token) > 10:
460
+ logging.debug(f"Received community_token (partial): {community_token[:10]}...")
461
+ elif isinstance(community_token, dict):
462
+ logging.debug(f"Received community_token (dict): { {k: (v[:10] + '...' if k == 'access_token' and isinstance(v, str) and len(v)>10 else v) for k,v in community_token.items()} }")
463
+
464
  if not all([comm_client_id, community_token, org_urn]):
465
  logging.error("Client ID, community_token, or Organization URN is missing or empty.")
466
  if not comm_client_id: logging.error("comm_client_id is missing.")
467
+ if not community_token: logging.error("community_token is missing or empty.")
468
  if not org_urn: logging.error("org_urn is missing.")
469
  return []
470
 
471
+ token_dict = community_token if isinstance(community_token, dict) else {'access_token': str(community_token), 'token_type': 'Bearer'}
472
+
473
  if not token_dict.get('access_token'):
474
  logging.error("Failed to construct a valid token_dict: 'access_token' is empty.")
475
  logging.debug(f"Problematic token_dict: {token_dict}")
476
  return []
477
 
478
+ logging.debug(f"Constructed token_dict for session: { {k: (v[:10] + '...' if k == 'access_token' and isinstance(v, str) and len(v)>10 else v) for k,v in token_dict.items()} }")
479
+
480
+ session = None
481
  try:
482
+ session = create_session(comm_client_id, token=token_dict)
483
  session.headers.update({
484
  "X-Restli-Protocol-Version": "2.0.0",
485
+ "LinkedIn-Version": LINKEDIN_API_VERSION,
486
+ "Accept-Language": "en_US"
487
  })
488
+ logging.info(f"Session created and headers updated for org {org_urn}.")
489
+ # ***** ADDED LOGGING HERE *****
490
+ logging.debug(f"get_linkedin_follower_stats: Session token after creation and header update: {session.token}")
491
+ if session.token and 'access_token' in session.token:
492
+ logging.debug(f"get_linkedin_follower_stats: Access token in session (partial): {str(session.token['access_token'])[:20]}...")
493
+ else:
494
+ logging.warning("get_linkedin_follower_stats: session.token is None or 'access_token' key is missing after session creation.")
495
+
496
+
497
  except Exception as e:
498
  logging.error(f"Failed to create session or update headers for org {org_urn}: {e}", exc_info=True)
499
+ return []
500
 
501
  logging.info(f"Starting follower stats retrieval for org: {org_urn}")
502
 
503
  functions_map = get_functions_map(session)
504
  seniorities_map = get_seniorities_map(session)
505
 
506
+ if not functions_map: logging.warning(f"Functions map is empty for org {org_urn}.")
507
+ if not seniorities_map: logging.warning(f"Seniorities map is empty for org {org_urn}.")
508
 
509
  all_follower_data = []
510
 
 
514
  demographics = fetch_follower_demographics(session, org_urn, functions_map, seniorities_map)
515
  all_follower_data.extend(demographics)
516
 
517
+ if not all_follower_data:
518
+ logging.warning(f"No follower data (gains or demographics) could be compiled for {org_urn}.")
519
+ else:
520
+ logging.info(f"Successfully compiled {len(all_follower_data)} total follower stat entries for {org_urn}.")
521
+
522
+ return all_follower_data