watsonx.ai_Function_Deployment_MNB_V2 / cloudant_helper_functions.py
MilanM's picture
Create cloudant_helper_functions.py
312bffc verified
raw
history blame
1.53 kB
def cloudant_retrieve_documents(
client: CloudantV1,
db_name: str,
selectors: dict,
fields,
sort: dict = None,
limit: int = 100,
):
"""Retrieve documents from the cloudant database."""
cloudant = client
sort_by = sort or {
"sort": [
{"_id": "desc"},
]
}
fields_to_retrieve = (
{"fields": fields} if isinstance(fields, list) else fields
)
if not isinstance(fields, (list, dict)):
raise ValueError("fields must be a list of strings or a dict")
retrieved_docs = cloudant.post_find(
db=db_name,
selector=selectors,
fields=fields_to_retrieve["fields"],
sort=sort,
limit=limit,
).get_result()
return retrieved_docs
def ensure_database_exists(client: CloudantV1, db_name: str) -> bool:
"""
Check if database exists and create it if it doesn't.
Args:
client (CloudantV1): Initialized Cloudant client
db_name (str): Database name to check/create
Returns:
bool: True if database exists or was created successfully
"""
try:
# Try to get database info - will raise error if doesn't exist
client.get_database_information(db=db_name)
return True
except Exception:
try:
# Database doesn't exist, try to create it
client.put_database(db=db_name)
return True
except Exception as e:
print(f"Failed to create database {db_name}: {str(e)}")
raise