Spaces:
Running
Running
File size: 7,095 Bytes
6140a89 dec8c1d 6140a89 a056cb8 6140a89 fc82495 a056cb8 182a396 49502e6 a056cb8 182a396 e0f2ba1 49502e6 a056cb8 9b2b6de a056cb8 9b2b6de c350578 a056cb8 9b2b6de |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# bubble_api_calls.py
import os
import json
import requests
import pandas as pd
def fetch_linkedin_token_from_bubble(url_user_token_str: str):
"""
Fetches LinkedIn access token from Bubble.io API using the state value (url_user_token_str).
The token is expected in a 'Raw_text' field as a JSON string, which is then parsed.
Args:
url_user_token_str: The state value (token from URL) to query Bubble.
Returns:
tuple: (parsed_token_dict, status_message)
parsed_token_dict is the dictionary containing the token (e.g., {"access_token": "value"})
or None if an error occurred or token not found.
status_message is a string describing the outcome of the API call.
"""
bubble_api_key = os.environ.get("Bubble_API")
if not bubble_api_key:
error_msg = "❌ Bubble API Error: The 'Bubble_API' environment variable is not set."
print(error_msg)
return None, error_msg
if not url_user_token_str or "not found" in url_user_token_str or "Could not access" in url_user_token_str:
status_msg = f"ℹ️ No valid user token from URL to query Bubble. ({url_user_token_str})"
print(status_msg)
return None, status_msg
base_url = "https://app.ingaze.ai/version-test/api/1.1/obj/Linkedin_access"
constraints = [{"key": "state", "constraint_type": "equals", "value": url_user_token_str}]
params = {'constraints': json.dumps(constraints)}
headers = {"Authorization": f"Bearer {bubble_api_key}"}
status_message = f"Attempting to fetch token from Bubble for state: {url_user_token_str}..."
print(status_message)
parsed_token_dict = None
response = None
try:
response = requests.get(base_url, params=params, headers=headers, timeout=15)
response.raise_for_status()
data = response.json()
results = data.get("response", {}).get("results", [])
if results:
raw_text_from_bubble = results[0].get("Raw_text", None)
if raw_text_from_bubble and isinstance(raw_text_from_bubble, str):
try:
temp_parsed_dict = json.loads(raw_text_from_bubble)
if isinstance(temp_parsed_dict, dict) and "access_token" in temp_parsed_dict:
parsed_token_dict = temp_parsed_dict # Successfully parsed and has access_token
status_message = f"✅ LinkedIn Token successfully fetched and parsed from Bubble 'Raw_text' for state: {url_user_token_str}"
elif isinstance(temp_parsed_dict, dict):
status_message = (f"⚠️ Bubble API: 'access_token' key missing in parsed 'Raw_text' dictionary for state: {url_user_token_str}. Parsed: {temp_parsed_dict}")
else: # Not a dict
status_message = (f"⚠️ Bubble API: 'Raw_text' field did not contain a valid JSON dictionary string. "
f"Content type: {type(raw_text_from_bubble)}, Value: {raw_text_from_bubble}")
except json.JSONDecodeError as e:
status_message = (f"⚠️ Bubble API: Error decoding 'Raw_text' JSON string: {e}. "
f"Content: {raw_text_from_bubble}")
elif raw_text_from_bubble: # It exists but is not a string
status_message = (f"⚠️ Bubble API: 'Raw_text' field was not a string. "
f"Type: {type(raw_text_from_bubble)}, Value: {raw_text_from_bubble}")
else: # Raw_text not found or null
status_message = (f"⚠️ Bubble API: Token field ('Raw_text') "
f"not found or is null in response for state: {url_user_token_str}. Result: {results[0]}")
else: # No results from Bubble for the given state
status_message = f"❌ Bubble API: No results found for state: {url_user_token_str}"
except requests.exceptions.HTTPError as http_err:
error_details = response.text if response else "No response content"
status_message = f"❌ Bubble API HTTP error: {http_err} - Response: {error_details}"
except requests.exceptions.Timeout:
status_message = "❌ Bubble API Request timed out."
except requests.exceptions.RequestException as req_err:
status_message = f"❌ Bubble API Request error: {req_err}"
except json.JSONDecodeError as json_err: # Error decoding the main Bubble response
error_details = response.text if response else "No response content"
status_message = f"❌ Bubble API main response JSON decode error: {json_err}. Response: {error_details}"
except Exception as e:
status_message = f"❌ An unexpected error occurred while fetching from Bubble: {str(e)}"
print(status_message) # Log the final status message
return parsed_token_dict
def fetch_linkedin_posts_data_from_bubble(org_urn: str, data_type:str):
bubble_api_key = os.environ.get("Bubble_API")
if not bubble_api_key:
error_msg = "❌ Bubble API Error: The 'Bubble_API' environment variable is not set."
print(error_msg)
return None, error_msg
base_url = f"https://app.ingaze.ai/version-test/api/1.1/obj/{data_type}"
constraints = [{"key": "organization_urn", "constraint_type": "equals", "value": org_urn}]
params = {'constraints': json.dumps(constraints)}
headers = {"Authorization": f"Bearer {bubble_api_key}"}
status_message = f"Attempting to fetch posts from Bubble for urn: {org_urn}..."
print(status_message)
try:
response = requests.get(base_url, params=params, headers=headers, timeout=15)
response.raise_for_status()
data = response.json()
results = data.get("response", {}).get("results", [])
if results:
df = pd.DataFrame(results)
print(f"Successfully retrieved {len(df)} posts.")
return df, None
else:
print("No posts found for the given org_urn.")
return pd.DataFrame(), None
except requests.exceptions.RequestException as e:
error_msg = f"❌ Bubble API Error: {str(e)}"
print(error_msg)
return None, error_msg
def bulk_upload_to_bubble(data, data_type):
api_token = os.environ.get("Bubble_API")
url = f"https://app.ingaze.ai/version-test/api/1.1/obj/{data_type}/bulk"
headers = {
"Authorization": f"Bearer {api_token}",
"Content-Type": "text/plain"
}
# Convert list of dicts to newline-separated JSON strings
payload = "\n".join(json.dumps(item) for item in data)
response = requests.post(url, headers=headers, data=payload)
print("Payload being sent:")
print(payload)
if response.status_code == 200:
print(f"Successfully uploaded {len(data)} records to {data_type}.")
else:
print(f"Failed to upload data to {data_type}. Status Code: {response.status_code}, Response: {response.text}")
|