GuglielmoTor commited on
Commit
5183403
Β·
verified Β·
1 Parent(s): c31034c

Update Data_Fetching_and_Rendering.py

Browse files
Files changed (1) hide show
  1. Data_Fetching_and_Rendering.py +76 -17
Data_Fetching_and_Rendering.py CHANGED
@@ -2,6 +2,8 @@ import json
2
  import requests
3
  import html
4
  from datetime import datetime
 
 
5
 
6
  from sessions import create_session
7
  from error_handling import display_error
@@ -9,6 +11,9 @@ from error_handling import display_error
9
  API_V2_BASE = 'https://api.linkedin.com/v2'
10
  API_REST_BASE = "https://api.linkedin.com/rest"
11
 
 
 
 
12
  def fetch_org_urn(comm_client_id, comm_token_dict):
13
  if not comm_token_dict or 'access_token' not in comm_token_dict:
14
  raise ValueError("Marketing token is missing or invalid.")
@@ -38,7 +43,7 @@ def fetch_org_urn(comm_client_id, comm_token_dict):
38
  org = elements[0]
39
  org_urn = org.get('organizationalTarget')
40
  org_name = org.get(next((k for k in org if k.endswith('organizationalTarget~')), {}), {}).get('localizedName')
41
-
42
  if not org_urn or not org_urn.startswith("urn:li:organization:"):
43
  raise ValueError("Invalid Organization URN.")
44
  if not org_name:
@@ -47,15 +52,64 @@ def fetch_org_urn(comm_client_id, comm_token_dict):
47
 
48
  return org_urn, org_name
49
 
50
- def fetch_posts_and_stats(comm_client_id, community_token, count=10):
51
- if not community_token:
52
- raise ValueError("Community token is missing.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53
 
 
54
  token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
55
  session = create_session(comm_client_id, token=token_dict)
56
 
57
  org_urn, org_name = fetch_org_urn(comm_client_id, token_dict)
58
- #org_urn, org_name = "urn:li:organization:19010008", "GRLS"
59
  posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED"
60
 
61
  try:
@@ -67,20 +121,17 @@ def fetch_posts_and_stats(comm_client_id, community_token, count=10):
67
  raise ValueError(f"Failed to fetch posts (Status: {status})") from e
68
 
69
  if not raw_posts:
70
- return [], org_name
71
 
72
  post_urns = [p["id"] for p in raw_posts if ":share:" in p["id"] or ":ugcPost:" in p["id"]]
73
- if not post_urns:
74
- return [], org_name
75
-
76
  stats_map = {}
 
77
  for i in range(0, len(post_urns), 20):
78
  batch = post_urns[i:i+20]
79
  params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn}
80
  for idx, urn in enumerate(batch):
81
  key = f"shares[{idx}]" if ":share:" in urn else f"ugcPosts[{idx}]"
82
  params[key] = urn
83
-
84
  try:
85
  stat_resp = session.get(f"{API_REST_BASE}/organizationalEntityShareStatistics", params=params)
86
  stat_resp.raise_for_status()
@@ -91,6 +142,9 @@ def fetch_posts_and_stats(comm_client_id, community_token, count=10):
91
  except:
92
  continue
93
 
 
 
 
94
  posts = []
95
  for post in raw_posts:
96
  post_id = post.get("id")
@@ -102,19 +156,22 @@ def fetch_posts_and_stats(comm_client_id, community_token, count=10):
102
  text = html.escape(text[:250]).replace("\n", "<br>") + ("..." if len(text) > 250 else "")
103
 
104
  likes = stats.get("likeCount", 0)
105
- comments = stats.get("commentCount", 0)
106
  clicks = stats.get("clickCount", 0)
107
  shares = stats.get("shareCount", 0)
108
  impressions = stats.get("impressionCount", 0)
109
- engagement = stats.get("engagement", likes + comments + clicks + shares) / impressions * 100 if impressions else 0.0
 
 
110
 
111
  posts.append({
112
  "id": post_id, "when": when, "text": text, "likes": likes,
113
- "comments": comments, "clicks": clicks, "shares": shares,
114
- "impressions": impressions, "engagement": f"{engagement:.2f}%"
 
115
  })
116
 
117
- return posts, org_name
118
 
119
  def render_post_cards(posts, org_name):
120
  safe_name = html.escape(org_name or "Your Organization")
