LinkedinMonitor / app.py
GuglielmoTor's picture
Update app.py
b5c877f verified
raw
history blame
20.7 kB
# -- coding: utf-8 --
import gradio as gr
import json
import os
import logging
import html
import pandas as pd # Ensure pandas is imported
from datetime import datetime # Used for pd.Timestamp
# Import functions from your custom modules
from Data_Fetching_and_Rendering import fetch_and_render_dashboard
from analytics_fetch_and_rendering import fetch_and_render_analytics
from mentions_dashboard import generate_mentions_dashboard
from gradio_utils import get_url_user_token
# Updated import to include fetch_posts_from_bubble
from Bubble_API_Calls import (
fetch_linkedin_token_from_bubble,
bulk_upload_to_bubble,
fetch_linkedin_posts_data_from_bubble
)
from Linkedin_Data_API_Calls import (
fetch_linkedin_posts_core,
fetch_comments,
analyze_sentiment,
compile_detailed_posts,
prepare_data_for_bubble
)
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Global Constants ---
# Standard number of posts for initial fetch
DEFAULT_INITIAL_FETCH_COUNT = 10
# Key for post URN in data processed from LinkedIn (e.g., in detailed_posts)
LINKEDIN_POST_URN_KEY = 'id'
# Column name for post URN in the DataFrame fetched from Bubble (bubble_posts_df)
BUBBLE_POST_URN_COLUMN_NAME = 'id' # Adjust if your Bubble 'LI_posts' table uses a different column name for URNs
def check_token_status(token_state):
"""Checks the status of the LinkedIn token."""
return "βœ… Token available" if token_state and token_state.get("token") else "❌ Token not available"
def process_and_store_bubble_token(url_user_token, org_urn, token_state):
"""
Processes user token, fetches LinkedIn token, fetches Bubble posts,
and determines if an initial fetch or update is needed for LinkedIn posts.
Updates token state and UI for the sync button.
"""
logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'")
new_state = token_state.copy() if token_state else {
"token": None, "client_id": None, "org_urn": None,
"bubble_posts_df": None, "fetch_count_for_api": 0
}
new_state.update({"org_urn": org_urn, "bubble_posts_df": new_state.get("bubble_posts_df"), "fetch_count_for_api": new_state.get("fetch_count_for_api", 0)})
button_update = gr.update(visible=False, interactive=False, value="πŸ”„ Sync LinkedIn Posts")
client_id = os.environ.get("Linkedin_client_id")
if not client_id:
logging.error("CRITICAL ERROR: 'Linkedin_client_id' environment variable not set.")
new_state["client_id"] = "ENV VAR MISSING"
else:
new_state["client_id"] = client_id
if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token:
logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}")
try:
parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token)
if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token:
new_state["token"] = parsed_linkedin_token
logging.info("βœ… LinkedIn Token successfully fetched from Bubble.")
else:
new_state["token"] = None
logging.warning(f"❌ Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}")
except Exception as e:
new_state["token"] = None
logging.error(f"❌ Exception while fetching LinkedIn token from Bubble: {e}")
else:
new_state["token"] = None
logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.")
current_org_urn = new_state.get("org_urn")
if current_org_urn:
logging.info(f"Attempting to fetch posts from Bubble for org_urn: {current_org_urn}")
try:
fetched_df, error_message = fetch_linkedin_posts_data_from_bubble(current_org_urn, "LI_posts")
if error_message:
logging.warning(f"Error reported by fetch_linkedin_posts_data_from_bubble: {error_message}. Treating as no data.")
new_state["bubble_posts_df"] = pd.DataFrame() # Ensure it's an empty DataFrame
else:
new_state["bubble_posts_df"] = fetched_df if fetched_df is not None else pd.DataFrame()
except Exception as e:
logging.error(f"❌ Error fetching posts from Bubble: {e}. Treating as no data.")
new_state["bubble_posts_df"] = pd.DataFrame()
else:
logging.warning("Org URN not available in state. Cannot fetch posts from Bubble.")
new_state["bubble_posts_df"] = pd.DataFrame()
DATE_COLUMN_NAME = 'published_at'
if new_state["bubble_posts_df"] is None or new_state["bubble_posts_df"].empty:
logging.info(f"ℹ️ No posts found in Bubble or DataFrame is empty. Button to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts will be visible.")
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
button_update = gr.update(value=f"πŸ”„ Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} LinkedIn Posts", visible=True, interactive=True)
else:
try:
df_for_date_check = new_state["bubble_posts_df"].copy()
if DATE_COLUMN_NAME not in df_for_date_check.columns:
logging.warning(f"Date column '{DATE_COLUMN_NAME}' not found in Bubble posts DataFrame. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.")
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
button_update = gr.update(value=f"πŸ”„ Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Column Missing)", visible=True, interactive=True)
elif df_for_date_check[DATE_COLUMN_NAME].isnull().all():
logging.warning(f"Date column '{DATE_COLUMN_NAME}' contains all null values. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.")
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
button_update = gr.update(value=f"πŸ”„ Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Column Empty)", visible=True, interactive=True)
else:
df_for_date_check[DATE_COLUMN_NAME] = pd.to_datetime(df_for_date_check[DATE_COLUMN_NAME], errors='coerce', utc=True)
last_post_date_utc = df_for_date_check[DATE_COLUMN_NAME].dropna().max()
if pd.isna(last_post_date_utc):
logging.warning(f"No valid dates found in '{DATE_COLUMN_NAME}' after conversion. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.")
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
button_update = gr.update(value=f"πŸ”„ Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (No Valid Dates)", visible=True, interactive=True)
else:
today_utc = pd.Timestamp('now', tz='UTC').normalize()
last_post_date_utc_normalized = last_post_date_utc.normalize()
time_difference_days = (today_utc - last_post_date_utc_normalized).days
logging.info(f"Last post date (UTC, normalized): {last_post_date_utc_normalized}, Today (UTC, normalized): {today_utc}, Difference: {time_difference_days} days.")
if time_difference_days >= 7:
num_weeks = max(1, time_difference_days // 7)
fetch_count = num_weeks * 10
new_state['fetch_count_for_api'] = fetch_count
button_label = f"πŸ”„ Update Last {num_weeks} Week(s) (~{fetch_count} Posts)"
logging.info(f"Data is {time_difference_days} days old. Update needed for {num_weeks} weeks, ~{fetch_count} posts.")
button_update = gr.update(value=button_label, visible=True, interactive=True)
else:
logging.info(f"Data is fresh ({time_difference_days} days old). No update needed now.")
new_state['fetch_count_for_api'] = 0
button_update = gr.update(visible=False, interactive=False)
except Exception as e:
logging.error(f"Error processing dates from Bubble posts: {e}. Defaulting to initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.")
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
button_update = gr.update(value=f"πŸ”„ Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Error)", visible=True, interactive=True)
token_status_message = check_token_status(new_state)
logging.info(f"Token processing complete. LinkedIn Token Status: {token_status_message}. Button update: {button_update}. Fetch count for API: {new_state['fetch_count_for_api']}")
return token_status_message, new_state, button_update
def guarded_fetch_posts(token_state):
logging.info("Starting guarded_fetch_posts process.")
if not token_state or not token_state.get("token"):
logging.error("Access denied for guarded_fetch_posts. No LinkedIn token available.")
return "<p style='color:red; text-align:center;'>❌ Access denied. LinkedIn token not available.</p>"
client_id = token_state.get("client_id")
token_dict = token_state.get("token")
org_urn = token_state.get('org_urn')
fetch_count_value = token_state.get('fetch_count_for_api')
bubble_posts_df = token_state.get("bubble_posts_df") # Get existing posts
if not org_urn:
logging.error("Organization URN (org_urn) not found in token_state.")
return "<p style='color:red; text-align:center;'>❌ Configuration error: Organization URN missing.</p>"
if not client_id or client_id == "ENV VAR MISSING":
logging.error("Client ID not found or missing in token_state.")
return "<p style='color:red; text-align:center;'>❌ Configuration error: LinkedIn Client ID missing.</p>"
if fetch_count_value == 0:
logging.info("Data is fresh. No new posts fetched based on date check.")
return "<p style='color:green; text-align:center;'>βœ… Data is already up-to-date. No new posts fetched.</p>"
try:
logging.info(f"Step 1: Fetching core posts for org_urn: {org_urn}. Fetch count: {fetch_count_value}")
processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_value)
if not processed_raw_posts:
logging.info("No posts retrieved from LinkedIn API.")
return "<p style='color:orange; text-align:center;'>ℹ️ No new LinkedIn posts found to process.</p>"
# --- Filter out posts already in Bubble ---
existing_post_urns = set()
if bubble_posts_df is not None and not bubble_posts_df.empty and BUBBLE_POST_URN_COLUMN_NAME in bubble_posts_df.columns:
existing_post_urns = set(bubble_posts_df[BUBBLE_POST_URN_COLUMN_NAME].dropna().astype(str))
logging.info(f"Found {len(existing_post_urns)} existing post URNs in Bubble data.")
else:
logging.info("No existing posts found in Bubble data or URN column missing; all fetched posts will be considered new.")
# Filter processed_raw_posts before compiling detailed_posts
new_raw_posts = [
post for post in processed_raw_posts
if str(post.get(LINKEDIN_POST_URN_KEY)) not in existing_post_urns
]
if not new_raw_posts:
logging.info("All fetched LinkedIn posts are already present in Bubble. No new posts to add.")
return "<p style='color:green; text-align:center;'>βœ… All fetched posts already exist in Bubble. Data is up-to-date.</p>"
logging.info(f"Identified {len(new_raw_posts)} new posts to process after filtering against Bubble data.")
# Continue processing only with new_raw_posts
post_urns_to_process = [post[LINKEDIN_POST_URN_KEY] for post in new_raw_posts if post.get(LINKEDIN_POST_URN_KEY)]
logging.info("Step 2: Fetching comments for new posts via LinkedIn API.")
# Adjust stats_map if it's keyed by URNs; ensure it's relevant for new_raw_posts
# For simplicity, assuming fetch_comments and subsequent steps can handle potentially fewer URNs
all_comments_data = fetch_comments(client_id, token_dict, post_urns_to_process, stats_map)
logging.info("Step 3: Analyzing sentiment for new posts.")
sentiments_per_post = analyze_sentiment(all_comments_data) # Assumes all_comments_data is now for new posts
logging.info("Step 4: Compiling detailed data for new posts.")
# Pass new_raw_posts to compile_detailed_posts
detailed_new_posts = compile_detailed_posts(new_raw_posts, stats_map, sentiments_per_post)
logging.info("Step 5: Preparing data for Bubble (only new posts).")
# Pass detailed_new_posts to prepare_data_for_bubble
li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_new_posts, all_comments_data)
logging.info(f"Step 6: Uploading {len(li_posts)} new posts and their related data to Bubble.")
if li_posts: # Ensure there's actually something to upload
bulk_upload_to_bubble(li_posts, "LI_posts")
if li_post_stats:
bulk_upload_to_bubble(li_post_stats, "LI_post_stats")
if li_post_comments:
bulk_upload_to_bubble(li_post_comments, "LI_post_comments")
action_message = f"uploaded {len(li_posts)} new post(s)"
else:
action_message = "found no new posts to upload after detailed processing"
logging.info("No new posts to upload after final preparation for Bubble.")
final_message_verb = "Initial data fetch" if fetch_count_value == DEFAULT_INITIAL_FETCH_COUNT and not existing_post_urns else "Data update"
logging.info(f"Successfully completed: {final_message_verb}. {action_message} to Bubble.")
return f"<p style='color:green; text-align:center;'>βœ… {final_message_verb} complete. Successfully {action_message} to Bubble.</p>"
except ValueError as ve:
logging.error(f"ValueError during LinkedIn data processing: {ve}")
return f"<p style='color:red; text-align:center;'>❌ Error: {html.escape(str(ve))}</p>"
except Exception as e:
logging.exception("An unexpected error occurred in guarded_fetch_posts.")
return "<p style='color:red; text-align:center;'>❌ An unexpected error occurred. Please check logs.</p>"
def guarded_fetch_dashboard(token_state):
if not token_state or not token_state.get("token"):
return "❌ Access denied. No token available for dashboard."
if token_state.get("bubble_posts_df") is not None and not token_state["bubble_posts_df"].empty:
return f"<p style='text-align: center;'>Dashboard would show {len(token_state['bubble_posts_df'])} posts from Bubble.</p>"
else:
return "<p style='text-align: center; color: #555;'>No posts loaded from Bubble yet for the dashboard.</p>"
def guarded_fetch_analytics(token_state):
if not token_state or not token_state.get("token"):
return ("❌ Access denied. No token available for analytics.",
None, None, None, None, None, None, None)
return fetch_and_render_analytics(token_state.get("client_id"), token_state.get("token"))
def run_mentions_and_load(token_state):
if not token_state or not token_state.get("token"):
return ("❌ Access denied. No token available for mentions.", None)
return generate_mentions_dashboard(token_state.get("client_id"), token_state.get("token"))
# --- Gradio UI Blocks ---
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"),
title="LinkedIn Post Viewer & Analytics") as app:
token_state = gr.State(value={
"token": None,
"client_id": None,
"org_urn": None,
"bubble_posts_df": pd.DataFrame(), # Initialize with empty DataFrame
"fetch_count_for_api": 0
})
gr.Markdown("# πŸš€ LinkedIn Organization Post Viewer & Analytics")
gr.Markdown("Token is supplied via URL parameter for Bubble.io lookup. Then explore dashboard and analytics.")
url_user_token_display = gr.Textbox(label="User Token (from URL - Hidden)", interactive=False, visible=False)
status_box = gr.Textbox(label="Overall LinkedIn Token Status", interactive=False, value="Initializing...")
org_urn_display = gr.Textbox(label="Organization URN (from URL - Hidden)", interactive=False, visible=False)
app.load(fn=get_url_user_token, inputs=None, outputs=[url_user_token_display, org_urn_display])
with gr.Tabs():
with gr.TabItem("1️⃣ Dashboard & Sync"):
gr.Markdown("System checks for existing data in Bubble. The button below will activate if new posts need to be fetched or updated from LinkedIn.")
sync_posts_to_bubble_btn = gr.Button(
value="πŸ”„ Sync LinkedIn Posts",
variant="primary",
visible=False,
interactive=False
)
dashboard_html_output = gr.HTML(
"<p style='text-align: center; color: #555;'>System initializing... "
"Checking for existing data in Bubble and LinkedIn token.</p>"
)
org_urn_display.change(
fn=process_and_store_bubble_token,
inputs=[url_user_token_display, org_urn_display, token_state],
outputs=[status_box, token_state, sync_posts_to_bubble_btn]
)
url_user_token_display.change(
fn=process_and_store_bubble_token,
inputs=[url_user_token_display, org_urn_display, token_state],
outputs=[status_box, token_state, sync_posts_to_bubble_btn]
)
sync_posts_to_bubble_btn.click(
fn=guarded_fetch_posts,
inputs=[token_state],
outputs=[dashboard_html_output]
).then(
fn=process_and_store_bubble_token,
inputs=[url_user_token_display, org_urn_display, token_state],
outputs=[status_box, token_state, sync_posts_to_bubble_btn]
)
with gr.TabItem("2️⃣ Analytics"):
gr.Markdown("View follower count and monthly gains for your organization (requires LinkedIn token).")
fetch_analytics_btn = gr.Button("πŸ“ˆ Fetch Follower Analytics", variant="primary")
follower_count = gr.Markdown("<p style='text-align: center; color: #555;'>Waiting for LinkedIn token...</p>")
with gr.Row():
follower_plot, growth_plot = gr.Plot(), gr.Plot()
with gr.Row():
eng_rate_plot = gr.Plot()
with gr.Row():
interaction_plot = gr.Plot()
with gr.Row():
eb_plot = gr.Plot()
with gr.Row():
mentions_vol_plot, mentions_sentiment_plot = gr.Plot(), gr.Plot()
fetch_analytics_btn.click(
fn=guarded_fetch_analytics,
inputs=[token_state],
outputs=[follower_count, follower_plot, growth_plot, eng_rate_plot,
interaction_plot, eb_plot, mentions_vol_plot, mentions_sentiment_plot]
)
with gr.TabItem("3️⃣ Mentions"):
gr.Markdown("Analyze sentiment of recent posts that mention your organization (requires LinkedIn token).")
fetch_mentions_btn = gr.Button("🧠 Fetch Mentions & Sentiment", variant="primary")
mentions_html = gr.HTML("<p style='text-align: center; color: #555;'>Waiting for LinkedIn token...</p>")
mentions_plot = gr.Plot()
fetch_mentions_btn.click(
fn=run_mentions_and_load,
inputs=[token_state],
outputs=[mentions_html, mentions_plot]
)
app.load(fn=lambda ts: check_token_status(ts), inputs=[token_state], outputs=status_box)
gr.Timer(15.0).tick(fn=lambda ts: check_token_status(ts), inputs=[token_state], outputs=status_box)
if __name__ == "__main__":
if not os.environ.get("Linkedin_client_id"):
logging.warning("WARNING: The 'Linkedin_client_id' environment variable is not set. The application may not function correctly for LinkedIn API calls.")
app.launch(server_name="0.0.0.0", server_port=7860, share=True)