Spaces:
Sleeping
Sleeping
| import json | |
| import requests | |
| from sessions import create_session | |
| from datetime import datetime, timezone, timedelta # Added timezone, timedelta | |
| import matplotlib.pyplot as plt # Added for plotting | |
| from error_handling import display_error | |
| import gradio as gr | |
| import traceback | |
| API_V2_BASE = 'https://api.linkedin.com/v2' | |
| API_REST_BASE = "https://api.linkedin.com/rest" | |
| def extract_follower_gains(data): | |
| """Extracts monthly follower gains from API response.""" | |
| results = [] | |
| print(f"Raw gains data received for extraction: {json.dumps(data, indent=2)}") # Debug print | |
| elements = data.get("elements", []) | |
| if not elements: | |
| print("Warning: No 'elements' found in follower statistics response.") | |
| return [] | |
| for item in elements: | |
| time_range = item.get("timeRange", {}) | |
| start_timestamp = time_range.get("start") | |
| if start_timestamp is None: | |
| print("Warning: Skipping item due to missing start timestamp.") | |
| continue | |
| # Convert timestamp to YYYY-MM format for clearer labeling | |
| # Use UTC timezone explicitly | |
| try: | |
| date_obj = datetime.fromtimestamp(start_timestamp / 1000, tz=timezone.utc) | |
| # Format as Year-Month (e.g., 2024-03) | |
| date_str = date_obj.strftime('%Y-%m') | |
| except Exception as time_e: | |
| print(f"Warning: Could not parse timestamp {start_timestamp}. Error: {time_e}. Skipping item.") | |
| continue | |
| follower_gains = item.get("followerGains", {}) | |
| # Handle potential None values from API by defaulting to 0 | |
| organic_gain = follower_gains.get("organicFollowerGain", 0) or 0 | |
| paid_gain = follower_gains.get("paidFollowerGain", 0) or 0 | |
| results.append({ | |
| "date": date_str, # Store simplified date string | |
| "organic": organic_gain, | |
| "paid": paid_gain | |
| }) | |
| print(f"Extracted follower gains (unsorted): {results}") # Debug print | |
| # Sort results by date string to ensure chronological order for plotting | |
| try: | |
| results.sort(key=lambda x: x['date']) | |
| except Exception as sort_e: | |
| print(f"Warning: Could not sort follower gains by date. Error: {sort_e}") | |
| print(f"Extracted follower gains (sorted): {results}") | |
| return results | |
| def fetch_analytics_data(comm_client_id, comm_token): | |
| """Fetches org URN, follower count, and follower gains using the Marketing token.""" | |
| print("--- Fetching Analytics Data ---") | |
| if not comm_token: | |
| raise ValueError("comm_token is missing.") | |
| token_dict = comm_token if isinstance(comm_token, dict) else {'access_token': comm_token, 'token_type': 'Bearer'} | |
| ln_mkt = create_session(comm_client_id, token=token_dict) | |
| try: | |
| # 1. Fetch Org URN and Name | |
| print("Fetching Org URN for analytics...") | |
| # This function already handles errors and raises ValueError | |
| #org_urn, org_name = fetch_org_urn(token_dict) | |
| org_urn, org_name = "urn:li:organization:19010008", "GRLS" | |
| print(f"Analytics using Org: {org_name} ({org_urn})") | |
| # 2. Fetch Follower Count (v2 API) | |
| # Endpoint requires r_organization_social permission | |
| print("Fetching follower count...") | |
| count_url = f"{API_V2_BASE}/networkSizes/{org_urn}?edgeType=CompanyFollowedByMember" | |
| print(f"Requesting follower count from: {count_url}") | |
| resp_count = ln_mkt.get(count_url) | |
| print(f"β COUNT Response Status: {resp_count.status_code}") | |
| print(f"β COUNT Response Body: {resp_count.text}") | |
| resp_count.raise_for_status() # Check for HTTP errors | |
| count_data = resp_count.json() | |
| # The follower count is in 'firstDegreeSize' | |
| follower_count = count_data.get("firstDegreeSize", 0) | |
| print(f"Follower count: {follower_count}") | |
| # 3. Fetch Follower Gains (REST API) | |
| # Endpoint requires r_organization_social permission | |
| print("Fetching follower gains...") | |
| # Calculate start date: 12 months ago, beginning of that month, UTC | |
| now = datetime.now(timezone.utc) | |
| # Go back roughly 365 days | |
| twelve_months_ago = now - timedelta(days=365) | |
| # Set to the first day of that month | |
| start_of_period = twelve_months_ago.replace(day=1, hour=0, minute=0, second=0, microsecond=0) | |
| start_ms = int(start_of_period.timestamp() * 1000) | |
| print(f"Requesting gains starting from: {start_of_period.strftime('%Y-%m-%d %H:%M:%S %Z')} ({start_ms} ms)") | |
| gains_url = ( | |
| f"{API_REST_BASE}/organizationalEntityFollowerStatistics" | |
| f"?q=organizationalEntity" | |
| f"&organizationalEntity={org_urn}" | |
| f"&timeIntervals.timeGranularityType=MONTH" | |
| f"&timeIntervals.timeRange.start={start_ms}" | |
| # No end date needed to get data up to the latest available month | |
| ) | |
| print(f"Requesting gains from: {gains_url}") | |
| resp_gains = ln_mkt.get(gains_url) | |
| print(f"β GAINS Request Headers: {resp_gains.request.headers}") | |
| print(f"β GAINS Response Status: {resp_gains.status_code}") | |
| print(f"β GAINS Response Body (first 500 chars): {resp_gains.text[:500]}") | |
| resp_gains.raise_for_status() # Check for HTTP errors | |
| gains_data = resp_gains.json() | |
| # 4. Process Gains Data using the extraction function | |
| follower_gains_list = extract_follower_gains(gains_data) | |
| # Return all fetched data | |
| return org_name, follower_count, follower_gains_list | |
| except requests.exceptions.RequestException as e: | |
| status = e.response.status_code if e.response is not None else "N/A" | |
| details = "" | |
| if e.response is not None: | |
| try: | |
| details = f" Details: {e.response.json()}" | |
| except json.JSONDecodeError: | |
| details = f" Response: {e.response.text[:200]}..." | |
| print(f"ERROR fetching analytics data (Status: {status}).{details}") | |
| # Re-raise a user-friendly error, including the original exception context | |
| raise ValueError(f"Failed to fetch analytics data from LinkedIn API (Status: {status}). Check permissions (r_organization_social) and API status.") from e | |
| except ValueError as ve: | |
| # Catch ValueErrors raised by fetch_org_urn | |
| print(f"ERROR during analytics data fetch (likely Org URN): {ve}") | |
| raise ve # Re-raise the specific error message | |
| except Exception as e: | |
| print(f"UNEXPECTED ERROR processing analytics data: {e}") | |
| tb = traceback.format_exc() | |
| print(tb) | |
| raise ValueError(f"An unexpected error occurred while fetching or processing analytics data.") from e | |
| def plot_follower_gains(follower_data): | |
| """Generates a matplotlib plot for follower gains. Returns the figure object.""" | |
| print(f"Plotting follower gains data: {follower_data}") | |
| plt.style.use('seaborn-v0_8-whitegrid') # Use a nice style | |
| if not follower_data: | |
| print("No follower data to plot.") | |
| # Create an empty plot with a message | |
| fig, ax = plt.subplots(figsize=(10, 5)) | |
| ax.text(0.5, 0.5, 'No follower gains data available for the last 12 months.', | |
| horizontalalignment='center', verticalalignment='center', | |
| transform=ax.transAxes, fontsize=12, color='grey') | |
| ax.set_title('Monthly Follower Gains (Last 12 Months)') | |
| ax.set_xlabel('Month') | |
| ax.set_ylabel('Follower Gains') | |
| # Remove ticks if there's no data | |
| ax.set_xticks([]) | |
| ax.set_yticks([]) | |
| plt.tight_layout() | |
| return fig # Return the figure object | |
| try: | |
| # Ensure data is sorted by date (should be done in extract_follower_gains, but double-check) | |
| follower_data.sort(key=lambda x: x['date']) | |
| dates = [entry['date'] for entry in follower_data] # Should be 'YYYY-MM' strings | |
| organic_gains = [entry['organic'] for entry in follower_data] | |
| paid_gains = [entry['paid'] for entry in follower_data] | |
| # Create the plot | |
| fig, ax = plt.subplots(figsize=(12, 6)) # Use fig, ax pattern | |
| ax.plot(dates, organic_gains, label='Organic Follower Gain', marker='o', linestyle='-', color='#0073b1') # LinkedIn blue | |
| ax.plot(dates, paid_gains, label='Paid Follower Gain', marker='x', linestyle='--', color='#d9534f') # Reddish color | |
| # Customize the plot | |
| ax.set_xlabel('Month (YYYY-MM)') | |
| ax.set_ylabel('Number of New Followers') | |
| ax.set_title('Monthly Follower Gains (Last 12 Months)') | |
| # Improve x-axis label readability | |
| # Show fewer labels if there are many months | |
| tick_frequency = max(1, len(dates) // 10) # Show label roughly every N months | |
| ax.set_xticks(dates[::tick_frequency]) | |
| ax.tick_params(axis='x', rotation=45, labelsize=9) # Rotate and adjust size | |
| ax.legend(title="Gain Type") | |
| ax.grid(True, linestyle='--', alpha=0.6) # Lighter grid | |
| # Add value labels on top of bars/points (optional, can get crowded) | |
| # for i, (org, paid) in enumerate(zip(organic_gains, paid_gains)): | |
| # if org > 0: ax.text(i, org, f'{org}', ha='center', va='bottom', fontsize=8) | |
| # if paid > 0: ax.text(i, paid, f'{paid}', ha='center', va='bottom', fontsize=8, color='red') | |
| plt.tight_layout() # Adjust layout to prevent labels from overlapping | |
| print("Successfully generated follower gains plot.") | |
| # Return the figure object for Gradio | |
| return fig | |
| except Exception as plot_e: | |
| print(f"ERROR generating follower gains plot: {plot_e}") | |
| tb = traceback.format_exc() | |
| print(tb) | |
| # Return an empty plot with an error message if plotting fails | |
| fig, ax = plt.subplots(figsize=(10, 5)) | |
| ax.text(0.5, 0.5, f'Error generating plot: {plot_e}', | |
| horizontalalignment='center', verticalalignment='center', | |
| transform=ax.transAxes, fontsize=12, color='red', wrap=True) | |
| ax.set_title('Follower Gains Plot Error') | |
| plt.tight_layout() | |
| return fig | |
| def fetch_and_render_analytics(comm_client_id, comm_token): | |
| """Fetches analytics data and prepares updates for Gradio UI.""" | |
| print("--- Rendering Analytics Tab ---") | |
| # Initial state for outputs | |
| count_output = gr.update(value="<p>Loading follower count...</p>", visible=True) | |
| plot_output = gr.update(value=None, visible=False) # Hide plot initially | |
| if not comm_token: | |
| print("ERROR: Marketing token missing for analytics.") | |
| error_msg = "<p style='color: red; text-align: center; font-weight: bold;'>β Error: Missing LinkedIn Marketing token. Please complete the login process first.</p>" | |
| return gr.update(value=error_msg, visible=True), gr.update(value=None, visible=False) | |
| try: | |
| # Fetch all data together | |
| org_name, follower_count, follower_gains_list = fetch_analytics_data(comm_client_id, comm_token) | |
| # Format follower count display - Nicer HTML | |
| count_display_html = f""" | |
| <div style='text-align: center; padding: 20px; background-color: #e7f3ff; border: 1px solid #bce8f1; border-radius: 8px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);'> | |
| <p style='font-size: 1.1em; color: #31708f; margin-bottom: 5px;'>Total Followers for</p> | |
| <p style='font-size: 1.4em; font-weight: bold; color: #005a9e; margin-bottom: 10px;'>{html.escape(org_name)}</p> | |
| <p style='font-size: 2.8em; font-weight: bold; color: #0073b1; margin: 0;'>{follower_count:,}</p> | |
| <p style='font-size: 0.9em; color: #777; margin-top: 5px;'>(As of latest data available)</p> | |
| </div> | |
| """ | |
| count_output = gr.update(value=count_display_html, visible=True) | |
| # Generate plot | |
| print("Generating follower gains plot...") | |
| plot_fig = plot_follower_gains(follower_gains_list) | |
| # If plot generation failed, plot_fig might contain an error message plot | |
| plot_output = gr.update(value=plot_fig, visible=True) | |
| print("Analytics data fetched and processed successfully.") | |
| return count_output, plot_output | |
| except (ValueError, requests.exceptions.RequestException) as api_ve: | |
| # Catch specific API or configuration errors from fetch_analytics_data | |
| print(f"API or VALUE ERROR during analytics fetch: {api_ve}") | |
| error_update = display_error(f"Failed to load analytics: {api_ve}", api_ve) | |
| # Show error in the count area, hide plot | |
| return gr.update(value=error_update.get('value', "<p style='color: red;'>Error loading follower count.</p>"), visible=True), gr.update(value=None, visible=False) | |
| except Exception as e: | |
| # Catch any other unexpected errors during fetch or plotting | |
| print(f"UNEXPECTED ERROR during analytics rendering: {e}") | |
| tb = traceback.format_exc() | |
| print(tb) | |
| error_update = display_error("An unexpected error occurred while loading analytics.", e) | |
| error_html = error_update.get('value', "<p style='color: red;'>An unexpected error occurred.</p>") | |
| # Ensure the error message is HTML-safe | |
| if isinstance(error_html, str) and not error_html.strip().startswith("<"): | |
| error_html = f"<pre style='color: red; white-space: pre-wrap;'>{html.escape(error_html)}</pre>" | |
| # Show error in the count area, hide plot | |
| return gr.update(value=error_html, visible=True), gr.update(value=None, visible=False) |