# -- coding: utf-8 -- import gradio as gr import json import os import logging import html import pandas as pd # Ensure pandas is imported from datetime import datetime # Used for pd.Timestamp # Import functions from your custom modules from Data_Fetching_and_Rendering import fetch_and_render_dashboard from analytics_fetch_and_rendering import fetch_and_render_analytics from mentions_dashboard import generate_mentions_dashboard from gradio_utils import get_url_user_token # Updated import to include fetch_posts_from_bubble from Bubble_API_Calls import ( fetch_linkedin_token_from_bubble, bulk_upload_to_bubble, fetch_linkedin_posts_data_from_bubble ) from Linkedin_Data_API_Calls import ( fetch_linkedin_posts_core, fetch_comments, analyze_sentiment, compile_detailed_posts, prepare_data_for_bubble ) # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def check_token_status(token_state): """Checks the status of the LinkedIn token.""" return "✅ Token available" if token_state and token_state.get("token") else "❌ Token not available" def process_and_store_bubble_token(url_user_token, org_urn, token_state): """ Processes user token, fetches LinkedIn token, fetches Bubble posts, and determines if an initial fetch or update is needed for LinkedIn posts. Updates token state and UI for the sync button. """ logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'") new_state = token_state.copy() if token_state else { "token": None, "client_id": None, "org_urn": None, "bubble_posts_df": None, "fetch_count_for_api": 0 } new_state.update({"org_urn": org_urn, "bubble_posts_df": new_state.get("bubble_posts_df"), "fetch_count_for_api": new_state.get("fetch_count_for_api", 0)}) # Default button update: hidden and non-interactive button_update = gr.update(visible=False, interactive=False, value="🔄 Sync LinkedIn Posts") client_id = os.environ.get("Linkedin_client_id") if not client_id: logging.error("CRITICAL ERROR: 'Linkedin_client_id' environment variable not set.") new_state["client_id"] = "ENV VAR MISSING" else: new_state["client_id"] = client_id if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token: logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}") try: parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token) if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token: new_state["token"] = parsed_linkedin_token logging.info("✅ LinkedIn Token successfully fetched from Bubble.") else: new_state["token"] = None logging.warning(f"❌ Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}") except Exception as e: new_state["token"] = None logging.error(f"❌ Exception while fetching LinkedIn token from Bubble: {e}") else: new_state["token"] = None logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.") # Fetch posts from Bubble current_org_urn = new_state.get("org_urn") bubble_posts_df = None if current_org_urn: logging.info(f"Attempting to fetch posts from Bubble for org_urn: {current_org_urn}") try: fetched_df, error_message = fetch_linkedin_posts_data_from_bubble(current_org_urn, "LI_posts") if error_message: logging.warning(f"Error reported by fetch_linkedin_posts_data_from_bubble: {error_message}. Treating as no data.") else: bubble_posts_df = fetched_df new_state["bubble_posts_df"] = bubble_posts_df except Exception as e: logging.error(f"❌ Error fetching posts from Bubble: {e}. Treating as no data.") new_state["bubble_posts_df"] = None # Ensure it's None on error else: logging.warning("Org URN not available in state. Cannot fetch posts from Bubble.") # Logic for determining fetch/update based on bubble_posts_df # DATE_COLUMN_NAME is now 'published_at' and contains ISO datetime strings. DATE_COLUMN_NAME = 'published_at' DEFAULT_INITIAL_FETCH_COUNT = 100 # Standard number of posts for initial fetch if new_state["bubble_posts_df"] is None or new_state["bubble_posts_df"].empty: logging.info(f"â„šī¸ No posts found in Bubble or DataFrame is empty. Button to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts will be visible.") new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT button_update = gr.update(value=f"🔄 Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} LinkedIn Posts", visible=True, interactive=True) else: try: df_for_date_check = new_state["bubble_posts_df"].copy() # Use a copy to avoid SettingWithCopyWarning if DATE_COLUMN_NAME not in df_for_date_check.columns: logging.warning(f"Date column '{DATE_COLUMN_NAME}' not found in Bubble posts DataFrame. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.") new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT button_update = gr.update(value=f"🔄 Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Column Missing)", visible=True, interactive=True) elif df_for_date_check[DATE_COLUMN_NAME].isnull().all(): logging.warning(f"Date column '{DATE_COLUMN_NAME}' contains all null values. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.") new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT button_update = gr.update(value=f"🔄 Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Column Empty)", visible=True, interactive=True) else: # Convert ISO datetime strings to datetime objects df_for_date_check[DATE_COLUMN_NAME] = pd.to_datetime(df_for_date_check[DATE_COLUMN_NAME], errors='coerce', utc=True) last_post_date_utc = df_for_date_check[DATE_COLUMN_NAME].dropna().max() if pd.isna(last_post_date_utc): logging.warning(f"No valid dates found in '{DATE_COLUMN_NAME}' after conversion. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.") new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT button_update = gr.update(value=f"🔄 Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (No Valid Dates)", visible=True, interactive=True) else: today_utc = pd.Timestamp('now', tz='UTC').normalize() last_post_date_utc_normalized = last_post_date_utc.normalize() time_difference_days = (today_utc - last_post_date_utc_normalized).days logging.info(f"Last post date (UTC, normalized): {last_post_date_utc_normalized}, Today (UTC, normalized): {today_utc}, Difference: {time_difference_days} days.") if time_difference_days >= 7: num_weeks = max(1, time_difference_days // 7) fetch_count = num_weeks * 10 new_state['fetch_count_for_api'] = fetch_count button_label = f"🔄 Update Last {num_weeks} Week(s) (~{fetch_count} Posts)" logging.info(f"Data is {time_difference_days} days old. Update needed for {num_weeks} weeks, ~{fetch_count} posts.") button_update = gr.update(value=button_label, visible=True, interactive=True) else: logging.info(f"Data is fresh ({time_difference_days} days old). No update needed now.") new_state['fetch_count_for_api'] = 0 button_update = gr.update(visible=False, interactive=False) except Exception as e: logging.error(f"Error processing dates from Bubble posts: {e}. Defaulting to initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.") new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT button_update = gr.update(value=f"🔄 Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Error)", visible=True, interactive=True) token_status_message = check_token_status(new_state) logging.info(f"Token processing complete. LinkedIn Token Status: {token_status_message}. Button update: {button_update}. Fetch count for API: {new_state['fetch_count_for_api']}") return token_status_message, new_state, button_update def guarded_fetch_posts(token_state): """ Fetches LinkedIn posts based on 'fetch_count_for_api' in token_state, analyzes them, and uploads to Bubble. """ logging.info("Starting guarded_fetch_posts process.") if not token_state or not token_state.get("token"): logging.error("Access denied for guarded_fetch_posts. No LinkedIn token available.") return "

