mona / utils /data_analysis.py
mrradix's picture
Upload 48 files
8e4018d verified
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import datetime
from typing import Dict, List, Any, Union, Optional, Tuple
import calendar
from collections import Counter, defaultdict
from utils.logging import setup_logger
from utils.error_handling import handle_exceptions
from utils.storage import load_data, safe_get
from utils.config import FILE_PATHS, UI_COLORS
# Initialize logger
logger = setup_logger(__name__)
@handle_exceptions
def filter_data_by_time_period(data: List[Dict[str, Any]], time_period: str, timestamp_key: str = "created_at") -> List[Dict[str, Any]]:
"""
Filter data based on the selected time period
Args:
data: List of data items (tasks, notes, goals, etc.)
time_period: Time period to filter by ("Last 7 Days", "Last 30 Days", "Last 90 Days", "All Time")
timestamp_key: Key in the data items that contains the timestamp
Returns:
Filtered data items
"""
logger.debug(f"Filtering data by time period: {time_period}")
if time_period == "All Time" or not data:
return data
now = datetime.datetime.now()
if time_period == "Last 7 Days":
cutoff = now - datetime.timedelta(days=7)
elif time_period == "Last 30 Days":
cutoff = now - datetime.timedelta(days=30)
elif time_period == "Last 90 Days":
cutoff = now - datetime.timedelta(days=90)
else:
logger.warning(f"Unknown time period: {time_period}, returning all data")
return data
# Convert cutoff to timestamp if the data timestamps are stored as timestamps
cutoff_timestamp = cutoff.timestamp()
filtered_data = []
for item in data:
item_timestamp = item.get(timestamp_key)
if not item_timestamp:
continue
# Handle both datetime string and timestamp formats
if isinstance(item_timestamp, str):
try:
item_datetime = datetime.datetime.fromisoformat(item_timestamp.replace('Z', '+00:00'))
item_timestamp = item_datetime.timestamp()
except ValueError:
logger.warning(f"Could not parse timestamp: {item_timestamp}")
continue
if item_timestamp >= cutoff_timestamp:
filtered_data.append(item)
logger.debug(f"Filtered data from {len(data)} to {len(filtered_data)} items")
return filtered_data
@handle_exceptions
def create_completion_rate_chart(data: List[Dict[str, Any]], title: str = "Completion Rate",
completed_key: str = "completed") -> go.Figure:
"""
Create a pie chart showing completion rate
Args:
data: List of data items (tasks, goals, etc.)
title: Chart title
completed_key: Key in the data items that indicates completion status
Returns:
Plotly figure object
"""
logger.debug(f"Creating completion rate chart with {len(data)} items")
# Count completed and incomplete items
completed = sum(1 for item in data if safe_get(item, completed_key, False))
incomplete = len(data) - completed
# Calculate percentages
if data:
completed_pct = completed / len(data) * 100
incomplete_pct = 100 - completed_pct
else:
completed_pct = 0
incomplete_pct = 0
# Create labels with percentages
labels = [f"Completed ({completed_pct:.1f}%)", f"Incomplete ({incomplete_pct:.1f}%)"]
values = [completed, incomplete]
# Create pie chart
fig = go.Figure(data=[go.Pie(
labels=labels,
values=values,
hole=0.4,
marker_colors=[UI_COLORS["success"], UI_COLORS["warning"]]
)])
fig.update_layout(
title=title,
showlegend=True,
legend=dict(orientation="h", yanchor="bottom", y=-0.2, xanchor="center", x=0.5),
margin=dict(l=20, r=20, t=40, b=20),
height=300
)
return fig
@handle_exceptions
def create_status_distribution_chart(tasks: List[Dict[str, Any]], title: str = "Task Status Distribution") -> go.Figure:
"""
Create a bar chart showing the distribution of task statuses
Args:
tasks: List of task items
title: Chart title
Returns:
Plotly figure object
"""
logger.debug(f"Creating status distribution chart with {len(tasks)} tasks")
# Count tasks by status
status_counts = Counter(safe_get(task, "status", "To Do") for task in tasks)
# Define status order and colors
status_order = ["To Do", "In Progress", "Done"]
status_colors = [UI_COLORS["warning"], UI_COLORS["info"], UI_COLORS["success"]]
# Filter and sort statuses
statuses = []
counts = []
colors = []
for status in status_order:
if status in status_counts:
statuses.append(status)
counts.append(status_counts[status])
colors.append(status_colors[status_order.index(status)])
# Add any other statuses not in the predefined order
for status, count in status_counts.items():
if status not in status_order:
statuses.append(status)
counts.append(count)
colors.append(UI_COLORS["secondary"])
# Create bar chart
fig = go.Figure(data=[go.Bar(
x=statuses,
y=counts,
marker_color=colors,
text=counts,
textposition='auto',
)])
fig.update_layout(
title=title,
xaxis_title="Status",
yaxis_title="Number of Tasks",
margin=dict(l=20, r=20, t=40, b=20),
height=300
)
return fig
@handle_exceptions
def create_priority_distribution_chart(tasks: List[Dict[str, Any]], title: str = "Task Priority Distribution") -> go.Figure:
"""
Create a bar chart showing the distribution of task priorities
Args:
tasks: List of task items
title: Chart title
Returns:
Plotly figure object
"""
logger.debug(f"Creating priority distribution chart with {len(tasks)} tasks")
# Count tasks by priority
priority_counts = Counter(safe_get(task, "priority", "Medium") for task in tasks)
# Define priority order and colors
priority_order = ["Low", "Medium", "High", "Urgent"]
priority_colors = [UI_COLORS["success"], UI_COLORS["info"], UI_COLORS["warning"], UI_COLORS["danger"]]
# Filter and sort priorities
priorities = []
counts = []
colors = []
for priority in priority_order:
if priority in priority_counts:
priorities.append(priority)
counts.append(priority_counts[priority])
colors.append(priority_colors[priority_order.index(priority)])
# Add any other priorities not in the predefined order
for priority, count in priority_counts.items():
if priority not in priority_order:
priorities.append(priority)
counts.append(count)
colors.append(UI_COLORS["secondary"])
# Create bar chart
fig = go.Figure(data=[go.Bar(
x=priorities,
y=counts,
marker_color=colors,
text=counts,
textposition='auto',
)])
fig.update_layout(
title=title,
xaxis_title="Priority",
yaxis_title="Number of Tasks",
margin=dict(l=20, r=20, t=40, b=20),
height=300
)
return fig
@handle_exceptions
def create_time_series_chart(data: List[Dict[str, Any]], title: str = "Activity Over Time",
timestamp_key: str = "created_at", group_by: str = "day") -> go.Figure:
"""
Create a time series chart showing activity over time
Args:
data: List of data items (tasks, notes, goals, activity, etc.)
title: Chart title
timestamp_key: Key in the data items that contains the timestamp
group_by: Time grouping ("day", "week", "month")
Returns:
Plotly figure object
"""
logger.debug(f"Creating time series chart with {len(data)} items, grouped by {group_by}")
if not data:
# Return empty chart if no data
fig = go.Figure()
fig.update_layout(title=title, height=300)
return fig
# Convert timestamps to datetime objects
dates = []
for item in data:
timestamp = item.get(timestamp_key)
if not timestamp:
continue
# Handle both datetime string and timestamp formats
if isinstance(timestamp, str):
try:
date = datetime.datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
except ValueError:
logger.warning(f"Could not parse timestamp: {timestamp}")
continue
else:
date = datetime.datetime.fromtimestamp(timestamp)
dates.append(date)
if not dates:
# Return empty chart if no valid dates
fig = go.Figure()
fig.update_layout(title=title, height=300)
return fig
# Group dates by the specified time period
date_counts = defaultdict(int)
for date in dates:
if group_by == "day":
key = date.strftime("%Y-%m-%d")
elif group_by == "week":
# Get the start of the week (Monday)
start_of_week = date - datetime.timedelta(days=date.weekday())
key = start_of_week.strftime("%Y-%m-%d")
elif group_by == "month":
key = date.strftime("%Y-%m")
else:
logger.warning(f"Unknown group_by value: {group_by}, using day")
key = date.strftime("%Y-%m-%d")
date_counts[key] += 1
# Sort dates
sorted_dates = sorted(date_counts.keys())
counts = [date_counts[date] for date in sorted_dates]
# Format x-axis labels based on grouping
if group_by == "day":
x_labels = [datetime.datetime.strptime(d, "%Y-%m-%d").strftime("%b %d") for d in sorted_dates]
elif group_by == "week":
x_labels = [f"Week of {datetime.datetime.strptime(d, '%Y-%m-%d').strftime('%b %d')}" for d in sorted_dates]
elif group_by == "month":
x_labels = [datetime.datetime.strptime(d, "%Y-%m").strftime("%b %Y") for d in sorted_dates]
else:
x_labels = sorted_dates
# Create line chart
fig = go.Figure(data=go.Scatter(
x=x_labels,
y=counts,
mode='lines+markers',
line=dict(color=UI_COLORS["primary"], width=2),
marker=dict(size=8, color=UI_COLORS["primary"]),
fill='tozeroy',
fillcolor=f"rgba({int(UI_COLORS['primary'][1:3], 16)}, {int(UI_COLORS['primary'][3:5], 16)}, {int(UI_COLORS['primary'][5:7], 16)}, 0.2)"
))
fig.update_layout(
title=title,
xaxis_title="Date",
yaxis_title="Count",
margin=dict(l=20, r=20, t=40, b=20),
height=300
)
return fig
@handle_exceptions
def create_activity_heatmap(data: List[Dict[str, Any]], title: str = "Activity Heatmap",
timestamp_key: str = "created_at") -> go.Figure:
"""
Create a heatmap showing activity by day of week and hour of day
Args:
data: List of data items (tasks, notes, goals, activity, etc.)
title: Chart title
timestamp_key: Key in the data items that contains the timestamp
Returns:
Plotly figure object
"""
logger.debug(f"Creating activity heatmap with {len(data)} items")
if not data:
# Return empty chart if no data
fig = go.Figure()
fig.update_layout(title=title, height=400)
return fig
# Convert timestamps to datetime objects and extract day of week and hour
day_hour_counts = np.zeros((7, 24)) # 7 days, 24 hours
for item in data:
timestamp = item.get(timestamp_key)
if not timestamp:
continue
# Handle both datetime string and timestamp formats
if isinstance(timestamp, str):
try:
date = datetime.datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
except ValueError:
logger.warning(f"Could not parse timestamp: {timestamp}")
continue
else:
date = datetime.datetime.fromtimestamp(timestamp)
# Get day of week (0 = Monday, 6 = Sunday) and hour
day_of_week = date.weekday()
hour = date.hour
# Increment count for this day and hour
day_hour_counts[day_of_week, hour] += 1
# Create day and hour labels
days = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
hours = [f"{h:02d}:00" for h in range(24)]
# Create heatmap
fig = go.Figure(data=go.Heatmap(
z=day_hour_counts,
x=hours,
y=days,
colorscale='Viridis',
hoverongaps=False,
colorbar=dict(title="Count")
))
fig.update_layout(
title=title,
xaxis_title="Hour of Day",
yaxis_title="Day of Week",
margin=dict(l=20, r=20, t=40, b=20),
height=400
)
return fig
@handle_exceptions
def create_calendar_heatmap(data: List[Dict[str, Any]], title: str = "Calendar Heatmap",
timestamp_key: str = "created_at", year: Optional[int] = None) -> go.Figure:
"""
Create a calendar heatmap showing activity by day of year
Args:
data: List of data items (tasks, notes, goals, activity, etc.)
title: Chart title
timestamp_key: Key in the data items that contains the timestamp
year: Year to display (defaults to current year)
Returns:
Plotly figure object
"""
logger.debug(f"Creating calendar heatmap with {len(data)} items")
if not data:
# Return empty chart if no data
fig = go.Figure()
fig.update_layout(title=title, height=500)
return fig
# Use current year if not specified
if year is None:
year = datetime.datetime.now().year
# Convert timestamps to datetime objects and count activities by date
date_counts = defaultdict(int)
for item in data:
timestamp = item.get(timestamp_key)
if not timestamp:
continue
# Handle both datetime string and timestamp formats
if isinstance(timestamp, str):
try:
date = datetime.datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
except ValueError:
logger.warning(f"Could not parse timestamp: {timestamp}")
continue
else:
date = datetime.datetime.fromtimestamp(timestamp)
# Only include dates from the specified year
if date.year == year:
date_key = date.strftime("%Y-%m-%d")
date_counts[date_key] += 1
# Create a DataFrame with all days of the year
start_date = datetime.date(year, 1, 1)
end_date = datetime.date(year, 12, 31)
all_dates = [start_date + datetime.timedelta(days=i) for i in range((end_date - start_date).days + 1)]
dates = [d.strftime("%Y-%m-%d") for d in all_dates]
counts = [date_counts.get(d, 0) for d in dates]
weekdays = [d.weekday() for d in all_dates] # 0 = Monday, 6 = Sunday
months = [d.month for d in all_dates]
days = [d.day for d in all_dates]
# Create a DataFrame for plotting
df = pd.DataFrame({
'date': dates,
'count': counts,
'weekday': weekdays,
'month': months,
'day': days
})
# Create a custom calendar layout
# We'll create a subplot for each month
fig = make_subplots(rows=4, cols=3, subplot_titles=[calendar.month_name[i] for i in range(1, 13)])
# Define color scale
max_count = max(counts) if counts else 1
colorscale = px.colors.sequential.Viridis
# Add data for each month
for month in range(1, 13):
month_data = df[df['month'] == month]
# Create a 7x6 grid for each month (7 days per week, up to 6 weeks per month)
month_grid = np.zeros((7, 6))
month_grid.fill(np.nan) # Fill with NaN to hide empty cells
# Get the first day of the month and its weekday
first_day = month_data.iloc[0]
first_weekday = first_day['weekday']
# Fill the grid with activity counts
for _, row in month_data.iterrows():
day = row['day'] - 1 # 0-indexed day
weekday = row['weekday'] # 0 = Monday, 6 = Sunday
week = (day + first_weekday) // 7
if week < 6: # Only show up to 6 weeks
month_grid[weekday, week] = row['count']
# Add heatmap for this month
row_idx = (month - 1) // 3 + 1
col_idx = (month - 1) % 3 + 1
fig.add_trace(
go.Heatmap(
z=month_grid,
x=list(range(6)), # Weeks
y=['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'], # Days
colorscale=colorscale,
showscale=month == 12, # Only show colorbar for December
zmin=0,
zmax=max_count
),
row=row_idx,
col=col_idx
)
# Update layout
fig.update_layout(
title=title,
height=800,
margin=dict(l=20, r=20, t=40, b=20),
)
# Hide x-axis labels and ticks
fig.update_xaxes(showticklabels=False, showgrid=False)
return fig
@handle_exceptions
def create_tags_distribution_chart(data: List[Dict[str, Any]], title: str = "Tags Distribution",
tags_key: str = "tags", top_n: int = 10) -> go.Figure:
"""
Create a bar chart showing the distribution of tags
Args:
data: List of data items (tasks, notes, goals, etc.)
title: Chart title
tags_key: Key in the data items that contains the tags
top_n: Number of top tags to display
Returns:
Plotly figure object
"""
logger.debug(f"Creating tags distribution chart with {len(data)} items")
# Extract all tags
all_tags = []
for item in data:
tags = safe_get(item, tags_key, [])
if tags and isinstance(tags, list):
all_tags.extend(tags)
if not all_tags:
# Return empty chart if no tags
fig = go.Figure()
fig.update_layout(title=title, height=300)
return fig
# Count tags
tag_counts = Counter(all_tags)
# Get top N tags
top_tags = tag_counts.most_common(top_n)
tags = [tag for tag, _ in top_tags]
counts = [count for _, count in top_tags]
# Create bar chart
fig = go.Figure(data=[go.Bar(
x=tags,
y=counts,
marker_color=UI_COLORS["primary"],
text=counts,
textposition='auto',
)])
fig.update_layout(
title=title,
xaxis_title="Tag",
yaxis_title="Count",
margin=dict(l=20, r=20, t=40, b=20),
height=300
)
return fig
@handle_exceptions
def create_sentiment_chart(notes: List[Dict[str, Any]], title: str = "Notes Sentiment Analysis",
content_key: str = "content", timestamp_key: str = "created_at") -> go.Figure:
"""
Create a line chart showing sentiment analysis of notes over time
Args:
notes: List of note items
title: Chart title
content_key: Key in the note items that contains the content
timestamp_key: Key in the note items that contains the timestamp
Returns:
Plotly figure object
"""
logger.debug(f"Creating sentiment chart with {len(notes)} notes")
if not notes:
# Return empty chart if no notes
fig = go.Figure()
fig.update_layout(title=title, height=300)
return fig
# Sort notes by timestamp
sorted_notes = sorted(notes, key=lambda x: safe_get(x, timestamp_key, 0))
# Analyze sentiment for each note
dates = []
sentiments = []
texts = []
for note in sorted_notes:
content = safe_get(note, content_key, "")
timestamp = safe_get(note, timestamp_key, None)
title_text = safe_get(note, "title", "Untitled Note")
if not content or not timestamp:
continue
# Analyze sentiment (returns a value between -1 and 1)
try:
from utils.ai_models import analyze_sentiment
sentiment = analyze_sentiment(content)
except Exception as e:
logger.warning(f"Error analyzing sentiment: {str(e)}")
# Use a random sentiment between -0.5 and 0.5 as fallback
sentiment = (np.random.random() - 0.5)
# Convert timestamp to datetime
if isinstance(timestamp, str):
try:
date = datetime.datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
except ValueError:
logger.warning(f"Could not parse timestamp: {timestamp}")
continue
else:
date = datetime.datetime.fromtimestamp(timestamp)
dates.append(date.strftime("%Y-%m-%d"))
sentiments.append(sentiment)
texts.append(f"<b>{title_text}</b><br>{content[:100]}..." if len(content) > 100 else f"<b>{title_text}</b><br>{content}")
if not dates:
# Return empty chart if no valid notes
fig = go.Figure()
fig.update_layout(title=title, height=300)
return fig
# Create line chart
fig = go.Figure(data=go.Scatter(
x=dates,
y=sentiments,
mode='lines+markers',
line=dict(color=UI_COLORS["primary"], width=2),
marker=dict(
size=10,
color=sentiments,
colorscale=[[0, UI_COLORS["danger"]], [0.5, UI_COLORS["warning"]], [1, UI_COLORS["success"]]],
cmin=-1,
cmax=1,
showscale=True,
colorbar=dict(title="Sentiment")
),
text=texts,
hoverinfo="text+x+y"
))
fig.update_layout(
title=title,
xaxis_title="Date",
yaxis_title="Sentiment Score",
yaxis=dict(range=[-1, 1]),
margin=dict(l=20, r=20, t=40, b=20),
height=400
)
# Add a horizontal line at y=0
fig.add_shape(
type="line",
x0=dates[0],
y0=0,
x1=dates[-1],
y1=0,
line=dict(color="gray", width=1, dash="dash")
)
return fig
@handle_exceptions
def create_model_usage_distribution(activities: List[Dict[str, Any]], title: str = "AI Model Usage Distribution") -> go.Figure:
"""
Create a pie chart showing the distribution of AI model usage
Args:
activities: List of activity items
title: Chart title
Returns:
Plotly figure object
"""
logger.debug(f"Creating model usage distribution chart with {len(activities)} activities")
# Filter activities related to AI model usage
ai_activities = []
for activity in activities:
activity_type = safe_get(activity, "type", "")
if "ai_" in activity_type or "model_" in activity_type:
ai_activities.append(activity)
if not ai_activities:
# Return empty chart if no AI activities
fig = go.Figure()
fig.update_layout(title=title, height=300)
return fig
# Count model usage by type
model_counts = defaultdict(int)
for activity in ai_activities:
activity_type = safe_get(activity, "type", "unknown")
# Clean up the activity type for better display
model_type = activity_type.replace("ai_", "").replace("model_", "").replace("_", " ").title()
model_counts[model_type] += 1
# Prepare data for pie chart
models = list(model_counts.keys())
counts = list(model_counts.values())
# Create color map for models
colors = px.colors.qualitative.Plotly[:len(models)] if len(models) <= 10 else px.colors.qualitative.Alphabet
# Create pie chart
fig = go.Figure(data=[go.Pie(
labels=models,
values=counts,
hole=0.4,
marker=dict(colors=colors),
textinfo="label+percent",
insidetextorientation="radial"
)])
fig.update_layout(
title=title,
margin=dict(l=20, r=20, t=40, b=20),
height=300,
legend=dict(orientation="h", yanchor="bottom", y=-0.2, xanchor="center", x=0.5)
)
return fig
@handle_exceptions
def create_model_usage_over_time(activities: List[Dict[str, Any]], title: str = "AI Model Usage Over Time",
timestamp_key: str = "timestamp", group_by: str = "day") -> go.Figure:
"""
Create a stacked area chart showing AI model usage over time
Args:
activities: List of activity items
title: Chart title
timestamp_key: Key in the activity items that contains the timestamp
group_by: Time grouping ("day", "week", "month")
Returns:
Plotly figure object
"""
logger.debug(f"Creating model usage over time chart with {len(activities)} activities")
# Filter activities related to AI model usage
ai_activities = []
for activity in activities:
activity_type = safe_get(activity, "type", "")
if "ai_" in activity_type or "model_" in activity_type:
ai_activities.append(activity)
if not ai_activities:
# Return empty chart if no AI activities
fig = go.Figure()
fig.update_layout(title=title, height=400)
return fig
# Group activities by date and model type
date_model_counts = defaultdict(lambda: defaultdict(int))
model_types = set()
for activity in ai_activities:
timestamp = safe_get(activity, timestamp_key, None)
if not timestamp:
continue
# Convert timestamp to datetime
if isinstance(timestamp, str):
try:
date = datetime.datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
except ValueError:
logger.warning(f"Could not parse timestamp: {timestamp}")
continue
else:
date = datetime.datetime.fromtimestamp(timestamp)
# Group by the specified time period
if group_by == "day":
date_key = date.strftime("%Y-%m-%d")
elif group_by == "week":
# Get the start of the week (Monday)
start_of_week = date - datetime.timedelta(days=date.weekday())
date_key = start_of_week.strftime("%Y-%m-%d")
elif group_by == "month":
date_key = date.strftime("%Y-%m")
else:
logger.warning(f"Unknown group_by value: {group_by}, using day")
date_key = date.strftime("%Y-%m-%d")
# Clean up the activity type for better display
activity_type = safe_get(activity, "type", "unknown")
model_type = activity_type.replace("ai_", "").replace("model_", "").replace("_", " ").title()
model_types.add(model_type)
# Increment count for this date and model type
date_model_counts[date_key][model_type] += 1
if not date_model_counts:
# Return empty chart if no valid activities
fig = go.Figure()
fig.update_layout(title=title, height=400)
return fig
# Sort dates
sorted_dates = sorted(date_model_counts.keys())
# Format x-axis labels based on grouping
if group_by == "day":
x_labels = [datetime.datetime.strptime(d, "%Y-%m-%d").strftime("%b %d") for d in sorted_dates]
elif group_by == "week":
x_labels = [f"Week of {datetime.datetime.strptime(d, '%Y-%m-%d').strftime('%b %d')}" for d in sorted_dates]
elif group_by == "month":
x_labels = [datetime.datetime.strptime(d, "%Y-%m").strftime("%b %Y") for d in sorted_dates]
else:
x_labels = sorted_dates
# Convert model types to a sorted list
model_types = sorted(model_types)
# Create color map for models
colors = px.colors.qualitative.Plotly[:len(model_types)] if len(model_types) <= 10 else px.colors.qualitative.Alphabet
# Create stacked area chart
fig = go.Figure()
for i, model_type in enumerate(model_types):
y_values = [date_model_counts[date].get(model_type, 0) for date in sorted_dates]
fig.add_trace(go.Scatter(
x=x_labels,
y=y_values,
mode='lines',
stackgroup='one', # This makes it a stacked area chart
name=model_type,
line=dict(width=0.5, color=colors[i % len(colors)]),
fillcolor=colors[i % len(colors)]
))
fig.update_layout(
title=title,
xaxis_title="Date",
yaxis_title="Usage Count",
margin=dict(l=20, r=20, t=40, b=20),
height=400,
legend=dict(orientation="h", yanchor="bottom", y=-0.3, xanchor="center", x=0.5)
)
return fig
@handle_exceptions
def create_completion_time_chart(data: List[Dict[str, Any]], title: str = "Completion Time Distribution",
created_key: str = "created_at", completed_key: str = "completed_at") -> go.Figure:
"""
Create a histogram showing the distribution of completion times
Args:
data: List of data items (tasks, goals, etc.)
title: Chart title
created_key: Key in the data items that contains the creation timestamp
completed_key: Key in the data items that contains the completion timestamp
Returns:
Plotly figure object
"""
logger.debug(f"Creating completion time chart with {len(data)} items")
# Filter completed items
completed_items = []
for item in data:
created_at = safe_get(item, created_key, None)
completed_at = safe_get(item, completed_key, None)
if not created_at or not completed_at:
continue
# Convert timestamps to datetime objects
if isinstance(created_at, str):
try:
created_date = datetime.datetime.fromisoformat(created_at.replace('Z', '+00:00'))
except ValueError:
logger.warning(f"Could not parse timestamp: {created_at}")
continue
else:
created_date = datetime.datetime.fromtimestamp(created_at)
if isinstance(completed_at, str):
try:
completed_date = datetime.datetime.fromisoformat(completed_at.replace('Z', '+00:00'))
except ValueError:
logger.warning(f"Could not parse timestamp: {completed_at}")
continue
else:
completed_date = datetime.datetime.fromtimestamp(completed_at)
# Calculate completion time in days
completion_time = (completed_date - created_date).total_seconds() / (60 * 60 * 24) # Convert to days
# Only include positive completion times
if completion_time > 0:
completed_items.append({
"item": item,
"completion_time": completion_time
})
if not completed_items:
# Return empty chart if no completed items
fig = go.Figure()
fig.update_layout(title=title, height=300)
return fig
# Extract completion times
completion_times = [item["completion_time"] for item in completed_items]
# Create histogram
fig = go.Figure(data=go.Histogram(
x=completion_times,
nbinsx=20,
marker_color=UI_COLORS["primary"],
opacity=0.7
))
# Calculate statistics
avg_time = np.mean(completion_times)
median_time = np.median(completion_times)
# Add vertical lines for average and median
fig.add_shape(
type="line",
x0=avg_time,
y0=0,
x1=avg_time,
y1=1,
yref="paper",
line=dict(color=UI_COLORS["success"], width=2, dash="dash"),
)
fig.add_shape(
type="line",
x0=median_time,
y0=0,
x1=median_time,
y1=1,
yref="paper",
line=dict(color=UI_COLORS["warning"], width=2, dash="dash"),
)
# Add annotations for average and median
fig.add_annotation(
x=avg_time,
y=0.95,
yref="paper",
text=f"Average: {avg_time:.1f} days",
showarrow=True,
arrowhead=1,
ax=40,
ay=-40,
bgcolor=UI_COLORS["success"],
font=dict(color="white")
)
fig.add_annotation(
x=median_time,
y=0.85,
yref="paper",
text=f"Median: {median_time:.1f} days",
showarrow=True,
arrowhead=1,
ax=-40,
ay=-40,
bgcolor=UI_COLORS["warning"],
font=dict(color="white")
)
fig.update_layout(
title=title,
xaxis_title="Completion Time (days)",
yaxis_title="Count",
margin=dict(l=20, r=20, t=40, b=20),
height=300
)
return fig