Spaces:
Running
Running
Create linkedin_follower_stats.py
Browse files- linkedin_follower_stats.py +320 -0
linkedin_follower_stats.py
ADDED
@@ -0,0 +1,320 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
# -- coding: utf-8 --
|
2 |
+
import json
|
3 |
+
import requests
|
4 |
+
import logging
|
5 |
+
from datetime import datetime, timezone
|
6 |
+
from dateutil.relativedelta import relativedelta # For robust month arithmetic
|
7 |
+
from urllib.parse import quote
|
8 |
+
|
9 |
+
# Assuming you have a sessions.py with create_session
|
10 |
+
# If sessions.py or create_session is not found, it will raise an ImportError,
|
11 |
+
# which is appropriate for a module that depends on it.
|
12 |
+
from sessions import create_session
|
13 |
+
|
14 |
+
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
15 |
+
|
16 |
+
API_V2_BASE = 'https://api.linkedin.com/v2'
|
17 |
+
API_REST_BASE = "https://api.linkedin.com/rest"
|
18 |
+
LINKEDIN_API_VERSION = "202502" # As per user's example for follower stats
|
19 |
+
|
20 |
+
# --- ID to Name Mapping Helper Functions ---
|
21 |
+
|
22 |
+
def _fetch_linkedin_names(session, url, params, result_key_path, name_key_path, id_key="id"):
|
23 |
+
"""
|
24 |
+
Generic helper to fetch and map IDs to names from a LinkedIn API endpoint.
|
25 |
+
result_key_path: list of keys to navigate to the list of items (e.g., ["elements"])
|
26 |
+
name_key_path: list of keys to navigate to the name within an item (e.g., ["name", "localized", "en_US"])
|
27 |
+
|
28 |
+
Revised: Removed locale_needed parameter; calling functions should provide locale in params if required.
|
29 |
+
"""
|
30 |
+
mapping = {}
|
31 |
+
try:
|
32 |
+
logging.debug(f"Fetching names from URL: {url} with params: {params}")
|
33 |
+
response = session.get(url, params=params)
|
34 |
+
response.raise_for_status()
|
35 |
+
data = response.json()
|
36 |
+
|
37 |
+
items = data
|
38 |
+
for key in result_key_path: # Navigate to the list/dict of items
|
39 |
+
if isinstance(items, dict):
|
40 |
+
items = items.get(key, []) # Default to empty list if key not found
|
41 |
+
else: # If items is already not a dict
|
42 |
+
logging.warning(f"Expected dict to get key '{key}' but got {type(items)} at path {result_key_path} for URL {url}. Check result_key_path.")
|
43 |
+
return mapping # Cannot proceed with this path
|
44 |
+
|
45 |
+
if isinstance(items, dict): # For batch responses like geo/industry (where keys are IDs)
|
46 |
+
for item_id_str, item_data in items.items():
|
47 |
+
name = item_data
|
48 |
+
for key_nav in name_key_path: # Navigate to the name string
|
49 |
+
if isinstance(name, dict):
|
50 |
+
name = name.get(key_nav)
|
51 |
+
else:
|
52 |
+
name = None # Path broken
|
53 |
+
break
|
54 |
+
if name:
|
55 |
+
mapping[item_id_str] = name
|
56 |
+
else:
|
57 |
+
logging.warning(f"No name found for ID {item_id_str} at path {name_key_path} in item: {item_data} from URL {url}")
|
58 |
+
elif isinstance(items, list): # For list responses like functions/seniorities
|
59 |
+
for item in items:
|
60 |
+
item_id_val = item.get(id_key)
|
61 |
+
name = item
|
62 |
+
for key_nav in name_key_path: # Navigate to the name string
|
63 |
+
if isinstance(name, dict):
|
64 |
+
name = name.get(key_nav)
|
65 |
+
else:
|
66 |
+
name = None # Path broken
|
67 |
+
break
|
68 |
+
if item_id_val is not None and name:
|
69 |
+
mapping[str(item_id_val)] = name # Ensure ID is string for consistency
|
70 |
+
else:
|
71 |
+
logging.warning(f"No ID ('{id_key}') or name found at path {name_key_path} in item: {item} from URL {url}")
|
72 |
+
else:
|
73 |
+
logging.warning(f"Expected list or dict of items at {result_key_path} from URL {url}, got {type(items)}")
|
74 |
+
|
75 |
+
except requests.exceptions.RequestException as e:
|
76 |
+
status_code = getattr(e.response, 'status_code', 'N/A')
|
77 |
+
error_text = getattr(e.response, 'text', str(e))
|
78 |
+
logging.error(f"Error fetching names from {url} (Status: {status_code}): {error_text}")
|
79 |
+
except json.JSONDecodeError as e:
|
80 |
+
logging.error(f"Error decoding JSON for names from {url}: {e}. Response: {response.text if 'response' in locals() else 'N/A'}")
|
81 |
+
except Exception as e:
|
82 |
+
logging.error(f"Unexpected error fetching names from {url}: {e}", exc_info=True)
|
83 |
+
return mapping
|
84 |
+
|
85 |
+
def get_functions_map(session):
|
86 |
+
"""Fetches all LinkedIn functions and returns a map of {id: name}."""
|
87 |
+
url = f"{API_V2_BASE}/functions"
|
88 |
+
params = {'locale': 'en_US'}
|
89 |
+
logging.info("Fetching all LinkedIn functions.")
|
90 |
+
return _fetch_linkedin_names(session, url, params, ["elements"], ["name", "localized", "en_US"], "id")
|
91 |
+
|
92 |
+
def get_seniorities_map(session):
|
93 |
+
"""Fetches all LinkedIn seniorities and returns a map of {id: name}."""
|
94 |
+
url = f"{API_V2_BASE}/seniorities"
|
95 |
+
params = {'locale': 'en_US'}
|
96 |
+
logging.info("Fetching all LinkedIn seniorities.")
|
97 |
+
return _fetch_linkedin_names(session, url, params, ["elements"], ["name", "localized", "en_US"], "id")
|
98 |
+
|
99 |
+
def get_industries_map(session, industry_urns, version="DEFAULT"):
|
100 |
+
"""Fetches names for a list of industry URNs. Returns a map {id: name}."""
|
101 |
+
if not industry_urns: return {}
|
102 |
+
industry_ids = [_parse_urn_to_id(urn) for urn in industry_urns if urn]
|
103 |
+
unique_ids = list(set(filter(None, industry_ids))) # Filter out None IDs from parsing
|
104 |
+
if not unique_ids: return {}
|
105 |
+
|
106 |
+
url = f"{API_V2_BASE}/industryTaxonomyVersions/{version}/industries"
|
107 |
+
# LinkedIn API for batch industries expects ids as repeated query parameters: ids=1&ids=23
|
108 |
+
# The requests library handles lists in params by creating repeated query parameters.
|
109 |
+
params = {'ids': unique_ids, 'locale.language': 'en', 'locale.country': 'US'}
|
110 |
+
logging.info(f"Fetching names for {len(unique_ids)} unique industry IDs.")
|
111 |
+
return _fetch_linkedin_names(session, url, params, ["results"], ["name", "localized", "en_US"])
|
112 |
+
|
113 |
+
|
114 |
+
def get_geo_map(session, geo_urns):
|
115 |
+
"""Fetches names for a list of geo URNs. Returns a map {id: name}."""
|
116 |
+
if not geo_urns: return {}
|
117 |
+
geo_ids = [_parse_urn_to_id(urn) for urn in geo_urns if urn]
|
118 |
+
unique_ids = list(set(filter(None, geo_ids)))
|
119 |
+
if not unique_ids: return {}
|
120 |
+
|
121 |
+
# API expects ids=List(123,456) format in query string.
|
122 |
+
ids_param_value = "List(" + ",".join(map(str,unique_ids)) + ")" # Ensure IDs are strings
|
123 |
+
# Parameters are embedded in the URL for this specific format
|
124 |
+
# Note: locale params are added here directly as part of the URL construction for this specific endpoint style.
|
125 |
+
url = f"{API_V2_BASE}/geo?ids={quote(ids_param_value)}&locale.language=en&locale.country=US"
|
126 |
+
logging.info(f"Fetching names for {len(unique_ids)} unique geo IDs using URL: {url}")
|
127 |
+
# Params dict is empty as all params are in the URL string for this call.
|
128 |
+
return _fetch_linkedin_names(session, url, {}, ["results"], ["defaultLocalizedName", "value"])
|
129 |
+
|
130 |
+
|
131 |
+
def _parse_urn_to_id(urn_string):
|
132 |
+
"""Helper to get the last part (ID) from a URN string."""
|
133 |
+
if not isinstance(urn_string, str):
|
134 |
+
logging.warning(f"Invalid URN type: {type(urn_string)}, value: {urn_string}")
|
135 |
+
return None
|
136 |
+
try:
|
137 |
+
return urn_string.split(':')[-1]
|
138 |
+
except IndexError: # Handle cases where split doesn't yield enough parts
|
139 |
+
logging.warning(f"Could not parse ID from URN: {urn_string}")
|
140 |
+
return None
|
141 |
+
except Exception as e:
|
142 |
+
logging.error(f"Unexpected error parsing URN {urn_string}: {e}")
|
143 |
+
return None
|
144 |
+
|
145 |
+
# --- Follower Data Fetching Functions ---
|
146 |
+
|
147 |
+
def fetch_monthly_follower_gains(session, org_urn):
|
148 |
+
"""
|
149 |
+
Fetches monthly follower gains for the last 12-13 months to ensure full coverage.
|
150 |
+
"""
|
151 |
+
results = []
|
152 |
+
now = datetime.now(timezone.utc)
|
153 |
+
# Go back 13 months to ensure we capture at least 12 full previous months
|
154 |
+
thirteen_months_ago = now - relativedelta(months=13)
|
155 |
+
start_of_period = thirteen_months_ago.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
156 |
+
start_ms = int(start_of_period.timestamp() * 1000)
|
157 |
+
|
158 |
+
url = (
|
159 |
+
f"{API_REST_BASE}/organizationalEntityFollowerStatistics"
|
160 |
+
f"?q=organizationalEntity"
|
161 |
+
f"&organizationalEntity={quote(org_urn)}"
|
162 |
+
f"&timeIntervals.timeGranularityType=MONTH"
|
163 |
+
f"&timeIntervals.timeRange.start={start_ms}"
|
164 |
+
)
|
165 |
+
logging.info(f"Fetching monthly follower gains from: {url}")
|
166 |
+
|
167 |
+
try:
|
168 |
+
response = session.get(url)
|
169 |
+
response.raise_for_status()
|
170 |
+
data = response.json()
|
171 |
+
|
172 |
+
for item in data.get("elements", []):
|
173 |
+
time_range = item.get("timeRange", {})
|
174 |
+
start_timestamp_ms = time_range.get("start")
|
175 |
+
if start_timestamp_ms is None:
|
176 |
+
logging.warning("Skipping item due to missing start timestamp in monthly gains.")
|
177 |
+
continue
|
178 |
+
|
179 |
+
date_obj = datetime.fromtimestamp(start_timestamp_ms / 1000, tz=timezone.utc)
|
180 |
+
date_str = date_obj.strftime('%Y-%m-%d') # First day of the month
|
181 |
+
|
182 |
+
follower_gains = item.get("followerGains", {})
|
183 |
+
organic_gain = follower_gains.get("organicFollowerGain", 0)
|
184 |
+
paid_gain = follower_gains.get("paidFollowerGain", 0)
|
185 |
+
|
186 |
+
results.append({
|
187 |
+
"category_name": date_str,
|
188 |
+
"follower_count_organic": organic_gain,
|
189 |
+
"follower_count_paid": paid_gain,
|
190 |
+
"follower_count_type": "follower_gains_monthly",
|
191 |
+
"organization_urn": org_urn # Add org_urn for consistency
|
192 |
+
})
|
193 |
+
logging.info(f"Fetched {len(results)} monthly follower gain entries for org URN {org_urn}.")
|
194 |
+
except requests.exceptions.RequestException as e:
|
195 |
+
status_code = getattr(e.response, 'status_code', 'N/A')
|
196 |
+
error_text = getattr(e.response, 'text', str(e))
|
197 |
+
logging.error(f"Error fetching monthly follower gains for {org_urn} (Status: {status_code}): {error_text}")
|
198 |
+
except json.JSONDecodeError as e:
|
199 |
+
logging.error(f"Error decoding JSON for monthly follower gains for {org_urn}: {e}. Response: {response.text if 'response' in locals() else 'N/A'}")
|
200 |
+
except Exception as e:
|
201 |
+
logging.error(f"Unexpected error fetching monthly follower gains for {org_urn}: {e}", exc_info=True)
|
202 |
+
return results
|
203 |
+
|
204 |
+
|
205 |
+
def fetch_follower_demographics(session, org_urn, functions_map, seniorities_map):
|
206 |
+
"""
|
207 |
+
Fetches current follower demographics (seniority, industry, function, geo, association).
|
208 |
+
"""
|
209 |
+
results = []
|
210 |
+
url = (
|
211 |
+
f"{API_REST_BASE}/organizationalEntityFollowerStatistics"
|
212 |
+
f"?q=organizationalEntity&organizationalEntity={quote(org_urn)}"
|
213 |
+
)
|
214 |
+
logging.info(f"Fetching follower demographics from: {url} for org URN {org_urn}")
|
215 |
+
|
216 |
+
try:
|
217 |
+
response = session.get(url)
|
218 |
+
response.raise_for_status()
|
219 |
+
data = response.json()
|
220 |
+
|
221 |
+
elements = data.get("elements", [])
|
222 |
+
if not elements:
|
223 |
+
logging.warning(f"No elements found in follower demographics response for {org_urn}.")
|
224 |
+
return []
|
225 |
+
|
226 |
+
stat_element = elements[0] # Data is usually in the first element
|
227 |
+
|
228 |
+
# Collect URNs for batch mapping
|
229 |
+
industry_urns_to_map = [item.get("industry") for item in stat_element.get("followerCountsByIndustry", []) if item.get("industry")]
|
230 |
+
geo_urns_to_map = [item.get("geo") for item in stat_element.get("followerCountsByGeoCountry", []) if item.get("geo")]
|
231 |
+
|
232 |
+
industries_map = get_industries_map(session, industry_urns_to_map)
|
233 |
+
geo_map = get_geo_map(session, geo_urns_to_map)
|
234 |
+
|
235 |
+
# Helper to create demographic entries
|
236 |
+
def _add_demographic_entry(items_list, type_name, id_map, id_field_name, org_urn_val):
|
237 |
+
if not items_list:
|
238 |
+
logging.info(f"No items found for demographic type '{type_name}' for org {org_urn_val}.")
|
239 |
+
return
|
240 |
+
|
241 |
+
for item in items_list:
|
242 |
+
category_name_val = "Unknown"
|
243 |
+
if type_name == "follower_association": # associationType is directly the name
|
244 |
+
category_name_val = item.get("associationType", f"Unknown AssociationType")
|
245 |
+
else: # For URN-based categories
|
246 |
+
urn_val = item.get(id_field_name)
|
247 |
+
entity_id = _parse_urn_to_id(urn_val)
|
248 |
+
category_name_val = id_map.get(str(entity_id), f"Unknown {type_name.split('_')[-1].capitalize()} (ID: {entity_id if entity_id else urn_val})")
|
249 |
+
|
250 |
+
counts = item.get("followerCounts", {})
|
251 |
+
results.append({
|
252 |
+
"category_name": category_name_val,
|
253 |
+
"follower_count_organic": counts.get("organicFollowerCount", 0),
|
254 |
+
"follower_count_paid": counts.get("paidFollowerCount", 0),
|
255 |
+
"follower_count_type": type_name,
|
256 |
+
"organization_urn": org_urn_val
|
257 |
+
})
|
258 |
+
|
259 |
+
_add_demographic_entry(stat_element.get("followerCountsByAssociationType", []), "follower_association", {}, "associationType", org_urn)
|
260 |
+
_add_demographic_entry(stat_element.get("followerCountsBySeniority", []), "follower_seniority", seniorities_map, "seniority", org_urn)
|
261 |
+
_add_demographic_entry(stat_element.get("followerCountsByFunction", []), "follower_function", functions_map, "function", org_urn)
|
262 |
+
_add_demographic_entry(stat_element.get("followerCountsByIndustry", []), "follower_industry", industries_map, "industry", org_urn)
|
263 |
+
_add_demographic_entry(stat_element.get("followerCountsByGeoCountry", []), "follower_geo", geo_map, "geo", org_urn)
|
264 |
+
|
265 |
+
logging.info(f"Processed follower demographics for {org_urn}. Total entries from this type: {len(results)}")
|
266 |
+
|
267 |
+
except requests.exceptions.RequestException as e:
|
268 |
+
status_code = getattr(e.response, 'status_code', 'N/A')
|
269 |
+
error_text = getattr(e.response, 'text', str(e))
|
270 |
+
logging.error(f"Error fetching follower demographics for {org_urn} (Status: {status_code}): {error_text}")
|
271 |
+
except json.JSONDecodeError as e:
|
272 |
+
logging.error(f"Error decoding JSON for follower demographics for {org_urn}: {e}. Response: {response.text if 'response' in locals() else 'N/A'}")
|
273 |
+
except Exception as e:
|
274 |
+
logging.error(f"Unexpected error fetching follower demographics for {org_urn}: {e}", exc_info=True)
|
275 |
+
return results
|
276 |
+
|
277 |
+
# --- Main Orchestration Function ---
|
278 |
+
|
279 |
+
def get_linkedin_follower_stats(comm_client_id, community_token, org_urn):
|
280 |
+
"""
|
281 |
+
Main function to fetch all follower statistics (monthly gains and demographics)
|
282 |
+
and format them for Bubble.
|
283 |
+
"""
|
284 |
+
if not all([comm_client_id, community_token, org_urn]):
|
285 |
+
logging.error("Client ID, token, or Organization URN is missing for get_linkedin_follower_stats.")
|
286 |
+
return []
|
287 |
+
|
288 |
+
token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
|
289 |
+
|
290 |
+
session = None # Initialize session to None
|
291 |
+
try:
|
292 |
+
session = create_session(comm_client_id, token=token_dict)
|
293 |
+
session.headers.update({
|
294 |
+
"X-Restli-Protocol-Version": "2.0.0",
|
295 |
+
"LinkedIn-Version": LINKEDIN_API_VERSION
|
296 |
+
})
|
297 |
+
except Exception as e:
|
298 |
+
logging.error(f"Failed to create session or update headers for org {org_urn}: {e}", exc_info=True)
|
299 |
+
return [] # Cannot proceed without a session
|
300 |
+
|
301 |
+
logging.info(f"Starting follower stats retrieval for org: {org_urn}")
|
302 |
+
|
303 |
+
# These maps are fetched once per call to get_linkedin_follower_stats
|
304 |
+
functions_map = get_functions_map(session)
|
305 |
+
seniorities_map = get_seniorities_map(session)
|
306 |
+
|
307 |
+
if not functions_map: logging.warning(f"Functions map is empty for org {org_urn}. Function names might not be resolved.")
|
308 |
+
if not seniorities_map: logging.warning(f"Seniorities map is empty for org {org_urn}. Seniority names might not be resolved.")
|
309 |
+
|
310 |
+
all_follower_data = []
|
311 |
+
|
312 |
+
monthly_gains = fetch_monthly_follower_gains(session, org_urn)
|
313 |
+
all_follower_data.extend(monthly_gains)
|
314 |
+
|
315 |
+
demographics = fetch_follower_demographics(session, org_urn, functions_map, seniorities_map)
|
316 |
+
all_follower_data.extend(demographics)
|
317 |
+
|
318 |
+
logging.info(f"Successfully compiled {len(all_follower_data)} total follower stat entries for {org_urn}.")
|
319 |
+
return all_follower_data
|
320 |
+
|