❌ Access denied. LinkedIn token not available.

" client_id = token_state.get("client_id") token_dict = token_state.get("token") org_urn = token_state.get('org_urn') fetch_count_value = token_state.get('fetch_count_for_api') if not org_urn: logging.error("Organization URN (org_urn) not found in token_state for guarded_fetch_posts.") return "

❌ Configuration error: Organization URN missing.

" if not client_id or client_id == "ENV VAR MISSING": logging.error("Client ID not found or missing in token_state for guarded_fetch_posts.") return "

❌ Configuration error: LinkedIn Client ID missing.

" if fetch_count_value == 0: logging.info("guarded_fetch_posts called, but fetch_count_for_api is 0. Data is fresh.") return "

✅ Data is already up-to-date. No new posts fetched.

" if fetch_count_value is None: # Should ideally not happen with new logic, but as a safeguard logging.warning("fetch_count_for_api is None in guarded_fetch_posts. This might indicate an issue. Defaulting to fetching a standard amount if your API supports it or all.") # Depending on your API, None might mean fetch all or a default. # If your API requires a specific count for "all", you might need to adjust here or in fetch_linkedin_posts_core. try: logging.info(f"Step 1: Fetching core posts for org_urn: {org_urn}. Fetch count parameter for API: {fetch_count_value}") processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_value) if not processed_raw_posts: logging.info("No posts found to process via LinkedIn API after step 1.") return "

â„šī¸ No new LinkedIn posts found to process at this time.

" post_urns = [post["id"] for post in processed_raw_posts if post.get("id")] logging.info(f"Extracted {len(post_urns)} post URNs for further processing.") logging.info("Step 2: Fetching comments via LinkedIn API.") all_comments_data = fetch_comments(client_id, token_dict, post_urns, stats_map) logging.info("Step 3: Analyzing sentiment.") sentiments_per_post = analyze_sentiment(all_comments_data) logging.info("Step 4: Compiling detailed posts.") detailed_posts = compile_detailed_posts(processed_raw_posts, stats_map, sentiments_per_post) logging.info("Step 5: Preparing data for Bubble.") li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_posts, all_comments_data) logging.info("Step 6: Uploading data to Bubble.") bulk_upload_to_bubble(li_posts, "LI_posts") bulk_upload_to_bubble(li_post_stats, "LI_post_stats") bulk_upload_to_bubble(li_post_comments, "LI_post_comments") action_performed = f"Initial data fetch (~{fetch_count_value} posts)" if fetch_count_value == DEFAULT_INITIAL_FETCH_COUNT else f"Data update (target: ~{fetch_count_value} posts)" logging.info(f"Successfully completed: {action_performed}. Uploaded posts and comments to Bubble.") return f"

✅ {action_performed} complete. Posts and comments from LinkedIn uploaded to Bubble.

" except ValueError as ve: logging.error(f"ValueError during LinkedIn data processing: {ve}") return f"

❌ Error: {html.escape(str(ve))}

" except Exception as e: logging.exception("An unexpected error occurred in guarded_fetch_posts.") return "

❌ An unexpected error occurred. Please check logs.

" def guarded_fetch_dashboard(token_state): if not token_state or not token_state.get("token"): return "❌ Access denied. No token available for dashboard." if token_state.get("bubble_posts_df") is not None and not token_state["bubble_posts_df"].empty: return f"

