Spaces:
Running
Running
# -- coding: utf-8 -- | |
import gradio as gr | |
import json | |
import os | |
import logging | |
import html | |
import pandas as pd # Ensure pandas is imported | |
from datetime import datetime # Used for pd.Timestamp | |
# Import functions from your custom modules | |
from Data_Fetching_and_Rendering import fetch_and_render_dashboard | |
from analytics_fetch_and_rendering import fetch_and_render_analytics | |
from mentions_dashboard import generate_mentions_dashboard | |
from gradio_utils import get_url_user_token | |
# Updated import to include fetch_posts_from_bubble | |
from Bubble_API_Calls import ( | |
fetch_linkedin_token_from_bubble, | |
bulk_upload_to_bubble, | |
fetch_linkedin_posts_data_from_bubble | |
) | |
from Linkedin_Data_API_Calls import ( | |
fetch_linkedin_posts_core, | |
fetch_comments, | |
analyze_sentiment, | |
compile_detailed_posts, | |
prepare_data_for_bubble | |
) | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
def check_token_status(token_state): | |
"""Checks the status of the LinkedIn token.""" | |
return "β Token available" if token_state and token_state.get("token") else "β Token not available" | |
def process_and_store_bubble_token(url_user_token, org_urn, token_state): | |
""" | |
Processes user token, fetches LinkedIn token, fetches Bubble posts, | |
and determines if an initial fetch or update is needed for LinkedIn posts. | |
Updates token state and UI for the sync button. | |
""" | |
logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'") | |
new_state = token_state.copy() if token_state else { | |
"token": None, "client_id": None, "org_urn": None, | |
"bubble_posts_df": None, "fetch_count_for_api": 0 | |
} | |
new_state.update({"org_urn": org_urn, "bubble_posts_df": new_state.get("bubble_posts_df"), "fetch_count_for_api": new_state.get("fetch_count_for_api", 0)}) | |
# Default button update: hidden and non-interactive | |
button_update = gr.update(visible=False, interactive=False, value="π Sync LinkedIn Posts") | |
client_id = os.environ.get("Linkedin_client_id") | |
if not client_id: | |
logging.error("CRITICAL ERROR: 'Linkedin_client_id' environment variable not set.") | |
new_state["client_id"] = "ENV VAR MISSING" | |
else: | |
new_state["client_id"] = client_id | |
if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token: | |
logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}") | |
try: | |
parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token) | |
if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token: | |
new_state["token"] = parsed_linkedin_token | |
logging.info("β LinkedIn Token successfully fetched from Bubble.") | |
else: | |
new_state["token"] = None | |
logging.warning(f"β Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}") | |
except Exception as e: | |
new_state["token"] = None | |
logging.error(f"β Exception while fetching LinkedIn token from Bubble: {e}") | |
else: | |
new_state["token"] = None | |
logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.") | |
# Fetch posts from Bubble | |
current_org_urn = new_state.get("org_urn") | |
bubble_posts_df = None | |
if current_org_urn: | |
logging.info(f"Attempting to fetch posts from Bubble for org_urn: {current_org_urn}") | |
try: | |
fetched_df, error_message = fetch_linkedin_posts_data_from_bubble(current_org_urn, "LI_posts") | |
if error_message: | |
logging.warning(f"Error reported by fetch_linkedin_posts_data_from_bubble: {error_message}. Treating as no data.") | |
else: | |
bubble_posts_df = fetched_df | |
new_state["bubble_posts_df"] = bubble_posts_df | |
except Exception as e: | |
logging.error(f"β Error fetching posts from Bubble: {e}. Treating as no data.") | |
new_state["bubble_posts_df"] = None # Ensure it's None on error | |
else: | |
logging.warning("Org URN not available in state. Cannot fetch posts from Bubble.") | |
# Logic for determining fetch/update based on bubble_posts_df | |
# DATE_COLUMN_NAME is now 'published_at' and contains ISO datetime strings. | |
DATE_COLUMN_NAME = 'published_at' | |
DEFAULT_INITIAL_FETCH_COUNT = 100 # Standard number of posts for initial fetch | |
if new_state["bubble_posts_df"] is None or new_state["bubble_posts_df"].empty: | |
logging.info(f"βΉοΈ No posts found in Bubble or DataFrame is empty. Button to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts will be visible.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
button_update = gr.update(value=f"π Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} LinkedIn Posts", visible=True, interactive=True) | |
else: | |
try: | |
df_for_date_check = new_state["bubble_posts_df"].copy() # Use a copy to avoid SettingWithCopyWarning | |
if DATE_COLUMN_NAME not in df_for_date_check.columns: | |
logging.warning(f"Date column '{DATE_COLUMN_NAME}' not found in Bubble posts DataFrame. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
button_update = gr.update(value=f"π Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Column Missing)", visible=True, interactive=True) | |
elif df_for_date_check[DATE_COLUMN_NAME].isnull().all(): | |
logging.warning(f"Date column '{DATE_COLUMN_NAME}' contains all null values. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
button_update = gr.update(value=f"π Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Column Empty)", visible=True, interactive=True) | |
else: | |
# Convert ISO datetime strings to datetime objects | |
df_for_date_check[DATE_COLUMN_NAME] = pd.to_datetime(df_for_date_check[DATE_COLUMN_NAME], errors='coerce', utc=True) | |
last_post_date_utc = df_for_date_check[DATE_COLUMN_NAME].dropna().max() | |
if pd.isna(last_post_date_utc): | |
logging.warning(f"No valid dates found in '{DATE_COLUMN_NAME}' after conversion. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
button_update = gr.update(value=f"π Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (No Valid Dates)", visible=True, interactive=True) | |
else: | |
today_utc = pd.Timestamp('now', tz='UTC').normalize() | |
last_post_date_utc_normalized = last_post_date_utc.normalize() | |
time_difference_days = (today_utc - last_post_date_utc_normalized).days | |
logging.info(f"Last post date (UTC, normalized): {last_post_date_utc_normalized}, Today (UTC, normalized): {today_utc}, Difference: {time_difference_days} days.") | |
if time_difference_days >= 7: | |
num_weeks = max(1, time_difference_days // 7) | |
fetch_count = num_weeks * 10 | |
new_state['fetch_count_for_api'] = fetch_count | |
button_label = f"π Update Last {num_weeks} Week(s) (~{fetch_count} Posts)" | |
logging.info(f"Data is {time_difference_days} days old. Update needed for {num_weeks} weeks, ~{fetch_count} posts.") | |
button_update = gr.update(value=button_label, visible=True, interactive=True) | |
else: | |
logging.info(f"Data is fresh ({time_difference_days} days old). No update needed now.") | |
new_state['fetch_count_for_api'] = 0 | |
button_update = gr.update(visible=False, interactive=False) | |
except Exception as e: | |
logging.error(f"Error processing dates from Bubble posts: {e}. Defaulting to initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.") | |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT | |
button_update = gr.update(value=f"π Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Error)", visible=True, interactive=True) | |
token_status_message = check_token_status(new_state) | |
logging.info(f"Token processing complete. LinkedIn Token Status: {token_status_message}. Button update: {button_update}. Fetch count for API: {new_state['fetch_count_for_api']}") | |
return token_status_message, new_state, button_update | |
def guarded_fetch_posts(token_state): | |
""" | |
Fetches LinkedIn posts based on 'fetch_count_for_api' in token_state, | |
analyzes them, and uploads to Bubble. | |
""" | |
logging.info("Starting guarded_fetch_posts process.") | |
if not token_state or not token_state.get("token"): | |
logging.error("Access denied for guarded_fetch_posts. No LinkedIn token available.") | |
return "<p style='color:red; text-align:center;'>β Access denied. LinkedIn token not available.</p>" | |
client_id = token_state.get("client_id") | |
token_dict = token_state.get("token") | |
org_urn = token_state.get('org_urn') | |
fetch_count_value = token_state.get('fetch_count_for_api') | |
if not org_urn: | |
logging.error("Organization URN (org_urn) not found in token_state for guarded_fetch_posts.") | |
return "<p style='color:red; text-align:center;'>β Configuration error: Organization URN missing.</p>" | |
if not client_id or client_id == "ENV VAR MISSING": | |
logging.error("Client ID not found or missing in token_state for guarded_fetch_posts.") | |
return "<p style='color:red; text-align:center;'>β Configuration error: LinkedIn Client ID missing.</p>" | |
if fetch_count_value == 0: | |
logging.info("guarded_fetch_posts called, but fetch_count_for_api is 0. Data is fresh.") | |
return "<p style='color:green; text-align:center;'>β Data is already up-to-date. No new posts fetched.</p>" | |
if fetch_count_value is None: # Should ideally not happen with new logic, but as a safeguard | |
logging.warning("fetch_count_for_api is None in guarded_fetch_posts. This might indicate an issue. Defaulting to fetching a standard amount if your API supports it or all.") | |
# Depending on your API, None might mean fetch all or a default. | |
# If your API requires a specific count for "all", you might need to adjust here or in fetch_linkedin_posts_core. | |
try: | |
logging.info(f"Step 1: Fetching core posts for org_urn: {org_urn}. Fetch count parameter for API: {fetch_count_value}") | |
processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_value) | |
if not processed_raw_posts: | |
logging.info("No posts found to process via LinkedIn API after step 1.") | |
return "<p style='color:orange; text-align:center;'>βΉοΈ No new LinkedIn posts found to process at this time.</p>" | |
post_urns = [post["id"] for post in processed_raw_posts if post.get("id")] | |
logging.info(f"Extracted {len(post_urns)} post URNs for further processing.") | |
logging.info("Step 2: Fetching comments via LinkedIn API.") | |
all_comments_data = fetch_comments(client_id, token_dict, post_urns, stats_map) | |
logging.info("Step 3: Analyzing sentiment.") | |
sentiments_per_post = analyze_sentiment(all_comments_data) | |
logging.info("Step 4: Compiling detailed posts.") | |
detailed_posts = compile_detailed_posts(processed_raw_posts, stats_map, sentiments_per_post) | |
logging.info("Step 5: Preparing data for Bubble.") | |
li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_posts, all_comments_data) | |
logging.info("Step 6: Uploading data to Bubble.") | |
bulk_upload_to_bubble(li_posts, "LI_posts") | |
bulk_upload_to_bubble(li_post_stats, "LI_post_stats") | |
bulk_upload_to_bubble(li_post_comments, "LI_post_comments") | |
action_performed = f"Initial data fetch (~{fetch_count_value} posts)" if fetch_count_value == DEFAULT_INITIAL_FETCH_COUNT else f"Data update (target: ~{fetch_count_value} posts)" | |
logging.info(f"Successfully completed: {action_performed}. Uploaded posts and comments to Bubble.") | |
return f"<p style='color:green; text-align:center;'>β {action_performed} complete. Posts and comments from LinkedIn uploaded to Bubble.</p>" | |
except ValueError as ve: | |
logging.error(f"ValueError during LinkedIn data processing: {ve}") | |
return f"<p style='color:red; text-align:center;'>β Error: {html.escape(str(ve))}</p>" | |
except Exception as e: | |
logging.exception("An unexpected error occurred in guarded_fetch_posts.") | |
return "<p style='color:red; text-align:center;'>β An unexpected error occurred. Please check logs.</p>" | |
def guarded_fetch_dashboard(token_state): | |
if not token_state or not token_state.get("token"): | |
return "β Access denied. No token available for dashboard." | |
if token_state.get("bubble_posts_df") is not None and not token_state["bubble_posts_df"].empty: | |
return f"<p style='text-align: center;'>Dashboard would show {len(token_state['bubble_posts_df'])} posts from Bubble.</p>" | |
else: | |
return "<p style='text-align: center; color: #555;'>No posts loaded from Bubble yet for the dashboard.</p>" | |
def guarded_fetch_analytics(token_state): | |
if not token_state or not token_state.get("token"): | |
return ("β Access denied. No token available for analytics.", | |
None, None, None, None, None, None, None) | |
return fetch_and_render_analytics(token_state.get("client_id"), token_state.get("token")) | |
def run_mentions_and_load(token_state): | |
if not token_state or not token_state.get("token"): | |
return ("β Access denied. No token available for mentions.", None) | |
return generate_mentions_dashboard(token_state.get("client_id"), token_state.get("token")) | |
# --- Gradio UI Blocks --- | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"), | |
title="LinkedIn Post Viewer & Analytics") as app: | |
# Define DEFAULT_INITIAL_FETCH_COUNT here if needed by guarded_fetch_posts for its messages, | |
# or ensure it's passed/accessible if logic depends on it there. | |
# For now, it's only used within process_and_store_bubble_token. | |
DEFAULT_INITIAL_FETCH_COUNT = 100 | |
token_state = gr.State(value={ | |
"token": None, | |
"client_id": None, | |
"org_urn": None, | |
"bubble_posts_df": None, | |
"fetch_count_for_api": 0 | |
}) | |
gr.Markdown("# π LinkedIn Organization Post Viewer & Analytics") | |
gr.Markdown("Token is supplied via URL parameter for Bubble.io lookup. Then explore dashboard and analytics.") | |
url_user_token_display = gr.Textbox(label="User Token (from URL - Hidden)", interactive=False, visible=False) | |
status_box = gr.Textbox(label="Overall LinkedIn Token Status", interactive=False, value="Initializing...") | |
org_urn_display = gr.Textbox(label="Organization URN (from URL - Hidden)", interactive=False, visible=False) | |
app.load(fn=get_url_user_token, inputs=None, outputs=[url_user_token_display, org_urn_display]) | |
with gr.Tabs(): | |
with gr.TabItem("1οΈβ£ Dashboard & Sync"): | |
gr.Markdown("System checks for existing data in Bubble. The button below will activate if new posts need to be fetched or updated from LinkedIn.") | |
sync_posts_to_bubble_btn = gr.Button( | |
value="π Sync LinkedIn Posts", | |
variant="primary", | |
visible=False, | |
interactive=False | |
) | |
dashboard_html_output = gr.HTML( | |
"<p style='text-align: center; color: #555;'>System initializing... " | |
"Checking for existing data in Bubble and LinkedIn token.</p>" | |
) | |
org_urn_display.change( | |
fn=process_and_store_bubble_token, | |
inputs=[url_user_token_display, org_urn_display, token_state], | |
outputs=[status_box, token_state, sync_posts_to_bubble_btn] | |
) | |
url_user_token_display.change( | |
fn=process_and_store_bubble_token, | |
inputs=[url_user_token_display, org_urn_display, token_state], | |
outputs=[status_box, token_state, sync_posts_to_bubble_btn] | |
) | |
sync_posts_to_bubble_btn.click( | |
fn=guarded_fetch_posts, | |
inputs=[token_state], | |
outputs=[dashboard_html_output] | |
).then( | |
fn=process_and_store_bubble_token, | |
inputs=[url_user_token_display, org_urn_display, token_state], | |
outputs=[status_box, token_state, sync_posts_to_bubble_btn] | |
) | |
with gr.TabItem("2οΈβ£ Analytics"): | |
gr.Markdown("View follower count and monthly gains for your organization (requires LinkedIn token).") | |
fetch_analytics_btn = gr.Button("π Fetch Follower Analytics", variant="primary") | |
follower_count = gr.Markdown("<p style='text-align: center; color: #555;'>Waiting for LinkedIn token...</p>") | |
with gr.Row(): | |
follower_plot, growth_plot = gr.Plot(), gr.Plot() | |
with gr.Row(): | |
eng_rate_plot = gr.Plot() | |
with gr.Row(): | |
interaction_plot = gr.Plot() | |
with gr.Row(): | |
eb_plot = gr.Plot() | |
with gr.Row(): | |
mentions_vol_plot, mentions_sentiment_plot = gr.Plot(), gr.Plot() | |
fetch_analytics_btn.click( | |
fn=guarded_fetch_analytics, | |
inputs=[token_state], | |
outputs=[follower_count, follower_plot, growth_plot, eng_rate_plot, | |
interaction_plot, eb_plot, mentions_vol_plot, mentions_sentiment_plot] | |
) | |
with gr.TabItem("3οΈβ£ Mentions"): | |
gr.Markdown("Analyze sentiment of recent posts that mention your organization (requires LinkedIn token).") | |
fetch_mentions_btn = gr.Button("π§ Fetch Mentions & Sentiment", variant="primary") | |
mentions_html = gr.HTML("<p style='text-align: center; color: #555;'>Waiting for LinkedIn token...</p>") | |
mentions_plot = gr.Plot() | |
fetch_mentions_btn.click( | |
fn=run_mentions_and_load, | |
inputs=[token_state], | |
outputs=[mentions_html, mentions_plot] | |
) | |
app.load(fn=lambda ts: check_token_status(ts), inputs=[token_state], outputs=status_box) | |
gr.Timer(15.0).tick(fn=lambda ts: check_token_status(ts), inputs=[token_state], outputs=status_box) | |
if __name__ == "__main__": | |
if not os.environ.get("Linkedin_client_id"): | |
logging.warning("WARNING: The 'Linkedin_client_id' environment variable is not set. The application may not function correctly for LinkedIn API calls.") | |
app.launch(server_name="0.0.0.0", server_port=7860, share=True) | |