Spaces:
Running
Running
import gradio as gr | |
import json | |
import os | |
import logging | |
import html | |
import pandas as pd | |
# Import functions from your custom modules | |
from Data_Fetching_and_Rendering import fetch_and_render_dashboard | |
from analytics_fetch_and_rendering import fetch_and_render_analytics | |
from mentions_dashboard import generate_mentions_dashboard | |
from gradio_utils import get_url_user_token | |
# Updated import to include fetch_posts_from_bubble | |
from Bubble_API_Calls import ( | |
fetch_linkedin_token_from_bubble, | |
bulk_upload_to_bubble, | |
fetch_posts_from_bubble # Added new function | |
) | |
from Linkedin_Data_API_Calls import ( | |
fetch_linkedin_posts_core, | |
fetch_comments, | |
analyze_sentiment, | |
compile_detailed_posts, | |
prepare_data_for_bubble | |
) | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
def check_token_status(token_state): | |
"""Checks the status of the LinkedIn token.""" | |
return "β Token available" if token_state and token_state.get("token") else "β Token not available" | |
def process_and_store_bubble_token(url_user_token, org_urn, token_state): | |
""" | |
Processes the user token from the URL, fetches LinkedIn token from Bubble, | |
fetches initial posts from Bubble, and updates the token state and UI accordingly. | |
""" | |
logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'") | |
# Initialize or copy existing state, adding bubble_posts_df | |
new_state = token_state.copy() if token_state else {"token": None, "client_id": None, "org_urn": None, "bubble_posts_df": None} | |
new_state.update({"token": None, "org_urn": org_urn, "bubble_posts_df": None}) # Ensure bubble_posts_df is reset/initialized | |
# Default button state: invisible and non-interactive | |
button_update = gr.Button( | |
value="π Fetch, Analyze & Store Posts to Bubble", | |
variant="primary", | |
visible=False, | |
interactive=False | |
) | |
client_id = os.environ.get("Linkedin_client_id") | |
if not client_id: | |
logging.error("CRITICAL ERROR: 'Linkedin_client_id' environment variable not set.") | |
new_state["client_id"] = "ENV VAR MISSING" | |
# Even if client_id is missing, we might still be able to fetch from Bubble if org_urn is present | |
# and then decide button visibility. | |
else: | |
new_state["client_id"] = client_id | |
# Attempt to fetch LinkedIn token from Bubble (related to LinkedIn API access) | |
if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token: | |
logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}") | |
parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token) | |
if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token: | |
new_state["token"] = parsed_linkedin_token | |
logging.info("β LinkedIn Token successfully fetched from Bubble.") | |
else: | |
logging.warning("β Failed to fetch a valid LinkedIn token from Bubble.") | |
else: | |
logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.") | |
# Fetch posts from Bubble using org_urn, regardless of LinkedIn token status for this specific fetch | |
current_org_urn = new_state.get("org_urn") | |
if current_org_urn: | |
logging.info(f"Attempting to fetch posts from Bubble for org_urn: {current_org_urn}") | |
try: | |
# Assuming fetch_posts_from_bubble returns a Pandas DataFrame or None | |
df_bubble_posts = fetch_posts_from_bubble(current_org_urn) | |
new_state["bubble_posts_df"] = df_bubble_posts | |
if df_bubble_posts is not None and not df_bubble_posts.empty: | |
logging.info(f"β Successfully fetched {len(df_bubble_posts)} posts from Bubble. Sync button will be enabled.") | |
button_update = gr.Button( | |
value="π Fetch, Analyze & Store Posts to Bubble", | |
variant="primary", | |
visible=True, | |
interactive=True | |
) | |
else: | |
logging.info("βΉοΈ No posts found in Bubble for this organization or DataFrame is empty. Sync button will remain hidden.") | |
except Exception as e: | |
logging.error(f"β Error fetching posts from Bubble: {e}") | |
# Keep button hidden on error | |
else: | |
logging.warning("Org URN not available in state. Cannot fetch posts from Bubble.") | |
token_status_message = check_token_status(new_state) | |
logging.info(f"Token processing complete. Status: {token_status_message}. Button visible: {button_update.visible}") | |
return token_status_message, new_state, button_update | |
def guarded_fetch_posts(token_state): | |
""" | |
Fetches LinkedIn posts, analyzes them, and uploads to Bubble. | |
This function is guarded by token availability. | |
""" | |
logging.info("Starting guarded_fetch_posts process.") | |
if not token_state or not token_state.get("token"): | |
logging.error("Access denied for guarded_fetch_posts. No LinkedIn token available.") | |
return "<p style='color:red; text-align:center;'>β Access denied. LinkedIn token not available.</p>" | |
client_id = token_state.get("client_id") | |
token_dict = token_state.get("token") | |
org_urn = token_state.get('org_urn') | |
if not org_urn: | |
logging.error("Organization URN (org_urn) not found in token_state for guarded_fetch_posts.") | |
return "<p style='color:red; text-align:center;'>β Configuration error: Organization URN missing.</p>" | |
if not client_id or client_id == "ENV VAR MISSING": | |
logging.error("Client ID not found or missing in token_state for guarded_fetch_posts.") | |
return "<p style='color:red; text-align:center;'>β Configuration error: LinkedIn Client ID missing.</p>" | |
try: | |
logging.info(f"Step 1: Fetching core posts for org_urn: {org_urn}") | |
processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn) | |
if not processed_raw_posts: | |
logging.info("No posts found to process after step 1.") | |
return "<p style='color:orange; text-align:center;'>βΉοΈ No new LinkedIn posts found to process.</p>" | |
post_urns = [post["id"] for post in processed_raw_posts if post.get("id")] | |
logging.info(f"Extracted {len(post_urns)} post URNs for further processing.") | |
logging.info("Step 2: Fetching comments.") | |
all_comments_data = fetch_comments(client_id, token_dict, post_urns, stats_map) | |
logging.info("Step 3: Analyzing sentiment.") | |
sentiments_per_post = analyze_sentiment(all_comments_data) | |
logging.info("Step 4: Compiling detailed posts.") | |
detailed_posts = compile_detailed_posts(processed_raw_posts, stats_map, sentiments_per_post) | |
logging.info("Step 5: Preparing data for Bubble.") | |
li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_posts, all_comments_data) | |
logging.info("Step 6: Uploading data to Bubble.") | |
bulk_upload_to_bubble(li_posts, "LI_posts") | |
bulk_upload_to_bubble(li_post_stats, "LI_post_stats") | |
bulk_upload_to_bubble(li_post_comments, "LI_post_comments") | |
logging.info("Successfully fetched and uploaded posts and comments to Bubble.") | |
return "<p style='color:green; text-align:center;'>β Posts and comments uploaded to Bubble.</p>" | |
except ValueError as ve: | |
logging.error(f"ValueError during LinkedIn data processing: {ve}") | |
return f"<p style='color:red; text-align:center;'>β Error: {html.escape(str(ve))}</p>" | |
except Exception as e: | |
logging.exception("An unexpected error occurred in guarded_fetch_posts.") | |
return "<p style='color:red; text-align:center;'>β An unexpected error occurred. Please check logs.</p>" | |
def guarded_fetch_dashboard(token_state): | |
"""Fetches and renders the dashboard if token is available.""" | |
if not token_state or not token_state.get("token"): | |
return "β Access denied. No token available for dashboard." | |
# This function is not used in the current UI structure for the first tab's main content | |
# but kept for potential future use or if it's called elsewhere. | |
# The first tab's content is now primarily the button and its output. | |
# If you intend to display a dashboard here *after* fetching, this would need integration. | |
# For now, returning a placeholder or status. | |
# return fetch_and_render_dashboard(token_state.get("client_id"), token_state.get("token")) | |
return "<p style='text-align: center; color: #555;'>Dashboard content would load here if implemented.</p>" | |
def guarded_fetch_analytics(token_state): | |
"""Fetches and renders analytics if token is available.""" | |
if not token_state or not token_state.get("token"): | |
return ("β Access denied. No token available for analytics.", | |
None, None, None, None, None, None, None) | |
return fetch_and_render_analytics(token_state.get("client_id"), token_state.get("token")) | |
def run_mentions_and_load(token_state): | |
"""Generates mentions dashboard if token is available.""" | |
if not token_state or not token_state.get("token"): | |
return ("β Access denied. No token available for mentions.", None) | |
return generate_mentions_dashboard(token_state.get("client_id"), token_state.get("token")) | |
# --- Gradio UI Blocks --- | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"), | |
title="LinkedIn Post Viewer & Analytics") as app: | |
# Initialize state with the new field for Bubble DataFrame | |
token_state = gr.State(value={"token": None, "client_id": None, "org_urn": None, "bubble_posts_df": None}) | |
gr.Markdown("# π LinkedIn Organization Post Viewer & Analytics") | |
gr.Markdown("Token is supplied via URL parameter for Bubble.io lookup. Then explore dashboard and analytics.") | |
url_user_token_display = gr.Textbox(label="User Token (from URL - Hidden)", interactive=False, visible=False) | |
status_box = gr.Textbox(label="Overall Token Status", interactive=False, value="Initializing...") # Initial status | |
org_urn_display = gr.Textbox(label="Organization URN (from URL - Hidden)", interactive=False, visible=False) # Renamed for clarity | |
# Load user token and org URN from URL parameters | |
app.load(fn=get_url_user_token, inputs=None, outputs=[url_user_token_display, org_urn_display]) | |
with gr.Tabs(): | |
with gr.TabItem("1οΈβ£ Dashboard & Sync"): | |
gr.Markdown("View your organization's recent posts and their engagement statistics. " | |
"Fetch new posts from LinkedIn, analyze, and store them in Bubble.") | |
# Button is initially not visible and not interactive. | |
# Its state will be updated by process_and_store_bubble_token | |
sync_posts_to_bubble_btn = gr.Button( | |
"π Fetch, Analyze & Store Posts to Bubble", | |
variant="primary", | |
visible=False, | |
interactive=False | |
) | |
dashboard_html_output = gr.HTML( | |
"<p style='text-align: center; color: #555;'>System initializing... Status and actions will appear shortly. " | |
"If data is found in Bubble, the 'Fetch, Analyze & Store' button will become active.</p>" | |
) | |
# Event: When URL token or org URN is loaded/changed, process it. | |
# This will update token_state and the sync_posts_to_bubble_btn. | |
# Using org_urn_display.change as the primary trigger after app.load completes. | |
# If get_url_user_token is very fast, app.load might be better, but .change is robust. | |
org_urn_display.change( | |
fn=process_and_store_bubble_token, | |
inputs=[url_user_token_display, org_urn_display, token_state], | |
outputs=[status_box, token_state, sync_posts_to_bubble_btn] # Added button to outputs | |
) | |
# Also trigger if url_user_token_display changes, in case org_urn loads first | |
# but token processing depends on url_user_token_display. | |
# This creates a dependency: if one changes, the function runs with current values of both. | |
url_user_token_display.change( | |
fn=process_and_store_bubble_token, | |
inputs=[url_user_token_display, org_urn_display, token_state], | |
outputs=[status_box, token_state, sync_posts_to_bubble_btn] | |
) | |
# Click handler for the sync button | |
sync_posts_to_bubble_btn.click( | |
fn=guarded_fetch_posts, | |
inputs=[token_state], | |
outputs=[dashboard_html_output] | |
) | |
with gr.TabItem("2οΈβ£ Analytics"): | |
gr.Markdown("View follower count and monthly gains for your organization.") | |
fetch_analytics_btn = gr.Button("π Fetch Follower Analytics", variant="primary") | |
follower_count = gr.Markdown("<p style='text-align: center; color: #555;'>Waiting for token...</p>") | |
with gr.Row(): | |
follower_plot, growth_plot = gr.Plot(), gr.Plot() | |
with gr.Row(): | |
eng_rate_plot = gr.Plot() | |
with gr.Row(): | |
interaction_plot = gr.Plot() | |
with gr.Row(): | |
eb_plot = gr.Plot() | |
with gr.Row(): | |
mentions_vol_plot, mentions_sentiment_plot = gr.Plot(), gr.Plot() | |
fetch_analytics_btn.click( | |
fn=guarded_fetch_analytics, | |
inputs=[token_state], | |
outputs=[follower_count, follower_plot, growth_plot, eng_rate_plot, | |
interaction_plot, eb_plot, mentions_vol_plot, mentions_sentiment_plot] | |
) | |
with gr.TabItem("3οΈβ£ Mentions"): | |
gr.Markdown("Analyze sentiment of recent posts that mention your organization.") | |
fetch_mentions_btn = gr.Button("π§ Fetch Mentions & Sentiment", variant="primary") | |
mentions_html = gr.HTML("<p style='text-align: center; color: #555;'>Waiting for token...</p>") | |
mentions_plot = gr.Plot() | |
fetch_mentions_btn.click( | |
fn=run_mentions_and_load, | |
inputs=[token_state], | |
outputs=[mentions_html, mentions_plot] | |
) | |
# Initial check of token status on app load (primarily for the status_box) | |
# The button visibility is handled by process_and_store_bubble_token | |
app.load(fn=lambda ts: check_token_status(ts), inputs=[token_state], outputs=status_box) | |
# Timer to periodically update the token status display (optional, but good for UX) | |
gr.Timer(15.0).tick(fn=lambda ts: check_token_status(ts), inputs=[token_state], outputs=status_box) | |
if __name__ == "__main__": | |
if not os.environ.get("Linkedin_client_id"): | |
logging.warning("WARNING: The 'Linkedin_client_id' environment variable is not set. The application may not function correctly for LinkedIn API calls.") | |
# Ensure the app launches. | |
# For testing, you might want share=False or specific server_name/port. | |
# share=True is useful for public sharing via Gradio link. | |
app.launch(server_name="0.0.0.0", server_port=7860, share=True) | |
# app.launch(share=True) # Simpler launch for testing if specific port/host not needed | |