Spaces:
Running
Running
File size: 18,310 Bytes
37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 2dfff06 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f023388 37335e7 f1fb052 f023388 f1fb052 f023388 138ba23 f023388 f1fb052 f023388 9a99f17 517dd8f 138ba23 f023388 f1fb052 f023388 f1fb052 f023388 9a99f17 517dd8f f023388 138ba23 f1fb052 f023388 f1fb052 f023388 138ba23 3cff7a2 f023388 f1fb052 f023388 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 |
import json
import requests
import html
from datetime import datetime
from collections import defaultdict
from transformers import pipeline
from sessions import create_session
from error_handling import display_error
from posts_categorization import batch_summarize_and_classify
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
API_V2_BASE = 'https://api.linkedin.com/v2'
API_REST_BASE = "https://api.linkedin.com/rest" # Corrected from API_REST_BASE to API_REST_BASE
# Initialize sentiment pipeline (consider loading it once globally if this module is imported multiple times)
sentiment_pipeline = pipeline("text-classification", model="tabularisai/multilingual-sentiment-analysis")
def fetch_linkedin_posts_core(comm_client_id, community_token, org_urn, count=20):
"""
Fetches raw posts, their basic statistics, and performs summarization/categorization.
Does NOT fetch comments or analyze sentiment.
"""
token_dict = community_token if isinstance(community_token, dict) else {'access_token': community_token, 'token_type': 'Bearer'}
session = create_session(comm_client_id, token=token_dict)
org_name = "GRLS" # Placeholder or fetch if necessary
posts_url = f"{API_REST_BASE}/posts?author={org_urn}&q=author&count={count}&sortBy=LAST_MODIFIED"
logging.info(f"Fetching posts from URL: {posts_url}")
try:
resp = session.get(posts_url)
resp.raise_for_status()
raw_posts_api = resp.json().get("elements", [])
logging.info(f"Fetched {len(raw_posts_api)} raw posts from API.")
except requests.exceptions.RequestException as e:
status = getattr(e.response, 'status_code', 'N/A')
logging.error(f"Failed to fetch posts (Status: {status}): {e}")
raise ValueError(f"Failed to fetch posts (Status: {status})") from e
if not raw_posts_api:
logging.info("No raw posts found.")
return [], {}, org_name
# Filter for valid post types if necessary, e.g., shares or ugcPosts
# post_urns_for_stats = [p["id"] for p in raw_posts_api if ":share:" in p["id"] or ":ugcPost:" in p["id"]]
post_urns_for_stats = [p["id"] for p in raw_posts_api if p.get("id")]
# Prepare texts for summarization/classification
post_texts_for_nlp = []
for p in raw_posts_api:
text_content = p.get("commentary") or \
p.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text", "") or \
"[No text content]"
post_texts_for_nlp.append({"text": text_content, "id": p.get("id")})
logging.info(f"Prepared {len(post_texts_for_nlp)} posts for NLP.")
structured_results_list = batch_summarize_and_classify(post_texts_for_nlp)
# Create a dictionary for easy lookup of structured results by post ID
structured_results_map = {res["id"]: res for res in structured_results_list if "id" in res}
# Fetch statistics
stats_map = {}
if post_urns_for_stats:
for i in range(0, len(post_urns_for_stats), 20): # LinkedIn API often has batch limits
batch_urns = post_urns_for_stats[i:i+20]
params = {'q': 'organizationalEntity', 'organizationalEntity': org_urn}
for idx, urn_str in enumerate(batch_urns):
# Determine if it's a share or ugcPost based on URN structure (simplified)
key_prefix = "shares" if ":share:" in urn_str else "ugcPosts"
params[f"{key_prefix}[{idx}]"] = urn_str
try:
logging.info(f"Fetching stats for batch starting with URN: {batch_urns[0]}")
stat_resp = session.get(f"{API_REST_BASE}/organizationalEntityShareStatistics", params=params)
stat_resp.raise_for_status()
for stat_element in stat_resp.json().get("elements", []):
urn = stat_element.get("share") or stat_element.get("ugcPost")
if urn:
stats_map[urn] = stat_element.get("totalShareStatistics", {})
logging.info(f"Successfully fetched stats for {len(batch_urns)} URNs. Current stats_map size: {len(stats_map)}")
except requests.exceptions.RequestException as e:
logging.warning(f"Failed to fetch stats for a batch: {e}. Response: {e.response.text if e.response else 'No response'}")
# Continue to next batch, some stats might be missing
except json.JSONDecodeError as e:
logging.warning(f"Failed to decode JSON from stats response: {e}. Response: {stat_resp.text if stat_resp else 'No response text'}")
processed_raw_posts = []
for p in raw_posts_api:
post_id = p.get("id")
if not post_id:
logging.warning("Skipping raw post due to missing ID.")
continue
text_content = p.get("commentary") or \
p.get("specificContent", {}).get("com.linkedin.ugc.ShareContent", {}).get("shareCommentaryV2", {}).get("text", "") or \
"[No text content]"
timestamp = p.get("publishedAt") or p.get("createdAt")
published_at_iso = datetime.fromtimestamp(timestamp / 1000).isoformat() if timestamp else None
structured_res = structured_results_map.get(post_id, {"summary": "N/A", "category": "N/A"})
processed_raw_posts.append({
"id": post_id,
"raw_text": text_content,
"summary": structured_res["summary"],
"category": structured_res["category"],
"published_at_timestamp": timestamp,
"published_at_iso": published_at_iso,
# These are placeholders for actual fields from LinkedIn API response. Verify field names.
"author_urn": p.get("author", "urn:li:unknown"), # e.g., "urn:li:person:xxxx" or "urn:li:organization:xxxx"
"is_ad": p.get("isSponsored", False), # LinkedIn might use a different field like 'sponsored' or 'promoted'
"media_type": p.get("mediaCategory", "NONE") # e.g., ARTICLE, IMAGE, VIDEO, NONE
})
logging.info(f"Processed {len(processed_raw_posts)} posts with core data.")
return processed_raw_posts, stats_map, org_name
def fetch_comments(comm_client_id, token_dict, post_urns, stats_map):
"""
Fetches comments for a list of post URNs.
Uses stats_map to potentially skip posts with 0 comments.
"""
from requests_oauthlib import OAuth2Session # Keep import here if OAuth2Session is specific to this
linkedin_session = OAuth2Session(comm_client_id, token=token_dict)
# LinkedIn API versions can change, ensure this is up-to-date.
# Using a recent version like "202402" or as per current LinkedIn docs.
# The user had "202502", which might be a future version. Using a slightly older one for safety.
linkedin_session.headers.update({'LinkedIn-Version': "202405", 'X-Restli-Protocol-Version': '2.0.0'})
all_comments_by_post = {}
logging.info(f"Fetching comments for {len(post_urns)} posts.")
for post_urn in post_urns:
# Optimization: if stats show 0 comments, skip API call for this post's comments
if stats_map.get(post_urn, {}).get('commentCount', 0) == 0:
logging.info(f"Skipping comment fetch for {post_urn} as commentCount is 0 in stats_map.")
all_comments_by_post[post_urn] = []
continue
try:
# According to LinkedIn docs, comments are often under /socialActions/{activityUrn}/comments
# or /commentsV2?q=entity&entity={activityUrn}
# The user's URL was /socialActions/{post_urn}/comments - this seems plausible for URNs like ugcPost URNs.
url = f"{API_REST_BASE}/socialActions/{post_urn}/comments"
logging.debug(f"Fetching comments from URL: {url} for post URN: {post_urn}")
response = linkedin_session.get(url)
if response.status_code == 200:
elements = response.json().get('elements', [])
comments_texts = [
c.get('message', {}).get('text')
for c in elements
if c.get('message') and c.get('message', {}).get('text')
]
all_comments_by_post[post_urn] = comments_texts
logging.info(f"Fetched {len(comments_texts)} comments for {post_urn}.")
elif response.status_code == 403: # Forbidden, often permissions or versioning
logging.warning(f"Forbidden (403) to fetch comments for {post_urn}. URL: {url}. Response: {response.text}")
all_comments_by_post[post_urn] = []
elif response.status_code == 404: # Not found
logging.warning(f"Comments not found (404) for {post_urn}. URL: {url}. Response: {response.text}")
all_comments_by_post[post_urn] = []
else:
logging.error(f"Error fetching comments for {post_urn}. Status: {response.status_code}. Response: {response.text}")
all_comments_by_post[post_urn] = []
except requests.exceptions.RequestException as e:
logging.error(f"RequestException fetching comments for {post_urn}: {e}")
all_comments_by_post[post_urn] = []
except Exception as e: # Catch any other unexpected errors
logging.error(f"Unexpected error fetching comments for {post_urn}: {e}")
all_comments_by_post[post_urn] = []
return all_comments_by_post
def analyze_sentiment(all_comments_data):
"""
Analyzes sentiment for comments grouped by post_urn.
all_comments_data is a dict: {post_urn: [comment_text_1, comment_text_2,...]}
Returns a dict: {post_urn: {"sentiment": "DominantSentiment", "percentage": X.X}}
"""
results_by_post = {}
logging.info(f"Analyzing sentiment for comments from {len(all_comments_data)} posts.")
for post_urn, comments_list in all_comments_data.items():
sentiment_counts = defaultdict(int)
total_valid_comments_for_post = 0
if not comments_list:
results_by_post[post_urn] = {"sentiment": "Neutral π", "percentage": 0.0, "details": sentiment_counts}
continue
for comment_text in comments_list:
if not comment_text or not comment_text.strip(): # Skip empty comments
continue
try:
# The pipeline expects a string or list of strings.
# Ensure comment_text is a string.
analysis_result = sentiment_pipeline(str(comment_text))
label = analysis_result[0]['label'].upper()
if label in ['POSITIVE', 'VERY POSITIVE']:
sentiment_counts['Positive π'] += 1
elif label in ['NEGATIVE', 'VERY NEGATIVE']:
sentiment_counts['Negative π'] += 1
elif label == 'NEUTRAL':
sentiment_counts['Neutral π'] += 1
else: # Other labels from the model
sentiment_counts['Unknown'] += 1
total_valid_comments_for_post += 1
except Exception as e:
logging.error(f"Sentiment analysis failed for comment under {post_urn}: '{comment_text[:50]}...'. Error: {e}")
sentiment_counts['Error'] += 1
if total_valid_comments_for_post > 0:
dominant_sentiment = max(sentiment_counts, key=sentiment_counts.get, default='Neutral π')
percentage = round((sentiment_counts[dominant_sentiment] / total_valid_comments_for_post) * 100, 1)
else: # No valid comments to analyze
dominant_sentiment = 'Neutral π'
percentage = 0.0
if sentiment_counts['Error'] > 0 : # If there were only errors
dominant_sentiment = 'Error'
results_by_post[post_urn] = {
"sentiment": dominant_sentiment,
"percentage": percentage,
"details": dict(sentiment_counts) # Store counts for more detailed reporting if needed
}
logging.debug(f"Sentiment for {post_urn}: {results_by_post[post_urn]}")
return results_by_post
def compile_detailed_posts(processed_raw_posts, stats_map, sentiments_per_post):
"""
Combines processed raw post data with their statistics and overall sentiment.
"""
detailed_post_list = []
logging.info(f"Compiling detailed data for {len(processed_raw_posts)} posts.")
for proc_post in processed_raw_posts:
post_id = proc_post["id"]
stats = stats_map.get(post_id, {})
likes = stats.get("likeCount", 0)
# Use 'commentSummary' from stats for comment count if available, else 'commentCount'
# LinkedIn sometimes has commentSummary.totalComments
comments_stat_count = stats.get("commentSummary", {}).get("totalComments") if "commentSummary" in stats else stats.get("commentCount", 0)
clicks = stats.get("clickCount", 0)
shares = stats.get("shareCount", 0)
impressions = stats.get("impressionCount", 0)
unique_impressions = stats.get("uniqueImpressionsCount", 0) # Ensure this field is in API response
# Calculate engagement: (likes + comments + clicks + shares) / impressions
# Ensure impressions is not zero to avoid DivisionByZeroError
engagement_numerator = likes + comments_stat_count + clicks + shares
engagement_rate = (engagement_numerator / impressions * 100) if impressions else 0.0
sentiment_info = sentiments_per_post.get(post_id, {"sentiment": "Neutral π", "percentage": 0.0})
# Format text for display (escaped and truncated)
display_text = html.escape(proc_post["raw_text"][:250]).replace("\n", "<br>") + \
("..." if len(proc_post["raw_text"]) > 250 else "")
when_formatted = datetime.fromtimestamp(proc_post["published_at_timestamp"] / 1000).strftime("%Y-%m-%d %H:%M") \
if proc_post["published_at_timestamp"] else "Unknown"
detailed_post_list.append({
"id": post_id,
"when": when_formatted,
"text_for_display": display_text, # Shortened, escaped text
"raw_text": proc_post["raw_text"], # Full original text
"likes": likes,
"comments_stat_count": comments_stat_count, # Count from post statistics
"clicks": clicks,
"shares": shares,
"impressions": impressions,
"uniqueImpressionsCount": unique_impressions,
"engagement": f"{engagement_rate:.2f}%", # Formatted string
"engagement_raw": engagement_rate, # Raw float for potential calculations
"sentiment": sentiment_info["sentiment"],
"sentiment_percent": sentiment_info["percentage"],
"sentiment_details": sentiment_info.get("details", {}), # Detailed counts
"summary": proc_post["summary"],
"category": proc_post["category"],
"author_urn": proc_post["author_urn"],
"is_ad": proc_post["is_ad"],
"media_type": proc_post["media_type"],
"published_at": proc_post["published_at_iso"] # ISO format datetime string
})
logging.info(f"Compiled {len(detailed_post_list)} detailed posts.")
return detailed_post_list
def prepare_data_for_bubble(detailed_posts, all_actual_comments_data):
"""
Prepares data lists for uploading to Bubble.
- detailed_posts: List of comprehensively compiled post objects.
- all_actual_comments_data: Dict of {post_urn: [comment_texts]} from fetch_comments.
"""
li_posts = []
li_post_stats = []
li_post_comments = [] # For individual comments
logging.info("Preparing data for Bubble.")
org_urn = detailed_posts[0]["author_urn"]
for post_data in detailed_posts:
# Data for LI_post table in Bubble
li_posts.append({
"author_urn": post_data["author_urn"],
"id": post_data["id"], # Post URN
"is_ad": post_data["is_ad"],
"media_type": post_data["media_type"],
"published_at": post_data["published_at"], # ISO datetime string
"sentiment": post_data["sentiment"], # Overall sentiment of the post based on its comments
"text": post_data["raw_text"], # Storing the full raw text
#"summary_text": post_data["summary"],
"li_eb_label": post_data["category"]
# Add any other fields from post_data needed for LI_post table
})
# Data for LI_post_stats table in Bubble
li_post_stats.append({
"clickCount": post_data["clicks"],
"commentCount": post_data["comments_stat_count"], # From post's own stats
"engagement": post_data["engagement"], # Formatted string e.g., "12.34%"
"impressionCount": post_data["impressions"],
"likeCount": post_data["likes"],
"shareCount": post_data["shares"],
"uniqueImpressionsCount": post_data["uniqueImpressionsCount"],
"post_id": post_data["id"], # Foreign key to LI_post
"organization_urn": org_urn
})
# Data for LI_post_comments table in Bubble (individual comments)
# This iterates through the actual comments fetched, not just the count.
for post_urn, comments_text_list in all_actual_comments_data.items():
for single_comment_text in comments_text_list:
if single_comment_text and single_comment_text.strip(): # Ensure comment text is not empty
li_post_comments.append({
"comment_text": single_comment_text,
"post_id": post_urn, # Foreign key to LI_post
"organization_urn": org_urn
# Could add sentiment per comment here if analyzed at that granularity
})
logging.info(f"Prepared {len(li_posts)} posts, {len(li_post_stats)} stats entries, and {len(li_post_comments)} comments for Bubble.")
return li_posts, li_post_stats, li_post_comments |