Spaces:
Running
Running
Update Bubble_API_Calls.py
Browse files- Bubble_API_Calls.py +33 -0
Bubble_API_Calls.py
CHANGED
|
@@ -88,7 +88,40 @@ def fetch_linkedin_token_from_bubble(url_user_token_str: str):
|
|
| 88 |
print(status_message) # Log the final status message
|
| 89 |
return parsed_token_dict
|
| 90 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 91 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 92 |
|
| 93 |
|
| 94 |
def bulk_upload_to_bubble(data, data_type):
|
|
|
|
| 88 |
print(status_message) # Log the final status message
|
| 89 |
return parsed_token_dict
|
| 90 |
|
| 91 |
+
def fetch_posts(org_urn: str):
|
| 92 |
+
bubble_api_key = os.environ.get("Bubble_API")
|
| 93 |
+
if not bubble_api_key:
|
| 94 |
+
error_msg = "❌ Bubble API Error: The 'Bubble_API' environment variable is not set."
|
| 95 |
+
print(error_msg)
|
| 96 |
+
return None, error_msg
|
| 97 |
|
| 98 |
+
base_url = "https://app.ingaze.ai/version-test/api/1.1/obj/LI_posts"
|
| 99 |
+
constraints = [{"key": "author_urn", "constraint_type": "equals", "value": org_urn}]
|
| 100 |
+
params = {'constraints': json.dumps(constraints)}
|
| 101 |
+
headers = {"Authorization": f"Bearer {bubble_api_key}"}
|
| 102 |
+
|
| 103 |
+
status_message = f"Attempting to fetch posts from Bubble for urn: {org_urn}..."
|
| 104 |
+
print(status_message)
|
| 105 |
+
|
| 106 |
+
try:
|
| 107 |
+
response = requests.get(base_url, params=params, headers=headers, timeout=15)
|
| 108 |
+
response.raise_for_status()
|
| 109 |
+
|
| 110 |
+
data = response.json()
|
| 111 |
+
results = data.get("response", {}).get("results", [])
|
| 112 |
+
|
| 113 |
+
if results:
|
| 114 |
+
df = pd.DataFrame(results)
|
| 115 |
+
print(f"Successfully retrieved {len(df)} posts.")
|
| 116 |
+
return df, None
|
| 117 |
+
else:
|
| 118 |
+
print("No posts found for the given org_urn.")
|
| 119 |
+
return pd.DataFrame(), None
|
| 120 |
+
|
| 121 |
+
except requests.exceptions.RequestException as e:
|
| 122 |
+
error_msg = f"❌ Bubble API Error: {str(e)}"
|
| 123 |
+
print(error_msg)
|
| 124 |
+
return None, error_msg
|
| 125 |
|
| 126 |
|
| 127 |
def bulk_upload_to_bubble(data, data_type):
|