mona / pages /tasks.py
mrradix's picture
Update pages/tasks.py
a24b87e verified
import streamlit as st
import pandas as pd
from datetime import datetime, timedelta
import plotly.express as px
import plotly.graph_objects as go
from typing import Dict, List, Any, Optional
# Import utilities
from utils.storage import load_data, save_data
from utils.error_handling import show_error, show_success, show_warning, handle_data_exceptions
from utils.logging import get_logger
from utils.state import generate_id, get_timestamp, record_activity, get_session_state, set_session_state
logger = get_logger(__name__)
# Task status options
TASK_STATUSES = ["Todo", "In Progress", "Review", "Done", "Cancelled"]
TASK_PRIORITIES = ["Low", "Medium", "High", "Critical"]
TASK_CATEGORIES = ["Development", "Testing", "Documentation", "Bug Fix", "Feature", "Maintenance"]
def create_tasks_page():
"""
Create the tasks management page
"""
st.title("πŸ“‹ Task Management")
# Initialize session state
if 'tasks' not in st.session_state:
st.session_state.tasks = load_tasks_data()
# Sidebar for task controls
with st.sidebar:
st.header("πŸ”§ Task Controls")
# Add new task button
if st.button("βž• Add New Task", use_container_width=True):
set_session_state('show_add_task', True)
# Filter options
st.subheader("πŸ” Filters")
status_filter = st.multiselect(
"Filter by Status",
TASK_STATUSES,
default=TASK_STATUSES
)
priority_filter = st.multiselect(
"Filter by Priority",
TASK_PRIORITIES,
default=TASK_PRIORITIES
)
category_filter = st.multiselect(
"Filter by Category",
TASK_CATEGORIES,
default=TASK_CATEGORIES
)
# Date range filter
st.subheader("πŸ“… Date Range")
date_range = st.date_input(
"Select Date Range",
value=(datetime.now() - timedelta(days=30), datetime.now()),
max_value=datetime.now()
)
# Bulk actions
st.subheader("πŸ”„ Bulk Actions")
if st.button("Save All Tasks"):
save_tasks_data(st.session_state.tasks)
show_success("Tasks saved successfully!")
if st.button("Reset Filters"):
st.experimental_rerun()
# Main content area
if get_session_state('show_add_task', False):
show_add_task_form()
# Task statistics
show_task_statistics()
# Tasks table
show_tasks_table(status_filter, priority_filter, category_filter, date_range)
# Task analytics
show_task_analytics()
def show_add_task_form():
"""
Display the add task form
"""
st.subheader("βž• Add New Task")
with st.form("add_task_form"):
col1, col2 = st.columns(2)
with col1:
title = st.text_input("Task Title*", placeholder="Enter task title")
status = st.selectbox("Status", TASK_STATUSES, index=0)
priority = st.selectbox("Priority", TASK_PRIORITIES, index=1)
with col2:
category = st.selectbox("Category", TASK_CATEGORIES, index=0)
assignee = st.text_input("Assignee", placeholder="Assign to...")
due_date = st.date_input("Due Date", value=datetime.now() + timedelta(days=7))
description = st.text_area("Description", placeholder="Task description...")
tags = st.text_input("Tags", placeholder="Enter tags separated by commas")
col1, col2, col3 = st.columns(3)
with col1:
submitted = st.form_submit_button("Create Task", type="primary")
with col2:
if st.form_submit_button("Cancel"):
set_session_state('show_add_task', False)
st.experimental_rerun()
if submitted:
if title.strip():
new_task = create_new_task(
title=title.strip(),
description=description.strip(),
status=status,
priority=priority,
category=category,
assignee=assignee.strip(),
due_date=due_date,
tags=tags.strip()
)
st.session_state.tasks.append(new_task)
save_tasks_data(st.session_state.tasks)
record_activity("task_created", {"task_id": new_task["id"], "title": title})
show_success(f"Task '{title}' created successfully!")
set_session_state('show_add_task', False)
st.experimental_rerun()
else:
show_error("Task title is required!")
def create_new_task(title: str, description: str, status: str, priority: str,
category: str, assignee: str, due_date, tags: str) -> Dict[str, Any]:
"""
Create a new task dictionary
"""
return {
"id": generate_id("task"),
"title": title,
"description": description,
"status": status,
"priority": priority,
"category": category,
"assignee": assignee,
"due_date": due_date.isoformat() if due_date else None,
"created_at": get_timestamp(),
"updated_at": get_timestamp(),
"tags": [tag.strip() for tag in tags.split(",") if tag.strip()],
"completed": status == "Done"
}
def show_task_statistics():
"""
Display task statistics
"""
st.subheader("πŸ“Š Task Statistics")
tasks = st.session_state.tasks
if not tasks:
st.info("No tasks available. Create your first task!")
return
# Calculate statistics
total_tasks = len(tasks)
completed_tasks = len([t for t in tasks if t["status"] == "Done"])
in_progress_tasks = len([t for t in tasks if t["status"] == "In Progress"])
overdue_tasks = len(get_overdue_tasks(tasks))
# Display metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric(
"Total Tasks",
total_tasks,
delta=None
)
with col2:
completion_rate = (completed_tasks / total_tasks * 100) if total_tasks > 0 else 0
st.metric(
"Completed",
completed_tasks,
delta=f"{completion_rate:.1f}%"
)
with col3:
st.metric(
"In Progress",
in_progress_tasks,
delta=None
)
with col4:
st.metric(
"Overdue",
overdue_tasks,
delta="⚠️" if overdue_tasks > 0 else "βœ…"
)
def show_tasks_table(status_filter: List[str], priority_filter: List[str],
category_filter: List[str], date_range):
"""
Display the tasks table with filtering
"""
st.subheader("πŸ“‹ Tasks List")
tasks = st.session_state.tasks
if not tasks:
st.info("No tasks to display.")
return
# Apply filters
filtered_tasks = filter_tasks(tasks, status_filter, priority_filter, category_filter, date_range)
if not filtered_tasks:
st.warning("No tasks match the current filters.")
return
# Convert to DataFrame for display
df = pd.DataFrame(filtered_tasks)
# Select columns to display
display_columns = ["title", "status", "priority", "category", "assignee", "due_date", "created_at"]
available_columns = [col for col in display_columns if col in df.columns]
# Display the table
edited_df = st.data_editor(
df[available_columns],
use_container_width=True,
hide_index=True,
column_config={
"title": st.column_config.TextColumn("Title", max_chars=50),
"status": st.column_config.SelectboxColumn(
"Status",
options=TASK_STATUSES,
required=True
),
"priority": st.column_config.SelectboxColumn(
"Priority",
options=TASK_PRIORITIES,
required=True
),
"category": st.column_config.SelectboxColumn(
"Category",
options=TASK_CATEGORIES,
required=True
),
"due_date": st.column_config.DateColumn("Due Date"),
"created_at": st.column_config.DatetimeColumn("Created")
},
num_rows="dynamic"
)
# Handle updates
if not edited_df.equals(df[available_columns]):
# Update the original tasks
for idx, row in edited_df.iterrows():
if idx < len(filtered_tasks):
task_id = filtered_tasks[idx]["id"]
update_task_from_row(task_id, row)
save_tasks_data(st.session_state.tasks)
show_success("Tasks updated successfully!")
def show_task_analytics():
"""
Display task analytics charts
"""
st.subheader("πŸ“ˆ Task Analytics")
tasks = st.session_state.tasks
if not tasks:
return
col1, col2 = st.columns(2)
with col1:
# Status distribution
status_counts = pd.Series([t["status"] for t in tasks]).value_counts()
fig_status = px.pie(
values=status_counts.values,
names=status_counts.index,
title="Tasks by Status"
)
st.plotly_chart(fig_status, use_container_width=True)
with col2:
# Priority distribution
priority_counts = pd.Series([t["priority"] for t in tasks]).value_counts()
fig_priority = px.bar(
x=priority_counts.index,
y=priority_counts.values,
title="Tasks by Priority"
)
st.plotly_chart(fig_priority, use_container_width=True)
# Tasks over time
tasks_df = pd.DataFrame(tasks)
if "created_at" in tasks_df.columns:
tasks_df["created_date"] = pd.to_datetime(tasks_df["created_at"]).dt.date
daily_counts = tasks_df.groupby("created_date").size().reset_index(name="count")
fig_timeline = px.line(
daily_counts,
x="created_date",
y="count",
title="Tasks Created Over Time"
)
st.plotly_chart(fig_timeline, use_container_width=True)
@handle_data_exceptions
def load_tasks_data() -> List[Dict[str, Any]]:
"""
Load tasks from storage
"""
tasks = load_data("tasks.json", "json")
if tasks is None:
# Create sample tasks
sample_tasks = create_sample_tasks()
save_data(sample_tasks, "tasks.json", "json")
return sample_tasks
return tasks
@handle_data_exceptions
def save_tasks_data(tasks: List[Dict[str, Any]]) -> bool:
"""
Save tasks to storage
"""
return save_data(tasks, "tasks.json", "json")
def create_sample_tasks() -> List[Dict[str, Any]]:
"""
Create sample tasks for demonstration
"""
sample_tasks = [
create_new_task(
"Setup project structure",
"Initialize the project with proper folder structure and configuration files",
"Done",
"High",
"Development",
"Developer 1",
datetime.now() - timedelta(days=2),
"setup, initialization"
),
create_new_task(
"Implement user authentication",
"Add login/logout functionality with session management",
"In Progress",
"High",
"Development",
"Developer 2",
datetime.now() + timedelta(days=5),
"auth, security"
),
create_new_task(
"Write unit tests",
"Create comprehensive unit tests for core functionality",
"Todo",
"Medium",
"Testing",
"QA Team",
datetime.now() + timedelta(days=10),
"testing, quality"
)
]
return sample_tasks
def filter_tasks(tasks: List[Dict], status_filter: List[str], priority_filter: List[str],
category_filter: List[str], date_range) -> List[Dict]:
"""
Filter tasks based on criteria
"""
filtered = []
for task in tasks:
# Status filter
if task["status"] not in status_filter:
continue
# Priority filter
if task["priority"] not in priority_filter:
continue
# Category filter
if task["category"] not in category_filter:
continue
# Date filter
if "created_at" in task and task["created_at"]:
try:
task_date = datetime.fromisoformat(task["created_at"]).date()
if len(date_range) == 2:
start_date, end_date = date_range
if not (start_date <= task_date <= end_date):
continue
except (ValueError, TypeError):
continue
filtered.append(task)
return filtered
def get_overdue_tasks(tasks: List[Dict]) -> List[Dict]:
"""
Get overdue tasks
"""
overdue = []
today = datetime.now().date()
for task in tasks:
if task["status"] != "Done" and task.get("due_date"):
try:
due_date = datetime.fromisoformat(task["due_date"]).date()
if due_date < today:
overdue.append(task)
except (ValueError, TypeError):
continue
return overdue
def update_task_from_row(task_id: str, row: pd.Series):
"""
Update task from DataFrame row
"""
for task in st.session_state.tasks:
if task["id"] == task_id:
for key, value in row.items():
if key in task and pd.notna(value):
task[key] = value
task["updated_at"] = get_timestamp()
break