Spaces:
Running
Running
# -- coding: utf-8 -- | |
import gradio as gr | |
import json | |
import os | |
import logging | |
import html | |
import pandas as pd # Ensure pandas is imported | |
from datetime import datetime # Used for pd.Timestamp | |
# Import functions from your custom modules | |
from Data_Fetching_and_Rendering import fetch_and_render_dashboard | |
from analytics_fetch_and_rendering import fetch_and_render_analytics | |
from mentions_dashboard import generate_mentions_dashboard | |
from gradio_utils import get_url_user_token | |
# Updated import to include fetch_posts_from_bubble | |
from Bubble_API_Calls import ( | |
fetch_linkedin_token_from_bubble, | |
bulk_upload_to_bubble, | |
fetch_linkedin_posts_data_from_bubble | |
) | |
from Linkedin_Data_API_Calls import ( | |
fetch_linkedin_posts_core, | |
fetch_comments, | |
analyze_sentiment, | |
compile_detailed_posts, | |
prepare_data_for_bubble | |
) | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# --- Global Constants --- | |
# Standard number of posts for initial fetch | |
DEFAULT_INITIAL_FETCH_COUNT = 10 | |
# Key for post URN in data processed from LinkedIn (e.g., in detailed_posts) | |
LINKEDIN_POST_URN_KEY = 'id' | |
# Column name for post URN in the DataFrame fetched from Bubble (bubble_posts_df) | |
BUBBLE_POST_URN_COLUMN_NAME = 'id' # Adjust if your Bubble 'LI_posts' table uses a different column name for URNs | |
def check_token_status(token_state): | |
"""Checks the status of the LinkedIn token.""" | |
return "β Token available" if token_state and token_state.get("token") else "β Token not available" | |
def process_and_store_bubble_token(url_user_token, org_urn, token_state): | |
""" | |
Processes user token, fetches LinkedIn token, fetches Bubble posts, | |
and determines if an initial fetch or update is needed for LinkedIn posts. | |
Updates token state and UI for the sync button. | |
""" | |
logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'") | |
new_state = token_state.copy() if token_state else { | |
"token": None, "client_id": None, "org_urn": None, | |
"bubble_posts_df": None, "fetch_count_for_api": 0 | |
} | |
new_state.update({"org_urn": org_urn, "bubble_posts_df": new_state.get("bubble_posts_df"), "fetch_count_for_api": new_state.get("fetch_count_for_api", 0)}) | |
button_update = gr.update(visible=False, interactive=False, value="π Sync LinkedIn Posts") | |
client_id = os.environ.get("Linkedin_client_id") | |
if not client_id: | |
logging.error("CRITICAL ERROR: 'Linkedin_client_id' environment variable not set.") | |
new_state["client_id"] = "ENV VAR MISSING" | |
else: | |
new_state["client_id"] = client_id | |
if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token: | |
logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}") | |
try: | |
parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token) | |
if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token: | |
new_state["token"] = parsed_linkedin_token | |
logging.info("β LinkedIn Token successfully fetched from Bubble.") | |
else: | |
new_state["token"] = None | |
logging.warning(f"β Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}") | |
except Exception as e: | |
new_state["token"] = None | |
logging.error(f"β Exception while fetching LinkedIn token from Bubble: {e}") | |
else: | |
new_state["token"] = None | |
logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.") | |
current_org_urn = new_state.get("org_urn") | |
if current_org_urn: | |
logging.info(f"Attempting to fetch posts from Bubble for org_urn: {current_org_urn}") | |
try: | |
fetched_df, error_message = fetch_linkedin_posts_data_from_bubble(current_org_urn, "LI_posts") | |
if error_message: | |
logging.warning(f"Error reported by fetch_linkedin_posts_data_from_bubble: {error_message}. Treating as no data.") | |
new_state["bubble_posts_df"] = pd.DataFrame() # Ensure it's an empty DataFrame | |
else: | |
new_state["bubble_posts_df"] = fetched_df if fetched_df is not None else pd.DataFrame() | |
except Exception as e: | |
logging.error(f"β Error fetching posts from Bubble: {e}. Treating as no data.") | |
new_state["bubble_posts_df"] = pd.DataFrame() | |
else: | |
logging.warning("Org URN not available in state. Cannot fetch posts from Bubble.") | |
new_state["bubble_posts_df"] = pd.DataFrame() | |
DATE_COLUMN_NAME = 'published_at' | |
if new_state["bubble_posts_df"] is None or new_state["bubble_posts_df"].empty: | |
logging.info(f"βΉοΈ No posts found in Bubble or DataFrame is empty. Button to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts will be visible.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
button_update = gr.update(value=f"π Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} LinkedIn Posts", visible=True, interactive=True) | |
else: | |
try: | |
df_for_date_check = new_state["bubble_posts_df"].copy() | |
if DATE_COLUMN_NAME not in df_for_date_check.columns: | |
logging.warning(f"Date column '{DATE_COLUMN_NAME}' not found in Bubble posts DataFrame. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
button_update = gr.update(value=f"π Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Column Missing)", visible=True, interactive=True) | |
elif df_for_date_check[DATE_COLUMN_NAME].isnull().all(): | |
logging.warning(f"Date column '{DATE_COLUMN_NAME}' contains all null values. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
button_update = gr.update(value=f"π Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Column Empty)", visible=True, interactive=True) | |
else: | |
df_for_date_check[DATE_COLUMN_NAME] = pd.to_datetime(df_for_date_check[DATE_COLUMN_NAME], errors='coerce', utc=True) | |
last_post_date_utc = df_for_date_check[DATE_COLUMN_NAME].dropna().max() | |
if pd.isna(last_post_date_utc): | |
logging.warning(f"No valid dates found in '{DATE_COLUMN_NAME}' after conversion. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
button_update = gr.update(value=f"π Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (No Valid Dates)", visible=True, interactive=True) | |
else: | |
today_utc = pd.Timestamp('now', tz='UTC').normalize() | |
last_post_date_utc_normalized = last_post_date_utc.normalize() | |
time_difference_days = (today_utc - last_post_date_utc_normalized).days | |
logging.info(f"Last post date (UTC, normalized): {last_post_date_utc_normalized}, Today (UTC, normalized): {today_utc}, Difference: {time_difference_days} days.") | |
if time_difference_days >= 7: | |
num_weeks = max(1, time_difference_days // 7) | |
fetch_count = num_weeks * 10 | |
new_state['fetch_count_for_api'] = fetch_count | |
button_label = f"π Update Last {num_weeks} Week(s) (~{fetch_count} Posts)" | |
logging.info(f"Data is {time_difference_days} days old. Update needed for {num_weeks} weeks, ~{fetch_count} posts.") | |
button_update = gr.update(value=button_label, visible=True, interactive=True) | |
else: | |
logging.info(f"Data is fresh ({time_difference_days} days old). No update needed now.") | |
new_state['fetch_count_for_api'] = 0 | |
button_update = gr.update(visible=False, interactive=False) | |
except Exception as e: | |
logging.error(f"Error processing dates from Bubble posts: {e}. Defaulting to initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
button_update = gr.update(value=f"π Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Error)", visible=True, interactive=True) | |
token_status_message = check_token_status(new_state) | |
logging.info(f"Token processing complete. LinkedIn Token Status: {token_status_message}. Button update: {button_update}. Fetch count for API: {new_state['fetch_count_for_api']}") | |
return token_status_message, new_state, button_update | |
def guarded_fetch_posts(token_state): | |
logging.info("Starting guarded_fetch_posts process.") | |
if not token_state or not token_state.get("token"): | |
logging.error("Access denied for guarded_fetch_posts. No LinkedIn token available.") | |
return "<p style='color:red; text-align:center;'>β Access denied. LinkedIn token not available.</p>" | |
client_id = token_state.get("client_id") | |
token_dict = token_state.get("token") | |
org_urn = token_state.get('org_urn') | |
fetch_count_value = token_state.get('fetch_count_for_api') | |
bubble_posts_df = token_state.get("bubble_posts_df") # Get existing posts | |
if not org_urn: | |
logging.error("Organization URN (org_urn) not found in token_state.") | |
return "<p style='color:red; text-align:center;'>β Configuration error: Organization URN missing.</p>" | |
if not client_id or client_id == "ENV VAR MISSING": | |
logging.error("Client ID not found or missing in token_state.") | |
return "<p style='color:red; text-align:center;'>β Configuration error: LinkedIn Client ID missing.</p>" | |
if fetch_count_value == 0: | |
logging.info("Data is fresh. No new posts fetched based on date check.") | |
return "<p style='color:green; text-align:center;'>β Data is already up-to-date. No new posts fetched.</p>" | |
try: | |
logging.info(f"Step 1: Fetching core posts for org_urn: {org_urn}. Fetch count: {fetch_count_value}") | |
processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_value) | |
if not processed_raw_posts: | |
logging.info("No posts retrieved from LinkedIn API.") | |
return "<p style='color:orange; text-align:center;'>βΉοΈ No new LinkedIn posts found to process.</p>" | |
# --- Filter out posts already in Bubble --- | |
existing_post_urns = set() | |
if bubble_posts_df is not None and not bubble_posts_df.empty and BUBBLE_POST_URN_COLUMN_NAME in bubble_posts_df.columns: | |
existing_post_urns = set(bubble_posts_df[BUBBLE_POST_URN_COLUMN_NAME].dropna().astype(str)) | |
logging.info(f"Found {len(existing_post_urns)} existing post URNs in Bubble data.") | |
else: | |
logging.info("No existing posts found in Bubble data or URN column missing; all fetched posts will be considered new.") | |
# Filter processed_raw_posts before compiling detailed_posts | |
new_raw_posts = [ | |
post for post in processed_raw_posts | |
if str(post.get(LINKEDIN_POST_URN_KEY)) not in existing_post_urns | |
] | |
if not new_raw_posts: | |
logging.info("All fetched LinkedIn posts are already present in Bubble. No new posts to add.") | |
return "<p style='color:green; text-align:center;'>β All fetched posts already exist in Bubble. Data is up-to-date.</p>" | |
logging.info(f"Identified {len(new_raw_posts)} new posts to process after filtering against Bubble data.") | |
# Continue processing only with new_raw_posts | |
post_urns_to_process = [post[LINKEDIN_POST_URN_KEY] for post in new_raw_posts if post.get(LINKEDIN_POST_URN_KEY)] | |
logging.info("Step 2: Fetching comments for new posts via LinkedIn API.") | |
# Adjust stats_map if it's keyed by URNs; ensure it's relevant for new_raw_posts | |
# For simplicity, assuming fetch_comments and subsequent steps can handle potentially fewer URNs | |
all_comments_data = fetch_comments(client_id, token_dict, post_urns_to_process, stats_map) | |
logging.info("Step 3: Analyzing sentiment for new posts.") | |
sentiments_per_post = analyze_sentiment(all_comments_data) # Assumes all_comments_data is now for new posts | |
logging.info("Step 4: Compiling detailed data for new posts.") | |
# Pass new_raw_posts to compile_detailed_posts | |
detailed_new_posts = compile_detailed_posts(new_raw_posts, stats_map, sentiments_per_post) | |
logging.info("Step 5: Preparing data for Bubble (only new posts).") | |
# Pass detailed_new_posts to prepare_data_for_bubble | |
li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_new_posts, all_comments_data) | |
logging.info(f"Step 6: Uploading {len(li_posts)} new posts and their related data to Bubble.") | |
if li_posts: # Ensure there's actually something to upload | |
bulk_upload_to_bubble(li_posts, "LI_posts") | |
if li_post_stats: | |
bulk_upload_to_bubble(li_post_stats, "LI_post_stats") | |
if li_post_comments: | |
bulk_upload_to_bubble(li_post_comments, "LI_post_comments") | |
action_message = f"uploaded {len(li_posts)} new post(s)" | |
else: | |
action_message = "found no new posts to upload after detailed processing" | |
logging.info("No new posts to upload after final preparation for Bubble.") | |
final_message_verb = "Initial data fetch" if fetch_count_value == DEFAULT_INITIAL_FETCH_COUNT and not existing_post_urns else "Data update" | |
logging.info(f"Successfully completed: {final_message_verb}. {action_message} to Bubble.") | |
return f"<p style='color:green; text-align:center;'>β {final_message_verb} complete. Successfully {action_message} to Bubble.</p>" | |
except ValueError as ve: | |
logging.error(f"ValueError during LinkedIn data processing: {ve}") | |
return f"<p style='color:red; text-align:center;'>β Error: {html.escape(str(ve))}</p>" | |
except Exception as e: | |
logging.exception("An unexpected error occurred in guarded_fetch_posts.") | |
return "<p style='color:red; text-align:center;'>β An unexpected error occurred. Please check logs.</p>" | |
def guarded_fetch_dashboard(token_state): | |
if not token_state or not token_state.get("token"): | |
return "β Access denied. No token available for dashboard." | |
if token_state.get("bubble_posts_df") is not None and not token_state["bubble_posts_df"].empty: | |
return f"<p style='text-align: center;'>Dashboard would show {len(token_state['bubble_posts_df'])} posts from Bubble.</p>" | |
else: | |
return "<p style='text-align: center; color: #555;'>No posts loaded from Bubble yet for the dashboard.</p>" | |
def guarded_fetch_analytics(token_state): | |
if not token_state or not token_state.get("token"): | |
return ("β Access denied. No token available for analytics.", | |
None, None, None, None, None, None, None) | |
return fetch_and_render_analytics(token_state.get("client_id"), token_state.get("token")) | |
def run_mentions_and_load(token_state): | |
if not token_state or not token_state.get("token"): | |
return ("β Access denied. No token available for mentions.", None) | |
return generate_mentions_dashboard(token_state.get("client_id"), token_state.get("token")) | |
# --- Gradio UI Blocks --- | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"), | |
title="LinkedIn Post Viewer & Analytics") as app: | |
token_state = gr.State(value={ | |
"token": None, | |
"client_id": None, | |
"org_urn": None, | |
"bubble_posts_df": pd.DataFrame(), # Initialize with empty DataFrame | |
"fetch_count_for_api": 0 | |
}) | |
gr.Markdown("# π LinkedIn Organization Post Viewer & Analytics") | |
gr.Markdown("Token is supplied via URL parameter for Bubble.io lookup. Then explore dashboard and analytics.") | |
url_user_token_display = gr.Textbox(label="User Token (from URL - Hidden)", interactive=False, visible=False) | |
status_box = gr.Textbox(label="Overall LinkedIn Token Status", interactive=False, value="Initializing...") | |
org_urn_display = gr.Textbox(label="Organization URN (from URL - Hidden)", interactive=False, visible=False) | |
app.load(fn=get_url_user_token, inputs=None, outputs=[url_user_token_display, org_urn_display]) | |
with gr.Tabs(): | |
with gr.TabItem("1οΈβ£ Dashboard & Sync"): | |
gr.Markdown("System checks for existing data in Bubble. The button below will activate if new posts need to be fetched or updated from LinkedIn.") | |
sync_posts_to_bubble_btn = gr.Button( | |
value="π Sync LinkedIn Posts", | |
variant="primary", | |
visible=False, | |
interactive=False | |
) | |
dashboard_html_output = gr.HTML( | |
"<p style='text-align: center; color: #555;'>System initializing... " | |
"Checking for existing data in Bubble and LinkedIn token.</p>" | |
) | |
org_urn_display.change( | |
fn=process_and_store_bubble_token, | |
inputs=[url_user_token_display, org_urn_display, token_state], | |
outputs=[status_box, token_state, sync_posts_to_bubble_btn] | |
) | |
url_user_token_display.change( | |
fn=process_and_store_bubble_token, | |
inputs=[url_user_token_display, org_urn_display, token_state], | |
outputs=[status_box, token_state, sync_posts_to_bubble_btn] | |
) | |
sync_posts_to_bubble_btn.click( | |
fn=guarded_fetch_posts, | |
inputs=[token_state], | |
outputs=[dashboard_html_output] | |
).then( | |
fn=process_and_store_bubble_token, | |
inputs=[url_user_token_display, org_urn_display, token_state], | |
outputs=[status_box, token_state, sync_posts_to_bubble_btn] | |
) | |
with gr.TabItem("2οΈβ£ Analytics"): | |
gr.Markdown("View follower count and monthly gains for your organization (requires LinkedIn token).") | |
fetch_analytics_btn = gr.Button("π Fetch Follower Analytics", variant="primary") | |
follower_count = gr.Markdown("<p style='text-align: center; color: #555;'>Waiting for LinkedIn token...</p>") | |
with gr.Row(): | |
follower_plot, growth_plot = gr.Plot(), gr.Plot() | |
with gr.Row(): | |
eng_rate_plot = gr.Plot() | |
with gr.Row(): | |
interaction_plot = gr.Plot() | |
with gr.Row(): | |
eb_plot = gr.Plot() | |
with gr.Row(): | |
mentions_vol_plot, mentions_sentiment_plot = gr.Plot(), gr.Plot() | |
fetch_analytics_btn.click( | |
fn=guarded_fetch_analytics, | |
inputs=[token_state], | |
outputs=[follower_count, follower_plot, growth_plot, eng_rate_plot, | |
interaction_plot, eb_plot, mentions_vol_plot, mentions_sentiment_plot] | |
) | |
with gr.TabItem("3οΈβ£ Mentions"): | |
gr.Markdown("Analyze sentiment of recent posts that mention your organization (requires LinkedIn token).") | |
fetch_mentions_btn = gr.Button("π§ Fetch Mentions & Sentiment", variant="primary") | |
mentions_html = gr.HTML("<p style='text-align: center; color: #555;'>Waiting for LinkedIn token...</p>") | |
mentions_plot = gr.Plot() | |
fetch_mentions_btn.click( | |
fn=run_mentions_and_load, | |
inputs=[token_state], | |
outputs=[mentions_html, mentions_plot] | |
) | |
app.load(fn=lambda ts: check_token_status(ts), inputs=[token_state], outputs=status_box) | |
gr.Timer(15.0).tick(fn=lambda ts: check_token_status(ts), inputs=[token_state], outputs=status_box) | |
if __name__ == "__main__": | |
if not os.environ.get("Linkedin_client_id"): | |
logging.warning("WARNING: The 'Linkedin_client_id' environment variable is not set. The application may not function correctly for LinkedIn API calls.") | |
app.launch(server_name="0.0.0.0", server_port=7860, share=True) | |