GuglielmoTor commited on
Commit
9c2556f
Β·
verified Β·
1 Parent(s): d399f5c

Update Data_Fetching_and_Rendering.py

Browse files
Files changed (1) hide show
  1. Data_Fetching_and_Rendering.py +210 -0
Data_Fetching_and_Rendering.py CHANGED
@@ -1,8 +1,10 @@
1
  import json
2
  import requests
3
  from sessions import create_session
 
4
 
5
  API_V2_BASE = 'https://api.linkedin.com/v2'
 
6
 
7
  def fetch_org_urn(comm_client_id, comm_token_dict):
8
  """
@@ -75,3 +77,211 @@ def fetch_org_urn(comm_client_id, comm_token_dict):
75
 
76
  print(f"Found Org: {org_name} ({org_urn_full})")
77
  return org_urn_full, org_name
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import json
2
  import requests
3
  from sessions import create_session
4
+ import html
5
 
6
  API_V2_BASE = 'https://api.linkedin.com/v2'
7
+ API_REST_BASE = "https://api.linkedin.com/rest"
8
 
9
  def fetch_org_urn(comm_client_id, comm_token_dict):
10
  """
 
77
 
78
  print(f"Found Org: {org_name} ({org_urn_full})")
79
  return org_urn_full, org_name
