Spaces:
Running
Running
# ============================================================================= | |
# βββββββββββββ IMPORTS βββββββββββββ | |
# ============================================================================= | |
import base64 | |
import glob | |
import json | |
import os | |
import pandas as pd | |
import pytz | |
import re | |
import shutil | |
import streamlit as st | |
import time | |
import uuid | |
import zipfile | |
from azure.cosmos import CosmosClient, PartitionKey, exceptions | |
from datetime import datetime | |
# ============================================================================= | |
# βββββββββββββ APP CONFIGURATION βββββββββββββ | |
# ============================================================================= | |
Site_Name = 'π GitCosmos' | |
title = "π GitCosmos" | |
st.set_page_config( | |
page_title=title, | |
page_icon='πππ«', | |
layout="wide", | |
initial_sidebar_state="auto" | |
) | |
# Cosmos DB Configuration | |
ENDPOINT = "https://acae-afd.documents.azure.com:443/" | |
Key = os.environ.get("Key") # Ensure this is set in your environment | |
# ============================================================================= | |
# βββββββββββββ HELPER FUNCTIONS βββββββββββββ | |
# ============================================================================= | |
def generate_unique_id(): | |
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') | |
unique_uuid = str(uuid.uuid4()) | |
return f"{timestamp}-{unique_uuid}" | |
def get_download_link(file_path): | |
with open(file_path, "rb") as file: | |
contents = file.read() | |
b64 = base64.b64encode(contents).decode() | |
file_name = os.path.basename(file_path) | |
return f'<a href="data:file/txt;base64,{b64}" download="{file_name}">Download {file_name} π</a>' | |
def sanitize_json_text(text): | |
text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F]', '', text) | |
text = text.replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t") | |
return text | |
# ============================================================================= | |
# βββββββββββββ COSMOS DB FUNCTIONS βββββββββββββ | |
# ============================================================================= | |
def get_databases(client): | |
return [db['id'] for db in client.list_databases()] | |
def get_containers(database): | |
return [container['id'] for container in database.list_containers()] | |
def get_documents(container, limit=None): | |
query = "SELECT * FROM c ORDER BY c._ts DESC" | |
items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit)) | |
return items | |
def insert_record(container, record): | |
try: | |
container.create_item(body=record) | |
return True, "Inserted! π" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error: {str(e)} π¨" | |
except Exception as e: | |
return False, f"Error: {str(e)} π±" | |
def update_record(container, updated_record): | |
try: | |
container.upsert_item(body=updated_record) | |
return True, f"Updated {updated_record['id']} π οΈ" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error: {str(e)} π¨" | |
except Exception as e: | |
return False, f"Error: {str(e)} π±" | |
def delete_record(container, record): | |
try: | |
doc_id = record["id"] | |
partition_key_value = record.get("pk", doc_id) | |
container.delete_item(item=doc_id, partition_key=partition_key_value) | |
return True, f"Record {doc_id} deleted. ποΈ" | |
except exceptions.CosmosResourceNotFoundError: | |
return True, f"Record {doc_id} not found. ποΈ" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error: {str(e)} π¨" | |
except Exception as e: | |
return False, f"Error: {str(e)} π±" | |
def archive_current_container(database_name, container_name, client): | |
try: | |
base_dir = "./cosmos_archive" | |
if os.path.exists(base_dir): | |
shutil.rmtree(base_dir) | |
os.makedirs(base_dir) | |
db_client = client.get_database_client(database_name) | |
container_client = db_client.get_container_client(container_name) | |
items = list(container_client.read_all_items()) | |
container_dir = os.path.join(base_dir, container_name) | |
os.makedirs(container_dir) | |
for item in items: | |
item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}") | |
with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f: | |
json.dump(item, f, indent=2) | |
archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
shutil.make_archive(archive_name, 'zip', base_dir) | |
return get_download_link(f"{archive_name}.zip") | |
except Exception as e: | |
return f"Archive error: {str(e)} π’" | |
def create_new_container(database, container_id, partition_key_path): | |
try: | |
container = database.create_container( | |
id=container_id, | |
partition_key=PartitionKey(path=partition_key_path) | |
) | |
return container | |
except exceptions.CosmosResourceExistsError: | |
return database.get_container_client(container_id) | |
except exceptions.CosmosHttpResponseError as e: | |
st.error(f"Error creating container: {str(e)}") | |
return None | |
# ============================================================================= | |
# βββββββββββββ UI FUNCTIONS βββββββββββββ | |
# ============================================================================= | |
def edit_all_documents(container): | |
st.markdown("### π All Documents") | |
documents = get_documents(container) | |
if not documents: | |
st.info("No documents in this container.") | |
return | |
for doc in documents: | |
ts = doc.get("_ts", 0) | |
dt = datetime.fromtimestamp(ts) if ts else datetime.now() | |
formatted_ts = dt.strftime("%I:%M %p %m/%d/%Y") | |
header = f"{doc.get('name', 'Unnamed')} - {formatted_ts}" | |
with st.expander(header): | |
doc_key = f"editor_{doc['id']}" | |
if doc_key not in st.session_state: | |
st.session_state[doc_key] = json.dumps(doc, indent=2) | |
edited_content = st.text_area("Edit JSON", value=st.session_state[doc_key], height=300, key=doc_key) | |
if st.button("πΎ Save", key=f"save_{doc['id']}"): | |
try: | |
updated_doc = json.loads(sanitize_json_text(edited_content)) | |
# Preserve system fields and identity | |
updated_doc['id'] = doc['id'] | |
updated_doc['pk'] = doc.get('pk', doc['id']) | |
for field in ['_ts', '_rid', '_self', '_etag', '_attachments']: | |
updated_doc.pop(field, None) # Remove system fields before upsert | |
success, message = update_record(container, updated_doc) | |
if success: | |
st.success(f"Saved {doc['id']}") | |
st.session_state[doc_key] = json.dumps(updated_doc, indent=2) | |
else: | |
st.error(message) | |
except json.JSONDecodeError: | |
st.error("Invalid JSON format.") | |
except Exception as e: | |
st.error(f"Save error: {str(e)}") | |
if st.button("ποΈ Delete", key=f"delete_{doc['id']}"): | |
success, message = delete_record(container, doc) | |
if success: | |
st.success(message) | |
st.rerun() | |
else: | |
st.error(message) | |
def new_item_default(container): | |
new_id = generate_unique_id() | |
default_doc = { | |
"id": new_id, | |
"pk": new_id, | |
"name": "New Document", | |
"content": "Start editing here...", | |
"timestamp": datetime.now().isoformat(), | |
"type": "sample" | |
} | |
success, message = insert_record(container, default_doc) | |
if success: | |
st.success("New document created! β¨") | |
st.rerun() | |
else: | |
st.error(f"Error creating new item: {message}") | |
# ============================================================================= | |
# βββββββββββββ MAIN FUNCTION βββββββββββββ | |
# ============================================================================= | |
def main(): | |
st.title("π GitCosmos - CosmosDB Manager") | |
# Initialize session state | |
if "client" not in st.session_state: | |
if not Key: | |
st.error("Missing CosmosDB Key πβ") | |
return | |
st.session_state.client = CosmosClient(ENDPOINT, credential=Key) | |
st.session_state.selected_database = None | |
st.session_state.selected_container = None | |
# Sidebar: Hierarchical Navigation | |
st.sidebar.title("π Navigator") | |
# Databases Section | |
st.sidebar.subheader("Databases") | |
databases = get_databases(st.session_state.client) | |
selected_db = st.sidebar.selectbox("ποΈ Select Database", databases, key="db_select") | |
if selected_db != st.session_state.selected_database: | |
st.session_state.selected_database = selected_db | |
st.session_state.selected_container = None | |
st.rerun() | |
# Containers Section | |
if st.session_state.selected_database: | |
database = st.session_state.client.get_database_client(st.session_state.selected_database) | |
st.sidebar.subheader("Containers") | |
if st.sidebar.button("π New Container"): | |
with st.sidebar.form("new_container_form"): | |
container_id = st.text_input("Container ID", "new-container") | |
partition_key = st.text_input("Partition Key", "/pk") | |
if st.form_submit_button("Create"): | |
container = create_new_container(database, container_id, partition_key) | |
if container: | |
st.success(f"Container '{container_id}' created!") | |
st.rerun() | |
containers = get_containers(database) | |
selected_container = st.sidebar.selectbox("π Select Container", containers, key="container_select") | |
if selected_container != st.session_state.selected_container: | |
st.session_state.selected_container = selected_container | |
st.rerun() | |
# Actions Section | |
st.sidebar.subheader("Actions") | |
if st.session_state.selected_container and st.sidebar.button("π¦ Export Container"): | |
download_link = archive_current_container(st.session_state.selected_database, st.session_state.selected_container, st.session_state.client) | |
if download_link.startswith('<a'): | |
st.sidebar.markdown(download_link, unsafe_allow_html=True) | |
else: | |
st.sidebar.error(download_link) | |
# Items Section | |
st.sidebar.subheader("Items") | |
if st.session_state.selected_container: | |
if st.sidebar.button("β New Item"): | |
container = database.get_container_client(st.session_state.selected_container) | |
new_item_default(container) | |
# Central Area: Display and Edit All Documents | |
if st.session_state.selected_container: | |
container = database.get_container_client(st.session_state.selected_container) | |
edit_all_documents(container) | |
else: | |
st.info("Select a database and container to view and edit documents.") | |
if __name__ == "__main__": | |
main() |