mona / pages /search.py
mrradix's picture
Upload 48 files
8e4018d verified
import gradio as gr
from typing import Dict, List, Any, Union, Optional
import datetime
# Import utilities
from utils.storage import load_data, save_data, safe_get
from utils.state import generate_id, get_timestamp, record_activity
from utils.ai_models import search_content
from utils.config import FILE_PATHS
from utils.logging import setup_logger
from utils.error_handling import handle_exceptions
# Initialize logger
logger = setup_logger(__name__)
@handle_exceptions
def create_search_page(state: Dict[str, Any]) -> None:
"""
Create the smart search page with advanced search capabilities
Args:
state: Application state
"""
logger.info("Creating search page")
# Create the search page layout
with gr.Column(elem_id="search-page"):
gr.Markdown("# πŸ” Smart Search")
gr.Markdown("*Advanced semantic search across all your content*")
# Search bar with suggestions
with gr.Row(elem_id="search-bar"):
search_input = gr.Textbox(
label="Search",
placeholder="Enter search terms or questions...",
elem_id="search-input"
)
# AI Q&A toggle
ai_qa_toggle = gr.Checkbox(
label="AI Q&A Mode",
value=False,
info="Enable to get AI-generated answers to your questions"
)
# Search suggestions (will appear as you type)
search_suggestions = gr.HTML(
visible=False,
elem_classes="search-suggestions-container",
elem_id="search-suggestions"
)
with gr.Row():
# Filters
content_types = gr.CheckboxGroup(
choices=["πŸ“ Tasks", "πŸ““ Notes", "🎯 Goals", "πŸ€– AI History"],
value=["πŸ“ Tasks", "πŸ““ Notes", "🎯 Goals", "πŸ€– AI History"],
label="Content Types"
)
time_period = gr.Dropdown(
choices=["All Time", "Today", "This Week", "This Month", "This Year", "Custom Range"],
value="All Time",
label="Time Period"
)
sort_by = gr.Dropdown(
choices=["Relevance", "Date (Newest First)", "Date (Oldest First)"],
value="Relevance",
label="Sort By"
)
# Advanced filters (initially collapsed)
with gr.Accordion("Advanced Filters", open=False):
with gr.Row():
# Tags filter
tags_filter = gr.Dropdown(
choices=get_all_tags(state) if 'get_all_tags' in globals() else [],
multiselect=True,
label="Filter by Tags",
elem_id="tags-filter"
)
# Status filter (for tasks and goals)
status_filter = gr.Dropdown(
choices=["Any Status", "Not Started", "In Progress", "Completed", "On Hold"],
value="Any Status",
label="Filter by Status",
elem_id="status-filter"
)
# Priority filter (for tasks)
priority_filter = gr.Dropdown(
choices=["Any Priority", "Low", "Medium", "High", "Urgent"],
value="Any Priority",
label="Filter by Priority",
elem_id="priority-filter"
)
# Advanced search features
with gr.Row():
advanced_features_checkboxes = gr.CheckboxGroup(
choices=[
"Knowledge Graph",
"Content Clustering",
"Duplicate Detection",
"Trend Analysis",
"Information Gaps"
],
label="Enable Advanced Features",
value=[]
)
# Search button
search_btn = gr.Button("πŸ” Search", variant="primary")
# AI Q&A Results (only visible when AI Q&A mode is enabled)
with gr.Group(visible=False) as ai_qa_group:
gr.Markdown("### πŸ€– AI-Generated Answer")
ai_answer = gr.Markdown()
gr.Markdown("#### Sources")
ai_sources = gr.Dataframe(
headers=["Source", "Content", "Relevance"],
datatype=["str", "str", "number"],
label="Sources",
col_count=(3, "fixed"),
interactive=False
)
# Results tabs
with gr.Tabs():
# All Results tab
with gr.TabItem("All Results"):
all_results = gr.Dataframe(
headers=["Type", "Title", "Content", "Date"],
datatype=["str", "str", "str", "str"],
label="All Results"
)
# Tasks tab
with gr.TabItem("Tasks"):
task_results = gr.Dataframe(
headers=["Title", "Description", "Status", "Due Date"],
datatype=["str", "str", "str", "str"],
label="Task Results"
)
# Notes tab
with gr.TabItem("Notes"):
note_results = gr.Dataframe(
headers=["Title", "Content", "Tags", "Date"],
datatype=["str", "str", "str", "str"],
label="Note Results"
)
# Goals tab
with gr.TabItem("Goals"):
goal_results = gr.Dataframe(
headers=["Title", "Description", "Progress", "Due Date"],
datatype=["str", "str", "str", "str"],
label="Goal Results"
)
# AI History tab
with gr.TabItem("AI History"):
ai_results = gr.Dataframe(
headers=["Query", "Response", "Date"],
datatype=["str", "str", "str"],
label="AI History Results"
)
# Insights tab
with gr.TabItem("Insights"):
insights_html = gr.HTML(
value="<div class='no-results'>Enable advanced features to see insights.</div>",
elem_id="insights-content"
)
# Related content section
with gr.Accordion("Related Content", open=False, visible=False) as related_content_section:
gr.Markdown("### πŸ”— Related Content")
related_content = gr.Dataframe(
headers=["Type", "Title", "Similarity"],
datatype=["str", "str", "number"],
col_count=(3, "fixed"),
interactive=False
)
# Saved Searches Section
with gr.Accordion("Saved Searches", open=False):
with gr.Row():
with gr.Column(scale=3):
saved_search_name = gr.Textbox(
label="Save Current Search As",
placeholder="Enter a name for this search...",
visible=False
)
with gr.Column(scale=1):
save_search_btn = gr.Button("πŸ’Ύ Save Search")
save_current_search_btn = gr.Button("βœ… Save", visible=False)
with gr.Row():
saved_searches_list = gr.Dataframe(
headers=["Name", "Query", "Filters", "Date Saved"],
datatype=["str", "str", "str", "str"],
label="Your Saved Searches",
value=lambda: get_saved_searches(state)
)
with gr.Row():
with gr.Column():
load_saved_search_btn = gr.Button("πŸ“‚ Load Selected Search")
with gr.Column():
delete_saved_search_btn = gr.Button("πŸ—‘οΈ Delete Selected Search")
# Function to get all tags from the system
def get_all_tags(state):
"""Get all unique tags from notes, tasks, and goals"""
all_tags = set()
# Get tags from notes
notes = safe_get(state, "notes", [])
for note in notes:
if "tags" in note and isinstance(note["tags"], list):
all_tags.update(note["tags"])
# Get tags from tasks
tasks = safe_get(state, "tasks", [])
for task in tasks:
if "tags" in task and isinstance(task["tags"], list):
all_tags.update(task["tags"])
# Get tags from goals
goals = safe_get(state, "goals", [])
for goal in goals:
if "tags" in goal and isinstance(goal["tags"], list):
all_tags.update(goal["tags"])
return sorted(list(all_tags))
# Function to perform search
@handle_exceptions
def search(
query: str,
types: List[str],
period: str,
sort: str
) -> tuple:
"""Perform search across content types"""
logger.info(f"Performing search: {query}")
# Initialize results
results = {
"all": [],
"tasks": [],
"notes": [],
"goals": [],
"ai": []
}
# Get time filter
now = datetime.datetime.now()
if period == "Past Week":
cutoff = now - datetime.timedelta(days=7)
elif period == "Past Month":
cutoff = now - datetime.timedelta(days=30)
elif period == "Past Year":
cutoff = now - datetime.timedelta(days=365)
else:
cutoff = None
# Search tasks
if "Tasks" in types:
tasks = safe_get(state, "tasks", [])
for task in tasks:
if matches_search(task, query, cutoff):
# Format for task results
task_result = [
task.get("title", ""),
task.get("description", ""),
task.get("status", ""),
task.get("due_date", "")
]
results["tasks"].append(task_result)
# Format for all results
all_result = [
"Task",
task.get("title", ""),
task.get("description", ""),
task.get("created_at", "")
]
results["all"].append(all_result)
# Search notes
if "Notes" in types:
notes = safe_get(state, "notes", [])
for note in notes:
if matches_search(note, query, cutoff):
# Format for note results
note_result = [
note.get("title", ""),
note.get("content", ""),
", ".join(note.get("tags", [])),
note.get("created_at", "")
]
results["notes"].append(note_result)
# Format for all results
all_result = [
"Note",
note.get("title", ""),
note.get("content", ""),
note.get("created_at", "")
]
results["all"].append(all_result)
# Search goals
if "Goals" in types:
goals = safe_get(state, "goals", [])
for goal in goals:
if matches_search(goal, query, cutoff):
# Format for goal results
goal_result = [
goal.get("title", ""),
goal.get("description", ""),
f"{goal.get('progress', 0)}%",
goal.get("due_date", "")
]
results["goals"].append(goal_result)
# Format for all results
all_result = [
"Goal",
goal.get("title", ""),
goal.get("description", ""),
goal.get("created_at", "")
]
results["all"].append(all_result)
# Search AI history
if "AI History" in types:
ai_history = safe_get(state, "ai_history", [])
for entry in ai_history:
if matches_search(entry, query, cutoff):
# Format for AI results
ai_result = [
entry.get("query", ""),
entry.get("response", ""),
entry.get("timestamp", "")
]
results["ai"].append(ai_result)
# Format for all results
all_result = [
"AI History",
entry.get("query", ""),
entry.get("response", ""),
entry.get("timestamp", "")
]
results["all"].append(all_result)
# Sort results if needed
if sort == "Date (Newest)":
for key in results:
results[key].sort(key=lambda x: x[3], reverse=True)
elif sort == "Date (Oldest)":
for key in results:
results[key].sort(key=lambda x: x[3])
# Record search activity
record_activity(state, "Performed Search", {
"query": query,
"types": types,
"period": period,
"sort": sort
})
return (
results["all"],
results["tasks"],
results["notes"],
results["goals"],
results["ai"]
)
# Function to check if item matches search criteria
@handle_exceptions
def matches_search(item: Dict[str, Any], query: str, cutoff: Optional[datetime.datetime]) -> bool:
"""Check if an item matches search criteria"""
# Check time period if cutoff is specified
if cutoff:
timestamp = item.get("timestamp", "") or item.get("created_at", "")
if timestamp:
try:
date = datetime.datetime.fromisoformat(timestamp)
if date < cutoff:
return False
except:
pass
# Check content match
query = query.lower()
for value in item.values():
if isinstance(value, str) and query in value.lower():
return True
elif isinstance(value, list):
for v in value:
if isinstance(v, str) and query in v.lower():
return True
return False
# Function to generate search suggestions
def generate_suggestions(query):
"""Generate search suggestions based on partial query"""
if not query or len(query) < 2:
return gr.update(visible=False)
# Get recent searches and popular content
recent_searches = safe_get(state, "search_history", [])
recent_searches = [s.get("query") for s in recent_searches if s.get("query", "").lower().startswith(query.lower())][:5]
# Get popular content titles
tasks = safe_get(state, "tasks", [])
notes = safe_get(state, "notes", [])
goals = safe_get(state, "goals", [])
popular_titles = []
for item in tasks + notes + goals:
title = item.get("title", "")
if title and query.lower() in title.lower():
popular_titles.append(title)
popular_titles = popular_titles[:5] # Limit to 5 suggestions
# Format suggestions
suggestions_html = "<div class='search-suggestions'>\n"
if recent_searches:
suggestions_html += "<p><strong>Recent Searches:</strong></p>\n<ul>\n"
for search in recent_searches:
suggestions_html += f"<li><a href='#' onclick='document.querySelector(\"#search-input\").value = \"{search}\"; return false;'>{search}</a></li>\n"
suggestions_html += "</ul>\n"
if popular_titles:
suggestions_html += "<p><strong>Popular Content:</strong></p>\n<ul>\n"
for title in popular_titles:
suggestions_html += f"<li><a href='#' onclick='document.querySelector(\"#search-input\").value = \"{title}\"; return false;'>{title}</a></li>\n"
suggestions_html += "</ul>\n"
suggestions_html += "</div>"
if recent_searches or popular_titles:
return gr.update(value=suggestions_html, visible=True)
else:
return gr.update(visible=False)
# Function to perform semantic search
@handle_exceptions
def semantic_search(query, content_types, time_period, sort_by, ai_qa_mode, tags=None, status=None, priority=None, advanced_features=None):
"""Perform semantic search across all content based on query and filters with advanced discovery features"""
if not query.strip():
return [], [], [], [], [], gr.update(visible=False), "", [], gr.update(visible=False), ""
# Clean content types (remove icons)
content_types = [ct.split(" ", 1)[1].lower() if " " in ct else ct.lower() for ct in content_types]
# Initialize results
all_results = []
tasks_results = []
notes_results = []
goals_results = []
ai_history_results = []
# Collect all items for searching
all_items = []
# Search tasks
if "tasks" in content_types:
tasks = safe_get(state, "tasks", [])
for task in tasks:
task["_type"] = "task"
all_items.append(task)
# Search notes
if "notes" in content_types:
notes = safe_get(state, "notes", [])
for note in notes:
note["_type"] = "note"
all_items.append(note)
# Search goals
if "goals" in content_types:
goals = safe_get(state, "goals", [])
for goal in goals:
goal["_type"] = "goal"
all_items.append(goal)
# Search AI history
if "ai history" in content_types:
ai_history = safe_get(state, "ai_history", [])
for entry in ai_history:
entry["_type"] = "ai_history"
all_items.append(entry)
# Apply time period filter
filtered_items = []
for item in all_items:
if matches_time_period(item, time_period):
filtered_items.append(item)
# Apply advanced filters if provided
if tags and len(tags) > 0:
filtered_items = [item for item in filtered_items if "tags" in item and any(tag in item["tags"] for tag in tags)]
if status and status != "Any Status":
filtered_items = [item for item in filtered_items if "status" in item and item["status"] == status]
if priority and priority != "Any Priority":
filtered_items = [item for item in filtered_items if "priority" in item and item["priority"] == priority]
# Initialize insights dictionary for advanced features
insights = {}
insights_html = ""
# Perform semantic search using search_content function
try:
search_results = search_content(query, filtered_items)
# Apply advanced search features if requested
if advanced_features and len(advanced_features) > 0:
# Knowledge graph generation
if "Knowledge Graph" in advanced_features:
from utils.ai_models import build_knowledge_graph
insights["knowledge_graph"] = build_knowledge_graph([item for item, _ in search_results[:20]])
# Content clustering
if "Content Clustering" in advanced_features:
from utils.ai_models import cluster_content
insights["clusters"] = cluster_content([item for item, _ in search_results], num_clusters=5)
# Duplicate detection
if "Duplicate Detection" in advanced_features:
from utils.ai_models import detect_duplicates
insights["potential_duplicates"] = detect_duplicates([item for item, _ in search_results])
# Trend analysis
if "Trend Analysis" in advanced_features:
from utils.ai_models import identify_trends
insights["trends"] = identify_trends([item for item, _ in search_results])
# Information gaps
if "Information Gaps" in advanced_features:
from utils.ai_models import identify_information_gaps
insights["information_gaps"] = identify_information_gaps([item for item, _ in search_results])
# Format insights for display
insights_html = format_insights(insights)
else:
insights_html = "<div class='no-results'>Enable advanced features to see insights.</div>"
except Exception as e:
# Fallback to keyword search if semantic search fails
logger.error(f"Semantic search failed: {str(e)}. Falling back to keyword search.")
search_results = [(item, calculate_relevance(item, query)) for item in filtered_items if keyword_matches(item, query)]
insights_html = "<div class='no-results'>Enable advanced features to see insights.</div>"
# Process search results
for item, relevance in search_results:
item_type = item.get("_type", "unknown")
if item_type == "task":
# Format for tasks results
tasks_results.append([
item.get("title", "Untitled Task"),
item.get("description", ""),
item.get("status", "Not Started"),
item.get("due_date", "No due date")
])
# Format for all results
all_results.append([
"Task",
item.get("title", "Untitled Task"),
item.get("description", ""),
item.get("created_at", "")
])
elif item_type == "note":
# Format for notes results
content_preview = item.get("content", "")[:50] + "..." if len(item.get("content", "")) > 50 else item.get("content", "")
tags = ", ".join(item.get("tags", []))
notes_results.append([
item.get("title", "Untitled Note"),
content_preview,
tags,
item.get("created_at", "No date")
])
# Format for all results
all_results.append([
"Note",
item.get("title", "Untitled Note"),
content_preview,
item.get("created_at", "No date")
])
elif item_type == "goal":
# Format for goals results
goals_results.append([
item.get("title", "Untitled Goal"),
item.get("description", ""),
f"{item.get('progress', 0)}%",
item.get("due_date", "No target date")
])
# Format for all results
all_results.append([
"Goal",
item.get("title", "Untitled Goal"),
item.get("description", ""),
item.get("created_at", "No date")
])
elif item_type == "ai_history":
# Format for AI history results
prompt = item.get("query", "")
response = item.get("response", "")[:50] + "..." if len(item.get("response", "")) > 50 else item.get("response", "")
ai_history_results.append([
prompt,
response,
item.get("timestamp", "No date")
])
# Format for all results
all_results.append([
"AI History",
prompt,
response,
item.get("timestamp", "No date")
])
# Sort results based on sort_by parameter
if sort_by == "Date (Newest First)":
all_results.sort(key=lambda x: x[3], reverse=True)
tasks_results.sort(key=lambda x: x[3], reverse=True)
notes_results.sort(key=lambda x: x[3], reverse=True)
goals_results.sort(key=lambda x: x[3], reverse=True)
ai_history_results.sort(key=lambda x: x[2], reverse=True)
elif sort_by == "Date (Oldest First)":
all_results.sort(key=lambda x: x[3])
tasks_results.sort(key=lambda x: x[3])
notes_results.sort(key=lambda x: x[3])
goals_results.sort(key=lambda x: x[3])
ai_history_results.sort(key=lambda x: x[2])
# Generate related content
related_content_data = generate_related_content(search_results)
related_content_visible = len(related_content_data) > 0
# Handle AI Q&A mode
ai_answer_text = ""
ai_sources_data = []
if ai_qa_mode and query.strip().endswith("?"):
# This is a question, generate an AI answer
try:
# Get top sources for context
sources = []
for item, relevance in search_results[:5]: # Use top 5 results as sources
if item.get("_type") == "note":
sources.append(item.get("content", ""))
elif item.get("_type") == "ai_history":
sources.append(item.get("response", ""))
# Combine sources into context
context = "\n\n".join(sources)
# Generate answer using question answering
from utils.ai_models import answer_question
ai_answer_text = answer_question(context, query)
# Format sources for display
for i, (item, relevance) in enumerate(search_results[:5]):
source_type = item.get("_type", "unknown").capitalize()
source_title = item.get("title", f"{source_type} {i+1}")
source_content = ""
if item.get("_type") == "note":
source_content = item.get("content", "")[:100] + "..." if len(item.get("content", "")) > 100 else item.get("content", "")
elif item.get("_type") == "ai_history":
source_content = item.get("response", "")[:100] + "..." if len(item.get("response", "")) > 100 else item.get("response", "")
ai_sources_data.append([source_title, source_content, relevance])
except Exception as e:
logger.error(f"AI Q&A generation failed: {str(e)}")
ai_answer_text = "Sorry, I couldn't generate an answer based on your content. Please try a different question."
# Record search activity and update search history
search_record = {
"type": "search",
"query": query,
"content_types": content_types,
"time_period": time_period,
"results_count": len(all_results),
"timestamp": get_timestamp()
}
record_activity(state, "Performed Search", search_record)
# Update search history
search_history = safe_get(state, "search_history", [])
search_history.insert(0, {"query": query, "timestamp": get_timestamp()})
if len(search_history) > 50: # Limit history size
search_history = search_history[:50]
state["search_history"] = search_history
return (
all_results,
tasks_results,
notes_results,
goals_results,
ai_history_results,
gr.update(visible=ai_qa_mode and query.strip().endswith("?")), # ai_qa_group visibility
ai_answer_text, # ai_answer
ai_sources_data, # ai_sources
gr.update(visible=related_content_visible), # related_content_section visibility
insights_html # insights_html for the insights tab
)
# Helper function to format insights for display
def format_insights(insights):
"""Format insights for display"""
if not insights:
return "<div class='no-results'>No insights available. Enable advanced features to see insights.</div>"
html = "<div class='insights-container'>"
# Knowledge Graph
if "knowledge_graph" in insights:
graph = insights["knowledge_graph"]
html += "<div class='insight-section'>"
html += "<h3>Knowledge Graph</h3>"
html += "<p>Connections between your content:</p>"
# Simple visualization of nodes and edges
html += "<div class='knowledge-graph'>"
html += "<h4>Nodes:</h4><ul>"
for node in graph.get("nodes", [])[:10]: # Limit to 10 nodes for display
html += f"<li>{node.get('label')} ({node.get('type')})</li>"
html += "</ul>"
html += "<h4>Connections:</h4><ul>"
for edge in graph.get("edges", [])[:10]: # Limit to 10 edges for display
html += f"<li>{edge.get('source')} β†’ {edge.get('target')} (strength: {edge.get('weight', 0):.2f})</li>"
html += "</ul>"
html += "</div></div>"
# Content Clusters
if "clusters" in insights:
clusters = insights["clusters"]
html += "<div class='insight-section'>"
html += "<h3>Content Clusters</h3>"
html += "<p>Your content organized by topic:</p>"
html += "<div class='clusters'>"
for cluster_name, items in clusters.items():
html += f"<div class='cluster'><h4>{cluster_name}</h4><ul>"
for item in items[:5]: # Limit to 5 items per cluster
title = item.get("title", "Untitled")
item_type = item.get("type", "item")
html += f"<li>{title} ({item_type})</li>"
html += "</ul></div>"
html += "</div></div>"
# Potential Duplicates
if "potential_duplicates" in insights:
duplicates = insights["potential_duplicates"]
html += "<div class='insight-section'>"
html += "<h3>Potential Duplicates</h3>"
if not duplicates:
html += "<p>No potential duplicates found.</p>"
else:
html += "<p>Items that might be duplicates:</p>"
html += "<div class='duplicates'>"
for i, group in enumerate(duplicates[:3]): # Limit to 3 duplicate groups
html += f"<div class='duplicate-group'><h4>Group {i+1}</h4><ul>"
for item in group:
title = item.get("title", "Untitled")
item_type = item.get("type", "item")
html += f"<li>{title} ({item_type})</li>"
html += "</ul></div>"
html += "</div>"
html += "</div>"
# Trends
if "trends" in insights:
trends = insights["trends"]
html += "<div class='insight-section'>"
html += "<h3>Content Trends</h3>"
# Trending topics
if "trending_topics" in trends:
html += "<h4>Trending Topics</h4><ul>"
trending_topics = trends["trending_topics"]
for month, topics in list(trending_topics.items())[:3]: # Show last 3 months
html += f"<li>{month}: "
topic_str = ", ".join([f"{topic} ({count})" for topic, count in topics[:3]])
html += f"{topic_str}</li>"
html += "</ul>"
# Growth rates
if "growth_rates" in trends:
html += "<h4>Content Growth</h4><ul>"
growth_rates = trends["growth_rates"]
for month, rate in list(growth_rates.items())[:3]: # Show last 3 months
html += f"<li>{month}: {rate:.1f}%</li>"
html += "</ul>"
html += "</div>"
# Information Gaps
if "information_gaps" in insights:
gaps = insights["information_gaps"]
html += "<div class='insight-section'>"
html += "<h3>Information Gaps</h3>"
if not gaps:
html += "<p>No significant information gaps detected.</p>"
else:
html += "<p>Areas that might need more content:</p><ul>"
for gap in gaps[:5]: # Limit to 5 gaps
gap_type = gap.get("type", "")
if gap_type == "underdeveloped_topic":
html += f"<li>Limited content on: {gap.get('topic', 'Unknown')}</li>"
elif gap_type == "missing_connection":
topics = gap.get("topics", [])
html += f"<li>Missing connection between: {' and '.join(topics)}</li>"
else:
html += f"<li>{gap.get('description', 'Unknown gap')}</li>"
html += "</ul>"
html += "</div>"
html += "</div>"
return html
# Helper function to check if an item matches the search criteria with keywords
def keyword_matches(item, query):
"""Check if an item matches the search query using keywords"""
query_lower = query.lower()
# Search in all string fields
for key, value in item.items():
if key.startswith("_"): # Skip internal fields
continue
if isinstance(value, str) and query_lower in value.lower():
return True
elif isinstance(value, list) and all(isinstance(x, str) for x in value):
# Search in list of strings (like tags)
for string_item in value:
if query_lower in string_item.lower():
return True
return False
# Helper function to check if an item matches the time period filter
def matches_time_period(item, time_period):
"""Check if an item falls within the specified time period"""
if time_period == "All Time":
return True
# Get the timestamp from the item
timestamp = None
for key in ["timestamp", "created_at", "date_created", "due_date", "target_date"]:
if key in item and item[key]:
timestamp = item[key]
break
if not timestamp:
return False
try:
item_date = datetime.datetime.fromisoformat(timestamp)
now = datetime.datetime.now()
if time_period == "Today":
return item_date.date() == now.date()
elif time_period == "This Week":
start_of_week = now - datetime.timedelta(days=now.weekday())
return start_of_week.date() <= item_date.date() <= now.date()
elif time_period == "This Month":
return item_date.year == now.year and item_date.month == now.month
elif time_period == "This Year":
return item_date.year == now.year
elif time_period == "Custom Range":
# In a real implementation, this would use custom date range inputs
# For now, default to last 30 days
return now - datetime.timedelta(days=30) <= item_date <= now
return True
except:
return False
# Helper function to calculate relevance score
def calculate_relevance(item, query):
"""Calculate a simple relevance score for an item based on the query"""
query_lower = query.lower()
score = 0
for key, value in item.items():
if key.startswith("_"): # Skip internal fields
continue
if isinstance(value, str):
# Count occurrences of query in the value
occurrences = value.lower().count(query_lower)
# Title and content fields are more important
if key in ["title", "content", "description"]:
score += occurrences * 2
else:
score += occurrences
elif isinstance(value, list) and all(isinstance(x, str) for x in value):
# For lists of strings (like tags)
for string_item in value:
score += string_item.lower().count(query_lower)
# Normalize score between 0 and 1
return min(score / 10, 1.0) # Cap at 1.0
# Function to generate related content
def generate_related_content(search_results):
"""Generate related content based on search results"""
if not search_results or len(search_results) == 0:
return []
related_content = []
seen_items = set() # To avoid duplicates
# Get top search results
top_results = search_results[:3]
for item, _ in top_results:
item_type = item.get("_type", "unknown")
item_id = f"{item_type}_{item.get('id', '')}"
if item_id in seen_items:
continue
seen_items.add(item_id)
# Find similar items
similar_items = find_similar_items(item, search_results)
for similar_item, similarity in similar_items:
similar_id = f"{similar_item.get('_type', 'unknown')}_{similar_item.get('id', '')}"
if similar_id in seen_items:
continue
seen_items.add(similar_id)
# Format for display
item_type_display = similar_item.get("_type", "unknown").capitalize()
title = similar_item.get("title", f"{item_type_display} item")
related_content.append([item_type_display, title, similarity])
return related_content[:5] # Limit to 5 items
# Helper function to find similar items
def find_similar_items(item, all_items):
"""Find items similar to the given item"""
similar_items = []
# Extract item features for comparison
item_text = ""
for key, value in item.items():
if key.startswith("_") or key in ["id", "timestamp", "created_at"]:
continue
if isinstance(value, str):
item_text += value + " "
elif isinstance(value, list) and all(isinstance(x, str) for x in value):
item_text += " ".join(value) + " "
item_text = item_text.lower()
# Compare with other items
for other_item, relevance in all_items:
# Skip the same item
if other_item.get("id") == item.get("id") and other_item.get("_type") == item.get("_type"):
continue
# Extract other item features
other_text = ""
for key, value in other_item.items():
if key.startswith("_") or key in ["id", "timestamp", "created_at"]:
continue
if isinstance(value, str):
other_text += value + " "
elif isinstance(value, list) and all(isinstance(x, str) for x in value):
other_text += " ".join(value) + " "
other_text = other_text.lower()
# Calculate similarity (simple word overlap for now)
item_words = set(item_text.split())
other_words = set(other_text.split())
if not item_words or not other_words:
continue
common_words = item_words.intersection(other_words)
similarity = len(common_words) / max(len(item_words), len(other_words))
if similarity > 0.1: # Threshold for similarity
similar_items.append((other_item, similarity))
# Sort by similarity
similar_items.sort(key=lambda x: x[1], reverse=True)
return similar_items[:3] # Return top 3 similar items
# Function to get saved searches
def get_saved_searches(state):
"""Get saved searches from state"""
saved_searches = safe_get(state, "saved_searches", [])
# Format for display
display_data = []
for search in saved_searches:
filters = f"Types: {', '.join(search.get('content_types', []))} | Period: {search.get('time_period', 'All Time')}"
display_data.append([
search.get("name", "Unnamed Search"),
search.get("query", ""),
filters,
search.get("date_saved", "")
])
return display_data
# Function to save a search
def save_search(name, query, content_types, time_period, sort_by, tags=None, status=None, priority=None, advanced_features=None):
"""Save a search configuration"""
if not name.strip():
return gr.update(value="Please enter a name for this search"), get_saved_searches(state)
# Create search object
search = {
"name": name,
"query": query,
"content_types": content_types,
"time_period": time_period,
"sort_by": sort_by,
"date_saved": get_timestamp(),
"advanced_filters": {
"tags": tags if tags else [],
"status": status if status else "Any Status",
"priority": priority if priority else "Any Priority",
"advanced_features": advanced_features if advanced_features else []
}
}
# Add to state
saved_searches = safe_get(state, "saved_searches", [])
saved_searches.append(search)
state["saved_searches"] = saved_searches
# Record activity
record_activity(state, "Saved Search", {
"search_name": name,
"query": query
})
return gr.update(value=""), get_saved_searches(state)
# Function to load a saved search
def load_saved_search(selected_row):
"""Load a saved search configuration"""
if not selected_row or len(selected_row) == 0:
return [gr.update()] * 10 # No updates if nothing selected
# Get the selected search name
search_name = selected_row[0][0]
# Find the search in saved searches
search = None
for s in safe_get(state, "saved_searches", []):
if s.get("name") == search_name:
search = s
break
if not search:
return [gr.update()] * 10
# Extract advanced filters
advanced_filters = search.get("advanced_filters", {})
tags = advanced_filters.get("tags", [])
status = advanced_filters.get("status", "Any Status")
priority = advanced_filters.get("priority", "Any Priority")
advanced_features = advanced_filters.get("advanced_features", [])
# Return updates for all relevant components
return [
gr.update(value=search.get("query", "")), # search_input
gr.update(value=search.get("content_types", [])), # content_types
gr.update(value=search.get("time_period", "All Time")), # time_period
gr.update(value=search.get("sort_by", "Relevance")), # sort_by
gr.update(value=tags), # tags_filter
gr.update(value=status), # status_filter
gr.update(value=priority), # priority_filter
gr.update(value=advanced_features), # advanced_features_checkboxes
gr.update(value=search_name), # saved_search_name
gr.update() # No update for saved_searches_list
]
# Function to delete a saved search
def delete_saved_search(selected_row):
"""Delete a saved search"""
if not selected_row or len(selected_row) == 0:
return get_saved_searches(state)
# Get the selected search name
search_name = selected_row[0][0]
# Remove from state
saved_searches = safe_get(state, "saved_searches", [])
state["saved_searches"] = [s for s in saved_searches if s.get("name") != search_name]
return get_saved_searches(state)
# Set up search input to generate suggestions
search_input.change(
generate_suggestions,
inputs=[search_input],
outputs=[search_suggestions]
)
# Connect the search button
search_btn.click(
semantic_search,
inputs=[
search_input,
content_types,
time_period,
sort_by,
ai_qa_toggle,
tags_filter,
status_filter,
priority_filter,
advanced_features_checkboxes
],
outputs=[
all_results,
task_results,
note_results,
goal_results,
ai_results,
ai_qa_group,
ai_answer,
ai_sources,
related_content_section,
insights_tab
]
)
# Connect the save search button
save_search_btn.click(
lambda: gr.update(visible=True),
inputs=[],
outputs=[saved_search_name]
)
# Connect the save current search button
save_current_search_btn.click(
save_search,
inputs=[
saved_search_name,
search_input,
content_types,
time_period,
sort_by,
tags_filter,
status_filter,
priority_filter,
advanced_features_checkboxes
],
outputs=[
saved_search_name,
saved_searches_list
]
)
# Connect the load saved search button
load_saved_search_btn.click(
load_saved_search,
inputs=[saved_searches_list],
outputs=[
search_input,
content_types,
time_period,
sort_by,
tags_filter,
status_filter,
priority_filter,
advanced_features_checkboxes,
saved_search_name,
saved_searches_list
]
)
# Connect the delete saved search button
delete_saved_search_btn.click(
delete_saved_search,
inputs=[saved_searches_list],
outputs=[saved_searches_list]
)
# Toggle AI Q&A group visibility
ai_qa_toggle.change(
lambda value: gr.update(visible=value),
inputs=[ai_qa_toggle],
outputs=[ai_qa_group]
)
# Record page visit in activity
record_activity(state, "Viewed Smart Search Page")