import gradio as gr import datetime import random import time from typing import Dict, List, Any, Union, Optional import os import json import shutil # Import utilities from utils.storage import load_data, save_data from utils.state import generate_id, get_timestamp, record_activity def create_integrations_page(state: Dict[str, Any]) -> None: """ Create the Integrations page with options to connect external services and import/export data Args: state: Application state """ # Initialize integrations data if not present if "integrations" not in state: state["integrations"] = load_data("integrations.json", { "connected_services": [], "import_history": [], "export_history": [] }) # Create the integrations page layout with gr.Column(elem_id="integrations-page"): gr.Markdown("# 🔄 Integrations") gr.Markdown("*Connect with external services and import/export your data*") # Notification area for messages settings_notification = gr.Markdown("", elem_id="settings-notification") # Main tabs for different integration features with gr.Tabs(): # Connected Services Tab with gr.TabItem("Connected Services"): with gr.Row(): # Left column - Available services with gr.Column(scale=1): gr.Markdown("### Available Services") # Service categories with gr.Accordion("Productivity", open=True): with gr.Group(): create_service_item("Google Calendar", "Calendar sync", state) create_service_item("Microsoft To Do", "Task sync", state) create_service_item("Notion", "Notes sync", state) with gr.Accordion("Storage", open=True): with gr.Group(): create_service_item("Google Drive", "File backup", state) create_service_item("Dropbox", "File backup", state) create_service_item("OneDrive", "File backup", state) with gr.Accordion("Other", open=True): with gr.Group(): create_service_item("Weather API", "Weather data", state) create_service_item("Spotify", "Music integration", state) # Right column - Connected services with gr.Column(scale=1): gr.Markdown("### Connected Services") connected_services_container = gr.HTML( "
No services connected yet
" ) refresh_connections_btn = gr.Button("Refresh Connections") # Import Data Tab with gr.TabItem("Import Data"): with gr.Row(): # Left column - Import options with gr.Column(scale=1): gr.Markdown("### Import Data") import_type = gr.Dropdown( choices=[ "Tasks", "Notes", "Goals", "Focus Sessions", "Mood Entries", "All Data" ], label="Data Type", value="Tasks" ) import_format = gr.Dropdown( choices=["JSON", "CSV", "Markdown"], label="File Format", value="JSON" ) import_file = gr.File( label="Upload File", file_types=[".json", ".csv", ".md", ".txt"] ) import_options = gr.CheckboxGroup( choices=[ "Replace existing data", "Preserve IDs", "Import timestamps" ], label="Import Options" ) import_btn = gr.Button("Import Data", variant="primary") import_status = gr.Markdown("") # Right column - Import history with gr.Column(scale=1): gr.Markdown("### Import History") import_history = gr.Dataframe( headers=["Date", "Type", "Format", "Items", "Status"], datatype=["str", "str", "str", "number", "str"], row_count=10, col_count=(5, "fixed") ) # Export Data Tab with gr.TabItem("Export Data"): with gr.Row(): # Left column - Export options with gr.Column(scale=1): gr.Markdown("### Export Data") export_type = gr.Dropdown( choices=[ "Tasks", "Notes", "Goals", "Focus Sessions", "Mood Entries", "All Data" ], label="Data Type", value="All Data" ) export_format = gr.Dropdown( choices=["JSON", "CSV", "Markdown"], label="File Format", value="JSON" ) export_options = gr.CheckboxGroup( choices=[ "Include IDs", "Include timestamps", "Pretty print (JSON)" ], value=["Include IDs", "Include timestamps"], label="Export Options" ) export_btn = gr.Button("Export Data", variant="primary") export_download = gr.File(label="Download Exported Data", interactive=False) export_status = gr.Markdown("") # Right column - Export history with gr.Column(scale=1): gr.Markdown("### Export History") export_history = gr.Dataframe( headers=["Date", "Type", "Format", "Items", "Size"], datatype=["str", "str", "str", "number", "str"], row_count=10, col_count=(5, "fixed") ) # Backup & Restore Tab with gr.TabItem("Backup & Restore"): with gr.Row(): # Left column - Backup options with gr.Column(scale=1): gr.Markdown("### Create Backup") backup_name = gr.Textbox( label="Backup Name", placeholder="e.g., Weekly Backup", value=f"Backup {datetime.datetime.now().strftime('%Y-%m-%d')}" ) backup_description = gr.Textbox( label="Description (Optional)", placeholder="Add a description for this backup...", lines=2 ) backup_btn = gr.Button("Create Backup", variant="primary") backup_status = gr.Markdown("") backup_download = gr.File(label="Download Backup", interactive=False) # Right column - Restore options with gr.Column(scale=1): gr.Markdown("### Restore from Backup") restore_file = gr.File( label="Upload Backup File", file_types=[".zip"] ) restore_options = gr.CheckboxGroup( choices=[ "Overwrite existing data", "Backup current data before restore" ], value=["Backup current data before restore"], label="Restore Options" ) restore_btn = gr.Button("Restore from Backup", variant="primary") restore_status = gr.Markdown("") # Backup history with gr.Accordion("Backup History", open=True): backup_history = gr.Dataframe( headers=["Date", "Name", "Description", "Size", "Status"], datatype=["str", "str", "str", "str", "str"], row_count=5, col_count=(5, "fixed") ) # Function to create a service connection item def create_service_item(service_name, service_description, state): """Create a UI component for a service connection""" with gr.Group(elem_id=f"service-{service_name.lower().replace(' ', '-')}"): with gr.Row(): with gr.Column(scale=3): gr.Markdown(f"**{service_name}**") gr.Markdown(f"*{service_description}*") with gr.Column(scale=1): # Check if service is already connected is_connected = service_name in [s.get("name") for s in state["integrations"].get("connected_services", [])] if is_connected: connect_btn = gr.Button("Disconnect", variant="stop") else: connect_btn = gr.Button("Connect", variant="primary") # Connect button click handler connect_btn.click( lambda svc, btn_text: toggle_service_connection(svc, btn_text == "Connect"), inputs=[gr.Textbox(value=service_name, visible=False), connect_btn], outputs=[connected_services_container, connect_btn, settings_notification] ) # Function to toggle service connection def toggle_service_connection(service_name, connect): """Connect or disconnect a service""" connected_services = state["integrations"].get("connected_services", []) if connect: # Check if API key is required and available for this service api_key_required = service_name in [ "OpenWeatherMap", "GitHub", "Google Calendar", "Telegram", "News API", "Crypto API" ] if api_key_required: # Get API key from settings api_key = safe_get(state, ["settings", "api_keys", service_name], "") if not api_key: # API key is required but not available return ( render_connected_services(connected_services), gr.update(value="Connect", variant="primary"), f"⚠️ API key required for {service_name}. Please add it in Settings > API Keys." ) # Connect service (in a real app, this would show an auth flow) # For demo purposes, we'll just add it to the list if service_name not in [s.get("name") for s in connected_services]: connected_services.append({ "name": service_name, "connected_at": get_timestamp(), "status": "active" }) record_activity(state, f"Connected to {service_name}") btn_text = "Disconnect" btn_variant = "stop" notification = f"✅ Successfully connected to {service_name}" else: # Disconnect service connected_services = [s for s in connected_services if s.get("name") != service_name] record_activity(state, f"Disconnected from {service_name}") btn_text = "Connect" btn_variant = "primary" notification = f"✅ Disconnected from {service_name}" # Update state and save state["integrations"]["connected_services"] = connected_services save_data("integrations.json", state["integrations"]) # Update UI html = render_connected_services(connected_services) return html, gr.update(value=btn_text, variant=btn_variant), notification # Function to render connected services HTML def render_connected_services(services): """Render HTML for connected services""" if not services: return "
No services connected yet
" html = "
" for service in services: name = service.get("name", "Unknown") connected_at = service.get("connected_at", "") status = service.get("status", "unknown") # Format date try: date_obj = datetime.datetime.fromisoformat(connected_at) date_str = date_obj.strftime("%Y-%m-%d %H:%M") except: date_str = connected_at # Status indicator status_color = "green" if status == "active" else "red" html += f"""
{name}
Connected: {date_str}
""" html += "
" return html # Function to import data def import_data(data_type, file_format, file_path, options): """Import data from a file""" if not file_path: return "Please upload a file to import", gr.update() try: # In a real app, this would parse the file and import the data # For demo purposes, we'll just simulate the import # Get file extension _, ext = os.path.splitext(file_path.name) # Check if file format matches extension expected_ext = { "JSON": ".json", "CSV": ".csv", "Markdown": ".md" }.get(file_format, "") if expected_ext and ext.lower() != expected_ext.lower(): return f"File format mismatch. Expected {expected_ext} for {file_format} format.", gr.update() # Simulate reading the file with open(file_path.name, "r") as f: # Just read the first few lines to check format content = f.read(1000) # Simulate parsing and importing time.sleep(1) # Simulate processing time # Generate random number of imported items num_items = random.randint(5, 50) # Record import in history import_record = { "timestamp": get_timestamp(), "type": data_type, "format": file_format, "items": num_items, "status": "Success", "file": os.path.basename(file_path.name) } state["integrations"]["import_history"].append(import_record) save_data("integrations.json", state["integrations"]) # Record activity record_activity(state, f"Imported {data_type} data from {file_format} file") # Update import history display history_data = format_import_history(state["integrations"]["import_history"]) return f"Successfully imported {num_items} {data_type.lower()} from {file_format} file", gr.update(value=history_data) except Exception as e: # Record failed import import_record = { "timestamp": get_timestamp(), "type": data_type, "format": file_format, "items": 0, "status": f"Failed: {str(e)}", "file": os.path.basename(file_path.name) if file_path else "" } state["integrations"]["import_history"].append(import_record) save_data("integrations.json", state["integrations"]) # Update import history display history_data = format_import_history(state["integrations"]["import_history"]) return f"Import failed: {str(e)}", gr.update(value=history_data) # Function to format import history for display def format_import_history(history): """Format import history for display in dataframe""" formatted_history = [] # Sort by timestamp (newest first) sorted_history = sorted(history, key=lambda x: x.get("timestamp", ""), reverse=True) for entry in sorted_history: timestamp = entry.get("timestamp", "") try: date = datetime.datetime.fromisoformat(timestamp).strftime("%Y-%m-%d %H:%M") except: date = timestamp data_type = entry.get("type", "") file_format = entry.get("format", "") items = entry.get("items", 0) status = entry.get("status", "") formatted_history.append([date, data_type, file_format, items, status]) return formatted_history # Function to export data def export_data(data_type, file_format, options): """Export data to a file""" try: # In a real app, this would fetch and format the data # For demo purposes, we'll just create a dummy file # Create exports directory if it doesn't exist os.makedirs("exports", exist_ok=True) # Generate filename timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"exports/{data_type.lower().replace(' ', '_')}_{timestamp}" # Add extension based on format if file_format == "JSON": filename += ".json" content_type = "application/json" elif file_format == "CSV": filename += ".csv" content_type = "text/csv" else: # Markdown filename += ".md" content_type = "text/markdown" # Generate dummy content based on data type and format content = generate_dummy_export(data_type, file_format, options) # Write to file with open(filename, "w") as f: f.write(content) # Get file size file_size = os.path.getsize(filename) size_str = format_file_size(file_size) # Generate random number of exported items num_items = random.randint(10, 100) # Record export in history export_record = { "timestamp": get_timestamp(), "type": data_type, "format": file_format, "items": num_items, "size": size_str, "file": os.path.basename(filename) } state["integrations"]["export_history"].append(export_record) save_data("integrations.json", state["integrations"]) # Record activity record_activity(state, f"Exported {data_type} data to {file_format} file") # Update export history display history_data = format_export_history(state["integrations"]["export_history"]) return f"Successfully exported {num_items} {data_type.lower()} to {file_format} file", gr.update(value=filename, visible=True), gr.update(value=history_data) except Exception as e: # Record failed export export_record = { "timestamp": get_timestamp(), "type": data_type, "format": file_format, "items": 0, "size": "0 KB", "status": f"Failed: {str(e)}" } state["integrations"]["export_history"].append(export_record) save_data("integrations.json", state["integrations"]) # Update export history display history_data = format_export_history(state["integrations"]["export_history"]) return f"Export failed: {str(e)}", gr.update(visible=False), gr.update(value=history_data) # Function to format export history for display def format_export_history(history): """Format export history for display in dataframe""" formatted_history = [] # Sort by timestamp (newest first) sorted_history = sorted(history, key=lambda x: x.get("timestamp", ""), reverse=True) for entry in sorted_history: timestamp = entry.get("timestamp", "") try: date = datetime.datetime.fromisoformat(timestamp).strftime("%Y-%m-%d %H:%M") except: date = timestamp data_type = entry.get("type", "") file_format = entry.get("format", "") items = entry.get("items", 0) size = entry.get("size", "0 KB") formatted_history.append([date, data_type, file_format, items, size]) return formatted_history # Function to generate dummy export content def generate_dummy_export(data_type, file_format, options): """Generate dummy export content based on data type and format""" include_ids = "Include IDs" in options include_timestamps = "Include timestamps" in options pretty_print = "Pretty print (JSON)" in options if file_format == "JSON": # Generate dummy JSON data data = [] for i in range(10): # Just generate 10 items for demo item = {} if include_ids: item["id"] = f"dummy-id-{i+1}" if include_timestamps: item["timestamp"] = (datetime.datetime.now() - datetime.timedelta(days=i)).isoformat() # Add type-specific fields if data_type == "Tasks": item["title"] = f"Sample Task {i+1}" item["description"] = f"This is a sample task description for task {i+1}" item["status"] = random.choice(["todo", "in_progress", "done"]) item["priority"] = random.choice(["low", "medium", "high"]) elif data_type == "Notes": item["title"] = f"Sample Note {i+1}" item["content"] = f"This is the content of sample note {i+1}." item["tags"] = [f"tag{j}" for j in range(1, random.randint(1, 4))] elif data_type == "Goals": item["title"] = f"Sample Goal {i+1}" item["description"] = f"This is a sample goal description for goal {i+1}" item["progress"] = random.randint(0, 100) elif data_type == "Focus Sessions": item["duration_minutes"] = random.randint(15, 60) item["description"] = f"Focus session {i+1}" elif data_type == "Mood Entries": item["mood"] = random.choice(["Great", "Good", "Neutral", "Low", "Very Low"]) item["energy"] = random.randint(1, 10) item["factors"] = random.sample(["Work", "Sleep", "Exercise", "Nutrition", "Stress"], k=random.randint(1, 3)) else: # All Data - just add a type field item["type"] = random.choice(["task", "note", "goal", "focus", "mood"]) item["title"] = f"Sample Item {i+1}" data.append(item) # Convert to JSON string if pretty_print: return json.dumps(data, indent=2) else: return json.dumps(data) elif file_format == "CSV": # Generate dummy CSV data lines = [] # Header row header = [] if include_ids: header.append("id") if include_timestamps: header.append("timestamp") # Add type-specific fields to header if data_type == "Tasks": header.extend(["title", "description", "status", "priority"]) elif data_type == "Notes": header.extend(["title", "content", "tags"]) elif data_type == "Goals": header.extend(["title", "description", "progress"]) elif data_type == "Focus Sessions": header.extend(["duration_minutes", "description"]) elif data_type == "Mood Entries": header.extend(["mood", "energy", "factors"]) else: # All Data header.extend(["type", "title"]) lines.append(",".join(header)) # Data rows for i in range(10): # Just generate 10 items for demo row = [] if include_ids: row.append(f"dummy-id-{i+1}") if include_timestamps: row.append((datetime.datetime.now() - datetime.timedelta(days=i)).isoformat()) # Add type-specific fields if data_type == "Tasks": row.extend([ f"Sample Task {i+1}", f"This is a sample task description for task {i+1}", random.choice(["todo", "in_progress", "done"]), random.choice(["low", "medium", "high"]) ]) elif data_type == "Notes": tags = ",".join([f"tag{j}" for j in range(1, random.randint(1, 4))]) row.extend([ f"Sample Note {i+1}", f"This is the content of sample note {i+1}.", tags ]) elif data_type == "Goals": row.extend([ f"Sample Goal {i+1}", f"This is a sample goal description for goal {i+1}", str(random.randint(0, 100)) ]) elif data_type == "Focus Sessions": row.extend([ str(random.randint(15, 60)), f"Focus session {i+1}" ]) elif data_type == "Mood Entries": factors = ",".join(random.sample(["Work", "Sleep", "Exercise", "Nutrition", "Stress"], k=random.randint(1, 3))) row.extend([ random.choice(["Great", "Good", "Neutral", "Low", "Very Low"]), str(random.randint(1, 10)), factors ]) else: # All Data row.extend([ random.choice(["task", "note", "goal", "focus", "mood"]), f"Sample Item {i+1}" ]) lines.append(",".join(row)) return "\n".join(lines) else: # Markdown # Generate dummy Markdown data lines = [] lines.append(f"# {data_type} Export") lines.append(f"Generated on {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") lines.append("") for i in range(10): # Just generate 10 items for demo lines.append(f"## Item {i+1}") if include_ids: lines.append(f"**ID:** dummy-id-{i+1}") if include_timestamps: timestamp = (datetime.datetime.now() - datetime.timedelta(days=i)).strftime("%Y-%m-%d %H:%M:%S") lines.append(f"**Timestamp:** {timestamp}") # Add type-specific fields if data_type == "Tasks": lines.append(f"**Title:** Sample Task {i+1}") lines.append(f"**Description:** This is a sample task description for task {i+1}") lines.append(f"**Status:** {random.choice(['To Do', 'In Progress', 'Done'])}") lines.append(f"**Priority:** {random.choice(['Low', 'Medium', 'High'])}") elif data_type == "Notes": lines.append(f"**Title:** Sample Note {i+1}") lines.append(f"**Content:**") lines.append(f"This is the content of sample note {i+1}.") tags = ", ".join([f"tag{j}" for j in range(1, random.randint(1, 4))]) lines.append(f"**Tags:** {tags}") elif data_type == "Goals": lines.append(f"**Title:** Sample Goal {i+1}") lines.append(f"**Description:** This is a sample goal description for goal {i+1}") lines.append(f"**Progress:** {random.randint(0, 100)}%") elif data_type == "Focus Sessions": lines.append(f"**Duration:** {random.randint(15, 60)} minutes") lines.append(f"**Description:** Focus session {i+1}") elif data_type == "Mood Entries": lines.append(f"**Mood:** {random.choice(['Great', 'Good', 'Neutral', 'Low', 'Very Low'])}") lines.append(f"**Energy:** {random.randint(1, 10)}/10") factors = ", ".join(random.sample(["Work", "Sleep", "Exercise", "Nutrition", "Stress"], k=random.randint(1, 3))) lines.append(f"**Factors:** {factors}") else: # All Data item_type = random.choice(["Task", "Note", "Goal", "Focus Session", "Mood Entry"]) lines.append(f"**Type:** {item_type}") lines.append(f"**Title:** Sample {item_type} {i+1}") lines.append("") return "\n".join(lines) # Function to format file size def format_file_size(size_bytes): """Format file size in human-readable format""" if size_bytes < 1024: return f"{size_bytes} B" elif size_bytes < 1024 * 1024: return f"{size_bytes / 1024:.1f} KB" elif size_bytes < 1024 * 1024 * 1024: return f"{size_bytes / (1024 * 1024):.1f} MB" else: return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB" # Function to create a backup def create_backup(name, description): """Create a backup of all data""" try: # In a real app, this would zip all data files # For demo purposes, we'll just create a dummy zip file # Create backups directory if it doesn't exist os.makedirs("backups", exist_ok=True) # Generate filename timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"backups/{name.replace(' ', '_')}_{timestamp}.zip" # Create a dummy file with some content with open("dummy_data.txt", "w") as f: f.write(f"This is a dummy backup file for {name}\n") f.write(f"Description: {description}\n") f.write(f"Created: {datetime.datetime.now().isoformat()}\n") f.write("\nThis would contain all your app data in a real implementation.") # Create a zip file import zipfile with zipfile.ZipFile(filename, 'w') as zipf: zipf.write("dummy_data.txt") # Clean up dummy file os.remove("dummy_data.txt") # Get file size file_size = os.path.getsize(filename) size_str = format_file_size(file_size) # Record backup in history if "backup_history" not in state["integrations"]: state["integrations"]["backup_history"] = [] backup_record = { "timestamp": get_timestamp(), "name": name, "description": description, "size": size_str, "status": "Success", "file": os.path.basename(filename) } state["integrations"]["backup_history"].append(backup_record) save_data("integrations.json", state["integrations"]) # Record activity record_activity(state, f"Created backup: {name}") # Update backup history display history_data = format_backup_history(state["integrations"].get("backup_history", [])) return f"Successfully created backup: {name}", gr.update(value=filename, visible=True), gr.update(value=history_data) except Exception as e: # Record failed backup if "backup_history" not in state["integrations"]: state["integrations"]["backup_history"] = [] backup_record = { "timestamp": get_timestamp(), "name": name, "description": description, "size": "0 KB", "status": f"Failed: {str(e)}", "file": "" } state["integrations"]["backup_history"].append(backup_record) save_data("integrations.json", state["integrations"]) # Update backup history display history_data = format_backup_history(state["integrations"].get("backup_history", [])) return f"Backup failed: {str(e)}", gr.update(visible=False), gr.update(value=history_data) # Function to restore from backup def restore_from_backup(file_path, options): """Restore data from a backup file""" if not file_path: return "Please upload a backup file to restore" try: # In a real app, this would extract the zip and restore the data # For demo purposes, we'll just simulate the restore # Check if it's a zip file _, ext = os.path.splitext(file_path.name) if ext.lower() != ".zip": return "Invalid backup file. Please upload a .zip file." # Simulate restore process time.sleep(2) # Simulate processing time # Record activity record_activity(state, "Restored data from backup") return "Successfully restored data from backup" except Exception as e: return f"Restore failed: {str(e)}" # Function to format backup history for display def format_backup_history(history): """Format backup history for display in dataframe""" formatted_history = [] # Sort by timestamp (newest first) sorted_history = sorted(history, key=lambda x: x.get("timestamp", ""), reverse=True) for entry in sorted_history: timestamp = entry.get("timestamp", "") try: date = datetime.datetime.fromisoformat(timestamp).strftime("%Y-%m-%d %H:%M") except: date = timestamp name = entry.get("name", "") description = entry.get("description", "") size = entry.get("size", "0 KB") status = entry.get("status", "") formatted_history.append([date, name, description, size, status]) return formatted_history # Connect UI components to functions # Connected Services tab connections refresh_connections_btn.click( lambda: render_connected_services(state["integrations"].get("connected_services", [])), outputs=[connected_services_container] ) # Import Data tab connections import_btn.click( import_data, inputs=[import_type, import_format, import_file, import_options], outputs=[import_status, import_history] ) # Export Data tab connections export_btn.click( export_data, inputs=[export_type, export_format, export_options], outputs=[export_status, export_download, export_history] ) # Backup & Restore tab connections backup_btn.click( create_backup, inputs=[backup_name, backup_description], outputs=[backup_status, backup_download, backup_history] ) restore_btn.click( restore_from_backup, inputs=[restore_file, restore_options], outputs=[restore_status] ) # Initialize displays # Load connected services connected_services_container.value = render_connected_services( state["integrations"].get("connected_services", []) ) # Load import history import_history.value = format_import_history( state["integrations"].get("import_history", []) ) # Load export history export_history.value = format_export_history( state["integrations"].get("export_history", []) ) # Load backup history backup_history.value = format_backup_history( state["integrations"].get("backup_history", []) ) # Record page visit in activity record_activity(state, "Viewed Integrations Page")