import streamlit as st import pandas as pd from datetime import datetime, timedelta import plotly.express as px import plotly.graph_objects as go from typing import Dict, List, Any, Optional # Import utilities from utils.storage import load_data, save_data from utils.error_handling import show_error, show_success, show_warning, handle_data_exceptions from utils.logging import get_logger from utils.state import generate_id, get_timestamp, record_activity, get_session_state, set_session_state logger = get_logger(__name__) # Task status options TASK_STATUSES = ["Todo", "In Progress", "Review", "Done", "Cancelled"] TASK_PRIORITIES = ["Low", "Medium", "High", "Critical"] TASK_CATEGORIES = ["Development", "Testing", "Documentation", "Bug Fix", "Feature", "Maintenance"] def create_tasks_page(): """ Create the tasks management page """ st.title("📋 Task Management") # Initialize session state if 'tasks' not in st.session_state: st.session_state.tasks = load_tasks_data() # Sidebar for task controls with st.sidebar: st.header("🔧 Task Controls") # Add new task button if st.button("➕ Add New Task", use_container_width=True): set_session_state('show_add_task', True) # Filter options st.subheader("🔍 Filters") status_filter = st.multiselect( "Filter by Status", TASK_STATUSES, default=TASK_STATUSES ) priority_filter = st.multiselect( "Filter by Priority", TASK_PRIORITIES, default=TASK_PRIORITIES ) category_filter = st.multiselect( "Filter by Category", TASK_CATEGORIES, default=TASK_CATEGORIES ) # Date range filter st.subheader("📅 Date Range") date_range = st.date_input( "Select Date Range", value=(datetime.now() - timedelta(days=30), datetime.now()), max_value=datetime.now() ) # Bulk actions st.subheader("🔄 Bulk Actions") if st.button("Save All Tasks"): save_tasks_data(st.session_state.tasks) show_success("Tasks saved successfully!") if st.button("Reset Filters"): st.experimental_rerun() # Main content area if get_session_state('show_add_task', False): show_add_task_form() # Task statistics show_task_statistics() # Tasks table show_tasks_table(status_filter, priority_filter, category_filter, date_range) # Task analytics show_task_analytics() def show_add_task_form(): """ Display the add task form """ st.subheader("➕ Add New Task") with st.form("add_task_form"): col1, col2 = st.columns(2) with col1: title = st.text_input("Task Title*", placeholder="Enter task title") status = st.selectbox("Status", TASK_STATUSES, index=0) priority = st.selectbox("Priority", TASK_PRIORITIES, index=1) with col2: category = st.selectbox("Category", TASK_CATEGORIES, index=0) assignee = st.text_input("Assignee", placeholder="Assign to...") due_date = st.date_input("Due Date", value=datetime.now() + timedelta(days=7)) description = st.text_area("Description", placeholder="Task description...") tags = st.text_input("Tags", placeholder="Enter tags separated by commas") col1, col2, col3 = st.columns(3) with col1: submitted = st.form_submit_button("Create Task", type="primary") with col2: if st.form_submit_button("Cancel"): set_session_state('show_add_task', False) st.experimental_rerun() if submitted: if title.strip(): new_task = create_new_task( title=title.strip(), description=description.strip(), status=status, priority=priority, category=category, assignee=assignee.strip(), due_date=due_date, tags=tags.strip() ) st.session_state.tasks.append(new_task) save_tasks_data(st.session_state.tasks) record_activity("task_created", {"task_id": new_task["id"], "title": title}) show_success(f"Task '{title}' created successfully!") set_session_state('show_add_task', False) st.experimental_rerun() else: show_error("Task title is required!") def create_new_task(title: str, description: str, status: str, priority: str, category: str, assignee: str, due_date, tags: str) -> Dict[str, Any]: """ Create a new task dictionary """ return { "id": generate_id("task"), "title": title, "description": description, "status": status, "priority": priority, "category": category, "assignee": assignee, "due_date": due_date.isoformat() if due_date else None, "created_at": get_timestamp(), "updated_at": get_timestamp(), "tags": [tag.strip() for tag in tags.split(",") if tag.strip()], "completed": status == "Done" } def show_task_statistics(): """ Display task statistics """ st.subheader("📊 Task Statistics") tasks = st.session_state.tasks if not tasks: st.info("No tasks available. Create your first task!") return # Calculate statistics total_tasks = len(tasks) completed_tasks = len([t for t in tasks if t["status"] == "Done"]) in_progress_tasks = len([t for t in tasks if t["status"] == "In Progress"]) overdue_tasks = len(get_overdue_tasks(tasks)) # Display metrics col1, col2, col3, col4 = st.columns(4) with col1: st.metric( "Total Tasks", total_tasks, delta=None ) with col2: completion_rate = (completed_tasks / total_tasks * 100) if total_tasks > 0 else 0 st.metric( "Completed", completed_tasks, delta=f"{completion_rate:.1f}%" ) with col3: st.metric( "In Progress", in_progress_tasks, delta=None ) with col4: st.metric( "Overdue", overdue_tasks, delta="⚠️" if overdue_tasks > 0 else "✅" ) def show_tasks_table(status_filter: List[str], priority_filter: List[str], category_filter: List[str], date_range): """ Display the tasks table with filtering """ st.subheader("📋 Tasks List") tasks = st.session_state.tasks if not tasks: st.info("No tasks to display.") return # Apply filters filtered_tasks = filter_tasks(tasks, status_filter, priority_filter, category_filter, date_range) if not filtered_tasks: st.warning("No tasks match the current filters.") return # Convert to DataFrame for display df = pd.DataFrame(filtered_tasks) # Select columns to display display_columns = ["title", "status", "priority", "category", "assignee", "due_date", "created_at"] available_columns = [col for col in display_columns if col in df.columns] # Display the table edited_df = st.data_editor( df[available_columns], use_container_width=True, hide_index=True, column_config={ "title": st.column_config.TextColumn("Title", max_chars=50), "status": st.column_config.SelectboxColumn( "Status", options=TASK_STATUSES, required=True ), "priority": st.column_config.SelectboxColumn( "Priority", options=TASK_PRIORITIES, required=True ), "category": st.column_config.SelectboxColumn( "Category", options=TASK_CATEGORIES, required=True ), "due_date": st.column_config.DateColumn("Due Date"), "created_at": st.column_config.DatetimeColumn("Created") }, num_rows="dynamic" ) # Handle updates if not edited_df.equals(df[available_columns]): # Update the original tasks for idx, row in edited_df.iterrows(): if idx < len(filtered_tasks): task_id = filtered_tasks[idx]["id"] update_task_from_row(task_id, row) save_tasks_data(st.session_state.tasks) show_success("Tasks updated successfully!") def show_task_analytics(): """ Display task analytics charts """ st.subheader("📈 Task Analytics") tasks = st.session_state.tasks if not tasks: return col1, col2 = st.columns(2) with col1: # Status distribution status_counts = pd.Series([t["status"] for t in tasks]).value_counts() fig_status = px.pie( values=status_counts.values, names=status_counts.index, title="Tasks by Status" ) st.plotly_chart(fig_status, use_container_width=True) with col2: # Priority distribution priority_counts = pd.Series([t["priority"] for t in tasks]).value_counts() fig_priority = px.bar( x=priority_counts.index, y=priority_counts.values, title="Tasks by Priority" ) st.plotly_chart(fig_priority, use_container_width=True) # Tasks over time tasks_df = pd.DataFrame(tasks) if "created_at" in tasks_df.columns: tasks_df["created_date"] = pd.to_datetime(tasks_df["created_at"]).dt.date daily_counts = tasks_df.groupby("created_date").size().reset_index(name="count") fig_timeline = px.line( daily_counts, x="created_date", y="count", title="Tasks Created Over Time" ) st.plotly_chart(fig_timeline, use_container_width=True) @handle_data_exceptions def load_tasks_data() -> List[Dict[str, Any]]: """ Load tasks from storage """ tasks = load_data("tasks.json", "json") if tasks is None: # Create sample tasks sample_tasks = create_sample_tasks() save_data(sample_tasks, "tasks.json", "json") return sample_tasks return tasks @handle_data_exceptions def save_tasks_data(tasks: List[Dict[str, Any]]) -> bool: """ Save tasks to storage """ return save_data(tasks, "tasks.json", "json") def create_sample_tasks() -> List[Dict[str, Any]]: """ Create sample tasks for demonstration """ sample_tasks = [ create_new_task( "Setup project structure", "Initialize the project with proper folder structure and configuration files", "Done", "High", "Development", "Developer 1", datetime.now() - timedelta(days=2), "setup, initialization" ), create_new_task( "Implement user authentication", "Add login/logout functionality with session management", "In Progress", "High", "Development", "Developer 2", datetime.now() + timedelta(days=5), "auth, security" ), create_new_task( "Write unit tests", "Create comprehensive unit tests for core functionality", "Todo", "Medium", "Testing", "QA Team", datetime.now() + timedelta(days=10), "testing, quality" ) ] return sample_tasks def filter_tasks(tasks: List[Dict], status_filter: List[str], priority_filter: List[str], category_filter: List[str], date_range) -> List[Dict]: """ Filter tasks based on criteria """ filtered = [] for task in tasks: # Status filter if task["status"] not in status_filter: continue # Priority filter if task["priority"] not in priority_filter: continue # Category filter if task["category"] not in category_filter: continue # Date filter if "created_at" in task and task["created_at"]: try: task_date = datetime.fromisoformat(task["created_at"]).date() if len(date_range) == 2: start_date, end_date = date_range if not (start_date <= task_date <= end_date): continue except (ValueError, TypeError): continue filtered.append(task) return filtered def get_overdue_tasks(tasks: List[Dict]) -> List[Dict]: """ Get overdue tasks """ overdue = [] today = datetime.now().date() for task in tasks: if task["status"] != "Done" and task.get("due_date"): try: due_date = datetime.fromisoformat(task["due_date"]).date() if due_date < today: overdue.append(task) except (ValueError, TypeError): continue return overdue def update_task_from_row(task_id: str, row: pd.Series): """ Update task from DataFrame row """ for task in st.session_state.tasks: if task["id"] == task_id: for key, value in row.items(): if key in task and pd.notna(value): task[key] = value task["updated_at"] = get_timestamp() break