Spaces:
Running
Running
import json | |
import requests | |
from datetime import datetime, timezone, timedelta | |
import matplotlib.pyplot as plt | |
import gradio as gr | |
import traceback | |
import html | |
from sessions import create_session | |
from error_handling import display_error | |
from Data_Fetching_and_Rendering import fetch_posts_and_stats | |
API_V2_BASE = 'https://api.linkedin.com/v2' | |
API_REST_BASE = 'https://api.linkedin.com/rest' | |
def extract_follower_gains(data): | |
elements = data.get("elements", []) | |
if not elements: | |
return [] | |
results = [] | |
for item in elements: | |
start_timestamp = item.get("timeRange", {}).get("start") | |
if not start_timestamp: | |
continue | |
try: | |
date_str = datetime.fromtimestamp(start_timestamp / 1000, tz=timezone.utc).strftime('%Y-%m') | |
except Exception: | |
continue | |
gains = item.get("followerGains", {}) | |
results.append({ | |
"date": date_str, | |
"organic": gains.get("organicFollowerGain", 0) or 0, | |
"paid": gains.get("paidFollowerGain", 0) or 0 | |
}) | |
return sorted(results, key=lambda x: x['date']) | |
def fetch_analytics_data(client_id, token): | |
if not token: | |
raise ValueError("comm_token is missing.") | |
token_dict = token if isinstance(token, dict) else {'access_token': token, 'token_type': 'Bearer'} | |
session = create_session(client_id, token=token_dict) | |
try: | |
org_urn, org_name = "urn:li:organization:19010008", "GRLS" | |
count_url = f"{API_V2_BASE}/networkSizes/{org_urn}?edgeType=CompanyFollowedByMember" | |
follower_count = session.get(count_url).json().get("firstDegreeSize", 0) | |
start = datetime.now(timezone.utc) - timedelta(days=365) | |
start = start.replace(day=1, hour=0, minute=0, second=0, microsecond=0) | |
start_ms = int(start.timestamp() * 1000) | |
gains_url = ( | |
f"{API_REST_BASE}/organizationalEntityFollowerStatistics" | |
f"?q=organizationalEntity&organizationalEntity={org_urn}" | |
f"&timeIntervals.timeGranularityType=MONTH" | |
f"&timeIntervals.timeRange.start={start_ms}" | |
) | |
gains_data = session.get(gains_url).json() | |
gains = extract_follower_gains(gains_data) | |
return org_name, follower_count, gains | |
except requests.exceptions.RequestException as e: | |
status = getattr(e.response, 'status_code', 'N/A') | |
msg = f"Failed to fetch LinkedIn analytics (Status: {status})." | |
raise ValueError(msg) from e | |
except Exception as e: | |
raise ValueError("Unexpected error during LinkedIn analytics fetch.") from e | |
def plot_follower_gains(data): | |
plt.style.use('seaborn-v0_8-whitegrid') | |
if not data: | |
fig, ax = plt.subplots(figsize=(10, 5)) | |
ax.text(0.5, 0.5, 'No follower gains data.', ha='center', va='center', transform=ax.transAxes) | |
ax.set_title('Monthly Follower Gains') | |
ax.set_xticks([]); ax.set_yticks([]) | |
return fig | |
dates = [d['date'] for d in data] | |
organic = [d['organic'] for d in data] | |
paid = [d['paid'] for d in data] | |
fig, ax = plt.subplots(figsize=(12, 6)) | |
ax.plot(dates, organic, label='Organic', marker='o', color='#0073b1') | |
ax.plot(dates, paid, label='Paid', marker='x', linestyle='--', color='#d9534f') | |
ax.set(title='Monthly Follower Gains', xlabel='Month', ylabel='New Followers') | |
ax.tick_params(axis='x', rotation=45) | |
ax.legend() | |
plt.tight_layout() | |
return fig | |
def plot_growth_rate(data, total): | |
if not data: | |
fig, ax = plt.subplots(figsize=(10, 5)) | |
ax.text(0.5, 0.5, 'No data for growth rate.', ha='center', va='center', transform=ax.transAxes) | |
ax.set_title('Growth Rate (%)') | |
ax.set_xticks([]); ax.set_yticks([]) | |
return fig | |
dates = [d['date'] for d in data] | |
gains = [d['organic'] + d['paid'] for d in data] | |
history = [] | |
current = total | |
for g in reversed(gains): | |
history.insert(0, current) | |
current -= g | |
rates = [((history[i] - history[i-1]) / history[i-1] * 100 if history[i-1] else 0) for i in range(1, len(history))] | |
fig, ax = plt.subplots(figsize=(12, 6)) | |
ax.plot(dates[1:], rates, label='Growth Rate (%)', marker='o', color='green') | |
ax.set(title='Monthly Growth Rate (%)', xlabel='Month', ylabel='Growth %') | |
ax.tick_params(axis='x', rotation=45) | |
ax.legend() | |
plt.tight_layout() | |
return fig | |
def compute_monthly_avg_engagement_rate(posts): | |
from collections import defaultdict | |
import statistics | |
if not posts: | |
return [] | |
monthly_data = defaultdict(lambda: {"engagement_sum": 0, "post_count": 0, "impression_total": 0}) | |
for post in posts: | |
try: | |
month = post["when"][:7] # Format: YYYY-MM | |
likes = post.get("likes", 0) | |
comments = post.get("comments", 0) | |
shares = post.get("shares", 0) | |
clicks = post.get("clicks", 0) | |
impressions = post.get("impressions", 0) | |
engagement = likes + comments + shares + clicks | |
monthly_data[month]["engagement_sum"] += engagement | |
monthly_data[month]["post_count"] += 1 | |
monthly_data[month]["impression_total"] += impressions | |
except Exception: | |
continue | |
results = [] | |
for month in sorted(monthly_data.keys()): | |
data = monthly_data[month] | |
if data["post_count"] == 0 or data["impression_total"] == 0: | |
rate = 0 | |
else: | |
avg_impressions = data["impression_total"] / data["post_count"] | |
rate = (data["engagement_sum"] / (data["post_count"] * avg_impressions)) * 100 | |
results.append({"month": month, "engagement_rate": round(rate, 2)}) | |
return results | |
def plot_avg_engagement_rate(data): | |
import matplotlib.pyplot as plt | |
if not data: | |
fig, ax = plt.subplots(figsize=(10, 5)) | |
ax.text(0.5, 0.5, 'No engagement data.', ha='center', va='center', transform=ax.transAxes) | |
ax.set_title('Average Post Engagement Rate (%)') | |
ax.set_xticks([]); ax.set_yticks([]) | |
return fig | |
months = [d["month"] for d in data] | |
rates = [d["engagement_rate"] for d in data] | |
fig, ax = plt.subplots(figsize=(12, 6)) | |
ax.plot(months, rates, label="Engagement Rate (%)", marker="s", color="#ff7f0e") | |
ax.set(title="Average Post Engagement Rate (%)", xlabel="Month", ylabel="Engagement Rate %") | |
ax.tick_params(axis='x', rotation=45) | |
ax.legend() | |
plt.tight_layout() | |
return fig | |
def fetch_and_render_analytics(client_id, token): | |
loading = gr.update(value="<p>Loading follower count...</p>", visible=True) | |
hidden = gr.update(value=None, visible=False) | |
if not token: | |
error = "<p style='color:red;'>❌ Missing token. Please log in.</p>" | |
return gr.update(value=error, visible=True), hidden, hidden | |
try: | |
name, count, gains = fetch_analytics_data(client_id, token) | |
posts, org_name, sentiments = fetch_posts_and_stats(client_id, token, count=30) | |
engagement_data = compute_monthly_avg_engagement_rate(posts) | |
count_html = f""" | |
<div style='text-align:center; padding:20px; background:#e7f3ff; border:1px solid #bce8f1; border-radius:8px;'> | |
<p style='font-size:1.1em; color:#31708f;'>Total Followers for</p> | |
<p style='font-size:1.4em; font-weight:bold; color:#005a9e;'>{html.escape(name)}</p> | |
<p style='font-size:2.8em; font-weight:bold; color:#0073b1;'>{count:,}</p> | |
<p style='font-size:0.9em; color:#777;'>(As of latest data)</p> | |
</div> | |
""" | |
return gr.update(value=count_html, visible=True), gr.update(value=plot_follower_gains(gains), visible=True), gr.update(value=plot_growth_rate(gains, count), visible=True), gr.update(value=plot_avg_engagement_rate(engagement_data), visible=True) | |
except Exception as e: | |
error = display_error("Analytics load failed.", e).get('value', "<p style='color:red;'>Error loading data.</p>") | |
return gr.update(value=error, visible=True), hidden, hidden | |