LinkedinMonitor / apis /Bubble_API_Calls.py
GuglielmoTor's picture
Update apis/Bubble_API_Calls.py
483353d verified
# apis/Bubble_API_Calls.py
"""
This module provides functions to read data from the Bubble.io API.
It is used to fetch the LinkedIn token and all pre-processed application data.
All data writing/updating functions have been removed.
"""
import os
import json
import requests
import pandas as pd
import logging
from typing import Dict, Any, Optional, List, Tuple
from config import BUBBLE_API_KEY_PRIVATE_ENV_VAR, BUBBLE_API_ENDPOINT_ENV_VAR, BUBBLE_APP_NAME_ENV_VAR
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def fetch_linkedin_token_from_bubble(url_user_token_str: str) -> Optional[dict]:
"""
Fetches LinkedIn access token from Bubble.io using a state value.
The token is expected in a 'Raw_text' field as a JSON string.
"""
bubble_api_key = os.environ.get(BUBBLE_API_KEY_PRIVATE_ENV_VAR)
if not bubble_api_key:
logger.error("Bubble API environment variables key not set.")
return None
if not url_user_token_str or "not found" in url_user_token_str or "Could not access" in url_user_token_str:
logger.info(f"No valid user token provided to query Bubble: {url_user_token_str}")
return None
base_url = "https://app.ingaze.ai/version-test/api/1.1/obj/Linkedin_access"
constraints = [{"key": "state", "constraint_type": "equals", "value": url_user_token_str}]
params = {'constraints': json.dumps(constraints)}
headers = {"Authorization": f"Bearer {bubble_api_key}"}
logger.info(f"Attempting to fetch LinkedIn token from Bubble for state: {url_user_token_str}")
try:
response = requests.get(base_url, params=params, headers=headers, timeout=15)
response.raise_for_status()
data = response.json()
results = data.get("response", {}).get("results", [])
if not results:
logger.warning(f"No token results found in Bubble for state: {url_user_token_str}")
return None
raw_text = results[0].get("Raw_text")
if not raw_text or not isinstance(raw_text, str):
logger.warning(f"Token 'Raw_text' field is missing or not a string. Value: {raw_text}")
return None
parsed_token = json.loads(raw_text)
if isinstance(parsed_token, dict) and "access_token" in parsed_token:
logger.info("Successfully fetched and parsed LinkedIn token from Bubble.")
return parsed_token
else:
logger.error(f"Parsed token from Bubble is not a valid dictionary with an access_token. Parsed value: {parsed_token}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Bubble API request error while fetching token: {e}", exc_info=True)
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON from Bubble token response: {e}", exc_info=True)
except Exception as e:
logger.error(f"An unexpected error occurred while fetching the token from Bubble: {e}", exc_info=True)
return None
def fetch_linkedin_posts_data_from_bubble(
data_type: str,
constraint_key: str,
constraint_type: str,
constraint_value: any,
additional_constraints: list = None
) -> Tuple[Optional[pd.DataFrame], Optional[str]]:
"""
Fetches data from the Bubble.io Data API, handling pagination to retrieve all results.
"""
bubble_api_key = os.environ.get(BUBBLE_API_KEY_PRIVATE_ENV_VAR)
if not bubble_api_key:
error_msg = "Bubble API not set."
logger.error(error_msg)
return None, error_msg
base_url = f"{BUBBLE_API_ENDPOINT_ENV_VAR}/{data_type}"
headers = {"Authorization": f"Bearer {bubble_api_key}"}
constraints = [{"key": constraint_key, "constraint_type": constraint_type, "value": constraint_value}]
if additional_constraints:
constraints.extend(additional_constraints)
all_results = []
cursor = 0
logger.info(f"Fetching data from Bubble type '{data_type}' where '{constraint_key}' is '{constraint_value}'...")
while True:
params = {'constraints': json.dumps(constraints), 'cursor': cursor, 'limit': 100}
try:
response = requests.get(base_url, params=params, headers=headers, timeout=30)
response.raise_for_status()
data = response.json().get("response", {})
results_on_page = data.get("results", [])
if results_on_page:
all_results.extend(results_on_page)
remaining = data.get("remaining", 0)
if remaining == 0:
break
cursor += len(results_on_page)
except requests.exceptions.RequestException as e:
error_msg = f"Bubble API Error fetching '{data_type}': {e}"
logger.error(error_msg, exc_info=True)
return None, error_msg
except json.JSONDecodeError as e:
error_msg = f"JSON Decode Error fetching '{data_type}': {e}. Response text: {response.text}"
logger.error(error_msg, exc_info=True)
return None, error_msg
if not all_results:
logger.info(f"No data found in Bubble for the given constraints in data type '{data_type}'.")
return pd.DataFrame(), None
logger.info(f"Successfully retrieved a total of {len(all_results)} records from '{data_type}'.")
return pd.DataFrame(all_results), None