|
import gradio as gr |
|
import datetime |
|
import random |
|
import time |
|
from typing import Dict, List, Any, Union, Optional |
|
import os |
|
import json |
|
import shutil |
|
|
|
|
|
from utils.storage import load_data, save_data |
|
from utils.state import generate_id, get_timestamp, record_activity |
|
|
|
def create_integrations_page(state: Dict[str, Any]) -> None: |
|
""" |
|
Create the Integrations page with options to connect external services and import/export data |
|
|
|
Args: |
|
state: Application state |
|
""" |
|
|
|
if "integrations" not in state: |
|
state["integrations"] = load_data("integrations.json", { |
|
"connected_services": [], |
|
"import_history": [], |
|
"export_history": [] |
|
}) |
|
|
|
|
|
with gr.Column(elem_id="integrations-page"): |
|
gr.Markdown("# 🔄 Integrations") |
|
gr.Markdown("*Connect with external services and import/export your data*") |
|
|
|
|
|
settings_notification = gr.Markdown("", elem_id="settings-notification") |
|
|
|
|
|
with gr.Tabs(): |
|
|
|
with gr.TabItem("Connected Services"): |
|
with gr.Row(): |
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### Available Services") |
|
|
|
|
|
with gr.Accordion("Productivity", open=True): |
|
with gr.Group(): |
|
create_service_item("Google Calendar", "Calendar sync", state) |
|
create_service_item("Microsoft To Do", "Task sync", state) |
|
create_service_item("Notion", "Notes sync", state) |
|
|
|
with gr.Accordion("Storage", open=True): |
|
with gr.Group(): |
|
create_service_item("Google Drive", "File backup", state) |
|
create_service_item("Dropbox", "File backup", state) |
|
create_service_item("OneDrive", "File backup", state) |
|
|
|
with gr.Accordion("Other", open=True): |
|
with gr.Group(): |
|
create_service_item("Weather API", "Weather data", state) |
|
create_service_item("Spotify", "Music integration", state) |
|
|
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### Connected Services") |
|
|
|
connected_services_container = gr.HTML( |
|
"<div id='connected-services-list'>No services connected yet</div>" |
|
) |
|
|
|
refresh_connections_btn = gr.Button("Refresh Connections") |
|
|
|
|
|
with gr.TabItem("Import Data"): |
|
with gr.Row(): |
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### Import Data") |
|
|
|
import_type = gr.Dropdown( |
|
choices=[ |
|
"Tasks", "Notes", "Goals", "Focus Sessions", |
|
"Mood Entries", "All Data" |
|
], |
|
label="Data Type", |
|
value="Tasks" |
|
) |
|
|
|
import_format = gr.Dropdown( |
|
choices=["JSON", "CSV", "Markdown"], |
|
label="File Format", |
|
value="JSON" |
|
) |
|
|
|
import_file = gr.File( |
|
label="Upload File", |
|
file_types=[".json", ".csv", ".md", ".txt"] |
|
) |
|
|
|
import_options = gr.CheckboxGroup( |
|
choices=[ |
|
"Replace existing data", |
|
"Preserve IDs", |
|
"Import timestamps" |
|
], |
|
label="Import Options" |
|
) |
|
|
|
import_btn = gr.Button("Import Data", variant="primary") |
|
import_status = gr.Markdown("") |
|
|
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### Import History") |
|
|
|
import_history = gr.Dataframe( |
|
headers=["Date", "Type", "Format", "Items", "Status"], |
|
datatype=["str", "str", "str", "number", "str"], |
|
row_count=10, |
|
col_count=(5, "fixed") |
|
) |
|
|
|
|
|
with gr.TabItem("Export Data"): |
|
with gr.Row(): |
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### Export Data") |
|
|
|
export_type = gr.Dropdown( |
|
choices=[ |
|
"Tasks", "Notes", "Goals", "Focus Sessions", |
|
"Mood Entries", "All Data" |
|
], |
|
label="Data Type", |
|
value="All Data" |
|
) |
|
|
|
export_format = gr.Dropdown( |
|
choices=["JSON", "CSV", "Markdown"], |
|
label="File Format", |
|
value="JSON" |
|
) |
|
|
|
export_options = gr.CheckboxGroup( |
|
choices=[ |
|
"Include IDs", |
|
"Include timestamps", |
|
"Pretty print (JSON)" |
|
], |
|
value=["Include IDs", "Include timestamps"], |
|
label="Export Options" |
|
) |
|
|
|
export_btn = gr.Button("Export Data", variant="primary") |
|
export_download = gr.File(label="Download Exported Data", interactive=False) |
|
export_status = gr.Markdown("") |
|
|
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### Export History") |
|
|
|
export_history = gr.Dataframe( |
|
headers=["Date", "Type", "Format", "Items", "Size"], |
|
datatype=["str", "str", "str", "number", "str"], |
|
row_count=10, |
|
col_count=(5, "fixed") |
|
) |
|
|
|
|
|
with gr.TabItem("Backup & Restore"): |
|
with gr.Row(): |
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### Create Backup") |
|
|
|
backup_name = gr.Textbox( |
|
label="Backup Name", |
|
placeholder="e.g., Weekly Backup", |
|
value=f"Backup {datetime.datetime.now().strftime('%Y-%m-%d')}" |
|
) |
|
|
|
backup_description = gr.Textbox( |
|
label="Description (Optional)", |
|
placeholder="Add a description for this backup...", |
|
lines=2 |
|
) |
|
|
|
backup_btn = gr.Button("Create Backup", variant="primary") |
|
backup_status = gr.Markdown("") |
|
backup_download = gr.File(label="Download Backup", interactive=False) |
|
|
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### Restore from Backup") |
|
|
|
restore_file = gr.File( |
|
label="Upload Backup File", |
|
file_types=[".zip"] |
|
) |
|
|
|
restore_options = gr.CheckboxGroup( |
|
choices=[ |
|
"Overwrite existing data", |
|
"Backup current data before restore" |
|
], |
|
value=["Backup current data before restore"], |
|
label="Restore Options" |
|
) |
|
|
|
restore_btn = gr.Button("Restore from Backup", variant="primary") |
|
restore_status = gr.Markdown("") |
|
|
|
|
|
with gr.Accordion("Backup History", open=True): |
|
backup_history = gr.Dataframe( |
|
headers=["Date", "Name", "Description", "Size", "Status"], |
|
datatype=["str", "str", "str", "str", "str"], |
|
row_count=5, |
|
col_count=(5, "fixed") |
|
) |
|
|
|
|
|
def create_service_item(service_name, service_description, state): |
|
"""Create a UI component for a service connection""" |
|
with gr.Group(elem_id=f"service-{service_name.lower().replace(' ', '-')}"): |
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
gr.Markdown(f"**{service_name}**") |
|
gr.Markdown(f"*{service_description}*") |
|
|
|
with gr.Column(scale=1): |
|
|
|
is_connected = service_name in [s.get("name") for s in state["integrations"].get("connected_services", [])] |
|
|
|
if is_connected: |
|
connect_btn = gr.Button("Disconnect", variant="stop") |
|
else: |
|
connect_btn = gr.Button("Connect", variant="primary") |
|
|
|
|
|
connect_btn.click( |
|
lambda svc, btn_text: toggle_service_connection(svc, btn_text == "Connect"), |
|
inputs=[gr.Textbox(value=service_name, visible=False), connect_btn], |
|
outputs=[connected_services_container, connect_btn, settings_notification] |
|
) |
|
|
|
|
|
def toggle_service_connection(service_name, connect): |
|
"""Connect or disconnect a service""" |
|
connected_services = state["integrations"].get("connected_services", []) |
|
|
|
if connect: |
|
|
|
api_key_required = service_name in [ |
|
"OpenWeatherMap", "GitHub", "Google Calendar", |
|
"Telegram", "News API", "Crypto API" |
|
] |
|
|
|
if api_key_required: |
|
|
|
api_key = safe_get(state, ["settings", "api_keys", service_name], "") |
|
|
|
if not api_key: |
|
|
|
return ( |
|
render_connected_services(connected_services), |
|
gr.update(value="Connect", variant="primary"), |
|
f"⚠️ API key required for {service_name}. Please add it in Settings > API Keys." |
|
) |
|
|
|
|
|
|
|
if service_name not in [s.get("name") for s in connected_services]: |
|
connected_services.append({ |
|
"name": service_name, |
|
"connected_at": get_timestamp(), |
|
"status": "active" |
|
}) |
|
record_activity(state, f"Connected to {service_name}") |
|
|
|
btn_text = "Disconnect" |
|
btn_variant = "stop" |
|
notification = f"✅ Successfully connected to {service_name}" |
|
else: |
|
|
|
connected_services = [s for s in connected_services if s.get("name") != service_name] |
|
record_activity(state, f"Disconnected from {service_name}") |
|
|
|
btn_text = "Connect" |
|
btn_variant = "primary" |
|
notification = f"✅ Disconnected from {service_name}" |
|
|
|
|
|
state["integrations"]["connected_services"] = connected_services |
|
save_data("integrations.json", state["integrations"]) |
|
|
|
|
|
html = render_connected_services(connected_services) |
|
return html, gr.update(value=btn_text, variant=btn_variant), notification |
|
|
|
|
|
def render_connected_services(services): |
|
"""Render HTML for connected services""" |
|
if not services: |
|
return "<div id='connected-services-list'>No services connected yet</div>" |
|
|
|
html = "<div id='connected-services-list'>" |
|
|
|
for service in services: |
|
name = service.get("name", "Unknown") |
|
connected_at = service.get("connected_at", "") |
|
status = service.get("status", "unknown") |
|
|
|
|
|
try: |
|
date_obj = datetime.datetime.fromisoformat(connected_at) |
|
date_str = date_obj.strftime("%Y-%m-%d %H:%M") |
|
except: |
|
date_str = connected_at |
|
|
|
|
|
status_color = "green" if status == "active" else "red" |
|
|
|
html += f""" |
|
<div class='connected-service-item'> |
|
<div class='service-header'> |
|
<span class='service-name'>{name}</span> |
|
<span class='service-status' style='color: {status_color};'>●</span> |
|
</div> |
|
<div class='service-details'> |
|
<span class='service-date'>Connected: {date_str}</span> |
|
</div> |
|
</div> |
|
""" |
|
|
|
html += "</div>" |
|
return html |
|
|
|
|
|
def import_data(data_type, file_format, file_path, options): |
|
"""Import data from a file""" |
|
if not file_path: |
|
return "Please upload a file to import", gr.update() |
|
|
|
try: |
|
|
|
|
|
|
|
|
|
_, ext = os.path.splitext(file_path.name) |
|
|
|
|
|
expected_ext = { |
|
"JSON": ".json", |
|
"CSV": ".csv", |
|
"Markdown": ".md" |
|
}.get(file_format, "") |
|
|
|
if expected_ext and ext.lower() != expected_ext.lower(): |
|
return f"File format mismatch. Expected {expected_ext} for {file_format} format.", gr.update() |
|
|
|
|
|
with open(file_path.name, "r") as f: |
|
|
|
content = f.read(1000) |
|
|
|
|
|
time.sleep(1) |
|
|
|
|
|
num_items = random.randint(5, 50) |
|
|
|
|
|
import_record = { |
|
"timestamp": get_timestamp(), |
|
"type": data_type, |
|
"format": file_format, |
|
"items": num_items, |
|
"status": "Success", |
|
"file": os.path.basename(file_path.name) |
|
} |
|
|
|
state["integrations"]["import_history"].append(import_record) |
|
save_data("integrations.json", state["integrations"]) |
|
|
|
|
|
record_activity(state, f"Imported {data_type} data from {file_format} file") |
|
|
|
|
|
history_data = format_import_history(state["integrations"]["import_history"]) |
|
|
|
return f"Successfully imported {num_items} {data_type.lower()} from {file_format} file", gr.update(value=history_data) |
|
|
|
except Exception as e: |
|
|
|
import_record = { |
|
"timestamp": get_timestamp(), |
|
"type": data_type, |
|
"format": file_format, |
|
"items": 0, |
|
"status": f"Failed: {str(e)}", |
|
"file": os.path.basename(file_path.name) if file_path else "" |
|
} |
|
|
|
state["integrations"]["import_history"].append(import_record) |
|
save_data("integrations.json", state["integrations"]) |
|
|
|
|
|
history_data = format_import_history(state["integrations"]["import_history"]) |
|
|
|
return f"Import failed: {str(e)}", gr.update(value=history_data) |
|
|
|
|
|
def format_import_history(history): |
|
"""Format import history for display in dataframe""" |
|
formatted_history = [] |
|
|
|
|
|
sorted_history = sorted(history, key=lambda x: x.get("timestamp", ""), reverse=True) |
|
|
|
for entry in sorted_history: |
|
timestamp = entry.get("timestamp", "") |
|
try: |
|
date = datetime.datetime.fromisoformat(timestamp).strftime("%Y-%m-%d %H:%M") |
|
except: |
|
date = timestamp |
|
|
|
data_type = entry.get("type", "") |
|
file_format = entry.get("format", "") |
|
items = entry.get("items", 0) |
|
status = entry.get("status", "") |
|
|
|
formatted_history.append([date, data_type, file_format, items, status]) |
|
|
|
return formatted_history |
|
|
|
|
|
def export_data(data_type, file_format, options): |
|
"""Export data to a file""" |
|
try: |
|
|
|
|
|
|
|
|
|
os.makedirs("exports", exist_ok=True) |
|
|
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filename = f"exports/{data_type.lower().replace(' ', '_')}_{timestamp}" |
|
|
|
|
|
if file_format == "JSON": |
|
filename += ".json" |
|
content_type = "application/json" |
|
elif file_format == "CSV": |
|
filename += ".csv" |
|
content_type = "text/csv" |
|
else: |
|
filename += ".md" |
|
content_type = "text/markdown" |
|
|
|
|
|
content = generate_dummy_export(data_type, file_format, options) |
|
|
|
|
|
with open(filename, "w") as f: |
|
f.write(content) |
|
|
|
|
|
file_size = os.path.getsize(filename) |
|
size_str = format_file_size(file_size) |
|
|
|
|
|
num_items = random.randint(10, 100) |
|
|
|
|
|
export_record = { |
|
"timestamp": get_timestamp(), |
|
"type": data_type, |
|
"format": file_format, |
|
"items": num_items, |
|
"size": size_str, |
|
"file": os.path.basename(filename) |
|
} |
|
|
|
state["integrations"]["export_history"].append(export_record) |
|
save_data("integrations.json", state["integrations"]) |
|
|
|
|
|
record_activity(state, f"Exported {data_type} data to {file_format} file") |
|
|
|
|
|
history_data = format_export_history(state["integrations"]["export_history"]) |
|
|
|
return f"Successfully exported {num_items} {data_type.lower()} to {file_format} file", gr.update(value=filename, visible=True), gr.update(value=history_data) |
|
|
|
except Exception as e: |
|
|
|
export_record = { |
|
"timestamp": get_timestamp(), |
|
"type": data_type, |
|
"format": file_format, |
|
"items": 0, |
|
"size": "0 KB", |
|
"status": f"Failed: {str(e)}" |
|
} |
|
|
|
state["integrations"]["export_history"].append(export_record) |
|
save_data("integrations.json", state["integrations"]) |
|
|
|
|
|
history_data = format_export_history(state["integrations"]["export_history"]) |
|
|
|
return f"Export failed: {str(e)}", gr.update(visible=False), gr.update(value=history_data) |
|
|
|
|
|
def format_export_history(history): |
|
"""Format export history for display in dataframe""" |
|
formatted_history = [] |
|
|
|
|
|
sorted_history = sorted(history, key=lambda x: x.get("timestamp", ""), reverse=True) |
|
|
|
for entry in sorted_history: |
|
timestamp = entry.get("timestamp", "") |
|
try: |
|
date = datetime.datetime.fromisoformat(timestamp).strftime("%Y-%m-%d %H:%M") |
|
except: |
|
date = timestamp |
|
|
|
data_type = entry.get("type", "") |
|
file_format = entry.get("format", "") |
|
items = entry.get("items", 0) |
|
size = entry.get("size", "0 KB") |
|
|
|
formatted_history.append([date, data_type, file_format, items, size]) |
|
|
|
return formatted_history |
|
|
|
|
|
def generate_dummy_export(data_type, file_format, options): |
|
"""Generate dummy export content based on data type and format""" |
|
include_ids = "Include IDs" in options |
|
include_timestamps = "Include timestamps" in options |
|
pretty_print = "Pretty print (JSON)" in options |
|
|
|
if file_format == "JSON": |
|
|
|
data = [] |
|
|
|
for i in range(10): |
|
item = {} |
|
|
|
if include_ids: |
|
item["id"] = f"dummy-id-{i+1}" |
|
|
|
if include_timestamps: |
|
item["timestamp"] = (datetime.datetime.now() - datetime.timedelta(days=i)).isoformat() |
|
|
|
|
|
if data_type == "Tasks": |
|
item["title"] = f"Sample Task {i+1}" |
|
item["description"] = f"This is a sample task description for task {i+1}" |
|
item["status"] = random.choice(["todo", "in_progress", "done"]) |
|
item["priority"] = random.choice(["low", "medium", "high"]) |
|
|
|
elif data_type == "Notes": |
|
item["title"] = f"Sample Note {i+1}" |
|
item["content"] = f"This is the content of sample note {i+1}." |
|
item["tags"] = [f"tag{j}" for j in range(1, random.randint(1, 4))] |
|
|
|
elif data_type == "Goals": |
|
item["title"] = f"Sample Goal {i+1}" |
|
item["description"] = f"This is a sample goal description for goal {i+1}" |
|
item["progress"] = random.randint(0, 100) |
|
|
|
elif data_type == "Focus Sessions": |
|
item["duration_minutes"] = random.randint(15, 60) |
|
item["description"] = f"Focus session {i+1}" |
|
|
|
elif data_type == "Mood Entries": |
|
item["mood"] = random.choice(["Great", "Good", "Neutral", "Low", "Very Low"]) |
|
item["energy"] = random.randint(1, 10) |
|
item["factors"] = random.sample(["Work", "Sleep", "Exercise", "Nutrition", "Stress"], k=random.randint(1, 3)) |
|
|
|
else: |
|
item["type"] = random.choice(["task", "note", "goal", "focus", "mood"]) |
|
item["title"] = f"Sample Item {i+1}" |
|
|
|
data.append(item) |
|
|
|
|
|
if pretty_print: |
|
return json.dumps(data, indent=2) |
|
else: |
|
return json.dumps(data) |
|
|
|
elif file_format == "CSV": |
|
|
|
lines = [] |
|
|
|
|
|
header = [] |
|
if include_ids: |
|
header.append("id") |
|
if include_timestamps: |
|
header.append("timestamp") |
|
|
|
|
|
if data_type == "Tasks": |
|
header.extend(["title", "description", "status", "priority"]) |
|
elif data_type == "Notes": |
|
header.extend(["title", "content", "tags"]) |
|
elif data_type == "Goals": |
|
header.extend(["title", "description", "progress"]) |
|
elif data_type == "Focus Sessions": |
|
header.extend(["duration_minutes", "description"]) |
|
elif data_type == "Mood Entries": |
|
header.extend(["mood", "energy", "factors"]) |
|
else: |
|
header.extend(["type", "title"]) |
|
|
|
lines.append(",".join(header)) |
|
|
|
|
|
for i in range(10): |
|
row = [] |
|
|
|
if include_ids: |
|
row.append(f"dummy-id-{i+1}") |
|
|
|
if include_timestamps: |
|
row.append((datetime.datetime.now() - datetime.timedelta(days=i)).isoformat()) |
|
|
|
|
|
if data_type == "Tasks": |
|
row.extend([ |
|
f"Sample Task {i+1}", |
|
f"This is a sample task description for task {i+1}", |
|
random.choice(["todo", "in_progress", "done"]), |
|
random.choice(["low", "medium", "high"]) |
|
]) |
|
|
|
elif data_type == "Notes": |
|
tags = ",".join([f"tag{j}" for j in range(1, random.randint(1, 4))]) |
|
row.extend([ |
|
f"Sample Note {i+1}", |
|
f"This is the content of sample note {i+1}.", |
|
tags |
|
]) |
|
|
|
elif data_type == "Goals": |
|
row.extend([ |
|
f"Sample Goal {i+1}", |
|
f"This is a sample goal description for goal {i+1}", |
|
str(random.randint(0, 100)) |
|
]) |
|
|
|
elif data_type == "Focus Sessions": |
|
row.extend([ |
|
str(random.randint(15, 60)), |
|
f"Focus session {i+1}" |
|
]) |
|
|
|
elif data_type == "Mood Entries": |
|
factors = ",".join(random.sample(["Work", "Sleep", "Exercise", "Nutrition", "Stress"], k=random.randint(1, 3))) |
|
row.extend([ |
|
random.choice(["Great", "Good", "Neutral", "Low", "Very Low"]), |
|
str(random.randint(1, 10)), |
|
factors |
|
]) |
|
|
|
else: |
|
row.extend([ |
|
random.choice(["task", "note", "goal", "focus", "mood"]), |
|
f"Sample Item {i+1}" |
|
]) |
|
|
|
lines.append(",".join(row)) |
|
|
|
return "\n".join(lines) |
|
|
|
else: |
|
|
|
lines = [] |
|
|
|
lines.append(f"# {data_type} Export") |
|
lines.append(f"Generated on {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
|
lines.append("") |
|
|
|
for i in range(10): |
|
lines.append(f"## Item {i+1}") |
|
|
|
if include_ids: |
|
lines.append(f"**ID:** dummy-id-{i+1}") |
|
|
|
if include_timestamps: |
|
timestamp = (datetime.datetime.now() - datetime.timedelta(days=i)).strftime("%Y-%m-%d %H:%M:%S") |
|
lines.append(f"**Timestamp:** {timestamp}") |
|
|
|
|
|
if data_type == "Tasks": |
|
lines.append(f"**Title:** Sample Task {i+1}") |
|
lines.append(f"**Description:** This is a sample task description for task {i+1}") |
|
lines.append(f"**Status:** {random.choice(['To Do', 'In Progress', 'Done'])}") |
|
lines.append(f"**Priority:** {random.choice(['Low', 'Medium', 'High'])}") |
|
|
|
elif data_type == "Notes": |
|
lines.append(f"**Title:** Sample Note {i+1}") |
|
lines.append(f"**Content:**") |
|
lines.append(f"This is the content of sample note {i+1}.") |
|
tags = ", ".join([f"tag{j}" for j in range(1, random.randint(1, 4))]) |
|
lines.append(f"**Tags:** {tags}") |
|
|
|
elif data_type == "Goals": |
|
lines.append(f"**Title:** Sample Goal {i+1}") |
|
lines.append(f"**Description:** This is a sample goal description for goal {i+1}") |
|
lines.append(f"**Progress:** {random.randint(0, 100)}%") |
|
|
|
elif data_type == "Focus Sessions": |
|
lines.append(f"**Duration:** {random.randint(15, 60)} minutes") |
|
lines.append(f"**Description:** Focus session {i+1}") |
|
|
|
elif data_type == "Mood Entries": |
|
lines.append(f"**Mood:** {random.choice(['Great', 'Good', 'Neutral', 'Low', 'Very Low'])}") |
|
lines.append(f"**Energy:** {random.randint(1, 10)}/10") |
|
factors = ", ".join(random.sample(["Work", "Sleep", "Exercise", "Nutrition", "Stress"], k=random.randint(1, 3))) |
|
lines.append(f"**Factors:** {factors}") |
|
|
|
else: |
|
item_type = random.choice(["Task", "Note", "Goal", "Focus Session", "Mood Entry"]) |
|
lines.append(f"**Type:** {item_type}") |
|
lines.append(f"**Title:** Sample {item_type} {i+1}") |
|
|
|
lines.append("") |
|
|
|
return "\n".join(lines) |
|
|
|
|
|
def format_file_size(size_bytes): |
|
"""Format file size in human-readable format""" |
|
if size_bytes < 1024: |
|
return f"{size_bytes} B" |
|
elif size_bytes < 1024 * 1024: |
|
return f"{size_bytes / 1024:.1f} KB" |
|
elif size_bytes < 1024 * 1024 * 1024: |
|
return f"{size_bytes / (1024 * 1024):.1f} MB" |
|
else: |
|
return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB" |
|
|
|
|
|
def create_backup(name, description): |
|
"""Create a backup of all data""" |
|
try: |
|
|
|
|
|
|
|
|
|
os.makedirs("backups", exist_ok=True) |
|
|
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filename = f"backups/{name.replace(' ', '_')}_{timestamp}.zip" |
|
|
|
|
|
with open("dummy_data.txt", "w") as f: |
|
f.write(f"This is a dummy backup file for {name}\n") |
|
f.write(f"Description: {description}\n") |
|
f.write(f"Created: {datetime.datetime.now().isoformat()}\n") |
|
f.write("\nThis would contain all your app data in a real implementation.") |
|
|
|
|
|
import zipfile |
|
with zipfile.ZipFile(filename, 'w') as zipf: |
|
zipf.write("dummy_data.txt") |
|
|
|
|
|
os.remove("dummy_data.txt") |
|
|
|
|
|
file_size = os.path.getsize(filename) |
|
size_str = format_file_size(file_size) |
|
|
|
|
|
if "backup_history" not in state["integrations"]: |
|
state["integrations"]["backup_history"] = [] |
|
|
|
backup_record = { |
|
"timestamp": get_timestamp(), |
|
"name": name, |
|
"description": description, |
|
"size": size_str, |
|
"status": "Success", |
|
"file": os.path.basename(filename) |
|
} |
|
|
|
state["integrations"]["backup_history"].append(backup_record) |
|
save_data("integrations.json", state["integrations"]) |
|
|
|
|
|
record_activity(state, f"Created backup: {name}") |
|
|
|
|
|
history_data = format_backup_history(state["integrations"].get("backup_history", [])) |
|
|
|
return f"Successfully created backup: {name}", gr.update(value=filename, visible=True), gr.update(value=history_data) |
|
|
|
except Exception as e: |
|
|
|
if "backup_history" not in state["integrations"]: |
|
state["integrations"]["backup_history"] = [] |
|
|
|
backup_record = { |
|
"timestamp": get_timestamp(), |
|
"name": name, |
|
"description": description, |
|
"size": "0 KB", |
|
"status": f"Failed: {str(e)}", |
|
"file": "" |
|
} |
|
|
|
state["integrations"]["backup_history"].append(backup_record) |
|
save_data("integrations.json", state["integrations"]) |
|
|
|
|
|
history_data = format_backup_history(state["integrations"].get("backup_history", [])) |
|
|
|
return f"Backup failed: {str(e)}", gr.update(visible=False), gr.update(value=history_data) |
|
|
|
|
|
def restore_from_backup(file_path, options): |
|
"""Restore data from a backup file""" |
|
if not file_path: |
|
return "Please upload a backup file to restore" |
|
|
|
try: |
|
|
|
|
|
|
|
|
|
_, ext = os.path.splitext(file_path.name) |
|
if ext.lower() != ".zip": |
|
return "Invalid backup file. Please upload a .zip file." |
|
|
|
|
|
time.sleep(2) |
|
|
|
|
|
record_activity(state, "Restored data from backup") |
|
|
|
return "Successfully restored data from backup" |
|
|
|
except Exception as e: |
|
return f"Restore failed: {str(e)}" |
|
|
|
|
|
def format_backup_history(history): |
|
"""Format backup history for display in dataframe""" |
|
formatted_history = [] |
|
|
|
|
|
sorted_history = sorted(history, key=lambda x: x.get("timestamp", ""), reverse=True) |
|
|
|
for entry in sorted_history: |
|
timestamp = entry.get("timestamp", "") |
|
try: |
|
date = datetime.datetime.fromisoformat(timestamp).strftime("%Y-%m-%d %H:%M") |
|
except: |
|
date = timestamp |
|
|
|
name = entry.get("name", "") |
|
description = entry.get("description", "") |
|
size = entry.get("size", "0 KB") |
|
status = entry.get("status", "") |
|
|
|
formatted_history.append([date, name, description, size, status]) |
|
|
|
return formatted_history |
|
|
|
|
|
|
|
|
|
refresh_connections_btn.click( |
|
lambda: render_connected_services(state["integrations"].get("connected_services", [])), |
|
outputs=[connected_services_container] |
|
) |
|
|
|
|
|
import_btn.click( |
|
import_data, |
|
inputs=[import_type, import_format, import_file, import_options], |
|
outputs=[import_status, import_history] |
|
) |
|
|
|
|
|
export_btn.click( |
|
export_data, |
|
inputs=[export_type, export_format, export_options], |
|
outputs=[export_status, export_download, export_history] |
|
) |
|
|
|
|
|
backup_btn.click( |
|
create_backup, |
|
inputs=[backup_name, backup_description], |
|
outputs=[backup_status, backup_download, backup_history] |
|
) |
|
|
|
restore_btn.click( |
|
restore_from_backup, |
|
inputs=[restore_file, restore_options], |
|
outputs=[restore_status] |
|
) |
|
|
|
|
|
|
|
|
|
connected_services_container.value = render_connected_services( |
|
state["integrations"].get("connected_services", []) |
|
) |
|
|
|
|
|
import_history.value = format_import_history( |
|
state["integrations"].get("import_history", []) |
|
) |
|
|
|
|
|
export_history.value = format_export_history( |
|
state["integrations"].get("export_history", []) |
|
) |
|
|
|
|
|
backup_history.value = format_backup_history( |
|
state["integrations"].get("backup_history", []) |
|
) |
|
|
|
|
|
record_activity(state, "Viewed Integrations Page") |