|
import streamlit as st |
|
import pandas as pd |
|
from datetime import datetime, timedelta |
|
import plotly.express as px |
|
import plotly.graph_objects as go |
|
from typing import Dict, List, Any, Optional |
|
|
|
|
|
from utils.storage import load_data, save_data |
|
from utils.error_handling import show_error, show_success, show_warning, handle_data_exceptions |
|
from utils.logging import get_logger |
|
from utils.state import generate_id, get_timestamp, record_activity, get_session_state, set_session_state |
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
TASK_STATUSES = ["Todo", "In Progress", "Review", "Done", "Cancelled"] |
|
TASK_PRIORITIES = ["Low", "Medium", "High", "Critical"] |
|
TASK_CATEGORIES = ["Development", "Testing", "Documentation", "Bug Fix", "Feature", "Maintenance"] |
|
|
|
def create_tasks_page(): |
|
""" |
|
Create the tasks management page |
|
""" |
|
st.title("π Task Management") |
|
|
|
|
|
if 'tasks' not in st.session_state: |
|
st.session_state.tasks = load_tasks_data() |
|
|
|
|
|
with st.sidebar: |
|
st.header("π§ Task Controls") |
|
|
|
|
|
if st.button("β Add New Task", use_container_width=True): |
|
set_session_state('show_add_task', True) |
|
|
|
|
|
st.subheader("π Filters") |
|
|
|
status_filter = st.multiselect( |
|
"Filter by Status", |
|
TASK_STATUSES, |
|
default=TASK_STATUSES |
|
) |
|
|
|
priority_filter = st.multiselect( |
|
"Filter by Priority", |
|
TASK_PRIORITIES, |
|
default=TASK_PRIORITIES |
|
) |
|
|
|
category_filter = st.multiselect( |
|
"Filter by Category", |
|
TASK_CATEGORIES, |
|
default=TASK_CATEGORIES |
|
) |
|
|
|
|
|
st.subheader("π
Date Range") |
|
date_range = st.date_input( |
|
"Select Date Range", |
|
value=(datetime.now() - timedelta(days=30), datetime.now()), |
|
max_value=datetime.now() |
|
) |
|
|
|
|
|
st.subheader("π Bulk Actions") |
|
if st.button("Save All Tasks"): |
|
save_tasks_data(st.session_state.tasks) |
|
show_success("Tasks saved successfully!") |
|
|
|
if st.button("Reset Filters"): |
|
st.experimental_rerun() |
|
|
|
|
|
if get_session_state('show_add_task', False): |
|
show_add_task_form() |
|
|
|
|
|
show_task_statistics() |
|
|
|
|
|
show_tasks_table(status_filter, priority_filter, category_filter, date_range) |
|
|
|
|
|
show_task_analytics() |
|
|
|
def show_add_task_form(): |
|
""" |
|
Display the add task form |
|
""" |
|
st.subheader("β Add New Task") |
|
|
|
with st.form("add_task_form"): |
|
col1, col2 = st.columns(2) |
|
|
|
with col1: |
|
title = st.text_input("Task Title*", placeholder="Enter task title") |
|
status = st.selectbox("Status", TASK_STATUSES, index=0) |
|
priority = st.selectbox("Priority", TASK_PRIORITIES, index=1) |
|
|
|
with col2: |
|
category = st.selectbox("Category", TASK_CATEGORIES, index=0) |
|
assignee = st.text_input("Assignee", placeholder="Assign to...") |
|
due_date = st.date_input("Due Date", value=datetime.now() + timedelta(days=7)) |
|
|
|
description = st.text_area("Description", placeholder="Task description...") |
|
tags = st.text_input("Tags", placeholder="Enter tags separated by commas") |
|
|
|
col1, col2, col3 = st.columns(3) |
|
|
|
with col1: |
|
submitted = st.form_submit_button("Create Task", type="primary") |
|
|
|
with col2: |
|
if st.form_submit_button("Cancel"): |
|
set_session_state('show_add_task', False) |
|
st.experimental_rerun() |
|
|
|
if submitted: |
|
if title.strip(): |
|
new_task = create_new_task( |
|
title=title.strip(), |
|
description=description.strip(), |
|
status=status, |
|
priority=priority, |
|
category=category, |
|
assignee=assignee.strip(), |
|
due_date=due_date, |
|
tags=tags.strip() |
|
) |
|
|
|
st.session_state.tasks.append(new_task) |
|
save_tasks_data(st.session_state.tasks) |
|
record_activity("task_created", {"task_id": new_task["id"], "title": title}) |
|
|
|
show_success(f"Task '{title}' created successfully!") |
|
set_session_state('show_add_task', False) |
|
st.experimental_rerun() |
|
else: |
|
show_error("Task title is required!") |
|
|
|
def create_new_task(title: str, description: str, status: str, priority: str, |
|
category: str, assignee: str, due_date, tags: str) -> Dict[str, Any]: |
|
""" |
|
Create a new task dictionary |
|
""" |
|
return { |
|
"id": generate_id("task"), |
|
"title": title, |
|
"description": description, |
|
"status": status, |
|
"priority": priority, |
|
"category": category, |
|
"assignee": assignee, |
|
"due_date": due_date.isoformat() if due_date else None, |
|
"created_at": get_timestamp(), |
|
"updated_at": get_timestamp(), |
|
"tags": [tag.strip() for tag in tags.split(",") if tag.strip()], |
|
"completed": status == "Done" |
|
} |
|
|
|
def show_task_statistics(): |
|
""" |
|
Display task statistics |
|
""" |
|
st.subheader("π Task Statistics") |
|
|
|
tasks = st.session_state.tasks |
|
|
|
if not tasks: |
|
st.info("No tasks available. Create your first task!") |
|
return |
|
|
|
|
|
total_tasks = len(tasks) |
|
completed_tasks = len([t for t in tasks if t["status"] == "Done"]) |
|
in_progress_tasks = len([t for t in tasks if t["status"] == "In Progress"]) |
|
overdue_tasks = len(get_overdue_tasks(tasks)) |
|
|
|
|
|
col1, col2, col3, col4 = st.columns(4) |
|
|
|
with col1: |
|
st.metric( |
|
"Total Tasks", |
|
total_tasks, |
|
delta=None |
|
) |
|
|
|
with col2: |
|
completion_rate = (completed_tasks / total_tasks * 100) if total_tasks > 0 else 0 |
|
st.metric( |
|
"Completed", |
|
completed_tasks, |
|
delta=f"{completion_rate:.1f}%" |
|
) |
|
|
|
with col3: |
|
st.metric( |
|
"In Progress", |
|
in_progress_tasks, |
|
delta=None |
|
) |
|
|
|
with col4: |
|
st.metric( |
|
"Overdue", |
|
overdue_tasks, |
|
delta="β οΈ" if overdue_tasks > 0 else "β
" |
|
) |
|
|
|
def show_tasks_table(status_filter: List[str], priority_filter: List[str], |
|
category_filter: List[str], date_range): |
|
""" |
|
Display the tasks table with filtering |
|
""" |
|
st.subheader("π Tasks List") |
|
|
|
tasks = st.session_state.tasks |
|
|
|
if not tasks: |
|
st.info("No tasks to display.") |
|
return |
|
|
|
|
|
filtered_tasks = filter_tasks(tasks, status_filter, priority_filter, category_filter, date_range) |
|
|
|
if not filtered_tasks: |
|
st.warning("No tasks match the current filters.") |
|
return |
|
|
|
|
|
df = pd.DataFrame(filtered_tasks) |
|
|
|
|
|
display_columns = ["title", "status", "priority", "category", "assignee", "due_date", "created_at"] |
|
available_columns = [col for col in display_columns if col in df.columns] |
|
|
|
|
|
edited_df = st.data_editor( |
|
df[available_columns], |
|
use_container_width=True, |
|
hide_index=True, |
|
column_config={ |
|
"title": st.column_config.TextColumn("Title", max_chars=50), |
|
"status": st.column_config.SelectboxColumn( |
|
"Status", |
|
options=TASK_STATUSES, |
|
required=True |
|
), |
|
"priority": st.column_config.SelectboxColumn( |
|
"Priority", |
|
options=TASK_PRIORITIES, |
|
required=True |
|
), |
|
"category": st.column_config.SelectboxColumn( |
|
"Category", |
|
options=TASK_CATEGORIES, |
|
required=True |
|
), |
|
"due_date": st.column_config.DateColumn("Due Date"), |
|
"created_at": st.column_config.DatetimeColumn("Created") |
|
}, |
|
num_rows="dynamic" |
|
) |
|
|
|
|
|
if not edited_df.equals(df[available_columns]): |
|
|
|
for idx, row in edited_df.iterrows(): |
|
if idx < len(filtered_tasks): |
|
task_id = filtered_tasks[idx]["id"] |
|
update_task_from_row(task_id, row) |
|
|
|
save_tasks_data(st.session_state.tasks) |
|
show_success("Tasks updated successfully!") |
|
|
|
def show_task_analytics(): |
|
""" |
|
Display task analytics charts |
|
""" |
|
st.subheader("π Task Analytics") |
|
|
|
tasks = st.session_state.tasks |
|
|
|
if not tasks: |
|
return |
|
|
|
col1, col2 = st.columns(2) |
|
|
|
with col1: |
|
|
|
status_counts = pd.Series([t["status"] for t in tasks]).value_counts() |
|
fig_status = px.pie( |
|
values=status_counts.values, |
|
names=status_counts.index, |
|
title="Tasks by Status" |
|
) |
|
st.plotly_chart(fig_status, use_container_width=True) |
|
|
|
with col2: |
|
|
|
priority_counts = pd.Series([t["priority"] for t in tasks]).value_counts() |
|
fig_priority = px.bar( |
|
x=priority_counts.index, |
|
y=priority_counts.values, |
|
title="Tasks by Priority" |
|
) |
|
st.plotly_chart(fig_priority, use_container_width=True) |
|
|
|
|
|
tasks_df = pd.DataFrame(tasks) |
|
if "created_at" in tasks_df.columns: |
|
tasks_df["created_date"] = pd.to_datetime(tasks_df["created_at"]).dt.date |
|
daily_counts = tasks_df.groupby("created_date").size().reset_index(name="count") |
|
|
|
fig_timeline = px.line( |
|
daily_counts, |
|
x="created_date", |
|
y="count", |
|
title="Tasks Created Over Time" |
|
) |
|
st.plotly_chart(fig_timeline, use_container_width=True) |
|
|
|
@handle_data_exceptions |
|
def load_tasks_data() -> List[Dict[str, Any]]: |
|
""" |
|
Load tasks from storage |
|
""" |
|
tasks = load_data("tasks.json", "json") |
|
if tasks is None: |
|
|
|
sample_tasks = create_sample_tasks() |
|
save_data(sample_tasks, "tasks.json", "json") |
|
return sample_tasks |
|
return tasks |
|
|
|
@handle_data_exceptions |
|
def save_tasks_data(tasks: List[Dict[str, Any]]) -> bool: |
|
""" |
|
Save tasks to storage |
|
""" |
|
return save_data(tasks, "tasks.json", "json") |
|
|
|
def create_sample_tasks() -> List[Dict[str, Any]]: |
|
""" |
|
Create sample tasks for demonstration |
|
""" |
|
sample_tasks = [ |
|
create_new_task( |
|
"Setup project structure", |
|
"Initialize the project with proper folder structure and configuration files", |
|
"Done", |
|
"High", |
|
"Development", |
|
"Developer 1", |
|
datetime.now() - timedelta(days=2), |
|
"setup, initialization" |
|
), |
|
create_new_task( |
|
"Implement user authentication", |
|
"Add login/logout functionality with session management", |
|
"In Progress", |
|
"High", |
|
"Development", |
|
"Developer 2", |
|
datetime.now() + timedelta(days=5), |
|
"auth, security" |
|
), |
|
create_new_task( |
|
"Write unit tests", |
|
"Create comprehensive unit tests for core functionality", |
|
"Todo", |
|
"Medium", |
|
"Testing", |
|
"QA Team", |
|
datetime.now() + timedelta(days=10), |
|
"testing, quality" |
|
) |
|
] |
|
|
|
return sample_tasks |
|
|
|
def filter_tasks(tasks: List[Dict], status_filter: List[str], priority_filter: List[str], |
|
category_filter: List[str], date_range) -> List[Dict]: |
|
""" |
|
Filter tasks based on criteria |
|
""" |
|
filtered = [] |
|
|
|
for task in tasks: |
|
|
|
if task["status"] not in status_filter: |
|
continue |
|
|
|
|
|
if task["priority"] not in priority_filter: |
|
continue |
|
|
|
|
|
if task["category"] not in category_filter: |
|
continue |
|
|
|
|
|
if "created_at" in task and task["created_at"]: |
|
try: |
|
task_date = datetime.fromisoformat(task["created_at"]).date() |
|
if len(date_range) == 2: |
|
start_date, end_date = date_range |
|
if not (start_date <= task_date <= end_date): |
|
continue |
|
except (ValueError, TypeError): |
|
continue |
|
|
|
filtered.append(task) |
|
|
|
return filtered |
|
|
|
def get_overdue_tasks(tasks: List[Dict]) -> List[Dict]: |
|
""" |
|
Get overdue tasks |
|
""" |
|
overdue = [] |
|
today = datetime.now().date() |
|
|
|
for task in tasks: |
|
if task["status"] != "Done" and task.get("due_date"): |
|
try: |
|
due_date = datetime.fromisoformat(task["due_date"]).date() |
|
if due_date < today: |
|
overdue.append(task) |
|
except (ValueError, TypeError): |
|
continue |
|
|
|
return overdue |
|
|
|
def update_task_from_row(task_id: str, row: pd.Series): |
|
""" |
|
Update task from DataFrame row |
|
""" |
|
for task in st.session_state.tasks: |
|
if task["id"] == task_id: |
|
for key, value in row.items(): |
|
if key in task and pd.notna(value): |
|
task[key] = value |
|
task["updated_at"] = get_timestamp() |
|
break |