File size: 14,254 Bytes
fed4e5b
 
 
5c5c0fc
 
fed4e5b
 
 
 
5c5c0fc
fed4e5b
 
 
 
 
5c5c0fc
fed4e5b
 
 
5c5c0fc
 
9da7d8b
fed4e5b
 
5c5c0fc
 
 
 
fed4e5b
 
 
 
 
 
5c5c0fc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fed4e5b
 
5c5c0fc
 
fed4e5b
 
 
 
5c5c0fc
fed4e5b
5c5c0fc
 
fed4e5b
 
5c5c0fc
fed4e5b
5c5c0fc
fed4e5b
5c5c0fc
 
fed4e5b
 
 
5c5c0fc
fed4e5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5c5c0fc
 
 
 
9da7d8b
 
5c5c0fc
 
 
 
 
 
 
 
 
 
 
 
 
 
fed4e5b
 
5c5c0fc
 
 
 
 
 
fed4e5b
5c5c0fc
 
 
 
fed4e5b
 
5c5c0fc
 
 
 
 
 
fed4e5b
5c5c0fc
 
 
 
 
 
 
 
fed4e5b
5c5c0fc
 
 
 
fed4e5b
5c5c0fc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fed4e5b
5c5c0fc
 
 
fed4e5b
5c5c0fc
fed4e5b
 
 
 
 
 
5c5c0fc
fed4e5b
5c5c0fc
 
 
fed4e5b
 
5c5c0fc
fed4e5b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# state_manager.py
"""
Manages the application state, including token processing,
initial data loading from Bubble, and determining sync requirements
based on the operations log.
"""
import pandas as pd
import logging
import os
from datetime import timezone # Python's datetime, not to be confused with pandas'
import gradio as gr

# Assuming Bubble_API_Calls contains fetch_linkedin_token_from_bubble and fetch_linkedin_posts_data_from_bubble
from Bubble_API_Calls import (
    fetch_linkedin_token_from_bubble,
    fetch_linkedin_posts_data_from_bubble # This is generic, used for all tables
)
# Assuming config.py contains all necessary constants
from config import (
    DEFAULT_INITIAL_FETCH_COUNT, DEFAULT_POSTS_UPDATE_FETCH_COUNT,
    BUBBLE_POST_DATE_COLUMN_NAME, BUBBLE_POSTS_TABLE_NAME,
    BUBBLE_POST_STATS_TABLE_NAME,
    BUBBLE_MENTIONS_TABLE_NAME, BUBBLE_MENTIONS_DATE_COLUMN_NAME,
    BUBBLE_FOLLOWER_STATS_TABLE_NAME, FOLLOWER_STATS_TYPE_COLUMN, FOLLOWER_STATS_CATEGORY_COLUMN,
    LINKEDIN_CLIENT_ID_ENV_VAR,
    BUBBLE_OPERATIONS_LOG_TABLE_NAME, BUBBLE_OPERATIONS_LOG_DATE_COLUMN,
    BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN,
    LOG_SUBJECT_POSTS, LOG_SUBJECT_MENTIONS, LOG_SUBJECT_FOLLOWER_STATS
)

def check_token_status(token_state):
    """Checks the status of the LinkedIn token."""
    return "βœ… Token available" if token_state and token_state.get("token") else "❌ Token not available"

