|
import streamlit as st |
|
import pandas as pd |
|
import json |
|
import os |
|
import time |
|
import datetime |
|
from typing import Dict, List, Any, Optional, Tuple |
|
import zipfile |
|
import io |
|
import random |
|
|
|
from utils.ui_components import create_card, create_metric_card, create_button, create_toggle |
|
from utils.logging import log_activity, setup_logger |
|
from utils.storage import load_data, save_data |
|
|
|
|
|
from utils.integrations import ( |
|
github_integration, |
|
google_calendar_integration, |
|
telegram_integration, |
|
email_integration, |
|
rss_integration, |
|
weather_integration, |
|
news_integration, |
|
crypto_integration |
|
) |
|
|
|
|
|
from utils.automation import ( |
|
workflow_manager, |
|
task_manager, |
|
reminder_manager, |
|
batch_processor, |
|
template_manager, |
|
sync_manager, |
|
backup_manager, |
|
cleanup_manager |
|
) |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
SERVICE_INFO = { |
|
"github": { |
|
"icon": "github", |
|
"name": "GitHub", |
|
"description": "Connect to GitHub to manage issues and repositories", |
|
"integration": github_integration, |
|
"settings": ["api_token"] |
|
}, |
|
"google_calendar": { |
|
"icon": "calendar_month", |
|
"name": "Google Calendar", |
|
"description": "Sync events with Google Calendar", |
|
"integration": google_calendar_integration, |
|
"settings": ["credentials_json", "token_json"] |
|
}, |
|
"telegram": { |
|
"icon": "send", |
|
"name": "Telegram", |
|
"description": "Send notifications to Telegram", |
|
"integration": telegram_integration, |
|
"settings": ["api_token", "chat_id"] |
|
}, |
|
"email": { |
|
"icon": "email", |
|
"name": "Email", |
|
"description": "Convert emails to tasks and send notifications", |
|
"integration": email_integration, |
|
"settings": ["email", "password", "smtp_server", "imap_server"] |
|
}, |
|
"rss": { |
|
"icon": "rss_feed", |
|
"name": "RSS Feeds", |
|
"description": "Aggregate content from RSS feeds", |
|
"integration": rss_integration, |
|
"settings": [] |
|
}, |
|
"weather": { |
|
"icon": "cloud", |
|
"name": "Weather", |
|
"description": "Get weather data and forecasts", |
|
"integration": weather_integration, |
|
"settings": ["api_key", "provider"] |
|
}, |
|
"news": { |
|
"icon": "newspaper", |
|
"name": "News", |
|
"description": "Get latest news and headlines", |
|
"integration": news_integration, |
|
"settings": ["api_key", "provider"] |
|
}, |
|
"crypto": { |
|
"icon": "currency_bitcoin", |
|
"name": "Cryptocurrency", |
|
"description": "Get cryptocurrency market data", |
|
"integration": crypto_integration, |
|
"settings": ["api_key", "provider"] |
|
} |
|
} |
|
|
|
|
|
AUTOMATION_INFO = { |
|
"workflows": { |
|
"icon": "schema", |
|
"name": "Smart Workflows", |
|
"description": "Create IF-THEN automation workflows", |
|
"manager": workflow_manager |
|
}, |
|
"scheduled_tasks": { |
|
"icon": "schedule", |
|
"name": "Scheduled Tasks", |
|
"description": "Set up time-based triggers for tasks", |
|
"manager": task_manager |
|
}, |
|
"reminders": { |
|
"icon": "notifications", |
|
"name": "Auto-Reminders", |
|
"description": "Configure intelligent notifications", |
|
"manager": reminder_manager |
|
}, |
|
"batch_processing": { |
|
"icon": "dynamic_feed", |
|
"name": "Batch Processing", |
|
"description": "Perform mass operations on data", |
|
"manager": batch_processor |
|
}, |
|
"templates": { |
|
"icon": "dashboard_customize", |
|
"name": "Template Automation", |
|
"description": "Auto-apply patterns to new items", |
|
"manager": template_manager |
|
}, |
|
"data_sync": { |
|
"icon": "sync", |
|
"name": "Data Sync", |
|
"description": "Set up cross-feature synchronization", |
|
"manager": sync_manager |
|
}, |
|
"backup": { |
|
"icon": "backup", |
|
"name": "Backup Automation", |
|
"description": "Configure scheduled exports", |
|
"manager": backup_manager |
|
}, |
|
"cleanup": { |
|
"icon": "cleaning_services", |
|
"name": "Cleanup Tools", |
|
"description": "Set up automated maintenance", |
|
"manager": cleanup_manager |
|
} |
|
} |
|
|
|
|
|
def create_integrations_page(state: Dict[str, Any]) -> None: |
|
"""Create the integrations page |
|
|
|
Args: |
|
state: Application state |
|
""" |
|
st.title("🔄 Integrations & Automation") |
|
|
|
|
|
log_activity("page_visit", {"page": "integrations"}) |
|
|
|
|
|
tabs = st.tabs(["Connected Services", "Automation", "Import Data", "Export Data", "Backup & Restore"]) |
|
|
|
|
|
with tabs[0]: |
|
st.header("Connected Services") |
|
st.write("Connect to external services to extend functionality.") |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
|
|
|
|
with col1: |
|
create_service_section(["github", "google_calendar", "telegram", "email"]) |
|
|
|
|
|
with col2: |
|
create_service_section(["rss", "weather", "news", "crypto"]) |
|
|
|
|
|
st.button("Refresh Connections", on_click=refresh_connections) |
|
|
|
|
|
with tabs[1]: |
|
st.header("Automation Features") |
|
st.write("Configure automated workflows and tasks.") |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
|
|
|
|
with col1: |
|
create_automation_section(["workflows", "scheduled_tasks", "reminders", "batch_processing"]) |
|
|
|
|
|
with col2: |
|
create_automation_section(["templates", "data_sync", "backup", "cleanup"]) |
|
|
|
|
|
with tabs[2]: |
|
st.header("Import Data") |
|
st.write("Import data from external sources.") |
|
|
|
|
|
uploaded_file = st.file_uploader("Choose a file to import", type=["json", "csv", "md", "txt"]) |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
|
|
with col1: |
|
import_type = st.selectbox( |
|
"Import as", |
|
["Tasks", "Notes", "Goals", "Focus Sessions", "Mood Entries"] |
|
) |
|
|
|
with col2: |
|
import_options = st.multiselect( |
|
"Options", |
|
["Overwrite existing", "Import as new", "Skip duplicates"], |
|
["Import as new"] |
|
) |
|
|
|
|
|
if st.button("Import Data") and uploaded_file is not None: |
|
import_data(uploaded_file, import_type, import_options) |
|
|
|
|
|
st.subheader("Import History") |
|
display_import_history() |
|
|
|
|
|
with tabs[3]: |
|
st.header("Export Data") |
|
st.write("Export your data to various formats.") |
|
|
|
|
|
col1, col2, col3 = st.columns(3) |
|
|
|
with col1: |
|
export_type = st.selectbox( |
|
"Export data", |
|
["Tasks", "Notes", "Goals", "Focus Sessions", "Mood Entries", "All Data"] |
|
) |
|
|
|
with col2: |
|
export_format = st.selectbox( |
|
"Format", |
|
["JSON", "CSV", "Markdown"] |
|
) |
|
|
|
with col3: |
|
export_options = st.multiselect( |
|
"Options", |
|
["Include IDs", "Include timestamps", "Pretty print (JSON)"], |
|
["Include timestamps"] |
|
) |
|
|
|
|
|
if st.button("Export Data"): |
|
export_data(export_type, export_format, export_options) |
|
|
|
|
|
st.subheader("Export History") |
|
display_export_history() |
|
|
|
|
|
with tabs[4]: |
|
st.header("Backup & Restore") |
|
st.write("Create backups of your data and restore when needed.") |
|
|
|
|
|
st.subheader("Create Backup") |
|
|
|
col1, col2 = st.columns(2) |
|
|
|
with col1: |
|
backup_options = st.multiselect( |
|
"Backup options", |
|
["Include IDs", "Include timestamps", "Compress data"], |
|
["Include timestamps", "Compress data"] |
|
) |
|
|
|
with col2: |
|
backup_types = st.multiselect( |
|
"Data to backup", |
|
["Tasks", "Notes", "Goals", "Focus Sessions", "Mood Entries", "Settings", "All Data"], |
|
["All Data"] |
|
) |
|
|
|
|
|
if st.button("Create Backup"): |
|
create_backup(backup_options, backup_types) |
|
|
|
|
|
st.subheader("Restore from Backup") |
|
|
|
|
|
restore_file = st.file_uploader("Choose a backup file to restore", type=["zip"]) |
|
|
|
|
|
restore_options = st.multiselect( |
|
"Restore options", |
|
["Overwrite existing data", "Backup current data before restore"], |
|
["Backup current data before restore"] |
|
) |
|
|
|
|
|
if st.button("Restore Data") and restore_file is not None: |
|
restore_from_backup(restore_file, restore_options) |
|
|
|
|
|
st.subheader("Backup History") |
|
display_backup_history() |
|
|
|
|
|
def create_service_section(service_ids: List[str]): |
|
"""Create a section of service connection cards |
|
|
|
Args: |
|
service_ids: List of service IDs to display |
|
""" |
|
for service_id in service_ids: |
|
service_info = SERVICE_INFO.get(service_id) |
|
if not service_info: |
|
continue |
|
|
|
|
|
is_connected = check_service_connection(service_id) |
|
|
|
|
|
with st.expander(f"{service_info['name']} - {'Connected' if is_connected else 'Not Connected'}"): |
|
st.write(service_info["description"]) |
|
|
|
|
|
if not is_connected: |
|
create_connection_form(service_id) |
|
else: |
|
|
|
st.success(f"Connected to {service_info['name']}") |
|
|
|
|
|
display_service_info(service_id) |
|
|
|
|
|
st.button(f"Disconnect {service_info['name']}", key=f"disconnect_{service_id}", |
|
on_click=lambda sid=service_id: disconnect_service(sid)) |
|
|
|
|
|
def create_automation_section(feature_ids: List[str]): |
|
"""Create a section of automation feature cards |
|
|
|
Args: |
|
feature_ids: List of feature IDs to display |
|
""" |
|
for feature_id in feature_ids: |
|
feature_info = AUTOMATION_INFO.get(feature_id) |
|
if not feature_info: |
|
continue |
|
|
|
|
|
is_enabled = check_automation_status(feature_id) |
|
|
|
|
|
with st.expander(f"{feature_info['name']} - {'Enabled' if is_enabled else 'Disabled'}"): |
|
st.write(feature_info["description"]) |
|
|
|
|
|
configure_automation_feature(feature_id, is_enabled) |
|
|
|
|
|
create_toggle( |
|
f"Enable {feature_info['name']}", |
|
is_enabled, |
|
key=f"toggle_{feature_id}", |
|
on_change=lambda fid=feature_id, val=not is_enabled: toggle_automation_feature(fid, val) |
|
) |
|
|
|
|
|
def check_service_connection(service_id: str) -> bool: |
|
"""Check if a service is connected |
|
|
|
Args: |
|
service_id: Service ID to check |
|
|
|
Returns: |
|
True if connected, False otherwise |
|
""" |
|
service_info = SERVICE_INFO.get(service_id) |
|
if not service_info or not service_info.get("integration"): |
|
return False |
|
|
|
|
|
try: |
|
return service_info["integration"].test_connection() |
|
except Exception as e: |
|
logger.error(f"Error checking connection for {service_id}: {str(e)}") |
|
return False |
|
|
|
|
|
def check_automation_status(feature_id: str) -> bool: |
|
"""Check if an automation feature is enabled |
|
|
|
Args: |
|
feature_id: Feature ID to check |
|
|
|
Returns: |
|
True if enabled, False otherwise |
|
""" |
|
|
|
automation_settings = load_data("automation_settings", default={}) |
|
return automation_settings.get(feature_id, {}).get("enabled", False) |
|
|
|
|
|
def create_connection_form(service_id: str): |
|
"""Create a form for connecting to a service |
|
|
|
Args: |
|
service_id: Service ID to connect |
|
""" |
|
service_info = SERVICE_INFO.get(service_id) |
|
if not service_info: |
|
return |
|
|
|
|
|
with st.form(f"connect_{service_id}_form"): |
|
|
|
form_values = {} |
|
|
|
for setting in service_info.get("settings", []): |
|
|
|
if any(keyword in setting for keyword in ["token", "key", "password", "secret", "credentials"]): |
|
form_values[setting] = st.text_input( |
|
setting.replace("_", " ").title(), |
|
type="password", |
|
key=f"{service_id}_{setting}" |
|
) |
|
else: |
|
form_values[setting] = st.text_input( |
|
setting.replace("_", " ").title(), |
|
key=f"{service_id}_{setting}" |
|
) |
|
|
|
|
|
submitted = st.form_submit_button(f"Connect to {service_info['name']}") |
|
|
|
if submitted: |
|
connect_service(service_id, form_values) |
|
|
|
|
|
def connect_service(service_id: str, settings: Dict[str, str]): |
|
"""Connect to a service |
|
|
|
Args: |
|
service_id: Service ID to connect |
|
settings: Service connection settings |
|
""" |
|
service_info = SERVICE_INFO.get(service_id) |
|
if not service_info or not service_info.get("integration"): |
|
st.error(f"Service {service_id} not found") |
|
return |
|
|
|
try: |
|
|
|
integration = service_info["integration"] |
|
|
|
|
|
if service_id == "github": |
|
integration.set_api_token(settings.get("api_token", "")) |
|
elif service_id == "google_calendar": |
|
integration.set_credentials( |
|
settings.get("credentials_json", ""), |
|
settings.get("token_json", "") |
|
) |
|
elif service_id == "telegram": |
|
integration.set_api_token(settings.get("api_token", "")) |
|
integration.set_chat_id(settings.get("chat_id", "")) |
|
elif service_id == "email": |
|
integration.set_credentials( |
|
settings.get("email", ""), |
|
settings.get("password", ""), |
|
settings.get("smtp_server", ""), |
|
settings.get("imap_server", "") |
|
) |
|
elif service_id == "weather" or service_id == "news" or service_id == "crypto": |
|
integration.set_api_key( |
|
settings.get("api_key", ""), |
|
settings.get("provider", "") |
|
) |
|
|
|
|
|
if integration.test_connection(): |
|
|
|
connections = load_data("service_connections", default={}) |
|
connections[service_id] = { |
|
"connected": True, |
|
"connected_at": datetime.datetime.now().isoformat(), |
|
"settings": {k: "*****" if any(keyword in k for keyword in ["token", "key", "password", "secret", "credentials"]) else v |
|
for k, v in settings.items()} |
|
} |
|
save_data("service_connections", connections) |
|
|
|
st.success(f"Successfully connected to {service_info['name']}") |
|
|
|
|
|
log_activity("service_connected", {"service": service_id}) |
|
|
|
|
|
st.experimental_rerun() |
|
else: |
|
st.error(f"Failed to connect to {service_info['name']}. Please check your settings.") |
|
except Exception as e: |
|
st.error(f"Error connecting to {service_info['name']}: {str(e)}") |
|
logger.error(f"Error connecting to {service_id}: {str(e)}") |
|
|
|
|
|
def disconnect_service(service_id: str): |
|
"""Disconnect from a service |
|
|
|
Args: |
|
service_id: Service ID to disconnect |
|
""" |
|
service_info = SERVICE_INFO.get(service_id) |
|
if not service_info: |
|
return |
|
|
|
try: |
|
|
|
connections = load_data("service_connections", default={}) |
|
if service_id in connections: |
|
connections[service_id]["connected"] = False |
|
connections[service_id]["disconnected_at"] = datetime.datetime.now().isoformat() |
|
save_data("service_connections", connections) |
|
|
|
|
|
log_activity("service_disconnected", {"service": service_id}) |
|
|
|
st.success(f"Disconnected from {service_info['name']}") |
|
|
|
|
|
st.experimental_rerun() |
|
except Exception as e: |
|
st.error(f"Error disconnecting from {service_info['name']}: {str(e)}") |
|
logger.error(f"Error disconnecting from {service_id}: {str(e)}") |
|
|
|
|
|
def display_service_info(service_id: str): |
|
"""Display service-specific information |
|
|
|
Args: |
|
service_id: Service ID to display info for |
|
""" |
|
service_info = SERVICE_INFO.get(service_id) |
|
if not service_info or not service_info.get("integration"): |
|
return |
|
|
|
|
|
connections = load_data("service_connections", default={}) |
|
connection_info = connections.get(service_id, {}) |
|
|
|
|
|
if connection_info.get("connected_at"): |
|
try: |
|
connected_at = datetime.datetime.fromisoformat(connection_info["connected_at"]) |
|
st.text(f"Connected since: {connected_at.strftime('%Y-%m-%d %H:%M:%S')}") |
|
except Exception: |
|
pass |
|
|
|
|
|
try: |
|
if service_id == "github": |
|
|
|
integration = service_info["integration"] |
|
user_info = integration.get_user_info() |
|
if user_info: |
|
st.text(f"Connected as: {user_info.get('login', 'Unknown')}") |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
repos = integration.get_repositories() |
|
create_metric_card("Repositories", len(repos), "github") |
|
|
|
with col2: |
|
issues = integration.get_issues() |
|
create_metric_card("Open Issues", len(issues), "bug_report") |
|
|
|
elif service_id == "google_calendar": |
|
|
|
integration = service_info["integration"] |
|
calendars = integration.get_calendars() |
|
|
|
if calendars: |
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
create_metric_card("Calendars", len(calendars), "calendar_month") |
|
|
|
with col2: |
|
events = integration.get_upcoming_events(limit=10) |
|
create_metric_card("Upcoming Events", len(events), "event") |
|
|
|
|
|
if st.checkbox("Show calendars", key="show_calendars"): |
|
st.write("Available calendars:") |
|
for calendar in calendars: |
|
st.text(f"• {calendar.get('summary', 'Unnamed')}") |
|
|
|
elif service_id == "telegram": |
|
|
|
integration = service_info["integration"] |
|
chat_info = integration.get_chat_info() |
|
|
|
if chat_info: |
|
st.text(f"Connected to chat: {chat_info.get('title', chat_info.get('username', 'Unknown'))}") |
|
|
|
|
|
if st.button("Send Test Notification", key="test_telegram"): |
|
integration.send_message("Test notification from MONA") |
|
st.success("Test notification sent!") |
|
|
|
elif service_id == "email": |
|
|
|
integration = service_info["integration"] |
|
email_info = integration.get_connection_info() |
|
|
|
if email_info: |
|
st.text(f"Connected as: {email_info.get('email', 'Unknown')}") |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
unread = integration.get_unread_count() |
|
create_metric_card("Unread Emails", unread, "mail") |
|
|
|
with col2: |
|
create_metric_card("Converted to Tasks", |
|
email_info.get("converted_count", 0), |
|
"task_alt") |
|
|
|
elif service_id == "rss": |
|
|
|
integration = service_info["integration"] |
|
feeds = integration.get_feeds() |
|
|
|
if feeds: |
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
create_metric_card("RSS Feeds", len(feeds), "rss_feed") |
|
|
|
with col2: |
|
entries = integration.get_recent_entries(limit=50) |
|
create_metric_card("Recent Entries", len(entries), "article") |
|
|
|
|
|
if st.checkbox("Show feeds", key="show_feeds"): |
|
st.write("Subscribed feeds:") |
|
for feed in feeds: |
|
st.text(f"• {feed.get('title', 'Unnamed')}") |
|
|
|
elif service_id == "weather": |
|
|
|
integration = service_info["integration"] |
|
weather_info = integration.get_connection_info() |
|
|
|
if weather_info: |
|
st.text(f"Provider: {weather_info.get('provider', 'Unknown')}") |
|
|
|
|
|
location = st.text_input("Get weather for location:", "New York") |
|
if location: |
|
try: |
|
weather = integration.get_current_weather(location) |
|
if weather: |
|
col1, col2 = st.columns(2) |
|
with col1: |
|
st.metric("Temperature", f"{weather.get('temp', 'N/A')}°C") |
|
with col2: |
|
st.metric("Conditions", weather.get('conditions', 'N/A')) |
|
except Exception as e: |
|
st.error(f"Error getting weather: {str(e)}") |
|
|
|
elif service_id == "news": |
|
|
|
integration = service_info["integration"] |
|
news_info = integration.get_connection_info() |
|
|
|
if news_info: |
|
st.text(f"Provider: {news_info.get('provider', 'Unknown')}") |
|
|
|
|
|
if st.button("Get Top Headlines", key="get_headlines"): |
|
try: |
|
headlines = integration.get_top_headlines(limit=5) |
|
if headlines: |
|
st.write("Top Headlines:") |
|
for headline in headlines: |
|
st.markdown(f"**{headline.get('title', 'No title')}**") |
|
st.text(f"Source: {headline.get('source', 'Unknown')}") |
|
st.text("---") |
|
except Exception as e: |
|
st.error(f"Error getting headlines: {str(e)}") |
|
|
|
elif service_id == "crypto": |
|
|
|
integration = service_info["integration"] |
|
crypto_info = integration.get_connection_info() |
|
|
|
if crypto_info: |
|
st.text(f"Provider: {crypto_info.get('provider', 'Unknown')}") |
|
|
|
|
|
if st.button("Get Top Cryptocurrencies", key="get_crypto"): |
|
try: |
|
coins = integration.get_top_coins(limit=5) |
|
if coins: |
|
st.write("Top Cryptocurrencies:") |
|
for coin in coins: |
|
col1, col2 = st.columns(2) |
|
with col1: |
|
st.markdown(f"**{coin.get('name', 'Unknown')} ({coin.get('symbol', 'N/A')})** ") |
|
with col2: |
|
st.metric("Price", f"${coin.get('price', 0):.2f}", |
|
f"{coin.get('change_24h', 0):.2f}%") |
|
except Exception as e: |
|
st.error(f"Error getting cryptocurrency data: {str(e)}") |
|
|
|
except Exception as e: |
|
st.error(f"Error displaying service info: {str(e)}") |
|
logger.error(f"Error displaying info for {service_id}: {str(e)}") |
|
|
|
|
|
def configure_automation_feature(feature_id: str, is_enabled: bool): |
|
"""Configure an automation feature |
|
|
|
Args: |
|
feature_id: Feature ID to configure |
|
is_enabled: Whether the feature is enabled |
|
""" |
|
feature_info = AUTOMATION_INFO.get(feature_id) |
|
if not feature_info: |
|
return |
|
|
|
|
|
automation_settings = load_data("automation_settings", default={}) |
|
feature_settings = automation_settings.get(feature_id, {}) |
|
|
|
|
|
try: |
|
if feature_id == "workflows": |
|
|
|
st.subheader("Workflows") |
|
|
|
|
|
workflows = workflow_manager.get_all_workflows() |
|
create_metric_card("Active Workflows", |
|
len([w for w in workflows if w.enabled]), |
|
"schema") |
|
|
|
|
|
if st.button("Create New Workflow", key="create_workflow"): |
|
st.session_state["redirect"] = "/workflows/new" |
|
st.experimental_rerun() |
|
|
|
|
|
if st.button("View All Workflows", key="view_workflows"): |
|
st.session_state["redirect"] = "/workflows" |
|
st.experimental_rerun() |
|
|
|
elif feature_id == "scheduled_tasks": |
|
|
|
st.subheader("Scheduled Tasks") |
|
|
|
|
|
tasks = task_manager.get_all_tasks() |
|
create_metric_card("Scheduled Tasks", |
|
len([t for t in tasks if t.enabled]), |
|
"schedule") |
|
|
|
|
|
if st.button("Create New Scheduled Task", key="create_task"): |
|
st.session_state["redirect"] = "/tasks/scheduled/new" |
|
st.experimental_rerun() |
|
|
|
|
|
if st.button("View All Scheduled Tasks", key="view_tasks"): |
|
st.session_state["redirect"] = "/tasks/scheduled" |
|
st.experimental_rerun() |
|
|
|
elif feature_id == "reminders": |
|
|
|
st.subheader("Reminders") |
|
|
|
|
|
reminders = reminder_manager.get_all_reminders() |
|
create_metric_card("Active Reminders", |
|
len([r for r in reminders if r.enabled]), |
|
"notifications") |
|
|
|
|
|
st.write("Notification Channels:") |
|
|
|
channels = feature_settings.get("channels", { |
|
"app": True, |
|
"email": False, |
|
"telegram": False |
|
}) |
|
|
|
|
|
channels["app"] = st.checkbox( |
|
"App Notifications", |
|
value=channels.get("app", True), |
|
key="reminder_channel_app" |
|
) |
|
|
|
|
|
email_connected = check_service_connection("email") |
|
channels["email"] = st.checkbox( |
|
"Email Notifications", |
|
value=channels.get("email", False), |
|
disabled=not email_connected, |
|
key="reminder_channel_email" |
|
) |
|
|
|
if not email_connected and st.checkbox("Connect Email", key="connect_email_for_reminders"): |
|
st.session_state["active_tab"] = 0 |
|
st.experimental_rerun() |
|
|
|
|
|
telegram_connected = check_service_connection("telegram") |
|
channels["telegram"] = st.checkbox( |
|
"Telegram Notifications", |
|
value=channels.get("telegram", False), |
|
disabled=not telegram_connected, |
|
key="reminder_channel_telegram" |
|
) |
|
|
|
if not telegram_connected and st.checkbox("Connect Telegram", key="connect_telegram_for_reminders"): |
|
st.session_state["active_tab"] = 0 |
|
st.experimental_rerun() |
|
|
|
|
|
feature_settings["channels"] = channels |
|
automation_settings[feature_id] = feature_settings |
|
save_data("automation_settings", automation_settings) |
|
|
|
|
|
st.write("Quiet Hours:") |
|
|
|
quiet_hours = feature_settings.get("quiet_hours", { |
|
"enabled": False, |
|
"start": "22:00", |
|
"end": "07:00" |
|
}) |
|
|
|
quiet_hours["enabled"] = st.checkbox( |
|
"Enable Quiet Hours", |
|
value=quiet_hours.get("enabled", False), |
|
key="quiet_hours_enabled" |
|
) |
|
|
|
col1, col2 = st.columns(2) |
|
|
|
with col1: |
|
quiet_hours["start"] = st.time_input( |
|
"Start Time", |
|
datetime.time(22, 0), |
|
key="quiet_hours_start" |
|
).strftime("%H:%M") |
|
|
|
with col2: |
|
quiet_hours["end"] = st.time_input( |
|
"End Time", |
|
datetime.time(7, 0), |
|
key="quiet_hours_end" |
|
).strftime("%H:%M") |
|
|
|
|
|
feature_settings["quiet_hours"] = quiet_hours |
|
automation_settings[feature_id] = feature_settings |
|
save_data("automation_settings", automation_settings) |
|
|
|
elif feature_id == "batch_processing": |
|
|
|
st.subheader("Batch Processing") |
|
|
|
|
|
operations = batch_processor.get_available_operations() |
|
|
|
st.write("Available Batch Operations:") |
|
for op in operations: |
|
st.text(f"• {op.replace('_', ' ').title()}") |
|
|
|
|
|
if st.button("Go to Batch Processing", key="go_to_batch"): |
|
st.session_state["redirect"] = "/batch" |
|
st.experimental_rerun() |
|
|
|
elif feature_id == "templates": |
|
|
|
st.subheader("Templates") |
|
|
|
|
|
templates = template_manager.get_all_templates() |
|
|
|
|
|
template_types = {} |
|
for template in templates: |
|
if template.template_type not in template_types: |
|
template_types[template.template_type] = [] |
|
template_types[template.template_type].append(template) |
|
|
|
|
|
cols = st.columns(len(template_types) if template_types else 1) |
|
|
|
for i, (template_type, type_templates) in enumerate(template_types.items()): |
|
with cols[i % len(cols)]: |
|
create_metric_card( |
|
f"{template_type.title()} Templates", |
|
len(type_templates), |
|
"dashboard_customize" |
|
) |
|
|
|
|
|
if st.button("Manage Templates", key="manage_templates"): |
|
st.session_state["redirect"] = "/templates" |
|
st.experimental_rerun() |
|
|
|
elif feature_id == "data_sync": |
|
|
|
st.subheader("Data Synchronization") |
|
|
|
|
|
rules = sync_manager.get_all_rules() |
|
create_metric_card("Sync Rules", |
|
len([r for r in rules if r.enabled]), |
|
"sync") |
|
|
|
|
|
sync_interval = feature_settings.get("sync_interval", 300) |
|
|
|
sync_interval = st.slider( |
|
"Sync Interval (seconds)", |
|
min_value=60, |
|
max_value=3600, |
|
value=sync_interval, |
|
step=60, |
|
key="sync_interval" |
|
) |
|
|
|
|
|
feature_settings["sync_interval"] = sync_interval |
|
automation_settings[feature_id] = feature_settings |
|
save_data("automation_settings", automation_settings) |
|
|
|
|
|
sync_manager.set_sync_interval(sync_interval) |
|
|
|
|
|
auto_sync = feature_settings.get("auto_sync", False) |
|
|
|
auto_sync = st.checkbox( |
|
"Enable Auto-Sync", |
|
value=auto_sync, |
|
key="auto_sync" |
|
) |
|
|
|
|
|
feature_settings["auto_sync"] = auto_sync |
|
automation_settings[feature_id] = feature_settings |
|
save_data("automation_settings", automation_settings) |
|
|
|
|
|
if auto_sync and not sync_manager.auto_sync: |
|
sync_manager.start_auto_sync() |
|
elif not auto_sync and sync_manager.auto_sync: |
|
sync_manager.stop_auto_sync() |
|
|
|
|
|
if st.button("Sync Now", key="sync_now"): |
|
results = sync_manager.sync_all() |
|
st.success(f"Synced {sum(results.values())} items across {len(results)} rules") |
|
|
|
|
|
if st.button("Manage Sync Rules", key="manage_sync_rules"): |
|
st.session_state["redirect"] = "/sync" |
|
st.experimental_rerun() |
|
|
|
elif feature_id == "backup": |
|
|
|
st.subheader("Backup Automation") |
|
|
|
|
|
schedules = backup_manager.get_all_schedules() |
|
create_metric_card("Backup Schedules", |
|
len([s for s in schedules if s.enabled]), |
|
"backup") |
|
|
|
|
|
backup_dir = feature_settings.get("backup_dir", backup_manager.backup_dir) |
|
|
|
backup_dir = st.text_input( |
|
"Backup Directory", |
|
value=backup_dir, |
|
key="backup_dir" |
|
) |
|
|
|
|
|
feature_settings["backup_dir"] = backup_dir |
|
automation_settings[feature_id] = feature_settings |
|
save_data("automation_settings", automation_settings) |
|
|
|
|
|
backup_manager.set_backup_directory(backup_dir) |
|
|
|
|
|
auto_backup = feature_settings.get("auto_backup", False) |
|
|
|
auto_backup = st.checkbox( |
|
"Enable Auto-Backup", |
|
value=auto_backup, |
|
key="auto_backup" |
|
) |
|
|
|
|
|
feature_settings["auto_backup"] = auto_backup |
|
automation_settings[feature_id] = feature_settings |
|
save_data("automation_settings", automation_settings) |
|
|
|
|
|
if auto_backup and not backup_manager.auto_backup: |
|
backup_manager.start_auto_backup() |
|
elif not auto_backup and backup_manager.auto_backup: |
|
backup_manager.stop_auto_backup() |
|
|
|
|
|
if st.button("Backup Now", key="backup_now"): |
|
|
|
if schedules: |
|
schedule = next((s for s in schedules if s.enabled), schedules[0]) |
|
backup_path = backup_manager.run_backup(schedule.id) |
|
st.success(f"Backup created: {os.path.basename(backup_path)}") |
|
else: |
|
st.error("No backup schedules found. Please create one first.") |
|
|
|
|
|
if st.button("Manage Backup Schedules", key="manage_backup_schedules"): |
|
st.session_state["redirect"] = "/backup" |
|
st.experimental_rerun() |
|
|
|
elif feature_id == "cleanup": |
|
|
|
st.subheader("Cleanup Tools") |
|
|
|
|
|
rules = cleanup_manager.get_all_rules() |
|
create_metric_card("Cleanup Rules", |
|
len([r for r in rules if r.enabled]), |
|
"cleaning_services") |
|
|
|
|
|
cleanup_interval = feature_settings.get("cleanup_interval", 86400) |
|
|
|
cleanup_interval_hours = st.slider( |
|
"Cleanup Interval (hours)", |
|
min_value=1, |
|
max_value=168, |
|
value=cleanup_interval // 3600, |
|
key="cleanup_interval" |
|
) |
|
|
|
cleanup_interval = cleanup_interval_hours * 3600 |
|
|
|
|
|
feature_settings["cleanup_interval"] = cleanup_interval |
|
automation_settings[feature_id] = feature_settings |
|
save_data("automation_settings", automation_settings) |
|
|
|
|
|
cleanup_manager.set_cleanup_interval(cleanup_interval) |
|
|
|
|
|
auto_cleanup = feature_settings.get("auto_cleanup", False) |
|
|
|
auto_cleanup = st.checkbox( |
|
"Enable Auto-Cleanup", |
|
value=auto_cleanup, |
|
key="auto_cleanup" |
|
) |
|
|
|
|
|
feature_settings["auto_cleanup"] = auto_cleanup |
|
automation_settings[feature_id] = feature_settings |
|
save_data("automation_settings", automation_settings) |
|
|
|
|
|
if auto_cleanup and not cleanup_manager.auto_cleanup: |
|
cleanup_manager.start_auto_cleanup() |
|
elif not auto_cleanup and cleanup_manager.auto_cleanup: |
|
cleanup_manager.stop_auto_cleanup() |
|
|
|
|
|
if st.button("Run Cleanup Now", key="cleanup_now"): |
|
results = cleanup_manager.run_all_rules() |
|
st.success(f"Processed {sum(results.values())} items across {len(results)} rules") |
|
|
|
|
|
if st.button("Manage Cleanup Rules", key="manage_cleanup_rules"): |
|
st.session_state["redirect"] = "/cleanup" |
|
st.experimental_rerun() |
|
|
|
except Exception as e: |
|
st.error(f"Error configuring {feature_info['name']}: {str(e)}") |
|
logger.error(f"Error configuring {feature_id}: {str(e)}") |
|
|
|
|
|
def toggle_automation_feature(feature_id: str, enabled: bool): |
|
"""Toggle an automation feature on or off |
|
|
|
Args: |
|
feature_id: Feature ID to toggle |
|
enabled: Whether to enable or disable the feature |
|
""" |
|
feature_info = AUTOMATION_INFO.get(feature_id) |
|
if not feature_info: |
|
return |
|
|
|
try: |
|
|
|
automation_settings = load_data("automation_settings", default={}) |
|
|
|
if feature_id not in automation_settings: |
|
automation_settings[feature_id] = {} |
|
|
|
automation_settings[feature_id]["enabled"] = enabled |
|
save_data("automation_settings", automation_settings) |
|
|
|
|
|
if feature_id == "data_sync": |
|
if enabled and not sync_manager.auto_sync and automation_settings[feature_id].get("auto_sync", False): |
|
sync_manager.start_auto_sync() |
|
elif not enabled and sync_manager.auto_sync: |
|
sync_manager.stop_auto_sync() |
|
|
|
elif feature_id == "backup": |
|
if enabled and not backup_manager.auto_backup and automation_settings[feature_id].get("auto_backup", False): |
|
backup_manager.start_auto_backup() |
|
elif not enabled and backup_manager.auto_backup: |
|
backup_manager.stop_auto_backup() |
|
|
|
elif feature_id == "cleanup": |
|
if enabled and not cleanup_manager.auto_cleanup and automation_settings[feature_id].get("auto_cleanup", False): |
|
cleanup_manager.start_auto_cleanup() |
|
elif not enabled and cleanup_manager.auto_cleanup: |
|
cleanup_manager.stop_auto_cleanup() |
|
|
|
|
|
log_activity("automation_toggled", { |
|
"feature": feature_id, |
|
"enabled": enabled |
|
}) |
|
|
|
|
|
st.experimental_rerun() |
|
|
|
except Exception as e: |
|
st.error(f"Error toggling {feature_info['name']}: {str(e)}") |
|
logger.error(f"Error toggling {feature_id}: {str(e)}") |
|
|
|
|
|
def refresh_connections(): |
|
"""Refresh all service connections""" |
|
|
|
connections = load_data("service_connections", default={}) |
|
|
|
|
|
for service_id, connection in connections.items(): |
|
if connection.get("connected", False): |
|
service_info = SERVICE_INFO.get(service_id) |
|
if service_info and service_info.get("integration"): |
|
try: |
|
|
|
is_connected = service_info["integration"].test_connection() |
|
|
|
|
|
connection["connected"] = is_connected |
|
if not is_connected: |
|
connection["disconnected_at"] = datetime.datetime.now().isoformat() |
|
except Exception as e: |
|
logger.error(f"Error refreshing connection for {service_id}: {str(e)}") |
|
connection["connected"] = False |
|
connection["disconnected_at"] = datetime.datetime.now().isoformat() |
|
|
|
|
|
save_data("service_connections", connections) |
|
|
|
|
|
log_activity("connections_refreshed", {}) |
|
|
|
|
|
st.success("Connections refreshed") |
|
|
|
|
|
|
|
|
|
def import_data(file, import_type, options): |
|
"""Import data from a file |
|
|
|
Args: |
|
file: Uploaded file |
|
import_type: Type of data to import |
|
options: Import options |
|
""" |
|
try: |
|
|
|
content = file.read() |
|
|
|
|
|
file_format = file.name.split('.')[-1].lower() |
|
|
|
|
|
if file_format == 'json': |
|
import_json(content, import_type, options) |
|
elif file_format == 'csv': |
|
import_csv(content, import_type, options) |
|
elif file_format in ['md', 'txt']: |
|
import_markdown(content, import_type, options) |
|
else: |
|
st.error(f"Unsupported file format: {file_format}") |
|
return |
|
|
|
|
|
record_import_history(file.name, import_type, file_format, options) |
|
|
|
st.success(f"Successfully imported {import_type} from {file.name}") |
|
|
|
|
|
log_activity("data_imported", { |
|
"file_name": file.name, |
|
"import_type": import_type, |
|
"file_format": file_format |
|
}) |
|
except Exception as e: |
|
st.error(f"Error importing data: {str(e)}") |
|
logger.error(f"Error importing data: {str(e)}") |
|
|
|
|
|
def import_json(content, import_type, options): |
|
"""Import data from JSON |
|
|
|
Args: |
|
content: File content |
|
import_type: Type of data to import |
|
options: Import options |
|
""" |
|
|
|
time.sleep(1) |
|
|
|
|
|
|
|
st.info(f"Simulated import of {import_type} from JSON") |
|
|
|
|
|
def import_csv(content, import_type, options): |
|
"""Import data from CSV |
|
|
|
Args: |
|
content: File content |
|
import_type: Type of data to import |
|
options: Import options |
|
""" |
|
|
|
time.sleep(1) |
|
|
|
|
|
|
|
st.info(f"Simulated import of {import_type} from CSV") |
|
|
|
|
|
def import_markdown(content, import_type, options): |
|
"""Import data from Markdown |
|
|
|
Args: |
|
content: File content |
|
import_type: Type of data to import |
|
options: Import options |
|
""" |
|
|
|
time.sleep(1) |
|
|
|
|
|
|
|
st.info(f"Simulated import of {import_type} from Markdown") |
|
|
|
|
|
def record_import_history(file_name, import_type, file_format, options): |
|
"""Record import history |
|
|
|
Args: |
|
file_name: Name of imported file |
|
import_type: Type of data imported |
|
file_format: Format of imported file |
|
options: Import options used |
|
""" |
|
|
|
import_history = load_data("import_history", default=[]) |
|
|
|
|
|
import_record = { |
|
"file_name": file_name, |
|
"import_type": import_type, |
|
"file_format": file_format, |
|
"options": options, |
|
"timestamp": datetime.datetime.now().isoformat(), |
|
"status": "success" |
|
} |
|
|
|
import_history.append(import_record) |
|
|
|
|
|
save_data("import_history", import_history) |
|
|
|
|
|
def display_import_history(): |
|
"""Display import history""" |
|
|
|
import_history = load_data("import_history", default=[]) |
|
|
|
if not import_history: |
|
st.info("No import history found") |
|
return |
|
|
|
|
|
history_data = [] |
|
for record in import_history: |
|
history_data.append({ |
|
"File": record.get("file_name", "Unknown"), |
|
"Type": record.get("import_type", "Unknown"), |
|
"Format": record.get("file_format", "Unknown"), |
|
"Date": datetime.datetime.fromisoformat(record.get("timestamp", datetime.datetime.now().isoformat())).strftime("%Y-%m-%d %H:%M"), |
|
"Status": record.get("status", "Unknown") |
|
}) |
|
|
|
|
|
history_df = pd.DataFrame(history_data) |
|
st.dataframe(history_df) |
|
|
|
|
|
def export_data(export_type, export_format, options): |
|
"""Export data to a file |
|
|
|
Args: |
|
export_type: Type of data to export |
|
export_format: Format to export to |
|
options: Export options |
|
""" |
|
try: |
|
|
|
if export_format == "JSON": |
|
content, file_name = export_json(export_type, options) |
|
elif export_format == "CSV": |
|
content, file_name = export_csv(export_type, options) |
|
elif export_format == "Markdown": |
|
content, file_name = export_markdown(export_type, options) |
|
else: |
|
st.error(f"Unsupported export format: {export_format}") |
|
return |
|
|
|
|
|
st.download_button( |
|
label=f"Download {export_type} as {export_format}", |
|
data=content, |
|
file_name=file_name, |
|
mime=get_mime_type(export_format) |
|
) |
|
|
|
|
|
record_export_history(file_name, export_type, export_format, options) |
|
|
|
|
|
log_activity("data_exported", { |
|
"file_name": file_name, |
|
"export_type": export_type, |
|
"export_format": export_format |
|
}) |
|
except Exception as e: |
|
st.error(f"Error exporting data: {str(e)}") |
|
logger.error(f"Error exporting data: {str(e)}") |
|
|
|
|
|
def get_mime_type(export_format): |
|
"""Get MIME type for export format |
|
|
|
Args: |
|
export_format: Export format |
|
|
|
Returns: |
|
MIME type string |
|
""" |
|
if export_format == "JSON": |
|
return "application/json" |
|
elif export_format == "CSV": |
|
return "text/csv" |
|
elif export_format == "Markdown": |
|
return "text/markdown" |
|
else: |
|
return "text/plain" |
|
|
|
|
|
def export_json(export_type, options): |
|
"""Export data to JSON |
|
|
|
Args: |
|
export_type: Type of data to export |
|
options: Export options |
|
|
|
Returns: |
|
Tuple of (content, file_name) |
|
""" |
|
|
|
dummy_content = generate_dummy_content(export_type, options) |
|
|
|
|
|
pretty_print = "Pretty print (JSON)" in options |
|
json_content = json.dumps(dummy_content, indent=2 if pretty_print else None) |
|
|
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
file_name = f"{export_type.lower().replace(' ', '_')}_{timestamp}.json" |
|
|
|
return json_content, file_name |
|
|
|
|
|
def export_csv(export_type, options): |
|
"""Export data to CSV |
|
|
|
Args: |
|
export_type: Type of data to export |
|
options: Export options |
|
|
|
Returns: |
|
Tuple of (content, file_name) |
|
""" |
|
|
|
dummy_content = generate_dummy_content(export_type, options) |
|
|
|
|
|
csv_content = generate_dummy_csv(dummy_content, export_type, options) |
|
|
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
file_name = f"{export_type.lower().replace(' ', '_')}_{timestamp}.csv" |
|
|
|
return csv_content, file_name |
|
|
|
|
|
def export_markdown(export_type, options): |
|
"""Export data to Markdown |
|
|
|
Args: |
|
export_type: Type of data to export |
|
options: Export options |
|
|
|
Returns: |
|
Tuple of (content, file_name) |
|
""" |
|
|
|
dummy_content = generate_dummy_content(export_type, options) |
|
|
|
|
|
md_content = generate_dummy_markdown(dummy_content, export_type, options) |
|
|
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
file_name = f"{export_type.lower().replace(' ', '_')}_{timestamp}.md" |
|
|
|
return md_content, file_name |
|
|
|
|
|
def generate_dummy_content(export_type, options): |
|
"""Generate dummy content for export |
|
|
|
Args: |
|
export_type: Type of data to export |
|
options: Export options |
|
|
|
Returns: |
|
Dummy content as list of dictionaries |
|
""" |
|
include_ids = "Include IDs" in options |
|
include_timestamps = "Include timestamps" in options |
|
|
|
|
|
if export_type == "Tasks": |
|
return generate_dummy_tasks(5, include_ids, include_timestamps) |
|
elif export_type == "Notes": |
|
return generate_dummy_notes(5, include_ids, include_timestamps) |
|
elif export_type == "Goals": |
|
return generate_dummy_goals(3, include_ids, include_timestamps) |
|
elif export_type == "Focus Sessions": |
|
return generate_dummy_focus_sessions(4, include_ids, include_timestamps) |
|
elif export_type == "Mood Entries": |
|
return generate_dummy_mood_entries(7, include_ids, include_timestamps) |
|
elif export_type == "All Data": |
|
|
|
return { |
|
"tasks": generate_dummy_tasks(5, include_ids, include_timestamps), |
|
"notes": generate_dummy_notes(5, include_ids, include_timestamps), |
|
"goals": generate_dummy_goals(3, include_ids, include_timestamps), |
|
"focus_sessions": generate_dummy_focus_sessions(4, include_ids, include_timestamps), |
|
"mood_entries": generate_dummy_mood_entries(7, include_ids, include_timestamps) |
|
} |
|
else: |
|
return [] |
|
|
|
|
|
def generate_dummy_tasks(count, include_ids=False, include_timestamps=False): |
|
"""Generate dummy tasks for export |
|
|
|
Args: |
|
count: Number of tasks to generate |
|
include_ids: Whether to include IDs |
|
include_timestamps: Whether to include timestamps |
|
|
|
Returns: |
|
List of dummy tasks |
|
""" |
|
tasks = [] |
|
for i in range(count): |
|
task = { |
|
"title": f"Task {i+1}", |
|
"description": f"This is a dummy task {i+1} for export demonstration", |
|
"status": random.choice(["Not Started", "In Progress", "Completed"]), |
|
"priority": random.choice(["Low", "Medium", "High"]), |
|
"tags": [f"tag{j}" for j in range(random.randint(1, 3))] |
|
} |
|
|
|
if include_ids: |
|
task["id"] = f"task_{i+1}" |
|
|
|
if include_timestamps: |
|
task["created_at"] = (datetime.datetime.now() - datetime.timedelta(days=random.randint(1, 30))).isoformat() |
|
task["updated_at"] = (datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 5))).isoformat() |
|
|
|
tasks.append(task) |
|
|
|
return tasks |
|
|
|
|
|
def generate_dummy_notes(count, include_ids=False, include_timestamps=False): |
|
"""Generate dummy notes for export |
|
|
|
Args: |
|
count: Number of notes to generate |
|
include_ids: Whether to include IDs |
|
include_timestamps: Whether to include timestamps |
|
|
|
Returns: |
|
List of dummy notes |
|
""" |
|
notes = [] |
|
for i in range(count): |
|
note = { |
|
"title": f"Note {i+1}", |
|
"content": f"This is a dummy note {i+1} for export demonstration. It contains some text content.", |
|
"category": random.choice(["Personal", "Work", "Ideas", "Reference"]), |
|
"tags": [f"tag{j}" for j in range(random.randint(1, 3))] |
|
} |
|
|
|
if include_ids: |
|
note["id"] = f"note_{i+1}" |
|
|
|
if include_timestamps: |
|
note["created_at"] = (datetime.datetime.now() - datetime.timedelta(days=random.randint(1, 30))).isoformat() |
|
note["updated_at"] = (datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 5))).isoformat() |
|
|
|
notes.append(note) |
|
|
|
return notes |
|
|
|
|
|
def generate_dummy_goals(count, include_ids=False, include_timestamps=False): |
|
"""Generate dummy goals for export |
|
|
|
Args: |
|
count: Number of goals to generate |
|
include_ids: Whether to include IDs |
|
include_timestamps: Whether to include timestamps |
|
|
|
Returns: |
|
List of dummy goals |
|
""" |
|
goals = [] |
|
for i in range(count): |
|
goal = { |
|
"title": f"Goal {i+1}", |
|
"description": f"This is a dummy goal {i+1} for export demonstration", |
|
"status": random.choice(["Not Started", "In Progress", "Completed"]), |
|
"deadline": (datetime.datetime.now() + datetime.timedelta(days=random.randint(10, 100))).isoformat(), |
|
"progress": random.randint(0, 100), |
|
"category": random.choice(["Personal", "Work", "Health", "Finance"]), |
|
"tags": [f"tag{j}" for j in range(random.randint(1, 3))] |
|
} |
|
|
|
if include_ids: |
|
goal["id"] = f"goal_{i+1}" |
|
|
|
if include_timestamps: |
|
goal["created_at"] = (datetime.datetime.now() - datetime.timedelta(days=random.randint(1, 30))).isoformat() |
|
goal["updated_at"] = (datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 5))).isoformat() |
|
|
|
goals.append(goal) |
|
|
|
return goals |
|
|
|
|
|
def generate_dummy_focus_sessions(count, include_ids=False, include_timestamps=False): |
|
"""Generate dummy focus sessions for export |
|
|
|
Args: |
|
count: Number of focus sessions to generate |
|
include_ids: Whether to include IDs |
|
include_timestamps: Whether to include timestamps |
|
|
|
Returns: |
|
List of dummy focus sessions |
|
""" |
|
sessions = [] |
|
for i in range(count): |
|
duration = random.randint(15, 120) |
|
session = { |
|
"title": f"Focus Session {i+1}", |
|
"duration": duration, |
|
"duration_unit": "minutes", |
|
"task": f"Task {random.randint(1, 5)}", |
|
"completed": random.choice([True, False]), |
|
"rating": random.randint(1, 5) |
|
} |
|
|
|
if include_ids: |
|
session["id"] = f"session_{i+1}" |
|
|
|
if include_timestamps: |
|
session["start_time"] = (datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 14))).isoformat() |
|
session["end_time"] = (datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 14), minutes=-duration)).isoformat() |
|
|
|
sessions.append(session) |
|
|
|
return sessions |
|
|
|
|
|
def generate_dummy_mood_entries(count, include_ids=False, include_timestamps=False): |
|
"""Generate dummy mood entries for export |
|
|
|
Args: |
|
count: Number of mood entries to generate |
|
include_ids: Whether to include IDs |
|
include_timestamps: Whether to include timestamps |
|
|
|
Returns: |
|
List of dummy mood entries |
|
""" |
|
entries = [] |
|
for i in range(count): |
|
entry = { |
|
"mood": random.choice(["Happy", "Sad", "Anxious", "Excited", "Tired", "Calm"]), |
|
"rating": random.randint(1, 10), |
|
"notes": f"Mood entry {i+1} for export demonstration", |
|
"factors": random.sample(["Sleep", "Exercise", "Diet", "Work", "Social", "Weather"], k=random.randint(1, 3)) |
|
} |
|
|
|
if include_ids: |
|
entry["id"] = f"mood_{i+1}" |
|
|
|
if include_timestamps: |
|
entry["timestamp"] = (datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 30))).isoformat() |
|
|
|
entries.append(entry) |
|
|
|
return entries |
|
|
|
|
|
def generate_dummy_csv(data, export_type, options): |
|
"""Generate dummy CSV content |
|
|
|
Args: |
|
data: Data to convert to CSV |
|
export_type: Type of data being exported |
|
options: Export options |
|
|
|
Returns: |
|
CSV content as string |
|
""" |
|
|
|
if export_type == "All Data": |
|
|
|
data = data.get("tasks", []) |
|
|
|
if not data: |
|
return "" |
|
|
|
|
|
headers = list(data[0].keys()) |
|
|
|
|
|
csv_lines = [','.join(headers)] |
|
|
|
for item in data: |
|
values = [] |
|
for header in headers: |
|
value = item.get(header, "") |
|
|
|
|
|
if isinstance(value, list): |
|
value = '|'.join(str(v) for v in value) |
|
elif isinstance(value, bool): |
|
value = str(value).lower() |
|
|
|
|
|
value = str(value).replace('"', '""') |
|
if ',' in value: |
|
value = f'"{value}"' |
|
|
|
values.append(value) |
|
|
|
csv_lines.append(','.join(values)) |
|
|
|
return '\n'.join(csv_lines) |
|
|
|
|
|
def generate_dummy_markdown(data, export_type, options): |
|
"""Generate dummy Markdown content |
|
|
|
Args: |
|
data: Data to convert to Markdown |
|
export_type: Type of data being exported |
|
options: Export options |
|
|
|
Returns: |
|
Markdown content as string |
|
""" |
|
|
|
if export_type == "All Data": |
|
|
|
md_lines = [f"# {export_type} Export\n"] |
|
|
|
for data_type, items in data.items(): |
|
if items: |
|
md_lines.append(f"## {data_type.replace('_', ' ').title()}\n") |
|
|
|
for item in items: |
|
md_lines.append(f"### {item.get('title', 'Untitled')}\n") |
|
|
|
for key, value in item.items(): |
|
if key != 'title': |
|
|
|
if isinstance(value, list): |
|
value_str = ', '.join(str(v) for v in value) |
|
else: |
|
value_str = str(value) |
|
|
|
md_lines.append(f"- **{key.replace('_', ' ').title()}**: {value_str}\n") |
|
|
|
md_lines.append("\n") |
|
|
|
return ''.join(md_lines) |
|
else: |
|
|
|
md_lines = [f"# {export_type} Export\n\n"] |
|
|
|
for item in data: |
|
md_lines.append(f"## {item.get('title', 'Untitled')}\n") |
|
|
|
for key, value in item.items(): |
|
if key != 'title': |
|
|
|
if isinstance(value, list): |
|
value_str = ', '.join(str(v) for v in value) |
|
else: |
|
value_str = str(value) |
|
|
|
md_lines.append(f"- **{key.replace('_', ' ').title()}**: {value_str}\n") |
|
|
|
md_lines.append("\n") |
|
|
|
return ''.join(md_lines) |
|
|
|
|
|
def record_export_history(file_name, export_type, export_format, options): |
|
"""Record export history |
|
|
|
Args: |
|
file_name: Name of exported file |
|
export_type: Type of data exported |
|
export_format: Format of exported file |
|
options: Export options used |
|
""" |
|
|
|
export_history = load_data("export_history", default=[]) |
|
|
|
|
|
export_record = { |
|
"file_name": file_name, |
|
"export_type": export_type, |
|
"export_format": export_format, |
|
"options": options, |
|
"timestamp": datetime.datetime.now().isoformat(), |
|
"status": "success" |
|
} |
|
|
|
export_history.append(export_record) |
|
|
|
|
|
save_data("export_history", export_history) |
|
|
|
|
|
def display_export_history(): |
|
"""Display export history""" |
|
|
|
export_history = load_data("export_history", default=[]) |
|
|
|
if not export_history: |
|
st.info("No export history found") |
|
return |
|
|
|
|
|
history_data = [] |
|
for record in export_history: |
|
history_data.append({ |
|
"File": record.get("file_name", "Unknown"), |
|
"Type": record.get("export_type", "Unknown"), |
|
"Format": record.get("export_format", "Unknown"), |
|
"Date": datetime.datetime.fromisoformat(record.get("timestamp", datetime.datetime.now().isoformat())).strftime("%Y-%m-%d %H:%M"), |
|
"Status": record.get("status", "Unknown") |
|
}) |
|
|
|
|
|
history_df = pd.DataFrame(history_data) |
|
st.dataframe(history_df) |
|
|
|
|
|
def create_backup(options, backup_types): |
|
"""Create a backup of data |
|
|
|
Args: |
|
options: Backup options |
|
backup_types: Types of data to backup |
|
""" |
|
try: |
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
backup_file_name = f"mona_backup_{timestamp}.zip" |
|
|
|
|
|
zip_buffer = io.BytesIO() |
|
|
|
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: |
|
|
|
metadata = { |
|
"backup_date": datetime.datetime.now().isoformat(), |
|
"backup_types": backup_types, |
|
"options": options, |
|
"version": "1.0" |
|
} |
|
|
|
zip_file.writestr("metadata.json", json.dumps(metadata, indent=2)) |
|
|
|
|
|
if "All Data" in backup_types or "Tasks" in backup_types: |
|
tasks = generate_dummy_tasks(10, "Include IDs" in options, "Include timestamps" in options) |
|
zip_file.writestr("tasks.json", json.dumps(tasks, indent=2)) |
|
|
|
if "All Data" in backup_types or "Notes" in backup_types: |
|
notes = generate_dummy_notes(10, "Include IDs" in options, "Include timestamps" in options) |
|
zip_file.writestr("notes.json", json.dumps(notes, indent=2)) |
|
|
|
if "All Data" in backup_types or "Goals" in backup_types: |
|
goals = generate_dummy_goals(5, "Include IDs" in options, "Include timestamps" in options) |
|
zip_file.writestr("goals.json", json.dumps(goals, indent=2)) |
|
|
|
if "All Data" in backup_types or "Focus Sessions" in backup_types: |
|
sessions = generate_dummy_focus_sessions(8, "Include IDs" in options, "Include timestamps" in options) |
|
zip_file.writestr("focus_sessions.json", json.dumps(sessions, indent=2)) |
|
|
|
if "All Data" in backup_types or "Mood Entries" in backup_types: |
|
entries = generate_dummy_mood_entries(15, "Include IDs" in options, "Include timestamps" in options) |
|
zip_file.writestr("mood_entries.json", json.dumps(entries, indent=2)) |
|
|
|
if "All Data" in backup_types or "Settings" in backup_types: |
|
|
|
settings = { |
|
"theme": "dark", |
|
"language": "en", |
|
"notifications": True, |
|
"auto_backup": True, |
|
"backup_frequency": "weekly" |
|
} |
|
zip_file.writestr("settings.json", json.dumps(settings, indent=2)) |
|
|
|
|
|
st.download_button( |
|
label="Download Backup", |
|
data=zip_buffer.getvalue(), |
|
file_name=backup_file_name, |
|
mime="application/zip" |
|
) |
|
|
|
|
|
record_backup_history(backup_file_name, backup_types, options) |
|
|
|
|
|
log_activity("backup_created", { |
|
"file_name": backup_file_name, |
|
"backup_types": backup_types |
|
}) |
|
|
|
st.success(f"Backup created: {backup_file_name}") |
|
except Exception as e: |
|
st.error(f"Error creating backup: {str(e)}") |
|
logger.error(f"Error creating backup: {str(e)}") |
|
|
|
|
|
def restore_from_backup(file, options): |
|
"""Restore data from a backup file |
|
|
|
Args: |
|
file: Uploaded backup file |
|
options: Restore options |
|
""" |
|
try: |
|
|
|
if not zipfile.is_zipfile(file): |
|
st.error("Invalid backup file. Please upload a valid backup zip file.") |
|
return |
|
|
|
|
|
if "Backup current data before restore" in options: |
|
create_backup(["Include timestamps", "Compress data"], ["All Data"]) |
|
|
|
|
|
with zipfile.ZipFile(file) as zip_file: |
|
|
|
if "metadata.json" not in zip_file.namelist(): |
|
st.error("Invalid backup file. Metadata not found.") |
|
return |
|
|
|
|
|
with zip_file.open("metadata.json") as metadata_file: |
|
metadata = json.load(metadata_file) |
|
|
|
|
|
st.write("Backup Information:") |
|
st.write(f"- Date: {datetime.datetime.fromisoformat(metadata.get('backup_date', '')).strftime('%Y-%m-%d %H:%M:%S')}") |
|
st.write(f"- Types: {', '.join(metadata.get('backup_types', []))}") |
|
st.write(f"- Version: {metadata.get('version', 'Unknown')}") |
|
|
|
|
|
time.sleep(1) |
|
|
|
|
|
|
|
st.info("Simulated restore from backup") |
|
|
|
|
|
record_restore_history(file.name, metadata.get('backup_types', []), options) |
|
|
|
|
|
log_activity("backup_restored", { |
|
"file_name": file.name, |
|
"backup_types": metadata.get('backup_types', []) |
|
}) |
|
|
|
st.success(f"Successfully restored from {file.name}") |
|
except Exception as e: |
|
st.error(f"Error restoring from backup: {str(e)}") |
|
logger.error(f"Error restoring from backup: {str(e)}") |
|
|
|
|
|
def record_backup_history(file_name, backup_types, options): |
|
"""Record backup history |
|
|
|
Args: |
|
file_name: Name of backup file |
|
backup_types: Types of data backed up |
|
options: Backup options used |
|
""" |
|
|
|
backup_history = load_data("backup_history", default=[]) |
|
|
|
|
|
backup_record = { |
|
"file_name": file_name, |
|
"backup_types": backup_types, |
|
"options": options, |
|
"timestamp": datetime.datetime.now().isoformat(), |
|
"status": "success" |
|
} |
|
|
|
backup_history.append(backup_record) |
|
|
|
|
|
save_data("backup_history", backup_history) |
|
|
|
|
|
def record_restore_history(file_name, backup_types, options): |
|
"""Record restore history |
|
|
|
Args: |
|
file_name: Name of backup file |
|
backup_types: Types of data restored |
|
options: Restore options used |
|
""" |
|
|
|
backup_history = load_data("backup_history", default=[]) |
|
|
|
|
|
restore_record = { |
|
"file_name": file_name, |
|
"backup_types": backup_types, |
|
"options": options, |
|
"timestamp": datetime.datetime.now().isoformat(), |
|
"status": "restored" |
|
} |
|
|
|
backup_history.append(restore_record) |
|
|
|
|
|
save_data("backup_history", backup_history) |
|
|
|
|
|
def display_backup_history(): |
|
"""Display backup history""" |
|
|
|
backup_history = load_data("backup_history", default=[]) |
|
|
|
if not backup_history: |
|
st.info("No backup history found") |
|
return |
|
|
|
|
|
history_data = [] |
|
for record in backup_history: |
|
history_data.append({ |
|
"File": record.get("file_name", "Unknown"), |
|
"Types": ", ".join(record.get("backup_types", [])), |
|
"Date": datetime.datetime.fromisoformat(record.get("timestamp", datetime.datetime.now().isoformat())).strftime("%Y-%m-%d %H:%M"), |
|
"Status": record.get("status", "Unknown") |
|
}) |
|
|
|
|
|
history_data.sort(key=lambda x: x["Date"], reverse=True) |
|
|
|
|
|
history_df = pd.DataFrame(history_data) |
|
st.dataframe(history_df) |