80
+
81
+
82
+
83
+
84
+ def fetch_posts_and_stats(comm_client_id, community_token, count=10):
85
+ """Fetches posts using Marketing token and stats using Marketing token."""
86
+ print("--- Fetching Posts and Stats ---")
87
+
88
+ if not community_token:
89
+ print("WARN: Community token missing, but not currently used for post/stat fetching.")
90
+ raise ValueError("Community token is missing.") # Don't raise if not needed
91
+
92
+ # Ensure tokens are in the correct format (dict)
93
+ comm_token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'} # Process if needed later
94
+
95
+ ln_comm = create_session(comm_client_id, token=comm_token_dict) # Keep session available if needed
96
+
97
+ # 1) Get Org URN (using Marketing token)
98
+ #org_urn, org_name = fetch_org_urn(comm_token_dict) # Reuses the function
99
+ org_urn, org_name = "urn:li:organization:19010008", "GRLS"
100
+
101
+ # 2) Fetch latest posts (using Marketing Token via REST API)
102
+ # Endpoint requires r_organization_social permission
103
+ posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED"
104
+
105
+ print(f"Attempting to fetch posts from: {posts_url} using Marketing token")
106
+ try:
107
+ resp_posts = ln_comm.get(posts_url)
108
+ print(f"β†’ POSTS Request Headers: {resp_posts.request.headers}")
109
+ print(f"β†’ POSTS Response Status: {resp_posts.status_code}")
110
+ # Limit printing large response bodies
111
+ print(f"β†’ POSTS Response Body (first 500 chars): {resp_posts.text[:500]}")
112
+ resp_posts.raise_for_status()
113
+ print("Fetched posts using Marketing token.")
114
+ except requests.exceptions.RequestException as e:
115
+ status = e.response.status_code if e.response is not None else "N/A"
116
+ details = ""
117
+ if e.response is not None:
118
+ try:
119
+ details = f" Details: {e.response.json()}"
120
+ except json.JSONDecodeError:
121
+ details = f" Response: {e.response.text[:200]}..."
122
+ print(f"ERROR: Fetching posts failed with Marketing token (Status: {status}).{details}")
123
+ raise ValueError(f"Failed to fetch posts using Marketing token (Status: {status}). Check permissions (r_organization_social).") from e
124
+
125
+ raw_posts_data = resp_posts.json()
126
+ raw_posts = raw_posts_data.get("elements", [])
127
+ print(f"Fetched {len(raw_posts)} raw posts.")
128
+
129
+ if not raw_posts:
130
+ return [], org_name # Return empty list and org name if no posts
131
+
132
+ # 3) Extract Post URNs (shares or ugcPosts)
133
+ post_urns = [p.get("id") for p in raw_posts if p.get("id") and (":share:" in p.get("id") or ":ugcPost:" in p.get("id"))]
134
+ if not post_urns:
135
+ print("WARN: No post URNs (share or ugcPost) found in the fetched posts.")
136
+ return [], org_name
137
+
138
+ print(f"Post URNs to fetch stats for: {post_urns}")
139
+
140
+ # 4) Fetch stats (using Comm session via REST API)
141
+ # Endpoint requires r_organization_social permission
142
+ stats_map = {}
143
+ batch_size = 20 # API likely has a limit on number of URNs per request
144
+ urn_batches = [post_urns[i:i + batch_size] for i in range(0, len(post_urns), batch_size)]
145
+
146
+ for batch in urn_batches:
147
+ if not batch: continue
148
+
149
+ stats_url = f"{API_REST_BASE}/organizationalEntityShareStatistics"
150
+ # Parameters need to be structured correctly: q=organizationalEntity, organizationalEntity=orgURN, shares[0]=shareURN1, ugcPosts[0]=ugcURN1 etc.
151
+ params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn}
152
+ share_idx, ugc_idx = 0, 0
153
+ for urn in batch:
154
+ if ':share:' in urn:
155
+ params[f'shares[{share_idx}]'] = urn
156
+ share_idx += 1
157
+ elif ':ugcPost:' in urn:
158
+ params[f'ugcPosts[{ugc_idx}]'] = urn
159
+ ugc_idx += 1
160
+ else:
161
+ print(f"WARN: Skipping unknown URN type for stats: {urn}")
162
+
163
+ if share_idx == 0 and ugc_idx == 0:
164
+ print("WARN: Skipping stats fetch for batch as no valid share/ugcPost URNs found.")
165
+ continue
166
+
167
+ print(f"Fetching stats for batch from: {stats_url} with {len(params)-2} URNs using Marketing token")
168
+ try:
169
+ resp_stats = ln_comm.get(stats_url, params=params)
170
+ print(f"β†’ STATS Request URL: {resp_stats.request.url}") # Log the exact URL called
171
+ print(f"β†’ STATS Request Headers: {resp_stats.request.headers}")
172
+ print(f"β†’ STATS Response Status: {resp_stats.status_code}")
173
+ print(f"β†’ STATS Response Body (first 500 chars): {resp_stats.text[:500]}")
174
+ resp_stats.raise_for_status()
175
+ stats_data = resp_stats.json().get("elements", [])
176
+ print(f"Received {len(stats_data)} stats elements for this batch.")
177
+
178
+ # Map stats back to their URNs
179
+ for elem in stats_data:
180
+ # Key in response is 'share' or 'ugcPost' containing the URN
181
+ urn_key = elem.get('share') or elem.get('ugcPost')
182
+ if urn_key:
183
+ # Store the whole 'totalShareStatistics' object
184
+ stats_map[urn_key] = elem.get('totalShareStatistics', {})
185
+ else:
186
+ print(f"WARN: Stats element missing 'share' or 'ugcPost' key: {elem}")
187
+
188
+
189
+ except requests.exceptions.RequestException as e:
190
+ status = e.response.status_code if e.response is not None else "N/A"
191
+ details = ""
192
+ if e.response is not None:
193
+ try:
194
+ details = f" Details: {e.response.json()}"
195
+ except json.JSONDecodeError:
196
+ details = f" Response: {e.response.text[:200]}..."
197
+ print(f"ERROR fetching stats batch using Marketing token (Status: {status}).{details}")
198
+ print("WARN: Skipping stats for this batch due to error.")
199
+ # Optionally raise an error here if stats are critical, or continue with partial data
200
+ # raise ValueError(f"Failed to fetch stats batch (Status: {status}).") from e
201
+
202
+ print(f"Fetched stats for {len(stats_map)} posts in total.")
203
+
204
+ # 5) Assemble combined post data
205
+ combined_posts = []
206
+ for post in raw_posts:
207
+ post_id = post.get("id")
208
+ if not post_id: continue
209
+
210
+ stats = stats_map.get(post_id, {}) # Get stats dict, default to empty if not found
211
+ published_ts = post.get("publishedAt")
212
+ created_ts = post.get("createdAt")
213
+ # Prefer publishedAt, fallback to createdAt
214
+ timestamp = published_ts or created_ts
215
+ when = datetime.fromtimestamp(timestamp / 1000).strftime("%Y-%m-%d %H:%M") if timestamp else "Unknown Date"
216
+
217
+ # --- Text Extraction Logic ---
218
+ text = ""
219
+ # Priority: REST API 'commentary' field seems most reliable for simple text posts
220
+ commentary_rest = post.get("commentary")
221
+ if commentary_rest:
222
+ text = commentary_rest
223
+ else:
224
+ # Fallback to V2 style fields if REST commentary is missing
225
+ # Check specificContent first (for shares with commentary)
226
+ specific_content = post.get("specificContent", {})
227
+ share_content = specific_content.get("com.linkedin.ugc.ShareContent", {})
228
+ share_commentary_v2 = share_content.get("shareCommentaryV2", {}).get("text")
229
+ if share_commentary_v2:
230
+ text = share_commentary_v2
231
+ else:
232
+ # Check top-level commentaryV2 (less common?)
233
+ commentary_v2 = post.get("commentaryV2", {}).get("text")
234
+ if commentary_v2:
235
+ text = commentary_v2
236
+ else:
237
+ # Check for article titles if it's an article share
238
+ article_content = specific_content.get("com.linkedin.ugc.ArticleContent", {})
239
+ article_title = article_content.get("title")
240
+ if article_title:
241
+ text = f"Article: {article_title}"
242
+ else:
243
+ # Check older 'content' field (might be deprecated)
244
+ content_text = post.get("content", {}).get("text", {}).get("text")
245
+ if content_text:
246
+ text = content_text
247
+ else:
248
+ # Final fallback
249
+ text = "[Media post or share without text]"
250
+
251
+ # Escape and truncate text for HTML display
252
+ display_text = html.escape(text[:250]).replace("\n", "<br>") + ("..." if len(text) > 250 else "")
253
+
254
+ # --- Stats Extraction ---
255
+ # Use .get with default 0 for robustness
256
+ impressions = stats.get("impressionCount", 0) or 0
257
+ likes = stats.get("likeCount", 0) or 0
258
+ comments = stats.get("commentCount", 0) or 0
259
+ clicks = stats.get("clickCount", 0) or 0
260
+ shares = stats.get("shareCount", 0) or 0
261
+
262
+ # Calculate engagement rate manually if 'engagement' field isn't present or reliable
263
+ engagement_num = likes + comments + clicks + shares # Sum of interactions
264
+ engagement_rate_manual = (engagement_num / impressions * 100) if impressions > 0 else 0.0
265
+
266
+ # Check if API provides 'engagement' field (usually rate as decimal)
267
+ engagement_api = stats.get('engagement')
268
+ if engagement_api is not None:
269
+ try:
270
+ # API provides rate as decimal (e.g., 0.02 for 2%)
271
+ engagement_str = f"{float(engagement_api) * 100:.2f}%"
272
+ except (ValueError, TypeError):
273
+ # Fallback to manual calculation if API value is invalid
274
+ engagement_str = f"{engagement_rate_manual:.2f}%"
275
+ else:
276
+ # Use manual calculation if API field is missing
277
+ engagement_str = f"{engagement_rate_manual:.2f}%"
278
+
279
+
280
+ combined_posts.append({
281
+ "id": post_id, "when": when, "text": display_text,
282
+ "likes": likes, "comments": comments, "impressions": impressions,
283
+ "clicks": clicks, "shares": shares, # Added shares to dict
284
+ "engagement": engagement_str,
285
+ })
286
+
287
+ return combined_posts, org_name