LinkedinMonitor / app.py
GuglielmoTor's picture
Update app.py
67742c4 verified
raw
history blame
19.5 kB
# -- coding: utf-8 --
import gradio as gr
import json
import os
import logging
import html
import pandas as pd # Ensure pandas is imported
from datetime import datetime # Used for pd.Timestamp
# Import functions from your custom modules
from Data_Fetching_and_Rendering import fetch_and_render_dashboard
from analytics_fetch_and_rendering import fetch_and_render_analytics
from mentions_dashboard import generate_mentions_dashboard
from gradio_utils import get_url_user_token
# Updated import to include fetch_posts_from_bubble
from Bubble_API_Calls import (
fetch_linkedin_token_from_bubble,
bulk_upload_to_bubble,
fetch_linkedin_posts_data_from_bubble
)
from Linkedin_Data_API_Calls import (
fetch_linkedin_posts_core,
fetch_comments,
analyze_sentiment,
compile_detailed_posts,
prepare_data_for_bubble
)
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def check_token_status(token_state):
"""Checks the status of the LinkedIn token."""
return "βœ… Token available" if token_state and token_state.get("token") else "❌ Token not available"
def process_and_store_bubble_token(url_user_token, org_urn, token_state):
"""
Processes user token, fetches LinkedIn token, fetches Bubble posts,
and determines if an initial fetch or update is needed for LinkedIn posts.
Updates token state and UI for the sync button.
"""
logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'")
new_state = token_state.copy() if token_state else {
"token": None, "client_id": None, "org_urn": None,
"bubble_posts_df": None, "fetch_count_for_api": 0
}
new_state.update({"org_urn": org_urn, "bubble_posts_df": new_state.get("bubble_posts_df"), "fetch_count_for_api": new_state.get("fetch_count_for_api", 0)})
# Default button update: hidden and non-interactive
button_update = gr.update(visible=False, interactive=False, value="πŸ”„ Sync LinkedIn Posts")
client_id = os.environ.get("Linkedin_client_id")
if not client_id:
logging.error("CRITICAL ERROR: 'Linkedin_client_id' environment variable not set.")
new_state["client_id"] = "ENV VAR MISSING"
else:
new_state["client_id"] = client_id
if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token:
logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}")
try:
parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token)
if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token:
new_state["token"] = parsed_linkedin_token
logging.info("βœ… LinkedIn Token successfully fetched from Bubble.")
else:
new_state["token"] = None
logging.warning(f"❌ Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}")
except Exception as e:
new_state["token"] = None
logging.error(f"❌ Exception while fetching LinkedIn token from Bubble: {e}")
else:
new_state["token"] = None
logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.")
# Fetch posts from Bubble
current_org_urn = new_state.get("org_urn")
bubble_posts_df = None
if current_org_urn:
logging.info(f"Attempting to fetch posts from Bubble for org_urn: {current_org_urn}")
try:
fetched_df, error_message = fetch_linkedin_posts_data_from_bubble(current_org_urn, "LI_posts")
if error_message:
logging.warning(f"Error reported by fetch_linkedin_posts_data_from_bubble: {error_message}. Treating as no data.")
else:
bubble_posts_df = fetched_df
new_state["bubble_posts_df"] = bubble_posts_df
except Exception as e:
logging.error(f"❌ Error fetching posts from Bubble: {e}. Treating as no data.")
new_state["bubble_posts_df"] = None # Ensure it's None on error
else:
logging.warning("Org URN not available in state. Cannot fetch posts from Bubble.")
# Logic for determining fetch/update based on bubble_posts_df
# DATE_COLUMN_NAME is now 'published_at' and contains ISO datetime strings.
DATE_COLUMN_NAME = 'published_at'
DEFAULT_INITIAL_FETCH_COUNT = 100 # Standard number of posts for initial fetch
if new_state["bubble_posts_df"] is None or new_state["bubble_posts_df"].empty:
logging.info(f"ℹ️ No posts found in Bubble or DataFrame is empty. Button to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts will be visible.")
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
button_update = gr.update(value=f"πŸ”„ Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} LinkedIn Posts", visible=True, interactive=True)
else:
try:
df_for_date_check = new_state["bubble_posts_df"].copy() # Use a copy to avoid SettingWithCopyWarning
if DATE_COLUMN_NAME not in df_for_date_check.columns:
logging.warning(f"Date column '{DATE_COLUMN_NAME}' not found in Bubble posts DataFrame. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.")
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
button_update = gr.update(value=f"πŸ”„ Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Column Missing)", visible=True, interactive=True)
elif df_for_date_check[DATE_COLUMN_NAME].isnull().all():
logging.warning(f"Date column '{DATE_COLUMN_NAME}' contains all null values. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.")
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
button_update = gr.update(value=f"πŸ”„ Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Column Empty)", visible=True, interactive=True)
else:
# Convert ISO datetime strings to datetime objects
df_for_date_check[DATE_COLUMN_NAME] = pd.to_datetime(df_for_date_check[DATE_COLUMN_NAME], errors='coerce', utc=True)
last_post_date_utc = df_for_date_check[DATE_COLUMN_NAME].dropna().max()
if pd.isna(last_post_date_utc):
logging.warning(f"No valid dates found in '{DATE_COLUMN_NAME}' after conversion. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.")
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
button_update = gr.update(value=f"πŸ”„ Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (No Valid Dates)", visible=True, interactive=True)
else:
today_utc = pd.Timestamp('now', tz='UTC').normalize()
last_post_date_utc_normalized = last_post_date_utc.normalize()
time_difference_days = (today_utc - last_post_date_utc_normalized).days
logging.info(f"Last post date (UTC, normalized): {last_post_date_utc_normalized}, Today (UTC, normalized): {today_utc}, Difference: {time_difference_days} days.")
if time_difference_days >= 7:
num_weeks = max(1, time_difference_days // 7)
fetch_count = num_weeks * 10
new_state['fetch_count_for_api'] = fetch_count
button_label = f"πŸ”„ Update Last {num_weeks} Week(s) (~{fetch_count} Posts)"
logging.info(f"Data is {time_difference_days} days old. Update needed for {num_weeks} weeks, ~{fetch_count} posts.")
button_update = gr.update(value=button_label, visible=True, interactive=True)
else:
logging.info(f"Data is fresh ({time_difference_days} days old). No update needed now.")
new_state['fetch_count_for_api'] = 0
button_update = gr.update(visible=False, interactive=False)
except Exception as e:
logging.error(f"Error processing dates from Bubble posts: {e}. Defaulting to initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.")
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
button_update = gr.update(value=f"πŸ”„ Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Error)", visible=True, interactive=True)
token_status_message = check_token_status(new_state)
logging.info(f"Token processing complete. LinkedIn Token Status: {token_status_message}. Button update: {button_update}. Fetch count for API: {new_state['fetch_count_for_api']}")
return token_status_message, new_state, button_update
def guarded_fetch_posts(token_state):
"""
Fetches LinkedIn posts based on 'fetch_count_for_api' in token_state,
analyzes them, and uploads to Bubble.
"""
logging.info("Starting guarded_fetch_posts process.")
if not token_state or not token_state.get("token"):
logging.error("Access denied for guarded_fetch_posts. No LinkedIn token available.")
return "<p style='color:red; text-align:center;'>❌ Access denied. LinkedIn token not available.</p>"
client_id = token_state.get("client_id")
token_dict = token_state.get("token")
org_urn = token_state.get('org_urn')
fetch_count_value = token_state.get('fetch_count_for_api')
if not org_urn:
logging.error("Organization URN (org_urn) not found in token_state for guarded_fetch_posts.")
return "<p style='color:red; text-align:center;'>❌ Configuration error: Organization URN missing.</p>"
if not client_id or client_id == "ENV VAR MISSING":
logging.error("Client ID not found or missing in token_state for guarded_fetch_posts.")
return "<p style='color:red; text-align:center;'>❌ Configuration error: LinkedIn Client ID missing.</p>"
if fetch_count_value == 0:
logging.info("guarded_fetch_posts called, but fetch_count_for_api is 0. Data is fresh.")
return "<p style='color:green; text-align:center;'>βœ… Data is already up-to-date. No new posts fetched.</p>"
if fetch_count_value is None: # Should ideally not happen with new logic, but as a safeguard
logging.warning("fetch_count_for_api is None in guarded_fetch_posts. This might indicate an issue. Defaulting to fetching a standard amount if your API supports it or all.")
# Depending on your API, None might mean fetch all or a default.
# If your API requires a specific count for "all", you might need to adjust here or in fetch_linkedin_posts_core.
try:
logging.info(f"Step 1: Fetching core posts for org_urn: {org_urn}. Fetch count parameter for API: {fetch_count_value}")
processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_value)
if not processed_raw_posts:
logging.info("No posts found to process via LinkedIn API after step 1.")
return "<p style='color:orange; text-align:center;'>ℹ️ No new LinkedIn posts found to process at this time.</p>"
post_urns = [post["id"] for post in processed_raw_posts if post.get("id")]
logging.info(f"Extracted {len(post_urns)} post URNs for further processing.")
logging.info("Step 2: Fetching comments via LinkedIn API.")
all_comments_data = fetch_comments(client_id, token_dict, post_urns, stats_map)
logging.info("Step 3: Analyzing sentiment.")
sentiments_per_post = analyze_sentiment(all_comments_data)
logging.info("Step 4: Compiling detailed posts.")
detailed_posts = compile_detailed_posts(processed_raw_posts, stats_map, sentiments_per_post)
logging.info("Step 5: Preparing data for Bubble.")
li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_posts, all_comments_data)
logging.info("Step 6: Uploading data to Bubble.")
bulk_upload_to_bubble(li_posts, "LI_posts")
bulk_upload_to_bubble(li_post_stats, "LI_post_stats")
bulk_upload_to_bubble(li_post_comments, "LI_post_comments")
action_performed = f"Initial data fetch (~{fetch_count_value} posts)" if fetch_count_value == DEFAULT_INITIAL_FETCH_COUNT else f"Data update (target: ~{fetch_count_value} posts)"
logging.info(f"Successfully completed: {action_performed}. Uploaded posts and comments to Bubble.")
return f"<p style='color:green; text-align:center;'>βœ… {action_performed} complete. Posts and comments from LinkedIn uploaded to Bubble.</p>"
except ValueError as ve:
logging.error(f"ValueError during LinkedIn data processing: {ve}")
return f"<p style='color:red; text-align:center;'>❌ Error: {html.escape(str(ve))}</p>"
except Exception as e:
logging.exception("An unexpected error occurred in guarded_fetch_posts.")
return "<p style='color:red; text-align:center;'>❌ An unexpected error occurred. Please check logs.</p>"
def guarded_fetch_dashboard(token_state):
if not token_state or not token_state.get("token"):
return "❌ Access denied. No token available for dashboard."
if token_state.get("bubble_posts_df") is not None and not token_state["bubble_posts_df"].empty:
return f"<p style='text-align: center;'>Dashboard would show {len(token_state['bubble_posts_df'])} posts from Bubble.</p>"
else:
return "<p style='text-align: center; color: #555;'>No posts loaded from Bubble yet for the dashboard.</p>"
def guarded_fetch_analytics(token_state):
if not token_state or not token_state.get("token"):
return ("❌ Access denied. No token available for analytics.",
None, None, None, None, None, None, None)
return fetch_and_render_analytics(token_state.get("client_id"), token_state.get("token"))
def run_mentions_and_load(token_state):
if not token_state or not token_state.get("token"):
return ("❌ Access denied. No token available for mentions.", None)
return generate_mentions_dashboard(token_state.get("client_id"), token_state.get("token"))
# --- Gradio UI Blocks ---
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"),
title="LinkedIn Post Viewer & Analytics") as app:
# Define DEFAULT_INITIAL_FETCH_COUNT here if needed by guarded_fetch_posts for its messages,
# or ensure it's passed/accessible if logic depends on it there.
# For now, it's only used within process_and_store_bubble_token.
DEFAULT_INITIAL_FETCH_COUNT = 100
token_state = gr.State(value={
"token": None,
"client_id": None,
"org_urn": None,
"bubble_posts_df": None,
"fetch_count_for_api": 0
})
gr.Markdown("# πŸš€ LinkedIn Organization Post Viewer & Analytics")
gr.Markdown("Token is supplied via URL parameter for Bubble.io lookup. Then explore dashboard and analytics.")
url_user_token_display = gr.Textbox(label="User Token (from URL - Hidden)", interactive=False, visible=False)
status_box = gr.Textbox(label="Overall LinkedIn Token Status", interactive=False, value="Initializing...")
org_urn_display = gr.Textbox(label="Organization URN (from URL - Hidden)", interactive=False, visible=False)
app.load(fn=get_url_user_token, inputs=None, outputs=[url_user_token_display, org_urn_display])
with gr.Tabs():
with gr.TabItem("1️⃣ Dashboard & Sync"):
gr.Markdown("System checks for existing data in Bubble. The button below will activate if new posts need to be fetched or updated from LinkedIn.")
sync_posts_to_bubble_btn = gr.Button(
value="πŸ”„ Sync LinkedIn Posts",
variant="primary",
visible=False,
interactive=False
)
dashboard_html_output = gr.HTML(
"<p style='text-align: center; color: #555;'>System initializing... "
"Checking for existing data in Bubble and LinkedIn token.</p>"
)
org_urn_display.change(
fn=process_and_store_bubble_token,
inputs=[url_user_token_display, org_urn_display, token_state],
outputs=[status_box, token_state, sync_posts_to_bubble_btn]
)
url_user_token_display.change(
fn=process_and_store_bubble_token,
inputs=[url_user_token_display, org_urn_display, token_state],
outputs=[status_box, token_state, sync_posts_to_bubble_btn]
)
sync_posts_to_bubble_btn.click(
fn=guarded_fetch_posts,
inputs=[token_state],
outputs=[dashboard_html_output]
).then(
fn=process_and_store_bubble_token,
inputs=[url_user_token_display, org_urn_display, token_state],
outputs=[status_box, token_state, sync_posts_to_bubble_btn]
)
with gr.TabItem("2️⃣ Analytics"):
gr.Markdown("View follower count and monthly gains for your organization (requires LinkedIn token).")
fetch_analytics_btn = gr.Button("πŸ“ˆ Fetch Follower Analytics", variant="primary")
follower_count = gr.Markdown("<p style='text-align: center; color: #555;'>Waiting for LinkedIn token...</p>")
with gr.Row():
follower_plot, growth_plot = gr.Plot(), gr.Plot()
with gr.Row():
eng_rate_plot = gr.Plot()
with gr.Row():
interaction_plot = gr.Plot()
with gr.Row():
eb_plot = gr.Plot()
with gr.Row():
mentions_vol_plot, mentions_sentiment_plot = gr.Plot(), gr.Plot()
fetch_analytics_btn.click(
fn=guarded_fetch_analytics,
inputs=[token_state],
outputs=[follower_count, follower_plot, growth_plot, eng_rate_plot,
interaction_plot, eb_plot, mentions_vol_plot, mentions_sentiment_plot]
)
with gr.TabItem("3️⃣ Mentions"):
gr.Markdown("Analyze sentiment of recent posts that mention your organization (requires LinkedIn token).")
fetch_mentions_btn = gr.Button("🧠 Fetch Mentions & Sentiment", variant="primary")
mentions_html = gr.HTML("<p style='text-align: center; color: #555;'>Waiting for LinkedIn token...</p>")
mentions_plot = gr.Plot()
fetch_mentions_btn.click(
fn=run_mentions_and_load,
inputs=[token_state],
outputs=[mentions_html, mentions_plot]
)
app.load(fn=lambda ts: check_token_status(ts), inputs=[token_state], outputs=status_box)
gr.Timer(15.0).tick(fn=lambda ts: check_token_status(ts), inputs=[token_state], outputs=status_box)
if __name__ == "__main__":
if not os.environ.get("Linkedin_client_id"):
logging.warning("WARNING: The 'Linkedin_client_id' environment variable is not set. The application may not function correctly for LinkedIn API calls.")
app.launch(server_name="0.0.0.0", server_port=7860, share=True)