@@ -127,14 +184,16 @@ def render_post_cards(posts, org_name):
127
  f"<div style='font-size:0.95em;margin-bottom:12px;max-height:120px;overflow:auto'>{p['text']}</div>"
128
  f"<div style='font-size:0.9em;color:#333;border-top:1px solid #eee;padding-top:10px;'>"
129
  f"πŸ‘οΈ {p['impressions']:,} | πŸ‘ {p['likes']:,} | πŸ’¬ {p['comments']:,} | πŸ”— {p['shares']:,} | πŸ–±οΈ {p['clicks']:,}<br>"
130
- f"<strong>πŸ“ˆ {p['engagement']}</strong></div></div>"
 
 
131
  for p in posts
132
  ]
133
  return f"<h2 style='text-align:center;margin-bottom:20px;'>Recent Posts for {safe_name}</h2><div style='display:flex;flex-wrap:wrap;gap:15px;justify-content:center;'>" + "".join(cards) + "</div>"
134
 
135
  def fetch_and_render_dashboard(comm_client_id, community_token):
136
  try:
137
- posts, org_name = fetch_posts_and_stats(comm_client_id, community_token)
138
  return render_post_cards(posts, org_name)
139
  except Exception as e:
140
  return display_error("Dashboard Error", e).get('value', '<p style="color:red;text-align:center;">❌ An error occurred.</p>')
 
2
  import requests
3
  import html
4
  from datetime import datetime
5
+ from collections import defaultdict
6
+ from transformers import pipeline
7
 
8
  from sessions import create_session
9
  from error_handling import display_error
 
11
  API_V2_BASE = 'https://api.linkedin.com/v2'
12
  API_REST_BASE = "https://api.linkedin.com/rest"
13
 
14
+ # Load sentiment model
15
+ sentiment_pipeline = pipeline("text-classification", model="tabularisai/multilingual-sentiment-analysis")
16
+
17
  def fetch_org_urn(comm_client_id, comm_token_dict):
18
  if not comm_token_dict or 'access_token' not in comm_token_dict:
19
  raise ValueError("Marketing token is missing or invalid.")
 
43
  org = elements[0]
44
  org_urn = org.get('organizationalTarget')
45
  org_name = org.get(next((k for k in org if k.endswith('organizationalTarget~')), {}), {}).get('localizedName')
46
+
47
  if not org_urn or not org_urn.startswith("urn:li:organization:"):
48
  raise ValueError("Invalid Organization URN.")
49
  if not org_name:
 
52
 
53
  return org_urn, org_name
54
 
55
+ def fetch_comments(comm_client_id, token_dict, post_urns, stats_map):
56
+ from requests_oauthlib import OAuth2Session
57
+ linkedin = OAuth2Session(comm_client_id, token=token_dict)
58
+ linkedin.headers.update({'LinkedIn-Version': "202502"})
59
+
60
+ all_comments = {}
61
+ for post_urn in post_urns:
62
+ if stats_map.get(post_urn, {}).get('commentCount', 0) == 0:
63
+ continue
64
+
65
+ try:
66
+ url = f"{API_REST_BASE}/socialActions/{post_urn}/comments"
67
+ response = linkedin.get(url)
68
+ if response.status_code == 200:
69
+ elements = response.json().get('elements', [])
70
+ all_comments[post_urn] = [c.get('message', {}).get('text') for c in elements if c.get('message')]
71
+ else:
72
+ all_comments[post_urn] = []
73
+ except Exception:
74
+ all_comments[post_urn] = []
75
+
76
+ return all_comments
77
+
78
+ def analyze_sentiment(comments_data):
79
+ results = {}
80
+ for post_urn, comments in comments_data.items():
81
+ sentiment_counts = defaultdict(int)
82
+ total = 0
83
+
84
+ for comment in comments:
85
+ if not comment:
86
+ continue
87
+ try:
88
+ result = sentiment_pipeline(comment)
89
+ label = result[0]['label'].upper()
90
+ if label in ['POSITIVE', 'VERY POSITIVE']:
91
+ sentiment_counts['Positive πŸ‘'] += 1
92
+ elif label in ['NEGATIVE', 'VERY NEGATIVE']:
93
+ sentiment_counts['Negative πŸ‘Ž'] += 1
94
+ elif label == 'NEUTRAL':
95
+ sentiment_counts['Neutral 😐'] += 1
96
+ else:
97
+ sentiment_counts['Unknown'] += 1
98
+ total += 1
99
+ except:
100
+ sentiment_counts['Error'] += 1
101
+
102
+ dominant = max(sentiment_counts, key=sentiment_counts.get, default='Neutral 😐')
103
+ percentage = round((sentiment_counts[dominant] / total) * 100, 1) if total else 0.0
104
+ results[post_urn] = {"sentiment": dominant, "percentage": percentage}
105
+
106
+ return results
107
 