Dashboard would show {len(token_state['bubble_posts_df'])} posts from Bubble.

" else: return "

No posts loaded from Bubble yet for the dashboard.

" def guarded_fetch_analytics(token_state): if not token_state or not token_state.get("token"): return ("❌ Access denied. No token available for analytics.", None, None, None, None, None, None, None) return fetch_and_render_analytics(token_state.get("client_id"), token_state.get("token")) def run_mentions_and_load(token_state): if not token_state or not token_state.get("token"): return ("❌ Access denied. No token available for mentions.", None) return generate_mentions_dashboard(token_state.get("client_id"), token_state.get("token")) # --- Gradio UI Blocks --- with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"), title="LinkedIn Post Viewer & Analytics") as app: # Define DEFAULT_INITIAL_FETCH_COUNT here if needed by guarded_fetch_posts for its messages, # or ensure it's passed/accessible if logic depends on it there. # For now, it's only used within process_and_store_bubble_token. DEFAULT_INITIAL_FETCH_COUNT = 100 token_state = gr.State(value={ "token": None, "client_id": None, "org_urn": None, "bubble_posts_df": None, "fetch_count_for_api": 0 }) gr.Markdown("# 🚀 LinkedIn Organization Post Viewer & Analytics") gr.Markdown("Token is supplied via URL parameter for Bubble.io lookup. Then explore dashboard and analytics.") url_user_token_display = gr.Textbox(label="User Token (from URL - Hidden)", interactive=False, visible=False) status_box = gr.Textbox(label="Overall LinkedIn Token Status", interactive=False, value="Initializing...") org_urn_display = gr.Textbox(label="Organization URN (from URL - Hidden)", interactive=False, visible=False) app.load(fn=get_url_user_token, inputs=None, outputs=[url_user_token_display, org_urn_display]) with gr.Tabs(): with gr.TabItem("1ī¸âƒŖ Dashboard & Sync"): gr.Markdown("System checks for existing data in Bubble. The button below will activate if new posts need to be fetched or updated from LinkedIn.") sync_posts_to_bubble_btn = gr.Button( value="🔄 Sync LinkedIn Posts", variant="primary", visible=False, interactive=False ) dashboard_html_output = gr.HTML( "

System initializing... " "Checking for existing data in Bubble and LinkedIn token.

" ) org_urn_display.change( fn=process_and_store_bubble_token, inputs=[url_user_token_display, org_urn_display, token_state], outputs=[status_box, token_state, sync_posts_to_bubble_btn] ) url_user_token_display.change( fn=process_and_store_bubble_token, inputs=[url_user_token_display, org_urn_display, token_state], outputs=[status_box, token_state, sync_posts_to_bubble_btn] ) sync_posts_to_bubble_btn.click( fn=guarded_fetch_posts, inputs=[token_state], outputs=[dashboard_html_output] ).then( fn=process_and_store_bubble_token, inputs=[url_user_token_display, org_urn_display, token_state], outputs=[status_box, token_state, sync_posts_to_bubble_btn] ) with gr.TabItem("2ī¸âƒŖ Analytics"): gr.Markdown("View follower count and monthly gains for your organization (requires LinkedIn token).") fetch_analytics_btn = gr.Button("📈 Fetch Follower Analytics", variant="primary") follower_count = gr.Markdown("

Waiting for LinkedIn token...

") with gr.Row(): follower_plot, growth_plot = gr.Plot(), gr.Plot() with gr.Row(): eng_rate_plot = gr.Plot() with gr.Row(): interaction_plot = gr.Plot() with gr.Row(): eb_plot = gr.Plot() with gr.Row(): mentions_vol_plot, mentions_sentiment_plot = gr.Plot(), gr.Plot() fetch_analytics_btn.click( fn=guarded_fetch_analytics, inputs=[token_state], outputs=[follower_count, follower_plot, growth_plot, eng_rate_plot, interaction_plot, eb_plot, mentions_vol_plot, mentions_sentiment_plot] ) with gr.TabItem("3ī¸âƒŖ Mentions"): gr.Markdown("Analyze sentiment of recent posts that mention your organization (requires LinkedIn token).") fetch_mentions_btn = gr.Button("🧠 Fetch Mentions & Sentiment", variant="primary") mentions_html = gr.HTML("

Waiting for LinkedIn token...

") mentions_plot = gr.Plot() fetch_mentions_btn.click( fn=run_mentions_and_load, inputs=[token_state], outputs=[mentions_html, mentions_plot] ) app.load(fn=lambda ts: check_token_status(ts), inputs=[token_state], outputs=status_box) gr.Timer(15.0).tick(fn=lambda ts: check_token_status(ts), inputs=[token_state], outputs=status_box) if __name__ == "__main__": if not os.environ.get("Linkedin_client_id"): logging.warning("WARNING: The 'Linkedin_client_id' environment variable is not set. The application may not function correctly for LinkedIn API calls.") app.launch(server_name="0.0.0.0", server_port=7860, share=True)