Spaces:
Running
Running
# apis/Bubble_API_Calls.py | |
""" | |
This module provides functions to read data from the Bubble.io API. | |
It is used to fetch the LinkedIn token and all pre-processed application data. | |
All data writing/updating functions have been removed. | |
""" | |
import os | |
import json | |
import requests | |
import pandas as pd | |
import logging | |
from typing import Dict, Any, Optional, List, Tuple | |
from config import BUBBLE_API_KEY_PRIVATE_ENV_VAR, BUBBLE_API_ENDPOINT_ENV_VAR, BUBBLE_APP_NAME_ENV_VAR | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def fetch_linkedin_token_from_bubble(url_user_token_str: str) -> Optional[dict]: | |
""" | |
Fetches LinkedIn access token from Bubble.io using a state value. | |
The token is expected in a 'Raw_text' field as a JSON string. | |
""" | |
bubble_api_key = os.environ.get(BUBBLE_API_KEY_PRIVATE_ENV_VAR) | |
if not bubble_api_key: | |
logger.error("Bubble API environment variables key not set.") | |
return None | |
if not url_user_token_str or "not found" in url_user_token_str or "Could not access" in url_user_token_str: | |
logger.info(f"No valid user token provided to query Bubble: {url_user_token_str}") | |
return None | |
base_url = "https://app.ingaze.ai/version-test/api/1.1/obj/Linkedin_access" | |
constraints = [{"key": "state", "constraint_type": "equals", "value": url_user_token_str}] | |
params = {'constraints': json.dumps(constraints)} | |
headers = {"Authorization": f"Bearer {bubble_api_key}"} | |
logger.info(f"Attempting to fetch LinkedIn token from Bubble for state: {url_user_token_str}") | |
try: | |
response = requests.get(base_url, params=params, headers=headers, timeout=15) | |
response.raise_for_status() | |
data = response.json() | |
results = data.get("response", {}).get("results", []) | |
if not results: | |
logger.warning(f"No token results found in Bubble for state: {url_user_token_str}") | |
return None | |
raw_text = results[0].get("Raw_text") | |
if not raw_text or not isinstance(raw_text, str): | |
logger.warning(f"Token 'Raw_text' field is missing or not a string. Value: {raw_text}") | |
return None | |
parsed_token = json.loads(raw_text) | |
if isinstance(parsed_token, dict) and "access_token" in parsed_token: | |
logger.info("Successfully fetched and parsed LinkedIn token from Bubble.") | |
return parsed_token | |
else: | |
logger.error(f"Parsed token from Bubble is not a valid dictionary with an access_token. Parsed value: {parsed_token}") | |
return None | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Bubble API request error while fetching token: {e}", exc_info=True) | |
except json.JSONDecodeError as e: | |
logger.error(f"Error decoding JSON from Bubble token response: {e}", exc_info=True) | |
except Exception as e: | |
logger.error(f"An unexpected error occurred while fetching the token from Bubble: {e}", exc_info=True) | |
return None | |
def fetch_linkedin_posts_data_from_bubble( | |
data_type: str, | |
constraint_key: str, | |
constraint_type: str, | |
constraint_value: any, | |
additional_constraints: list = None | |
) -> Tuple[Optional[pd.DataFrame], Optional[str]]: | |
""" | |
Fetches data from the Bubble.io Data API, handling pagination to retrieve all results. | |
""" | |
bubble_api_key = os.environ.get(BUBBLE_API_KEY_PRIVATE_ENV_VAR) | |
if not bubble_api_key: | |
error_msg = "Bubble API not set." | |
logger.error(error_msg) | |
return None, error_msg | |
base_url = f"{BUBBLE_API_ENDPOINT_ENV_VAR}/{data_type}" | |
headers = {"Authorization": f"Bearer {bubble_api_key}"} | |
constraints = [{"key": constraint_key, "constraint_type": constraint_type, "value": constraint_value}] | |
if additional_constraints: | |
constraints.extend(additional_constraints) | |
all_results = [] | |
cursor = 0 | |
logger.info(f"Fetching data from Bubble type '{data_type}' where '{constraint_key}' is '{constraint_value}'...") | |
while True: | |
params = {'constraints': json.dumps(constraints), 'cursor': cursor, 'limit': 100} | |
try: | |
response = requests.get(base_url, params=params, headers=headers, timeout=30) | |
response.raise_for_status() | |
data = response.json().get("response", {}) | |
results_on_page = data.get("results", []) | |
if results_on_page: | |
all_results.extend(results_on_page) | |
remaining = data.get("remaining", 0) | |
if remaining == 0: | |
break | |
cursor += len(results_on_page) | |
except requests.exceptions.RequestException as e: | |
error_msg = f"Bubble API Error fetching '{data_type}': {e}" | |
logger.error(error_msg, exc_info=True) | |
return None, error_msg | |
except json.JSONDecodeError as e: | |
error_msg = f"JSON Decode Error fetching '{data_type}': {e}. Response text: {response.text}" | |
logger.error(error_msg, exc_info=True) | |
return None, error_msg | |
if not all_results: | |
logger.info(f"No data found in Bubble for the given constraints in data type '{data_type}'.") | |
return pd.DataFrame(), None | |
logger.info(f"Successfully retrieved a total of {len(all_results)} records from '{data_type}'.") | |
return pd.DataFrame(all_results), None | |