108
+ def fetch_posts_and_stats(comm_client_id, community_token, count=10):
109
  token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
110
  session = create_session(comm_client_id, token=token_dict)
111
 
112
  org_urn, org_name = fetch_org_urn(comm_client_id, token_dict)
 
113
  posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED"
114
 
115
  try:
 
121
  raise ValueError(f"Failed to fetch posts (Status: {status})") from e
122
 
123
  if not raw_posts:
124
+ return [], org_name, {}
125
 
126
  post_urns = [p["id"] for p in raw_posts if ":share:" in p["id"] or ":ugcPost:" in p["id"]]
 
 
 
127
  stats_map = {}
128
+
129
  for i in range(0, len(post_urns), 20):
130
  batch = post_urns[i:i+20]
131
  params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn}
132
  for idx, urn in enumerate(batch):
133
  key = f"shares[{idx}]" if ":share:" in urn else f"ugcPosts[{idx}]"
134
  params[key] = urn
 
135
  try:
136
  stat_resp = session.get(f"{API_REST_BASE}/organizationalEntityShareStatistics", params=params)
137
  stat_resp.raise_for_status()
 
142
  except:
143
  continue
144
 
145
+ comments = fetch_comments(comm_client_id, token_dict, post_urns, stats_map)
146
+ sentiments = analyze_sentiment(comments)
147
+
148
  posts = []
149
  for post in raw_posts:
150
  post_id = post.get("id")
 
156
  text = html.escape(text[:250]).replace("\n", "<br>") + ("..." if len(text) > 250 else "")
157
 
158
  likes = stats.get("likeCount", 0)
159
+ comments_count = stats.get("commentCount", 0)
160
  clicks = stats.get("clickCount", 0)
161
  shares = stats.get("shareCount", 0)
162
  impressions = stats.get("impressionCount", 0)
163
+ engagement = stats.get("engagement", likes + comments_count + clicks + shares) / impressions * 100 if impressions else 0.0
164
+
165
+ sentiment_info = sentiments.get(post_id, {"sentiment": "Neutral 😐", "percentage": 0.0})
166
 
167
  posts.append({
168
  "id": post_id, "when": when, "text": text, "likes": likes,
169
+ "comments": comments_count, "clicks": clicks, "shares": shares,
170
+ "impressions": impressions, "engagement": f"{engagement:.2f}%",
171
+ "sentiment": sentiment_info["sentiment"], "sentiment_percent": sentiment_info["percentage"]
172
  })
173
 
174
+ return posts, org_name, sentiments
175
 
176
  def render_post_cards(posts, org_name):
177
  safe_name = html.escape(org_name or "Your Organization")
 
184
  f"<div style='font-size:0.95em;margin-bottom:12px;max-height:120px;overflow:auto'>{p['text']}</div>"
185
  f"<div style='font-size:0.9em;color:#333;border-top:1px solid #eee;padding-top:10px;'>"
186
  f"πŸ‘οΈ {p['impressions']:,} | πŸ‘ {p['likes']:,} | πŸ’¬ {p['comments']:,} | πŸ”— {p['shares']:,} | πŸ–±οΈ {p['clicks']:,}<br>"
187
+ f"<strong>πŸ“ˆ {p['engagement']}</strong><br>"
188
+ f"<span style='color:#444;'>🧠 Sentiment: <strong>{p['sentiment']}</strong> ({p['sentiment_percent']}%)</span>"
189
+ f"</div></div>"
190
  for p in posts
191
  ]
192
  return f"<h2 style='text-align:center;margin-bottom:20px;'>Recent Posts for {safe_name}</h2><div style='display:flex;flex-wrap:wrap;gap:15px;justify-content:center;'>" + "".join(cards) + "</div>"
193
 
194
  def fetch_and_render_dashboard(comm_client_id, community_token):
195
  try:
196
+ posts, org_name, _ = fetch_posts_and_stats(comm_client_id, community_token)
197
  return render_post_cards(posts, org_name)
198
  except Exception as e:
199
  return display_error("Dashboard Error", e).get('value', '<p style="color:red;text-align:center;">❌ An error occurred.</p>')