watsonx.ai_Function_Deployment_MNB_V2 / cloudant_helper_functions.py
MilanM's picture
Update cloudant_helper_functions.py
40e9b67 verified
from ibmcloudant.cloudant_v1 import CloudantV1, Document
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
def cloudant_retrieve_documents(
client: CloudantV1,
db_name: str,
selectors: dict,
fields,
sort: dict = None,
limit: int = 100,
):
"""Retrieve documents from the cloudant database."""
cloudant = client
sort_by = sort or {
"sort": [
{"_id": "desc"},
]
}
fields_to_retrieve = (
{"fields": fields} if isinstance(fields, list) else fields
)
if not isinstance(fields, (list, dict)):
raise ValueError("fields must be a list of strings or a dict")
retrieved_docs = cloudant.post_find(
db=db_name,
selector=selectors,
fields=fields_to_retrieve["fields"],
sort=sort,
limit=limit,
).get_result()
return retrieved_docs
def ensure_database_exists(client: CloudantV1, db_name: str) -> bool:
"""
Check if database exists and create it if it doesn't.
Args:
client (CloudantV1): Initialized Cloudant client
db_name (str): Database name to check/create
Returns:
bool: True if database exists or was created successfully
"""
try:
# Try to get database info - will raise error if doesn't exist
client.get_database_information(db=db_name)
return True
except Exception:
try:
# Database doesn't exist, try to create it
client.put_database(db=db_name)
return True
except Exception as e:
print(f"Failed to create database {db_name}: {str(e)}")
raise