Spaces:
Running
Running
Update Bubble_API_Calls.py
Browse files- Bubble_API_Calls.py +20 -0
Bubble_API_Calls.py
CHANGED
@@ -3,6 +3,7 @@ import os
|
|
3 |
import json
|
4 |
import requests
|
5 |
|
|
|
6 |
def fetch_linkedin_token_from_bubble(url_user_token_str: str):
|
7 |
"""
|
8 |
Fetches LinkedIn access token from Bubble.io API using the state value (url_user_token_str).
|
@@ -86,3 +87,22 @@ def fetch_linkedin_token_from_bubble(url_user_token_str: str):
|
|
86 |
|
87 |
print(status_message) # Log the final status message
|
88 |
return parsed_token_dict
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
3 |
import json
|
4 |
import requests
|
5 |
|
6 |
+
|
7 |
def fetch_linkedin_token_from_bubble(url_user_token_str: str):
|
8 |
"""
|
9 |
Fetches LinkedIn access token from Bubble.io API using the state value (url_user_token_str).
|
|
|
87 |
|
88 |
print(status_message) # Log the final status message
|
89 |
return parsed_token_dict
|
90 |
+
|
91 |
+
|
92 |
+
|
93 |
+
|
94 |
+
def bulk_upload_to_bubble(data, data_type):
|
95 |
+
api_token = os.environ.get("Bubble_API")
|
96 |
+
|
97 |
+
url = f"https://app.ingaze.ai/version-test/api/1.1/obj/{data_type}/bulk"
|
98 |
+
|
99 |
+
headers = {
|
100 |
+
"Authorization": f"Bearer {api_token}",
|
101 |
+
"Content-Type": "application/json"
|
102 |
+
}
|
103 |
+
|
104 |
+
response = requests.post(url, headers=headers, json=data)
|
105 |
+
if response.status_code == 200:
|
106 |
+
print(f"Successfully uploaded {len(data)} records to {data_type}.")
|
107 |
+
else:
|
108 |
+
print(f"Failed to upload data to {data_type}. Status Code: {response.status_code}, Response: {response.text}")
|