def get_last_sync_attempt_date(operations_log_df, subject, org_urn):
    """
    Retrieves the last sync attempt date for a given subject and organization URN
    from the operations log DataFrame.

    Args:
        operations_log_df (pd.DataFrame): DataFrame containing operations log data.
                                          Expected columns defined in config:
                                          BUBBLE_OPERATIONS_LOG_DATE_COLUMN,
                                          BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN,
                                          BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN.
        subject (str): The subject of the sync operation (e.g., "post", "mention").
        org_urn (str): The organization URN.

    Returns:
        pd.Timestamp: The last sync attempt date (UTC), or pd.NaT if no relevant log entry is found.
    """
    if operations_log_df.empty or not org_urn:
        return pd.NaT

    # Ensure required columns exist
    required_cols = [BUBBLE_OPERATIONS_LOG_DATE_COLUMN, BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN, BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN]
    if not all(col in operations_log_df.columns for col in required_cols):
        logging.warning(f"Operations log DF is missing one or more required columns: {required_cols}")
        return pd.NaT

    try:
        # Filter for the specific subject and organization URN
        # Ensure data types are consistent for comparison, especially org_urn
        filtered_df = operations_log_df[
            (operations_log_df[BUBBLE_OPERATIONS_LOG_SUBJECT_COLUMN].astype(str) == str(subject)) &
            (operations_log_df[BUBBLE_OPERATIONS_LOG_ORG_URN_COLUMN].astype(str) == str(org_urn))
        ]

        if filtered_df.empty:
            return pd.NaT

        # Convert date column to datetime objects (UTC) and find the maximum (latest)
        # The dates should ideally be stored in UTC or converted upon fetch.
        # Assuming fetch_linkedin_posts_data_from_bubble handles date parsing correctly or provides strings.
        dates = pd.to_datetime(filtered_df[BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True)
        return dates.dropna().max()
    except Exception as e:
        logging.error(f"Error processing operations log for last sync attempt date: {e}", exc_info=True)
        return pd.NaT


def process_and_store_bubble_token(url_user_token, org_urn, token_state):
    """
    Processes user token, fetches LinkedIn token, fetches existing Bubble data (posts, mentions, follower stats, operations log),
    and determines if a sync is needed for each data type based on the operations log.
    Updates token state and UI for the sync button.
    """
    logging.info(f"Processing token with URL user token: '{url_user_token}', Org URN: '{org_urn}'")

    new_state = token_state.copy() if token_state else {}
    new_state.update({
        "token": new_state.get("token"), # Preserve existing token if any
        "client_id": new_state.get("client_id"),
        "org_urn": org_urn,
        "bubble_posts_df": new_state.get("bubble_posts_df", pd.DataFrame()),
        "fetch_count_for_api": 0, # Will be determined based on log
        "bubble_mentions_df": new_state.get("bubble_mentions_df", pd.DataFrame()),
        "mentions_should_sync_now": False, # Will be determined based on log
        "bubble_follower_stats_df": new_state.get("bubble_follower_stats_df", pd.DataFrame()),
        "fs_should_sync_now": False, # Will be determined based on log
        "bubble_operations_log_df": new_state.get("bubble_operations_log_df", pd.DataFrame()), # NEW
        "url_user_token_temp_storage": url_user_token
    })

    button_update = gr.update(visible=False, interactive=False, value="πŸ”„ Sync LinkedIn Data")

    client_id = os.environ.get(LINKEDIN_CLIENT_ID_ENV_VAR)
    new_state["client_id"] = client_id if client_id else "ENV VAR MISSING"
    if not client_id: logging.error(f"CRITICAL ERROR: '{LINKEDIN_CLIENT_ID_ENV_VAR}' environment variable not set.")

    if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token:
        logging.info(f"Attempting to fetch LinkedIn token from Bubble with user token: {url_user_token}")
        try:
            parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token)
            if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token:
                new_state["token"] = parsed_linkedin_token
                logging.info("βœ… LinkedIn Token successfully fetched from Bubble.")
            else:
                new_state["token"] = None
                logging.warning(f"❌ Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}")
        except Exception as e:
            new_state["token"] = None
            logging.error(f"❌ Exception while fetching LinkedIn token from Bubble: {e}", exc_info=True)
    else:
        new_state["token"] = None
        logging.info("No valid URL user token provided for LinkedIn token fetch, or an error was indicated.")

    current_org_urn = new_state.get("org_urn")
    if current_org_urn:
        data_tables_to_fetch = {
            "bubble_posts_df": BUBBLE_POSTS_TABLE_NAME,
            "bubble_mentions_df": BUBBLE_MENTIONS_TABLE_NAME,
            "bubble_follower_stats_df": BUBBLE_FOLLOWER_STATS_TABLE_NAME,
            "bubble_operations_log_df": BUBBLE_OPERATIONS_LOG_TABLE_NAME, # NEW
            "bubble_post_stats_df": BUBBLE_POST_STATS_TABLE_NAME
        }
        for state_key, table_name in data_tables_to_fetch.items():
            logging.info(f"Attempting to fetch {table_name} from Bubble for org_urn: {current_org_urn}")
            try:
                fetched_df, error_message = fetch_linkedin_posts_data_from_bubble(current_org_urn, table_name)
                new_state[state_key] = pd.DataFrame() if error_message or fetched_df is None else fetched_df
                if error_message: logging.warning(f"Error fetching {table_name} from Bubble: {error_message}.")
                # Ensure date column in operations log is parsed correctly if it's fetched as string
                if state_key == "bubble_operations_log_df" and not new_state[state_key].empty and BUBBLE_OPERATIONS_LOG_DATE_COLUMN in new_state[state_key].columns:
                    new_state[state_key][BUBBLE_OPERATIONS_LOG_DATE_COLUMN] = pd.to_datetime(new_state[state_key][BUBBLE_OPERATIONS_LOG_DATE_COLUMN], errors='coerce', utc=True)

            except Exception as e:
                logging.error(f"❌ Error fetching {table_name} from Bubble: {e}.", exc_info=True)
                new_state[state_key] = pd.DataFrame()
    else:
        logging.warning("Org URN not available in state. Cannot fetch data from Bubble.")
        for key in ["bubble_posts_df", "bubble_mentions_df", "bubble_follower_stats_df", "bubble_operations_log_df"]:
            new_state[key] = pd.DataFrame()

    # --- Determine sync needs based on Operations Log ---
    ops_log_df = new_state.get("bubble_operations_log_df", pd.DataFrame())
    now_utc = pd.Timestamp.now(tz='UTC')

    # 1. Posts Sync Logic
    last_post_sync_attempt = get_last_sync_attempt_date(ops_log_df, LOG_SUBJECT_POSTS, current_org_urn)
    if pd.isna(last_post_sync_attempt):
        logging.info(f"ℹ️ No previous '{LOG_SUBJECT_POSTS}' sync attempt logged. Setting to fetch initial {DEFAULT_INITIAL_FETCH_COUNT} posts.")
        new_state['fetch_count_for_api'] = DEFAULT_INITIAL_FETCH_COUNT
    else:
        days_since_last_attempt = (now_utc.normalize() - last_post_sync_attempt.normalize()).days
        if days_since_last_attempt >= 7:
            # Dynamic fetch count based on how many weeks have passed, or a fixed update count
            # For simplicity, using DEFAULT_POSTS_UPDATE_FETCH_COUNT
            new_state['fetch_count_for_api'] = DEFAULT_POSTS_UPDATE_FETCH_COUNT
            logging.info(f"Posts sync attempt is {days_since_last_attempt} days old. Setting fetch count to {new_state['fetch_count_for_api']}.")
        else:
            new_state['fetch_count_for_api'] = 0
            logging.info(f"Posts sync attempt was recent ({days_since_last_attempt} days ago). No new posts fetch scheduled based on log.")

    # 2. Mentions Sync Logic
    last_mention_sync_attempt = get_last_sync_attempt_date(ops_log_df, LOG_SUBJECT_MENTIONS, current_org_urn)
    if pd.isna(last_mention_sync_attempt):
        new_state['mentions_should_sync_now'] = True
        logging.info(f"Mentions sync needed: No previous '{LOG_SUBJECT_MENTIONS}' sync attempt logged.")
    else:
        days_since_last_attempt_mentions = (now_utc.normalize() - last_mention_sync_attempt.normalize()).days
        if days_since_last_attempt_mentions >= 7:
            new_state['mentions_should_sync_now'] = True
            logging.info(f"Mentions sync needed: Last attempt was {days_since_last_attempt_mentions} days ago.")
        else:
            new_state['mentions_should_sync_now'] = False
            logging.info(f"Mentions sync attempt was recent ({days_since_last_attempt_mentions} days ago). Sync not scheduled.")

    # 3. Follower Stats Sync Logic
    last_fs_sync_attempt = get_last_sync_attempt_date(ops_log_df, LOG_SUBJECT_FOLLOWER_STATS, current_org_urn)
    fs_df_current = new_state.get("bubble_follower_stats_df", pd.DataFrame())
    
    demographics_missing = False
    if fs_df_current.empty:
        demographics_missing = True # If entire table is empty, demographics are missing
        logging.info("Follower stats: Main table is empty, considering demographics missing.")
    elif FOLLOWER_STATS_TYPE_COLUMN not in fs_df_current.columns:
        demographics_missing = True # If type column is missing, cannot check demographics
        logging.info(f"Follower stats: Column '{FOLLOWER_STATS_TYPE_COLUMN}' is missing, considering demographics missing.")
    else:
        # Check if any rows exist that are NOT 'follower_gains_monthly'
        if fs_df_current[fs_df_current[FOLLOWER_STATS_TYPE_COLUMN] != 'follower_gains_monthly'].empty:
            demographics_missing = True
            logging.info("Follower stats: Demographic data (non-monthly types) is missing.")

    time_based_need_fs = False
    if pd.isna(last_fs_sync_attempt):
        time_based_need_fs = True
        logging.info(f"Follower stats sync needed: No previous '{LOG_SUBJECT_FOLLOWER_STATS}' sync attempt logged.")
    else:
        start_of_current_month = now_utc.normalize().replace(day=1)
        # Ensure last_fs_sync_attempt is timezone-aware (should be by get_last_sync_attempt_date)
        if last_fs_sync_attempt.tzinfo is None: # Should not happen if get_last_sync_attempt_date works
            last_fs_sync_attempt = last_fs_sync_attempt.tz_localize('UTC')
        
        if last_fs_sync_attempt < start_of_current_month:
            time_based_need_fs = True
            logging.info(f"Follower stats sync needed: Last attempt {last_fs_sync_attempt.date()} is before current month start {start_of_current_month.date()}.")

    if time_based_need_fs or demographics_missing:
        new_state['fs_should_sync_now'] = True
        if demographics_missing and not time_based_need_fs:
            logging.info("Follower stats sync triggered: Demographic data missing, even if last sync attempt is recent.")
        elif time_based_need_fs:
             logging.info("Follower stats sync triggered by schedule.")
    else:
        new_state['fs_should_sync_now'] = False
        logging.info("Follower stats sync not currently required by schedule or data presence.")

    # Update Sync Button based on determined needs
    sync_actions = []
    if new_state.get('fetch_count_for_api', 0) > 0:
        sync_actions.append(f"Posts ({new_state['fetch_count_for_api']})")
    if new_state.get('mentions_should_sync_now', False):
        sync_actions.append("Mentions")
    if new_state.get('fs_should_sync_now', False):
        sync_actions.append("Follower Stats")

    if new_state["token"] and sync_actions:
        button_label = f"πŸ”„ Sync LinkedIn Data ({', '.join(sync_actions)})"
        button_update = gr.update(value=button_label, visible=True, interactive=True)
    elif new_state["token"]:
        button_label = "βœ… Data Up-to-Date (based on sync log)"
        button_update = gr.update(value=button_label, visible=True, interactive=False)
    else: # No token
        button_update = gr.update(visible=False, interactive=False, value="πŸ”„ Sync LinkedIn Data")


    token_status_message = check_token_status(new_state)
    logging.info(f"Token processing complete. Status: {token_status_message}. Button: {button_update.get('value', 'N/A') if button_update else 'N/A'}. Sync actions needed: {sync_actions}")
    return token_status_message, new_state, button_update