Spaces:
Running
Running
Update app.py
Browse files
app.py
CHANGED
|
@@ -29,6 +29,14 @@ from Linkedin_Data_API_Calls import (
|
|
| 29 |
# Configure logging
|
| 30 |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
| 31 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 32 |
def check_token_status(token_state):
|
| 33 |
"""Checks the status of the LinkedIn token."""
|
| 34 |
return "β
Token available" if token_state and token_state.get("token") else "β Token not available"
|
|
@@ -47,7 +55,6 @@ def process_and_store_bubble_token(url_user_token, org_urn, token_state):
|
|
| 47 |
}
|
| 48 |
new_state.update({"org_urn": org_urn, "bubble_posts_df": new_state.get("bubble_posts_df"), "fetch_count_for_api": new_state.get("fetch_count_for_api", 0)})
|
| 49 |
|
| 50 |
-
# Default button update: hidden and non-interactive
|
| 51 |
button_update = gr.update(visible=False, interactive=False, value="π Sync LinkedIn Posts")
|
| 52 |
|
| 53 |
client_id = os.environ.get("Linkedin_client_id")
|
|
@@ -74,28 +81,25 @@ def process_and_store_bubble_token(url_user_token, org_urn, token_state):
|
|
| 74 |
new_state["token"] = None
|
| 75 |
logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.")
|
| 76 |
|
| 77 |
-
# Fetch posts from Bubble
|
| 78 |
current_org_urn = new_state.get("org_urn")
|
| 79 |
-
bubble_posts_df = None
|
| 80 |
if current_org_urn:
|
| 81 |
logging.info(f"Attempting to fetch posts from Bubble for org_urn: {current_org_urn}")
|
| 82 |
try:
|
| 83 |
fetched_df, error_message = fetch_linkedin_posts_data_from_bubble(current_org_urn, "LI_posts")
|
| 84 |
if error_message:
|
| 85 |
logging.warning(f"Error reported by fetch_linkedin_posts_data_from_bubble: {error_message}. Treating as no data.")
|
|
|
|
| 86 |
else:
|
| 87 |
-
bubble_posts_df = fetched_df
|
| 88 |
-
new_state["bubble_posts_df"] = bubble_posts_df
|
| 89 |
except Exception as e:
|
| 90 |
logging.error(f"β Error fetching posts from Bubble: {e}. Treating as no data.")
|
| 91 |
-
new_state["bubble_posts_df"] =
|
| 92 |
else:
|
| 93 |
logging.warning("Org URN not available in state. Cannot fetch posts from Bubble.")
|
|
|
|
|
|
|
| 94 |
|
| 95 |
-
# Logic for determining fetch/update based on bubble_posts_df
|
| 96 |
-
# DATE_COLUMN_NAME is now 'published_at' and contains ISO datetime strings.
|
| 97 |
DATE_COLUMN_NAME = 'published_at'
|
| 98 |
-
DEFAULT_INITIAL_FETCH_COUNT = 100 # Standard number of posts for initial fetch
|
| 99 |
|
| 100 |
if new_state["bubble_posts_df"] is None or new_state["bubble_posts_df"].empty:
|
| 101 |
logging.info(f"βΉοΈ No posts found in Bubble or DataFrame is empty. Button to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts will be visible.")
|
|
@@ -103,7 +107,7 @@ def process_and_store_bubble_token(url_user_token, org_urn, token_state):
|
|
| 103 |
button_update = gr.update(value=f"π Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} LinkedIn Posts", visible=True, interactive=True)
|
| 104 |
else:
|
| 105 |
try:
|
| 106 |
-
df_for_date_check = new_state["bubble_posts_df"].copy()
|
| 107 |
if DATE_COLUMN_NAME not in df_for_date_check.columns:
|
| 108 |
logging.warning(f"Date column '{DATE_COLUMN_NAME}' not found in Bubble posts DataFrame. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.")
|
| 109 |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
|
|
@@ -113,7 +117,6 @@ def process_and_store_bubble_token(url_user_token, org_urn, token_state):
|
|
| 113 |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
|
| 114 |
button_update = gr.update(value=f"π Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Column Empty)", visible=True, interactive=True)
|
| 115 |
else:
|
| 116 |
-
# Convert ISO datetime strings to datetime objects
|
| 117 |
df_for_date_check[DATE_COLUMN_NAME] = pd.to_datetime(df_for_date_check[DATE_COLUMN_NAME], errors='coerce', utc=True)
|
| 118 |
last_post_date_utc = df_for_date_check[DATE_COLUMN_NAME].dropna().max()
|
| 119 |
|
|
@@ -149,10 +152,6 @@ def process_and_store_bubble_token(url_user_token, org_urn, token_state):
|
|
| 149 |
return token_status_message, new_state, button_update
|
| 150 |
|
| 151 |
def guarded_fetch_posts(token_state):
|
| 152 |
-
"""
|
| 153 |
-
Fetches LinkedIn posts based on 'fetch_count_for_api' in token_state,
|
| 154 |
-
analyzes them, and uploads to Bubble.
|
| 155 |
-
"""
|
| 156 |
logging.info("Starting guarded_fetch_posts process.")
|
| 157 |
if not token_state or not token_state.get("token"):
|
| 158 |
logging.error("Access denied for guarded_fetch_posts. No LinkedIn token available.")
|
|
@@ -162,54 +161,83 @@ def guarded_fetch_posts(token_state):
|
|
| 162 |
token_dict = token_state.get("token")
|
| 163 |
org_urn = token_state.get('org_urn')
|
| 164 |
fetch_count_value = token_state.get('fetch_count_for_api')
|
|
|
|
| 165 |
|
| 166 |
if not org_urn:
|
| 167 |
-
logging.error("Organization URN (org_urn) not found in token_state
|
| 168 |
return "<p style='color:red; text-align:center;'>β Configuration error: Organization URN missing.</p>"
|
| 169 |
if not client_id or client_id == "ENV VAR MISSING":
|
| 170 |
-
logging.error("Client ID not found or missing in token_state
|
| 171 |
return "<p style='color:red; text-align:center;'>β Configuration error: LinkedIn Client ID missing.</p>"
|
| 172 |
|
| 173 |
if fetch_count_value == 0:
|
| 174 |
-
logging.info("
|
| 175 |
return "<p style='color:green; text-align:center;'>β
Data is already up-to-date. No new posts fetched.</p>"
|
| 176 |
|
| 177 |
-
if fetch_count_value is None: # Should ideally not happen with new logic, but as a safeguard
|
| 178 |
-
logging.warning("fetch_count_for_api is None in guarded_fetch_posts. This might indicate an issue. Defaulting to fetching a standard amount if your API supports it or all.")
|
| 179 |
-
# Depending on your API, None might mean fetch all or a default.
|
| 180 |
-
# If your API requires a specific count for "all", you might need to adjust here or in fetch_linkedin_posts_core.
|
| 181 |
-
|
| 182 |
try:
|
| 183 |
-
logging.info(f"Step 1: Fetching core posts for org_urn: {org_urn}. Fetch count
|
| 184 |
processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_value)
|
| 185 |
|
| 186 |
if not processed_raw_posts:
|
| 187 |
-
logging.info("No posts
|
| 188 |
-
return "<p style='color:orange; text-align:center;'>βΉοΈ No new LinkedIn posts found to process
|
| 189 |
-
|
| 190 |
-
|
| 191 |
-
|
| 192 |
-
|
| 193 |
-
|
| 194 |
-
|
| 195 |
-
|
| 196 |
-
|
| 197 |
-
|
| 198 |
-
|
| 199 |
-
|
| 200 |
-
|
| 201 |
-
|
| 202 |
-
|
| 203 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 204 |
|
| 205 |
-
logging.info("Step 6: Uploading data to Bubble.")
|
| 206 |
-
bulk_upload_to_bubble(li_posts, "LI_posts")
|
| 207 |
-
bulk_upload_to_bubble(li_post_stats, "LI_post_stats")
|
| 208 |
-
bulk_upload_to_bubble(li_post_comments, "LI_post_comments")
|
| 209 |
|
| 210 |
-
|
| 211 |
-
logging.info(f"Successfully completed: {
|
| 212 |
-
return f"<p style='color:green; text-align:center;'>β
{
|
| 213 |
|
| 214 |
except ValueError as ve:
|
| 215 |
logging.error(f"ValueError during LinkedIn data processing: {ve}")
|
|
@@ -242,17 +270,11 @@ def run_mentions_and_load(token_state):
|
|
| 242 |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"),
|
| 243 |
title="LinkedIn Post Viewer & Analytics") as app:
|
| 244 |
|
| 245 |
-
# Define DEFAULT_INITIAL_FETCH_COUNT here if needed by guarded_fetch_posts for its messages,
|
| 246 |
-
# or ensure it's passed/accessible if logic depends on it there.
|
| 247 |
-
# For now, it's only used within process_and_store_bubble_token.
|
| 248 |
-
DEFAULT_INITIAL_FETCH_COUNT = 100
|
| 249 |
-
|
| 250 |
-
|
| 251 |
token_state = gr.State(value={
|
| 252 |
"token": None,
|
| 253 |
"client_id": None,
|
| 254 |
"org_urn": None,
|
| 255 |
-
"bubble_posts_df":
|
| 256 |
"fetch_count_for_api": 0
|
| 257 |
})
|
| 258 |
|
|
|
|
| 29 |
# Configure logging
|
| 30 |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
| 31 |
|
| 32 |
+
# --- Global Constants ---
|
| 33 |
+
# Standard number of posts for initial fetch
|
| 34 |
+
DEFAULT_INITIAL_FETCH_COUNT = 100
|
| 35 |
+
# Key for post URN in data processed from LinkedIn (e.g., in detailed_posts)
|
| 36 |
+
LINKEDIN_POST_URN_KEY = 'id'
|
| 37 |
+
# Column name for post URN in the DataFrame fetched from Bubble (bubble_posts_df)
|
| 38 |
+
BUBBLE_POST_URN_COLUMN_NAME = 'id' # Adjust if your Bubble 'LI_posts' table uses a different column name for URNs
|
| 39 |
+
|
| 40 |
def check_token_status(token_state):
|
| 41 |
"""Checks the status of the LinkedIn token."""
|
| 42 |
return "β
Token available" if token_state and token_state.get("token") else "β Token not available"
|
|
|
|
| 55 |
}
|
| 56 |
new_state.update({"org_urn": org_urn, "bubble_posts_df": new_state.get("bubble_posts_df"), "fetch_count_for_api": new_state.get("fetch_count_for_api", 0)})
|
| 57 |
|
|
|
|
| 58 |
button_update = gr.update(visible=False, interactive=False, value="π Sync LinkedIn Posts")
|
| 59 |
|
| 60 |
client_id = os.environ.get("Linkedin_client_id")
|
|
|
|
| 81 |
new_state["token"] = None
|
| 82 |
logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.")
|
| 83 |
|
|
|
|
| 84 |
current_org_urn = new_state.get("org_urn")
|
|
|
|
| 85 |
if current_org_urn:
|
| 86 |
logging.info(f"Attempting to fetch posts from Bubble for org_urn: {current_org_urn}")
|
| 87 |
try:
|
| 88 |
fetched_df, error_message = fetch_linkedin_posts_data_from_bubble(current_org_urn, "LI_posts")
|
| 89 |
if error_message:
|
| 90 |
logging.warning(f"Error reported by fetch_linkedin_posts_data_from_bubble: {error_message}. Treating as no data.")
|
| 91 |
+
new_state["bubble_posts_df"] = pd.DataFrame() # Ensure it's an empty DataFrame
|
| 92 |
else:
|
| 93 |
+
new_state["bubble_posts_df"] = fetched_df if fetched_df is not None else pd.DataFrame()
|
|
|
|
| 94 |
except Exception as e:
|
| 95 |
logging.error(f"β Error fetching posts from Bubble: {e}. Treating as no data.")
|
| 96 |
+
new_state["bubble_posts_df"] = pd.DataFrame()
|
| 97 |
else:
|
| 98 |
logging.warning("Org URN not available in state. Cannot fetch posts from Bubble.")
|
| 99 |
+
new_state["bubble_posts_df"] = pd.DataFrame()
|
| 100 |
+
|
| 101 |
|
|
|
|
|
|
|
| 102 |
DATE_COLUMN_NAME = 'published_at'
|
|
|
|
| 103 |
|
| 104 |
if new_state["bubble_posts_df"] is None or new_state["bubble_posts_df"].empty:
|
| 105 |
logging.info(f"βΉοΈ No posts found in Bubble or DataFrame is empty. Button to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts will be visible.")
|
|
|
|
| 107 |
button_update = gr.update(value=f"π Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} LinkedIn Posts", visible=True, interactive=True)
|
| 108 |
else:
|
| 109 |
try:
|
| 110 |
+
df_for_date_check = new_state["bubble_posts_df"].copy()
|
| 111 |
if DATE_COLUMN_NAME not in df_for_date_check.columns:
|
| 112 |
logging.warning(f"Date column '{DATE_COLUMN_NAME}' not found in Bubble posts DataFrame. Assuming initial fetch of {DEFAULT_INITIAL_FETCH_COUNT} posts.")
|
| 113 |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
|
|
|
|
| 117 |
new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
|
| 118 |
button_update = gr.update(value=f"π Fetch Initial {DEFAULT_INITIAL_FETCH_COUNT} (Date Column Empty)", visible=True, interactive=True)
|
| 119 |
else:
|
|
|
|
| 120 |
df_for_date_check[DATE_COLUMN_NAME] = pd.to_datetime(df_for_date_check[DATE_COLUMN_NAME], errors='coerce', utc=True)
|
| 121 |
last_post_date_utc = df_for_date_check[DATE_COLUMN_NAME].dropna().max()
|
| 122 |
|
|
|
|
| 152 |
return token_status_message, new_state, button_update
|
| 153 |
|
| 154 |
def guarded_fetch_posts(token_state):
|
|
|
|
|
|
|
|
|
|
|
|
|
| 155 |
logging.info("Starting guarded_fetch_posts process.")
|
| 156 |
if not token_state or not token_state.get("token"):
|
| 157 |
logging.error("Access denied for guarded_fetch_posts. No LinkedIn token available.")
|
|
|
|
| 161 |
token_dict = token_state.get("token")
|
| 162 |
org_urn = token_state.get('org_urn')
|
| 163 |
fetch_count_value = token_state.get('fetch_count_for_api')
|
| 164 |
+
bubble_posts_df = token_state.get("bubble_posts_df") # Get existing posts
|
| 165 |
|
| 166 |
if not org_urn:
|
| 167 |
+
logging.error("Organization URN (org_urn) not found in token_state.")
|
| 168 |
return "<p style='color:red; text-align:center;'>β Configuration error: Organization URN missing.</p>"
|
| 169 |
if not client_id or client_id == "ENV VAR MISSING":
|
| 170 |
+
logging.error("Client ID not found or missing in token_state.")
|
| 171 |
return "<p style='color:red; text-align:center;'>β Configuration error: LinkedIn Client ID missing.</p>"
|
| 172 |
|
| 173 |
if fetch_count_value == 0:
|
| 174 |
+
logging.info("Data is fresh. No new posts fetched based on date check.")
|
| 175 |
return "<p style='color:green; text-align:center;'>β
Data is already up-to-date. No new posts fetched.</p>"
|
| 176 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 177 |
try:
|
| 178 |
+
logging.info(f"Step 1: Fetching core posts for org_urn: {org_urn}. Fetch count: {fetch_count_value}")
|
| 179 |
processed_raw_posts, stats_map, _ = fetch_linkedin_posts_core(client_id, token_dict, org_urn, count=fetch_count_value)
|
| 180 |
|
| 181 |
if not processed_raw_posts:
|
| 182 |
+
logging.info("No posts retrieved from LinkedIn API.")
|
| 183 |
+
return "<p style='color:orange; text-align:center;'>βΉοΈ No new LinkedIn posts found to process.</p>"
|
| 184 |
+
|
| 185 |
+
# --- Filter out posts already in Bubble ---
|
| 186 |
+
existing_post_urns = set()
|
| 187 |
+
if bubble_posts_df is not None and not bubble_posts_df.empty and BUBBLE_POST_URN_COLUMN_NAME in bubble_posts_df.columns:
|
| 188 |
+
existing_post_urns = set(bubble_posts_df[BUBBLE_POST_URN_COLUMN_NAME].dropna().astype(str))
|
| 189 |
+
logging.info(f"Found {len(existing_post_urns)} existing post URNs in Bubble data.")
|
| 190 |
+
else:
|
| 191 |
+
logging.info("No existing posts found in Bubble data or URN column missing; all fetched posts will be considered new.")
|
| 192 |
+
|
| 193 |
+
# Filter processed_raw_posts before compiling detailed_posts
|
| 194 |
+
new_raw_posts = [
|
| 195 |
+
post for post in processed_raw_posts
|
| 196 |
+
if str(post.get(LINKEDIN_POST_URN_KEY)) not in existing_post_urns
|
| 197 |
+
]
|
| 198 |
+
|
| 199 |
+
if not new_raw_posts:
|
| 200 |
+
logging.info("All fetched LinkedIn posts are already present in Bubble. No new posts to add.")
|
| 201 |
+
return "<p style='color:green; text-align:center;'>β
All fetched posts already exist in Bubble. Data is up-to-date.</p>"
|
| 202 |
+
|
| 203 |
+
logging.info(f"Identified {len(new_raw_posts)} new posts to process after filtering against Bubble data.")
|
| 204 |
+
|
| 205 |
+
# Continue processing only with new_raw_posts
|
| 206 |
+
post_urns_to_process = [post[LINKEDIN_POST_URN_KEY] for post in new_raw_posts if post.get(LINKEDIN_POST_URN_KEY)]
|
| 207 |
+
|
| 208 |
+
logging.info("Step 2: Fetching comments for new posts via LinkedIn API.")
|
| 209 |
+
# Adjust stats_map if it's keyed by URNs; ensure it's relevant for new_raw_posts
|
| 210 |
+
# For simplicity, assuming fetch_comments and subsequent steps can handle potentially fewer URNs
|
| 211 |
+
all_comments_data = fetch_comments(client_id, token_dict, post_urns_to_process, stats_map)
|
| 212 |
+
|
| 213 |
+
logging.info("Step 3: Analyzing sentiment for new posts.")
|
| 214 |
+
sentiments_per_post = analyze_sentiment(all_comments_data) # Assumes all_comments_data is now for new posts
|
| 215 |
+
|
| 216 |
+
logging.info("Step 4: Compiling detailed data for new posts.")
|
| 217 |
+
# Pass new_raw_posts to compile_detailed_posts
|
| 218 |
+
detailed_new_posts = compile_detailed_posts(new_raw_posts, stats_map, sentiments_per_post)
|
| 219 |
+
|
| 220 |
+
logging.info("Step 5: Preparing data for Bubble (only new posts).")
|
| 221 |
+
# Pass detailed_new_posts to prepare_data_for_bubble
|
| 222 |
+
li_posts, li_post_stats, li_post_comments = prepare_data_for_bubble(detailed_new_posts, all_comments_data)
|
| 223 |
+
|
| 224 |
+
logging.info(f"Step 6: Uploading {len(li_posts)} new posts and their related data to Bubble.")
|
| 225 |
+
if li_posts: # Ensure there's actually something to upload
|
| 226 |
+
bulk_upload_to_bubble(li_posts, "LI_posts")
|
| 227 |
+
if li_post_stats:
|
| 228 |
+
bulk_upload_to_bubble(li_post_stats, "LI_post_stats")
|
| 229 |
+
if li_post_comments:
|
| 230 |
+
bulk_upload_to_bubble(li_post_comments, "LI_post_comments")
|
| 231 |
+
|
| 232 |
+
action_message = f"uploaded {len(li_posts)} new post(s)"
|
| 233 |
+
else:
|
| 234 |
+
action_message = "found no new posts to upload after detailed processing"
|
| 235 |
+
logging.info("No new posts to upload after final preparation for Bubble.")
|
| 236 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 237 |
|
| 238 |
+
final_message_verb = "Initial data fetch" if fetch_count_value == DEFAULT_INITIAL_FETCH_COUNT and not existing_post_urns else "Data update"
|
| 239 |
+
logging.info(f"Successfully completed: {final_message_verb}. {action_message} to Bubble.")
|
| 240 |
+
return f"<p style='color:green; text-align:center;'>β
{final_message_verb} complete. Successfully {action_message} to Bubble.</p>"
|
| 241 |
|
| 242 |
except ValueError as ve:
|
| 243 |
logging.error(f"ValueError during LinkedIn data processing: {ve}")
|
|
|
|
| 270 |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"),
|
| 271 |
title="LinkedIn Post Viewer & Analytics") as app:
|
| 272 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 273 |
token_state = gr.State(value={
|
| 274 |
"token": None,
|
| 275 |
"client_id": None,
|
| 276 |
"org_urn": None,
|
| 277 |
+
"bubble_posts_df": pd.DataFrame(), # Initialize with empty DataFrame
|
| 278 |
"fetch_count_for_api": 0
|
| 279 |
})
|
| 280 |
|