Spaces:
Running
Running
# ============================================================================= | |
# βββββββββββββ IMPORTS βββββββββββββ | |
# ============================================================================= | |
import base64 | |
import glob | |
import hashlib | |
import json | |
import os | |
import pandas as pd | |
import pytz | |
import random | |
import re | |
import shutil | |
import streamlit as st | |
import time | |
import traceback | |
import uuid | |
import zipfile | |
from PIL import Image | |
from azure.cosmos import CosmosClient, PartitionKey, exceptions | |
from datetime import datetime | |
from git import Repo | |
from github import Github | |
from gradio_client import Client, handle_file | |
import tempfile | |
import io | |
import requests | |
import numpy as np | |
from urllib.parse import quote | |
# ============================================================================= | |
# βββββββββββββ EXTERNAL HELP LINKS (Always visible in sidebar) βββββββββββββ | |
# ============================================================================= | |
external_links = [ | |
{"title": "CosmosDB GenAI Full Text Search", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/gen-ai/full-text-search", "emoji": "π»"}, | |
{"title": "CosmosDB SQL API Client Library", "url": "https://learn.microsoft.com/en-us/python/api/overview/azure/cosmos-readme?view=azure-python", "emoji": "π»"}, | |
{"title": "CosmosDB Index and Query Vectors", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/how-to-python-vector-index-query", "emoji": "π»"}, | |
{"title": "CosmosDB NoSQL Materialized Views", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/materialized-views", "emoji": "π»"}, | |
{"title": "LangChain Vector Store Guide", "url": "https://python.langchain.com/docs/integrations/vectorstores/azure_cosmos_db_no_sql/", "emoji": "π»"}, | |
{"title": "Vector Database Prompt Engineering RAG for Python", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/vector-database?source=recommendations", "emoji": "π»"}, | |
{"title": "MergeKit Official GitHub", "url": "https://github.com/arcee-ai/MergeKit", "emoji": "π»"}, | |
{"title": "MergeKit Sample Usage", "url": "https://github.com/arcee-ai/MergeKit#examples", "emoji": "π"}, | |
{"title": "DistillKit Official GitHub", "url": "https://github.com/arcee-ai/DistillKit", "emoji": "π»"}, | |
{"title": "DistillKit Sample Usage", "url": "https://github.com/arcee-ai/DistillKit#usage", "emoji": "π"}, | |
{"title": "arcee.ai Official Website", "url": "https://arcee.ai", "emoji": "π"}, | |
] | |
# ============================================================================= | |
# βββββββββββββ APP CONFIGURATION βββββββββββββ | |
# ============================================================================= | |
Site_Name = 'π GitCosmos' | |
title = "π GitCosmos" | |
helpURL = 'https://huggingface.co/awacke1' | |
bugURL = 'https://huggingface.co/spaces/awacke1/AzureCosmosDBUI/' | |
icons = 'πππ«' | |
st.set_page_config( | |
page_title=title, | |
page_icon=icons, | |
layout="wide", | |
initial_sidebar_state="auto", | |
menu_items={ | |
'Get Help': helpURL, | |
'Report a bug': bugURL, | |
'About': title | |
} | |
) | |
# Cosmos DB & App URLs | |
ENDPOINT = "https://acae-afd.documents.azure.com:443/" | |
DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") | |
CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") | |
Key = os.environ.get("Key") | |
LOCAL_APP_URL = "https://huggingface.co/spaces/awacke1/AzureCosmosDBUI" | |
CosmosDBUrl = 'https://portal.azure.com/#@AaronCWackergmail.onmicrosoft.com/resource/subscriptions/003fba60-5b3f-48f4-ab36-3ed11bc40816/resourceGroups/datasets/providers/Microsoft.DocumentDB/databaseAccounts/acae-afd/dataExplorer' | |
# ============================================================================= | |
# βββββββββββββ HELPER FUNCTIONS βββββββββββββ | |
# ============================================================================= | |
def get_download_link(file_path): | |
with open(file_path, "rb") as file: | |
contents = file.read() | |
b64 = base64.b64encode(contents).decode() | |
file_name = os.path.basename(file_path) | |
return f'<a href="data:file/txt;base64,{b64}" download="{file_name}">Download {file_name} π</a>' | |
def generate_unique_id(): | |
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') | |
unique_uuid = str(uuid.uuid4()) | |
return_value = f"{timestamp}-{unique_uuid}" | |
st.write('New ID: ' + return_value) | |
return return_value | |
def generate_filename(prompt, file_type): | |
central = pytz.timezone('US/Central') | |
safe_date_time = datetime.now(central).strftime("%m%d_%H%M") | |
safe_prompt = re.sub(r'\W+', '', prompt)[:90] | |
return f"{safe_date_time}{safe_prompt}.{file_type}" | |
def create_file(filename, prompt, response, should_save=True): | |
if not should_save: | |
return | |
with open(filename, 'w', encoding='utf-8') as file: | |
file.write(prompt + "\n\n" + response) | |
def load_file(file_name): | |
with open(file_name, "r", encoding='utf-8') as file: | |
content = file.read() | |
return content | |
def display_glossary_entity(k): | |
search_urls = { | |
"π": lambda k: f"/?q={k}", | |
"π": lambda k: f"https://en.wikipedia.org/wiki/{quote(k)}", | |
"π": lambda k: f"https://www.google.com/search?q={quote(k)}", | |
"π₯": lambda k: f"https://www.youtube.com/results?search_query={quote(k)}", | |
} | |
links_md = ' '.join([f"<a href='{url(k)}' target='_blank'>{emoji}</a>" for emoji, url in search_urls.items()]) | |
st.markdown(f"{k} {links_md}", unsafe_allow_html=True) | |
def create_zip_of_files(files): | |
zip_name = "all_files.zip" | |
with zipfile.ZipFile(zip_name, 'w') as zipf: | |
for file in files: | |
zipf.write(file) | |
return zip_name | |
def get_video_html(video_path, width="100%"): | |
video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}" | |
return f''' | |
<video width="{width}" controls autoplay loop> | |
<source src="{video_url}" type="video/mp4"> | |
Your browser does not support video. | |
</video> | |
''' | |
def get_audio_html(audio_path, width="100%"): | |
audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}" | |
return f''' | |
<audio controls style="width:{width}"> | |
<source src="{audio_url}" type="audio/mpeg"> | |
Your browser does not support audio. | |
</audio> | |
''' | |
def preprocess_text(text): | |
text = text.replace('\r\n', '\\n').replace('\r', '\\n').replace('\n', '\\n') | |
text = text.replace('"', '\\"') | |
text = re.sub(r'[\t]', ' ', text) | |
text = re.sub(r'[^\x00-\x7F]+', '', text) | |
return text.strip() | |
def sanitize_json_text(text): | |
text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F]', '', text) | |
text = text.replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t") | |
return text | |
# ============================================================================= | |
# βββββββββββββ COSMOS DB FUNCTIONS βββββββββββββ | |
# ============================================================================= | |
def get_databases(client): | |
return [db['id'] for db in client.list_databases()] | |
def get_containers(database): | |
return [container['id'] for container in database.list_containers()] | |
def get_documents(container, limit=None): | |
query = "SELECT * FROM c ORDER BY c._ts DESC" | |
items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit)) | |
return items | |
def insert_record(container, record): | |
try: | |
container.create_item(body=record) | |
return True, "Inserted! π" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error: {str(e)} π¨" | |
except Exception as e: | |
return False, f"Error: {str(e)} π±" | |
def update_record(container, updated_record): | |
try: | |
container.upsert_item(body=updated_record) | |
return True, f"Updated {updated_record['id']} π οΈ" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error: {str(e)} π¨" | |
except Exception as e: | |
return False, f"Error: {traceback.format_exc()} π±" | |
def delete_record(container, record): | |
try: | |
if "id" not in record: | |
return False, "Record must contain an 'id' field. π" | |
doc_id = record["id"] | |
partition_key_value = record.get("pk", doc_id) | |
container.delete_item(item=doc_id, partition_key=partition_key_value) | |
return True, f"Record {doc_id} successfully deleted from Cosmos DB. ποΈ" | |
except exceptions.CosmosResourceNotFoundError: | |
return True, f"Record {doc_id} not found in Cosmos DB (already deleted or never existed). ποΈ" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error deleting {doc_id}: {str(e)}. π¨" | |
except Exception as e: | |
return False, f"Unexpected error deleting {doc_id}: {str(traceback.format_exc())}. π±" | |
def archive_current_container(database_name, container_name, client): | |
try: | |
base_dir = "./cosmos_archive_current_container" | |
if os.path.exists(base_dir): | |
shutil.rmtree(base_dir) | |
os.makedirs(base_dir) | |
db_client = client.get_database_client(database_name) | |
container_client = db_client.get_container_client(container_name) | |
items = list(container_client.read_all_items()) | |
container_dir = os.path.join(base_dir, container_name) | |
os.makedirs(container_dir) | |
for item in items: | |
item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}") | |
with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f: | |
json.dump(item, f, indent=2) | |
archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
shutil.make_archive(archive_name, 'zip', base_dir) | |
return get_download_link(f"{archive_name}.zip") | |
except Exception as e: | |
return f"Archive error: {str(e)} π’" | |
# ============================================================================= | |
# βββββββββββββ ADVANCED COSMOS FUNCTIONS (omitted for brevity) βββββββββββββ | |
# ============================================================================= | |
# β¦ (your existing advanced cosmos functions remain here) β¦ | |
# ============================================================================= | |
# βββββββββββββ NEW COSMOSDB DEMO FUNCTIONS βββββββββββββ | |
# Each function below corresponds to one of your provided code snippets. | |
# They expose a simple UI (with text inputs and buttons) to run the demo code. | |
# ============================================================================= | |
def demo_create_database(): | |
st.markdown("### Demo: Create Database") | |
database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_name") | |
if st.button("Create Database", key="btn_create_db"): | |
url = os.environ.get("ACCOUNT_URI") | |
key_env = os.environ.get("ACCOUNT_KEY") | |
if not url or not key_env: | |
st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
return | |
client_demo = CosmosClient(url, credential=key_env) | |
try: | |
database = client_demo.create_database(database_name) | |
st.success(f"Database '{database_name}' created.") | |
st.write(database) | |
except exceptions.CosmosResourceExistsError: | |
database = client_demo.get_database_client(database_name) | |
st.info(f"Database '{database_name}' already exists.") | |
st.write(database) | |
except Exception as e: | |
st.error(f"Error: {str(e)}") | |
def demo_create_container(): | |
st.markdown("### Demo: Create Container") | |
database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_for_container") | |
container_name = st.text_input("Enter Container Name", value="products", key="demo_container_name") | |
partition_key = st.text_input("Enter Partition Key Path", value="/productName", key="demo_partition_key") | |
if st.button("Create Container", key="btn_create_container"): | |
url = os.environ.get("ACCOUNT_URI") | |
key_env = os.environ.get("ACCOUNT_KEY") | |
if not url or not key_env: | |
st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
return | |
client_demo = CosmosClient(url, credential=key_env) | |
try: | |
database = client_demo.get_database_client(database_name) | |
container = database.create_container(id=container_name, partition_key=PartitionKey(path=partition_key)) | |
st.success(f"Container '{container_name}' created.") | |
st.write(container) | |
except exceptions.CosmosResourceExistsError: | |
container = database.get_container_client(container_name) | |
st.info(f"Container '{container_name}' already exists.") | |
st.write(container) | |
except exceptions.CosmosHttpResponseError as e: | |
st.error(f"HTTP error: {str(e)}") | |
except Exception as e: | |
st.error(f"Error: {str(e)}") | |
def demo_create_analytical_container(): | |
st.markdown("### Demo: Create Analytical Store Enabled Container") | |
database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_for_analytical") | |
container_name = st.text_input("Enter Container Name", value="products", key="demo_container_analytical") | |
partition_key = st.text_input("Enter Partition Key Path", value="/productName", key="demo_partition_key_analytical") | |
if st.button("Create Analytical Container", key="btn_create_analytical"): | |
url = os.environ.get("ACCOUNT_URI") | |
key_env = os.environ.get("ACCOUNT_KEY") | |
if not url or not key_env: | |
st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
return | |
client_demo = CosmosClient(url, credential=key_env) | |
try: | |
database = client_demo.get_database_client(database_name) | |
container = database.create_container( | |
id=container_name, | |
partition_key=PartitionKey(path=partition_key), | |
analytical_storage_ttl=-1 | |
) | |
st.success(f"Analytical container '{container_name}' created.") | |
st.write(container) | |
except exceptions.CosmosResourceExistsError: | |
container = database.get_container_client(container_name) | |
st.info(f"Container '{container_name}' already exists.") | |
st.write(container) | |
except exceptions.CosmosHttpResponseError as e: | |
st.error(f"HTTP error: {str(e)}") | |
except Exception as e: | |
st.error(f"Error: {str(e)}") | |
def demo_get_existing_container(): | |
st.markdown("### Demo: Get Existing Container") | |
database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_get") | |
container_name = st.text_input("Enter Container Name", value="products", key="demo_container_get") | |
if st.button("Get Container", key="btn_get_container"): | |
url = os.environ.get("ACCOUNT_URI") | |
key_env = os.environ.get("ACCOUNT_KEY") | |
if not url or not key_env: | |
st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
return | |
client_demo = CosmosClient(url, credential=key_env) | |
try: | |
database = client_demo.get_database_client(database_name) | |
container = database.get_container_client(container_name) | |
st.success(f"Retrieved container '{container_name}'.") | |
st.write(container) | |
except Exception as e: | |
st.error(f"Error: {str(e)}") | |
def demo_insert_data(): | |
st.markdown("### Demo: Insert Data") | |
database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_insert") | |
container_name = st.text_input("Enter Container Name", value="products", key="demo_container_insert") | |
num_items = st.number_input("Number of items to insert", min_value=1, max_value=20, value=9, key="demo_num_items") | |
if st.button("Insert Data", key="btn_insert_data"): | |
url = os.environ.get("ACCOUNT_URI") | |
key_env = os.environ.get("ACCOUNT_KEY") | |
if not url or not key_env: | |
st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
return | |
client_demo = CosmosClient(url, credential=key_env) | |
try: | |
database = client_demo.get_database_client(database_name) | |
container = database.get_container_client(container_name) | |
for i in range(1, int(num_items) + 1): | |
container.upsert_item({ | |
'id': f'item{i}', | |
'productName': 'Widget', | |
'productModel': f'Model {i}' | |
}) | |
st.success(f"Inserted {num_items} items.") | |
except Exception as e: | |
st.error(f"Error: {str(e)}") | |
def demo_delete_data(): | |
st.markdown("### Demo: Delete Data") | |
database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_delete") | |
container_name = st.text_input("Enter Container Name", value="products", key="demo_container_delete") | |
query_model = st.text_input("Product Model to delete", value="Model 2", key="demo_query_model") | |
if st.button("Delete Data", key="btn_delete_data"): | |
url = os.environ.get("ACCOUNT_URI") | |
key_env = os.environ.get("ACCOUNT_KEY") | |
if not url or not key_env: | |
st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
return | |
client_demo = CosmosClient(url, credential=key_env) | |
try: | |
database = client_demo.get_database_client(database_name) | |
container = database.get_container_client(container_name) | |
items = list(container.query_items( | |
query=f'SELECT * FROM products p WHERE p.productModel = "{query_model}"', | |
enable_cross_partition_query=True | |
)) | |
count = 0 | |
for item in items: | |
container.delete_item(item, partition_key=item.get("productName", "Widget")) | |
count += 1 | |
st.success(f"Deleted {count} items with productModel = '{query_model}'.") | |
except Exception as e: | |
st.error(f"Error: {str(e)}") | |
def demo_query_database(): | |
st.markdown("### Demo: Query Database") | |
database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_query") | |
container_name = st.text_input("Enter Container Name", value="products", key="demo_container_query") | |
query_str = st.text_area("Enter SQL Query", value='SELECT * FROM mycontainer r WHERE r.id="item3"', key="demo_query_str") | |
if st.button("Run Query", key="btn_query_database"): | |
url = os.environ.get("ACCOUNT_URI") | |
key_env = os.environ.get("ACCOUNT_KEY") | |
if not url or not key_env: | |
st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
return | |
client_demo = CosmosClient(url, credential=key_env) | |
try: | |
database = client_demo.get_database_client(database_name) | |
container = database.get_container_client(container_name) | |
results = list(container.query_items(query=query_str, enable_cross_partition_query=True)) | |
if results: | |
for item in results: | |
st.json(item) | |
else: | |
st.info("No results found.") | |
except Exception as e: | |
st.error(f"Error: {str(e)}") | |
def demo_parameterized_query(): | |
st.markdown("### Demo: Parameterized Query") | |
database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_param") | |
container_name = st.text_input("Enter Container Name", value="products", key="demo_container_param") | |
model_value = st.text_input("Enter productModel value", value="Model 7", key="demo_model_value") | |
if st.button("Run Parameterized Query", key="btn_param_query"): | |
url = os.environ.get("ACCOUNT_URI") | |
key_env = os.environ.get("ACCOUNT_KEY") | |
if not url or not key_env: | |
st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
return | |
client_demo = CosmosClient(url, credential=key_env) | |
try: | |
database = client_demo.get_database_client(database_name) | |
container = database.get_container_client(container_name) | |
discontinued_items = container.query_items( | |
query='SELECT * FROM products p WHERE p.productModel = @model', | |
parameters=[{"name": "@model", "value": model_value}], | |
enable_cross_partition_query=True | |
) | |
for item in discontinued_items: | |
st.json(item) | |
except Exception as e: | |
st.error(f"Error: {str(e)}") | |
def demo_get_db_properties(): | |
st.markdown("### Demo: Get Database Properties") | |
database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_props") | |
if st.button("Get Properties", key="btn_db_props"): | |
url = os.environ.get("ACCOUNT_URI") | |
key_env = os.environ.get("ACCOUNT_KEY") | |
if not url or not key_env: | |
st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
return | |
client_demo = CosmosClient(url, credential=key_env) | |
try: | |
database = client_demo.get_database_client(database_name) | |
props = database.read() | |
st.json(props) | |
except Exception as e: | |
st.error(f"Error: {str(e)}") | |
def demo_get_throughput(): | |
st.markdown("### Demo: Get Throughput (Database & Container)") | |
database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_throughput") | |
container_name = st.text_input("Enter Container Name (for container throughput)", value="testContainer", key="demo_container_throughput") | |
if st.button("Get Throughput", key="btn_get_throughput"): | |
url = os.environ.get("ACCOUNT_URI") | |
key_env = os.environ.get("ACCOUNT_KEY") | |
if not url or not key_env: | |
st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
return | |
client_demo = CosmosClient(url, credential=key_env) | |
try: | |
database = client_demo.get_database_client(database_name) | |
db_offer = database.get_throughput() | |
st.write(f"Database Offer: {db_offer.properties['id']} with throughput {db_offer.properties['content']['offerThroughput']}") | |
try: | |
container = database.get_container_client(container_name) | |
container_offer = container.get_throughput() | |
st.write(f"Container Offer: {container_offer.properties['id']} with throughput {container_offer.properties['content']['offerThroughput']}") | |
except exceptions.CosmosHttpResponseError as e: | |
st.error(f"Container throughput error: {str(e)}") | |
except Exception as e: | |
st.error(f"Error: {str(e)}") | |
def demo_modify_container_properties(): | |
st.markdown("### Demo: Modify Container Properties (Set default TTL)") | |
database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_modify") | |
container_name = st.text_input("Enter Container Name", value="products", key="demo_container_modify") | |
new_ttl = st.number_input("Enter new default TTL (seconds)", min_value=0, value=10, key="demo_new_ttl") | |
if st.button("Modify Container", key="btn_modify_container"): | |
url = os.environ.get("ACCOUNT_URI") | |
key_env = os.environ.get("ACCOUNT_KEY") | |
if not url or not key_env: | |
st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
return | |
client_demo = CosmosClient(url, credential=key_env) | |
try: | |
database = client_demo.get_database_client(database_name) | |
container = database.get_container_client(container_name) | |
# Note: replace_container is used here as per your snippet. | |
database.replace_container( | |
container, | |
partition_key=PartitionKey(path="/productName"), | |
default_ttl=new_ttl, | |
) | |
container_props = container.read() | |
st.write("New default TTL:", container_props.get("defaultTtl")) | |
except Exception as e: | |
st.error(f"Error: {str(e)}") | |
def demo_item_response_headers(): | |
st.markdown("### Demo: Using Item Point Operation Response Headers") | |
database_name = st.text_input("Enter Database Name", value="testDatabase", key="demo_db_headers") | |
container_name = st.text_input("Enter Container Name", value="products", key="demo_container_headers") | |
if st.button("Create Item & Show Headers", key="btn_item_headers"): | |
url = os.environ.get("ACCOUNT_URI") | |
key_env = os.environ.get("ACCOUNT_KEY") | |
if not url or not key_env: | |
st.error("Missing ACCOUNT_URI or ACCOUNT_KEY environment variables.") | |
return | |
client_demo = CosmosClient(url, credential=key_env) | |
try: | |
database = client_demo.get_database_client(database_name) | |
container = database.get_container_client(container_name) | |
operation_response = container.create_item({"id": "test_item", "productName": "test_item"}) | |
headers = operation_response.get_response_headers() | |
st.write("ETag:", headers.get("etag")) | |
st.write("Request Charge:", headers.get("x-ms-request-charge")) | |
except Exception as e: | |
st.error(f"Error: {str(e)}") | |
# ============================================================================= | |
# βββββββββββββ EXISTING FUNCTIONS (GitHub, File Management, etc.) βββββββββββββ | |
# ============================================================================= | |
# β¦ (your other functions remain unchanged) β¦ | |
# ============================================================================= | |
# βββββββββββββ MAIN FUNCTION βββββββββββββ | |
# ============================================================================= | |
def main(): | |
st.markdown("### π GitCosmos - Cosmos & Git Hub") | |
st.markdown(f"[π Portal]({CosmosDBUrl})") | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [] | |
st.session_state.setdefault("current_container", None) | |
if Key: | |
st.session_state.primary_key = Key | |
st.session_state.logged_in = True | |
else: | |
st.error("Missing Cosmos Key πβ") | |
return | |
# ββ New: CosmosDB Demo Features Section in Sidebar ββ | |
st.sidebar.markdown("## CosmosDB Demo Features") | |
demo_feature = st.sidebar.selectbox("Select a Demo", | |
["Select", "Create Database", "Create Container", "Create Analytical Container", | |
"Get Existing Container", "Insert Data", "Delete Data", "Query Database", | |
"Parameterized Query", "Get Database Properties", "Get Throughput", | |
"Modify Container Properties", "Item Response Headers"], | |
key="demo_select") | |
if demo_feature != "Select": | |
if demo_feature == "Create Database": | |
demo_create_database() | |
elif demo_feature == "Create Container": | |
demo_create_container() | |
elif demo_feature == "Create Analytical Container": | |
demo_create_analytical_container() | |
elif demo_feature == "Get Existing Container": | |
demo_get_existing_container() | |
elif demo_feature == "Insert Data": | |
demo_insert_data() | |
elif demo_feature == "Delete Data": | |
demo_delete_data() | |
elif demo_feature == "Query Database": | |
demo_query_database() | |
elif demo_feature == "Parameterized Query": | |
demo_parameterized_query() | |
elif demo_feature == "Get Database Properties": | |
demo_get_db_properties() | |
elif demo_feature == "Get Throughput": | |
demo_get_throughput() | |
elif demo_feature == "Modify Container Properties": | |
demo_modify_container_properties() | |
elif demo_feature == "Item Response Headers": | |
demo_item_response_headers() | |
# ββ Existing Sidebar Items (Item Management, File Management, etc.) | |
st.sidebar.markdown("## π οΈ Item Management") | |
if st.sidebar.button("New Item"): | |
if st.session_state.get("current_container"): | |
new_doc = new_item_default(st.session_state.current_container) | |
if new_doc: | |
st.session_state.doc_editor = json.dumps(new_doc, indent=2) | |
else: | |
st.warning("No container selected!") | |
st.sidebar.text_input("New Field Key", key="new_field_key") | |
st.sidebar.text_input("New Field Value", key="new_field_value") | |
if st.sidebar.button("Add Field"): | |
if "doc_editor" in st.session_state: | |
add_field_to_doc() | |
else: | |
st.warning("No document loaded to add a field.") | |
if st.sidebar.button("New AI Record"): | |
if st.session_state.get("current_container"): | |
new_ai_record(st.session_state.current_container) | |
else: | |
st.warning("No container selected!") | |
if st.sidebar.button("New Links Record"): | |
if st.session_state.get("current_container"): | |
new_links_record(st.session_state.current_container) | |
else: | |
st.warning("No container selected!") | |
st.sidebar.markdown("## π Vector Search") | |
search_documents_ui(st.session_state.get("current_container")) | |
show_sidebar_data_grid() | |
# β¦ (rest of your main() function continues unchanged) β¦ | |
# For brevity, your remaining navigation, file management, and document editing UI remain as-is. | |
if __name__ == "__main__": | |
main() | |