Spaces:
Paused
Paused
import base64 | |
import io | |
import os | |
import pandas as pd | |
from docx import Document | |
from io import BytesIO, StringIO | |
import dash # Version 3.0.3 (or compatible) | |
import dash_bootstrap_components as dbc # Version 2.0.2 (or compatible) | |
from dash import html, dcc, Input, Output, State, callback_context, ALL, no_update # Import no_update | |
from dash.exceptions import PreventUpdate # <-- Import PreventUpdate from here | |
import google.generativeai as genai | |
from docx.shared import Pt | |
from docx.enum.style import WD_STYLE_TYPE | |
from PyPDF2 import PdfReader | |
import logging | |
import uuid # For unique IDs if needed with pattern matching | |
import xlsxwriter # Needed for Excel export engine | |
# --- Logging Configuration --- | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# --- Initialize Dash app --- | |
# Using Bootstrap for layout and styling. Added meta tags for responsiveness. | |
# dash==3.0.3 | |
# dash-bootstrap-components==2.0.2 | |
app = dash.Dash(__name__, | |
external_stylesheets=[dbc.themes.BOOTSTRAP], | |
suppress_callback_exceptions=True, # Needed because controls are dynamically added | |
meta_tags=[{"name": "viewport", "content": "width=device-width, initial-scale=1"}]) | |
server = app.server # Expose server for Gunicorn | |
# --- Configure Gemini AI --- | |
# IMPORTANT: Set the GEMINI_API_KEY environment variable before running the app. | |
try: | |
# Prefer direct CUDA GPU configuration in app.py - Note: This is not applicable for cloud-based APIs like Gemini. Configuration happens via API key. | |
api_key = os.environ.get("GEMINI_API_KEY") | |
if not api_key: | |
logging.warning("GEMINI_API_KEY environment variable not found. AI features will be disabled.") | |
model = None | |
else: | |
genai.configure(api_key=api_key) | |
# Specify a model compatible with function calling or more advanced generation if needed. | |
# Using 'gemini-1.5-pro-latest' for potential better performance, check compatibility if issues arise. | |
model = genai.GenerativeModel('gemini-2.5-pro-preview-03-25') # Updated model | |
logging.info("Gemini AI configured successfully using 'gemini-2.5-pro-preview-03-25'.") | |
except Exception as e: | |
logging.error(f"Error configuring Gemini AI: {e}", exc_info=True) | |
model = None | |
# --- Global Variables --- | |
# Using dictionaries to store session-specific data might be better for multi-user, | |
# but for simplicity with current constraints, we use global vars. | |
# Consider using dcc.Store for better state management in complex scenarios. | |
uploaded_files = {} # {filename: content_text} | |
# Stores the *results* of generation/review steps | |
shredded_document = None # Text content of the shredded PWS/requirements | |
pink_review_document = None # Text content of the generated Pink Review | |
red_review_document = None # Text content of the generated Red Review | |
gold_review_document = None # Text content of the generated Gold Review | |
loe_document = None # Text content of the generated LOE | |
virtual_board_document = None # Text content of the generated Virtual Board | |
# Stores the *generated* proposal drafts | |
pink_document = None # Text content of the generated Pink Team document | |
red_document = None # Text content of the generated Red Team document | |
gold_document = None # Text content of the generated Gold Team document | |
# Store uploaded content specifically for review inputs | |
uploaded_pink_content = None | |
uploaded_red_content = None | |
uploaded_gold_content = None | |
# Store the currently displayed document and its type for download/chat | |
current_display_document = None | |
current_display_type = None | |
# --- Document Types --- | |
# Descriptions adjusted slightly for clarity | |
document_types = { | |
"Shred": "Generate a requirements spreadsheet from the PWS/Source Docs, identifying action words (shall, will, perform, etc.) by section.", | |
"Pink": "Create a compliant and compelling Pink Team proposal draft based on the Shredded requirements.", | |
"Pink Review": "Evaluate a Pink Team draft against Shredded requirements. Output findings (compliance, gaps, recommendations) in a spreadsheet.", | |
"Red": "Create a Red Team proposal draft, addressing feedback from the Pink Review and enhancing compliance/compellingness.", | |
"Red Review": "Evaluate a Red Team draft against Shredded requirements and Pink Review findings. Output findings in a spreadsheet.", | |
"Gold": "Create a Gold Team proposal draft, addressing feedback from the Red Review for final compliance and polish.", | |
"Gold Review": "Perform a final compliance review of the Gold Team draft against Shredded requirements and Red Review findings. Output findings.", | |
"Virtual Board": "Simulate a source selection board evaluation of the final proposal against PWS/Shred requirements and evaluation criteria (Sec L/M). Output evaluation.", | |
"LOE": "Generate a Level of Effort (LOE) estimate spreadsheet based on the Shredded requirements." | |
} | |
# --- Layout Definition --- | |
# Using Dash Bootstrap Components for layout and Cards for logical separation. | |
# Single form layout functions for modern design. | |
app.layout = dbc.Container(fluid=True, className="dbc", children=[ | |
# Title Row - Full Width | |
dbc.Row( | |
dbc.Col(html.H1("Proposal AI Assistant", className="text-center my-4", style={'color': '#1C304A'}), width=12) | |
), | |
# Progress Indicator Row (Initially Hidden) - Full Width below title, above columns | |
dbc.Row( | |
dbc.Col( | |
# Blinking triple dot for progress | |
dcc.Loading( | |
id="loading-indicator", | |
type="dots", # Changed type to dots as requested | |
children=[html.Div(id="loading-output", style={'height': '10px'})], # Placeholder content | |
overlay_style={"visibility":"hidden", "opacity": 0}, # Make overlay invisible | |
style={'visibility':'hidden', 'height': '30px'}, # Hide initially via style, give some height | |
fullscreen=False, # Keep it contained | |
className="justify-content-center" | |
), | |
width=12, | |
className="text-center mb-3" # Center the dots | |
) | |
), | |
# Main Content Row (Two Columns) | |
dbc.Row([ | |
# Left Column (Navigation / Upload) - 30% width, light gray background | |
dbc.Col( | |
dbc.Card( | |
dbc.CardBody([ | |
html.H4("1. Upload Source Documents", className="card-title"), | |
dcc.Upload( | |
id='upload-document', | |
children=html.Div(['Drag and Drop or ', html.A('Select PWS/Source Files')]), | |
style={ | |
'width': '100%', 'height': '60px', 'lineHeight': '60px', | |
'borderWidth': '1px', 'borderStyle': 'dashed', 'borderRadius': '5px', | |
'textAlign': 'center', 'margin': '10px 0', 'backgroundColor': '#ffffff' # White background for contrast | |
}, | |
multiple=True # Allow multiple source files | |
), | |
# Use Card for file list for better visual grouping | |
dbc.Card( | |
dbc.CardBody( | |
html.Div(id='file-list', style={'maxHeight': '150px', 'overflowY': 'auto', 'fontSize': '0.9em'}) | |
), className="mb-3" , style={'backgroundColor': '#ffffff'} | |
), | |
html.Hr(), | |
html.H4("2. Select Action", className="card-title mt-3"), | |
# Buttons for actions - Use Card for button group | |
dbc.Card( | |
dbc.CardBody([ | |
# Use primary button style defined in CSS request (implicitly via dbc class) | |
*[dbc.Button( | |
doc_type, | |
id={'type': 'action-button', 'index': doc_type}, # Use pattern-matching ID | |
color="primary", # Primary style | |
className="mb-2 w-100 d-block", # d-block for full width buttons | |
style={'textAlign': 'left', 'whiteSpace': 'normal', 'height': 'auto', 'wordWrap': 'break-word'} # Allow wrap | |
) for doc_type in document_types.keys()] | |
]) | |
) | |
]) | |
, color="light"), # Use Bootstrap 'light' color for card background -> light gray | |
width=12, lg=4, # Full width on small screens, 30% (4/12) on large | |
className="mb-3 mb-lg-0", # Margin bottom on small screens | |
style={'padding': '15px'} | |
), | |
# Right Column (Status / Preview / Controls / Chat) - 70% width, white background | |
dbc.Col( | |
dbc.Card( | |
dbc.CardBody([ | |
# Status Bar | |
dbc.Alert(id='status-bar', children="Upload source documents and select an action.", color="info"), | |
# Dynamic Controls for Reviews - Use Card for visual separation | |
dbc.Card(id='review-controls-card', children=[dbc.CardBody(id='review-controls')], className="mb-3", style={'display': 'none'}), # Hidden initially | |
# Document Preview Area - Use Card | |
dbc.Card( | |
dbc.CardBody([ | |
html.H5("Document Preview / Output", className="card-title"), | |
# Wrap preview in Loading | |
dcc.Loading( | |
id="loading-preview", # Separate loading for preview | |
type="circle", | |
children=[html.Div(id='document-preview', style={'whiteSpace': 'pre-wrap', 'maxHeight': '400px', 'overflowY': 'auto', 'border': '1px solid #ccc', 'padding': '10px', 'borderRadius': '5px', 'background': '#f8f9fa'})] | |
) | |
]), className="mb-3" | |
), | |
dbc.Button("Download Output", id="btn-download", color="success", className="mt-3 me-2", style={'display': 'none'}), # Hidden initially, add margin | |
dcc.Download(id="download-document"), | |
html.Hr(), | |
# Chat Section - Use Card | |
dbc.Card( | |
dbc.CardBody([ | |
html.H5("Refine Output (Chat)", className="card-title"), | |
# Wrap chat in loading | |
dcc.Loading( | |
id="chat-loading", | |
type="circle", | |
children=[ | |
dbc.Textarea(id="chat-input", placeholder="Enter instructions to refine the document shown above...", className="mb-2", style={'whiteSpace': 'normal', 'wordWrap': 'break-word'}), # Ensure word wrap | |
# Button Group for Send and Clear Chat | |
dbc.ButtonGroup([ | |
dbc.Button("Send Chat", id="btn-send-chat", color="secondary"), # Use secondary style | |
dbc.Button("Clear Chat", id="btn-clear-chat", color="tertiary") # Use tertiary style | |
], className="mb-3"), | |
html.Div(id="chat-output", style={'whiteSpace': 'pre-wrap', 'marginTop': '10px', 'border': '1px solid #eee', 'padding': '10px', 'borderRadius': '5px', 'minHeight': '50px'}) # Add border/padding | |
] | |
) | |
]), className="mb-3" | |
) | |
]) | |
), | |
width=12, lg=8, # Full width on small screens, 70% (8/12) on large | |
style={'backgroundColor': '#ffffff', 'padding': '15px'} # White background | |
) | |
]) | |
], style={'maxWidth': '100%', 'padding': '0 15px'}) # Max width and padding for container | |
# --- Helper Functions --- | |
def process_document(contents, filename): | |
"""Processes uploaded file content (PDF or DOCX) and returns text, or None and error message.""" | |
if contents is None: | |
logging.warning(f"process_document called with None contents for {filename}") | |
return None, f"Error: No content provided for {filename}." | |
try: | |
content_type, content_string = contents.split(',') | |
decoded = base64.b64decode(content_string) | |
logging.info(f"Processing file: {filename}") | |
text = None | |
error_message = None | |
if filename.lower().endswith('.docx'): | |
doc = Document(io.BytesIO(decoded)) | |
# Extract text, ensuring paragraphs are separated and empty ones are skipped | |
text = "\n".join([para.text for para in doc.paragraphs if para.text.strip()]) | |
logging.info(f"Successfully processed DOCX: {filename}") | |
elif filename.lower().endswith('.pdf'): | |
pdf = PdfReader(io.BytesIO(decoded)) | |
extracted_pages = [] | |
for i, page in enumerate(pdf.pages): | |
try: | |
page_text = page.extract_text() | |
if page_text: | |
extracted_pages.append(page_text) | |
except Exception as page_e: | |
logging.warning(f"Could not extract text from page {i+1} of {filename}: {page_e}") | |
text = "\n\n".join(extracted_pages) # Separate pages clearly | |
if not text: | |
logging.warning(f"No text extracted from PDF: {filename}. It might be image-based or corrupted.") | |
error_message = f"Error: No text could be extracted from PDF {filename}. It might be image-based or require OCR." | |
else: | |
logging.info(f"Successfully processed PDF: {filename}") | |
else: | |
logging.warning(f"Unsupported file format: {filename}") | |
error_message = f"Unsupported file format: {filename}. Please upload PDF or DOCX." | |
return text, error_message | |
except Exception as e: | |
logging.error(f"Error processing document {filename}: {e}", exc_info=True) | |
return None, f"Error processing file {filename}: {str(e)}" | |
def get_combined_uploaded_text(): | |
"""Combines text content of all successfully uploaded files, separated clearly.""" | |
if not uploaded_files: | |
return "" | |
# Join contents with a separator indicating file breaks | |
return "\n\n--- FILE BREAK ---\n\n".join(uploaded_files.values()) | |
def generate_ai_document(doc_type, input_docs, context_docs=None): | |
"""Generates document using Gemini AI. Updates current_display.""" | |
global current_display_document, current_display_type # Allow modification | |
if not model: | |
logging.error("Gemini AI model not initialized.") | |
return "Error: AI Model not configured. Please check API Key." | |
if not input_docs or not any(doc.strip() for doc in input_docs if doc): # Check if list exists and has non-empty content | |
logging.warning(f"generate_ai_document called for {doc_type} with no valid input documents.") | |
return f"Error: Missing required input document(s) for {doc_type} generation." | |
# Combine input documents into a single string | |
combined_input = "\n\n---\n\n".join(filter(None, input_docs)) | |
combined_context = "\n\n---\n\n".join(filter(None, context_docs)) if context_docs else "" | |
# Enhanced prompt structure based on user feedback and best practices | |
prompt = f"""**Objective:** Generate the '{doc_type}' document. | |
**Your Role:** Act as an expert proposal writer/analyst. | |
**Core Instructions:** | |
1. **Adhere Strictly to the Task:** Generate *only* the content for the '{doc_type}'. Do not add introductions, summaries, or conversational filler unless it's part of the requested document format itself. | |
2. **Follow Format Guidelines:** | |
* **Spreadsheet Types (Shred, Reviews, LOE, Board):** Structure output clearly. Use Markdown tables or a delimited format (like CSV) suitable for parsing. Define clear columns (e.g., `PWS_Section | Requirement | Finding | Recommendation` for reviews; `Section | Task | Estimated_Hours | Resource_Type` for LOE). Use '|' as the primary delimiter for tables. | |
* **Proposal Sections (Pink, Red, Gold):** Write professional, compelling prose. Use active voice ("MicroHealth will..."). Directly address requirements from context (Shredded PWS). Detail the 'how' (technical approach, methodology, workflow, tools). Incorporate innovation and benefits (efficiency, quality, outcomes). Substantiate claims (e.g., cite Gartner, Forrester if applicable). Clearly state roles/responsibilities (labor categories). Ensure compliance with Section L/M (Evaluation Criteria) from context. Avoid vague terms ('might', 'could', 'potentially'); be assertive and confident. Use paragraphs primarily; limit bullet points to lists where essential. | |
3. **Utilize Provided Documents:** | |
* **Context Document(s):** Use these as the primary reference or baseline (e.g., Shredded Requirements are the basis for compliance). | |
* **Primary Input Document(s):** This is the main subject of the task (e.g., the PWS to be Shredded, the Pink draft to be Reviewed, the Review findings to incorporate into the next draft). | |
**Provided Documents:** | |
**Context Document(s) (e.g., Shredded Requirements, PWS Section L/M):** | |
```text | |
{combined_context if combined_context else "N/A"} | |
``` | |
**Primary Input Document(s) (e.g., PWS text, Pink Draft text, Review Findings text):** | |
```text | |
{combined_input} | |
``` | |
**Detailed Instructions for '{doc_type}':** | |
{document_types.get(doc_type, "Generate the requested document based on the inputs and context.")} | |
**Begin '{doc_type}' Output (Use Markdown table format for spreadsheet types):** | |
""" | |
logging.info(f"Generating AI document for: {doc_type}") | |
# logging.debug(f"Prompt for {doc_type}: {prompt[:500]}...") # Uncomment for debugging prompt starts | |
try: | |
# Increased timeout might be needed for complex generations | |
response = model.generate_content(prompt) # Consider adding request_options={'timeout': 300} if needed | |
# Handle potential safety blocks or empty responses | |
# Accessing response text might differ slightly based on API version/model behavior | |
generated_text = "" | |
try: | |
if hasattr(response, 'text'): | |
generated_text = response.text | |
elif hasattr(response, 'parts') and response.parts: | |
# Concatenate text from parts if necessary | |
generated_text = "".join(part.text for part in response.parts if hasattr(part, 'text')) | |
else: | |
# Check for finish_reason if available | |
finish_reason = getattr(response, 'prompt_feedback', {}).get('block_reason') or getattr(response, 'candidates', [{}])[0].get('finish_reason') | |
logging.warning(f"Gemini AI response for {doc_type} has no text/parts. Finish Reason: {finish_reason}. Response: {response}") | |
generated_text = f"Error: AI returned no content for {doc_type}. Possible reason: {finish_reason}. Check Gemini safety settings or prompt complexity." | |
except Exception as resp_err: | |
logging.error(f"Error extracting text from Gemini response for {doc_type}: {resp_err}", exc_info=True) | |
generated_text = f"Error: Could not parse AI response for {doc_type}." | |
if not generated_text.strip() and not generated_text.startswith("Error:"): | |
logging.warning(f"Gemini AI returned empty text for {doc_type}.") | |
generated_text = f"Error: AI returned empty content for {doc_type}. Please try again or adjust the input documents." | |
logging.info(f"Successfully generated document for: {doc_type}") | |
# Update global state for download/chat *only if successful* | |
if not generated_text.startswith("Error:"): | |
current_display_document = generated_text | |
current_display_type = doc_type | |
else: | |
# Ensure error message is displayed if AI returns an error internally or extraction failed | |
current_display_document = generated_text | |
current_display_type = doc_type # Still set type so user knows what failed | |
return generated_text | |
except Exception as e: | |
logging.error(f"Error during Gemini AI call for {doc_type}: {e}", exc_info=True) | |
# Update display with error message | |
current_display_document = f"Error generating document via AI for {doc_type}: {str(e)}" | |
current_display_type = doc_type | |
return current_display_document | |
# --- Callbacks --- | |
# 1. Handle File Uploads (Source Documents) | |
def handle_file_upload(list_of_contents, list_of_names, existing_files_display): | |
global uploaded_files | |
# Reset downstream data when new source files are uploaded, as context changes | |
global shredded_document, pink_document, pink_review_document, red_document, red_review_document, gold_document, gold_review_document, loe_document, virtual_board_document, current_display_document, current_display_type, uploaded_pink_content, uploaded_red_content, uploaded_gold_content | |
status_message = "Please upload source documents (.pdf, .docx) and select an action." | |
if list_of_contents is None: | |
raise PreventUpdate | |
new_files_display = [] | |
processed_count = 0 | |
error_count = 0 | |
reset_needed = False | |
if existing_files_display is None: | |
existing_files_display = [] | |
# Get current filenames from the display to avoid duplicates | |
current_filenames = set() | |
if existing_files_display: | |
# Handle potential list vs single item | |
file_list_items = existing_files_display if isinstance(existing_files_display, list) else [existing_files_display] | |
for item in file_list_items: | |
# Check structure carefully based on Div/Button/Span | |
if isinstance(item, html.Div) and len(item.children) > 1 and isinstance(item.children[1], html.Span): | |
current_filenames.add(item.children[1].children) | |
for i, (content, name) in enumerate(zip(list_of_contents, list_of_names)): | |
if name in current_filenames: | |
logging.warning(f"Skipping duplicate upload attempt for source file: {name}") | |
continue # Avoid processing duplicates | |
file_content_text, error = process_document(content, name) | |
if error: | |
logging.error(f"Failed to process source file {name}: {error}") | |
error_count += 1 | |
status_message = f"Error processing {name}. {error}" # Show last error | |
continue # Skip adding failed files | |
if file_content_text is not None: # Allow empty files if processing is successful | |
uploaded_files[name] = file_content_text | |
# Use dbc.Button for remove, styled small | |
new_files_display.append(html.Div([ | |
dbc.Button('X', id={'type': 'remove-file', 'index': name}, size="sm", color="danger", className="me-2 py-0 px-1", n_clicks=0), | |
html.Span(name, title=name) # Add tooltip with full name | |
], className="d-flex align-items-center mb-1")) | |
processed_count += 1 | |
current_filenames.add(name) # Add to tracking set | |
reset_needed = True # Mark that downstream docs should be cleared | |
if reset_needed: | |
logging.info("New source files uploaded, resetting downstream generated documents.") | |
shredded_document = None | |
pink_document = None | |
pink_review_document = None | |
red_document = None | |
red_review_document = None | |
gold_document = None | |
gold_review_document = None | |
loe_document = None | |
virtual_board_document = None | |
current_display_document = None # Clear preview | |
current_display_type = None | |
uploaded_pink_content = None # Also clear review uploads if source changes | |
uploaded_red_content = None | |
uploaded_gold_content = None | |
if processed_count > 0: | |
status_message = f"Successfully uploaded {processed_count} source file(s). Ready for 'Shred' or other actions." | |
elif error_count > 0 and processed_count == 0: | |
status_message = "Failed to process uploaded file(s). Check logs. Ensure they are valid PDF/DOCX with extractable text." | |
elif not new_files_display: # Means only duplicates were uploaded or upload was empty | |
status_message = "No new valid source files were added." | |
# Combine existing and new display items | |
final_display_list = (existing_files_display if isinstance(existing_files_display, list) else [existing_files_display] if existing_files_display else []) + new_files_display | |
return final_display_list, status_message | |
# 2. Handle File Removal (Source Documents) | |
def handle_file_remove(n_clicks, current_file_list_display): | |
global uploaded_files | |
# Reset downstream data when a source file is removed | |
global shredded_document, pink_document, pink_review_document, red_document, red_review_document, gold_document, gold_review_document, loe_document, virtual_board_document, current_display_document, current_display_type, uploaded_pink_content, uploaded_red_content, uploaded_gold_content | |
triggered_id_dict = callback_context.triggered_id | |
# Check if the callback was triggered by a pattern-matching ID and n_clicks increased | |
if not triggered_id_dict or not isinstance(triggered_id_dict, dict) or 'index' not in triggered_id_dict: | |
raise PreventUpdate | |
# Check if any click count is > 0 (or just check the specific one that triggered) | |
# Ensure n_clicks is a list before using any() | |
if not n_clicks or not any(nc for nc in n_clicks if nc is not None): # Check if any click occurred | |
raise PreventUpdate | |
file_to_remove = triggered_id_dict['index'] | |
logging.info(f"Attempting to remove source file: {file_to_remove}") | |
if file_to_remove in uploaded_files: | |
del uploaded_files[file_to_remove] | |
logging.info(f"Removed {file_to_remove} from uploaded_files dictionary.") | |
# Reset downstream docs since context changed | |
logging.info("Source file removed, resetting downstream generated documents.") | |
shredded_document = None | |
pink_document = None | |
pink_review_document = None | |
red_document = None | |
red_review_document = None | |
gold_document = None | |
gold_review_document = None | |
loe_document = None | |
virtual_board_document = None | |
current_display_document = None # Clear preview | |
current_display_type = None | |
uploaded_pink_content = None # Also clear review uploads | |
uploaded_red_content = None | |
uploaded_gold_content = None | |
# Filter the display list | |
updated_file_list_display = [] | |
if current_file_list_display: | |
# Handle potential list vs single item | |
file_list_items = current_file_list_display if isinstance(current_file_list_display, list) else [current_file_list_display] | |
updated_file_list_display = [ | |
item for item in file_list_items | |
# Check structure carefully based on Div/Button/Span | |
if not (isinstance(item, html.Div) and | |
item.children and isinstance(item.children[0], dbc.Button) and | |
isinstance(item.children[0].id, dict) and | |
item.children[0].id.get('index') == file_to_remove) | |
] | |
status_message = f"Removed source file: {file_to_remove}. " | |
if not uploaded_files: | |
status_message += "No source files remaining. Please upload documents." | |
else: | |
status_message += "Ready for 'Shred' or other actions." | |
# If the list is now empty, return an empty list instead of None | |
return updated_file_list_display if updated_file_list_display else [], status_message | |
# 3. Handle Action Button Clicks (Show Controls or Trigger Generation) | |
def handle_action_button(n_clicks): | |
global shredded_document, pink_document, red_document, gold_document, pink_review_document, red_review_document, gold_review_document, loe_document, virtual_board_document | |
# Reset potentially uploaded review files when a *new* main action is selected from the left nav | |
global uploaded_pink_content, uploaded_red_content, uploaded_gold_content | |
global current_display_document, current_display_type # Need to update preview/download state | |
triggered_id_dict = callback_context.triggered_id | |
if not triggered_id_dict or not isinstance(triggered_id_dict, dict) or 'index' not in triggered_id_dict: | |
raise PreventUpdate | |
# Check if any click count is > 0 | |
if not n_clicks or not any(nc for nc in n_clicks if nc is not None): | |
raise PreventUpdate | |
action_type = triggered_id_dict['index'] | |
logging.info(f"Action button clicked: {action_type}") | |
# Default states | |
review_controls_style = {'display': 'none'} # Hide review controls by default | |
review_controls_children = [] | |
status_message = f"Selected action: {action_type}" | |
doc_preview_children = no_update # Avoid clearing preview unless needed | |
loading_style = {'visibility':'hidden', 'height': '30px'} # Hide loading dots by default | |
download_style = {'display': 'none'} # Hide download button by default | |
# Reset previously uploaded review files content when a *new* action is selected. | |
# This prevents using an old uploaded file for a new review type accidentally. | |
uploaded_pink_content = None | |
uploaded_red_content = None | |
uploaded_gold_content = None | |
logging.debug("Cleared any previously uploaded review document content.") | |
# --- Actions Requiring Review Controls (Pink/Red/Gold Review) --- | |
if action_type in ["Pink Review", "Red Review", "Gold Review"]: | |
review_controls_style = {'display': 'block'} # Show the review controls card | |
base_doc_type = action_type.split(" ")[0] # Pink, Red, or Gold | |
prereq_doc = None | |
prereq_doc_name = "" | |
generated_doc_to_review = None | |
generated_doc_name = f"Generated {base_doc_type} Document" | |
upload_file_prompt = f"Select {base_doc_type} File" | |
# Check common prerequisite: Shredded document | |
if not shredded_document: | |
status_message = "Error: Please 'Shred' the source documents first." | |
doc_preview_children = html.Div(status_message, className="text-danger") | |
current_display_document = None # Clear potentially stale preview | |
current_display_type = None | |
return review_controls_style, [], status_message, doc_preview_children, loading_style, download_style | |
# Check specific prerequisites and get the document to review | |
if action_type == "Pink Review": | |
prereq_doc = shredded_document # Base requirement | |
prereq_doc_name = "Shredded Document" | |
generated_doc_to_review = pink_document # Generated Pink to review | |
elif action_type == "Red Review": | |
prereq_doc = pink_review_document # Need Pink review results | |
prereq_doc_name = "Pink Review Document" | |
generated_doc_to_review = red_document # Generated Red to review | |
elif action_type == "Gold Review": | |
prereq_doc = red_review_document # Need Red review results | |
prereq_doc_name = "Red Review Document" | |
generated_doc_to_review = gold_document # Generated Gold to review | |
# Check if the specific prerequisite (like Pink Review for Red Review) exists | |
if prereq_doc is None and action_type != "Pink Review": # Shred is checked above | |
status_message = f"Error: Please complete '{prereq_doc_name.replace(' Document','')}' first." | |
doc_preview_children = html.Div(status_message, className="text-danger") | |
current_display_document = None | |
current_display_type = None | |
return review_controls_style, [], status_message, doc_preview_children, loading_style, download_style | |
# Configure Radio Items based on whether the generated version exists | |
radio_options = [] | |
default_value = 'upload' # Default to upload as requested | |
if generated_doc_to_review: | |
radio_options.append({'label': f'Use {generated_doc_name}', 'value': 'generated'}) | |
radio_options.append({'label': f'Upload {base_doc_type} Document', 'value': 'upload'}) | |
# Keep default 'upload' unless generated is the *only* option (which shouldn't happen here) | |
else: | |
# If generated doesn't exist, only allow upload | |
radio_options.append({'label': f'Upload {base_doc_type} Document', 'value': 'upload'}) | |
status_message = f"Warning: No '{base_doc_type}' document was generated in this session. You must upload one to proceed with {action_type}." | |
# Build the controls | |
review_controls_children = [ | |
html.H5(f"Configure Input for {action_type}"), | |
dbc.Label(f"Select {base_doc_type} document source:"), | |
dbc.RadioItems( | |
id='review-source-radio', # Single ID for the radio group | |
options=radio_options, | |
value=default_value, # Default to 'upload' | |
inline=True, | |
className='mb-2' | |
), | |
# Single Upload component, dynamically shown/hidden by radio button callback | |
dcc.Upload( | |
id='upload-review-doc', # Single ID for the upload component | |
children=html.Div(['Drag and Drop or ', html.A(upload_file_prompt)]), | |
style={'display': 'block' if default_value == 'upload' else 'none', # Show/hide based on default value | |
'width': '100%', 'height': '60px', 'lineHeight': '60px', 'borderWidth': '1px', | |
'borderStyle': 'dashed', 'borderRadius': '5px', 'textAlign': 'center', 'margin': '10px 0', | |
'backgroundColor': '#f8f9fa'}, | |
multiple=False | |
), | |
html.Div(id='review-upload-status', className='mb-2 text-muted small'), # For upload confirmation/error | |
# Generate Review button with pattern-matching ID | |
dbc.Button(f"Generate {action_type}", id={'type': 'generate-review-button', 'index': action_type}, color="primary") | |
] | |
# Clear preview when showing controls, provide instruction | |
doc_preview_children = html.Div(f"Configure input source for {base_doc_type} document and click 'Generate {action_type}'.", style={'padding':'10px'}) | |
status_message = f"Ready to configure input for {action_type}." | |
current_display_document = None # Clear internal state for preview/download too | |
current_display_type = None | |
download_style = {'display': 'none'} # Ensure download button is hidden | |
# --- Actions Triggering Direct Generation (Shred, Pink, Red, Gold, LOE, Virtual Board) --- | |
else: | |
review_controls_style = {'display': 'none'} # Hide review controls | |
review_controls_children = [] | |
loading_style = {'visibility':'visible', 'height': '30px'} # Show loading dots | |
doc_preview_children = "" # Clear preview while loading/generating | |
status_message = f"Generating {action_type}..." | |
download_style = {'display': 'none'} # Hide download during generation | |
# Determine inputs based on action type | |
input_docs = [] | |
context_docs = [] | |
generation_possible = True | |
if action_type == "Shred": | |
source_docs_text = get_combined_uploaded_text() | |
if not source_docs_text: | |
status_message = "Error: Please upload source document(s) first." | |
generation_possible = False | |
else: | |
input_docs = [source_docs_text] | |
elif action_type == "Pink": | |
if not shredded_document: | |
status_message = "Error: Please 'Shred' the source documents first." | |
generation_possible = False | |
else: | |
input_docs = [get_combined_uploaded_text()] # Pink is based on source docs | |
context_docs = [shredded_document] # With context of shredded requirements | |
elif action_type == "Red": | |
if not shredded_document or not pink_review_document: | |
status_message = "Error: Please complete 'Shred' and 'Pink Review' first." | |
generation_possible = False | |
else: | |
# Red uses Pink Review findings as primary input to address them | |
input_docs = [pink_review_document] | |
# Context includes Shredded requirements and maybe original Pink? Let's stick to Shred+Review for now. | |
context_docs = [shredded_document] | |
elif action_type == "Gold": | |
if not shredded_document or not red_review_document: | |
status_message = "Error: Please complete 'Shred' and 'Red Review' first." | |
generation_possible = False | |
else: | |
# Gold uses Red Review findings as primary input | |
input_docs = [red_review_document] | |
context_docs = [shredded_document] | |
elif action_type in ["LOE", "Virtual Board"]: | |
if not shredded_document: | |
status_message = f"Error: Please 'Shred' the source documents first before generating {action_type}." | |
generation_possible = False | |
else: | |
# These likely only need the shredded requirements as input | |
input_docs = [shredded_document] | |
else: | |
status_message = f"Action '{action_type}' is not recognized for direct generation." | |
generation_possible = False | |
# Perform generation if possible | |
if generation_possible: | |
result_doc = generate_ai_document(action_type, input_docs, context_docs) | |
# Store result in the correct global variable AND current_display vars | |
if result_doc and not result_doc.startswith("Error:"): | |
current_display_document = result_doc # Set for preview/download/chat | |
current_display_type = action_type | |
if action_type == "Shred": shredded_document = result_doc | |
elif action_type == "Pink": pink_document = result_doc | |
elif action_type == "Red": red_document = result_doc | |
elif action_type == "Gold": gold_document = result_doc | |
elif action_type == "LOE": loe_document = result_doc | |
elif action_type == "Virtual Board": virtual_board_document = result_doc | |
# Reviews are handled separately | |
doc_preview_children = dcc.Markdown(result_doc, style={'wordWrap': 'break-word', 'overflowX': 'auto'}) # Allow horizontal scroll for wide tables | |
status_message = f"{action_type} generated successfully." | |
download_style = {'display': 'inline-block', 'marginRight': '10px'} # Show download button on success | |
else: | |
# If generation failed, result_doc contains the error message from generate_ai_document | |
doc_preview_children = html.Div(result_doc, className="text-danger") # Display error in preview | |
status_message = f"Error generating {action_type}. See preview for details." | |
current_display_document = result_doc # Show error in preview but prevent download/chat | |
current_display_type = action_type | |
download_style = {'display': 'none'} # Hide download button on error | |
else: | |
# Generation not possible due to prerequisites | |
doc_preview_children = html.Div(status_message, className="text-danger") | |
current_display_document = None # No doc generated | |
current_display_type = None | |
download_style = {'display': 'none'} | |
loading_style = {'visibility':'hidden', 'height': '30px'} # Hide loading dots when finished/failed | |
return review_controls_style, review_controls_children, status_message, doc_preview_children, loading_style, download_style | |
# 4. Toggle Review Upload Component Visibility based on Radio Button | |
def toggle_review_upload_visibility(radio_value, current_style): | |
# Preserves existing style attributes while toggling 'display' | |
new_style = current_style.copy() if current_style else {} | |
should_display = (radio_value == 'upload') | |
new_display_value = 'block' if should_display else 'none' | |
# Prevent update if display style is already correct | |
if current_style and ('display' in current_style and current_style['display'] == new_display_value): | |
raise PreventUpdate | |
else: | |
logging.debug(f"Toggling review upload visibility. Radio: {radio_value}, New display: {new_display_value}") | |
new_style['display'] = new_display_value | |
# Make sure other style defaults are present if creating new style dict | |
if not current_style: | |
new_style.update({ | |
'width': '100%', 'height': '60px', 'lineHeight': '60px', 'borderWidth': '1px', | |
'borderStyle': 'dashed', 'borderRadius': '5px', 'textAlign': 'center', 'margin': '10px 0', | |
'backgroundColor': '#f8f9fa' | |
}) | |
return new_style | |
# 5. Handle Upload of Document for Review Input | |
def handle_review_upload(contents, filename, review_controls_children): | |
global uploaded_pink_content, uploaded_red_content, uploaded_gold_content | |
if contents is None or filename is None or not review_controls_children: | |
# No file uploaded or controls not populated yet (e.g., user selected action but hasn't uploaded) | |
raise PreventUpdate | |
# Determine which review type this upload is for. Find the H5 title. | |
review_type = None | |
base_type = None | |
try: | |
# Assuming the first child is the H5 title like "Configure Input for Pink Review" | |
if isinstance(review_controls_children, list) and len(review_controls_children) > 0 and isinstance(review_controls_children[0], html.H5): | |
title_text = review_controls_children[0].children | |
# Extract "Pink Review", "Red Review", etc. | |
parts = title_text.split(" for ") | |
if len(parts) > 1: | |
review_type = parts[1].strip() | |
base_type = review_type.split(" ")[0] # Pink, Red, Gold | |
except Exception as e: | |
logging.warning(f"Could not reliably determine review type from review controls children: {e}") | |
if not review_type or not base_type: | |
logging.warning("handle_review_upload: Could not determine review type from controls.") | |
return html.Div("Error determining review type.", className="text-danger small"), "Internal error: Could not determine review type." | |
logging.info(f"Handling upload of file '{filename}' for {review_type} input.") | |
file_content_text, error = process_document(contents, filename) | |
upload_status_display = "" | |
status_bar_message = "" | |
# Clear previous uploads for this type before storing new one | |
if base_type == "Pink": uploaded_pink_content = None | |
elif base_type == "Red": uploaded_red_content = None | |
elif base_type == "Gold": uploaded_gold_content = None | |
if error: | |
status_bar_message = f"Error processing uploaded {base_type} file: {error}" | |
upload_status_display = html.Div(f"Failed to load {filename}: {error}", className="text-danger small") | |
elif file_content_text is None: # Handle case where process_document returns (None, None) | |
status_bar_message = f"Error processing uploaded {base_type} file: No content could be extracted." | |
upload_status_display = html.Div(f"Failed to load {filename}: No text extracted.", className="text-danger small") | |
else: | |
status_bar_message = f"Uploaded '{filename}' successfully for {review_type} input." | |
upload_status_display = html.Div(f"Using uploaded file: {filename}", className="text-success small") | |
# Store the content in the correct variable | |
if base_type == "Pink": uploaded_pink_content = file_content_text | |
elif base_type == "Red": uploaded_red_content = file_content_text | |
elif base_type == "Gold": uploaded_gold_content = file_content_text | |
logging.info(f"Stored uploaded content for {base_type} review input.") | |
return upload_status_display, status_bar_message | |
# 6. Generate Review Document on Button Click | |
def generate_review_document(n_clicks, source_option, button_ids): | |
global shredded_document, pink_document, red_document, gold_document | |
global pink_review_document, red_review_document, gold_review_document | |
global uploaded_pink_content, uploaded_red_content, uploaded_gold_content | |
global current_display_document, current_display_type # Update preview state | |
triggered_id_dict = callback_context.triggered_id | |
if not triggered_id_dict or not isinstance(triggered_id_dict, dict) or 'index' not in triggered_id_dict: | |
raise PreventUpdate | |
# Check if any click count is > 0 | |
if not n_clicks or not any(nc for nc in n_clicks if nc is not None): | |
raise PreventUpdate | |
review_type = triggered_id_dict['index'] # e.g., "Pink Review" | |
base_type = review_type.split(" ")[0] # e.g., "Pink" | |
logging.info(f"Generate button clicked for: {review_type}, Source option chosen: {source_option}") | |
doc_preview_children = "" # Clear preview | |
status_message = f"Generating {review_type}..." | |
loading_style = {'visibility':'visible', 'height': '30px'} # Show loading dots | |
download_style = {'display': 'none'} # Hide download initially | |
current_display_document = None # Clear display state | |
current_display_type = None | |
# --- Prerequisite Check --- | |
if not shredded_document: | |
status_message = "Error: 'Shredded' document is missing. Please perform 'Shred' first." | |
loading_style = {'visibility':'hidden', 'height': '30px'} | |
doc_preview_children = html.Div(status_message, className="text-danger") | |
return doc_preview_children, status_message, loading_style, download_style | |
# --- Determine Input Document based on Radio Choice --- | |
input_document_content = None | |
input_doc_source_name = "" # For logging/status messages | |
if source_option == 'generated': | |
input_doc_source_name = f"Generated {base_type} Document" | |
if base_type == "Pink": input_document_content = pink_document | |
elif base_type == "Red": input_document_content = red_document | |
elif base_type == "Gold": input_document_content = gold_document | |
if not input_document_content: | |
status_message = f"Error: Cannot use 'generated' option. The {input_doc_source_name} was not found (was it generated successfully?). Please generate or upload it." | |
loading_style = {'visibility':'hidden', 'height': '30px'} | |
doc_preview_children = html.Div(status_message, className="text-danger") | |
return doc_preview_children, status_message, loading_style, download_style | |
elif source_option == 'upload': | |
input_doc_source_name = f"Uploaded {base_type} Document" | |
if base_type == "Pink": input_document_content = uploaded_pink_content | |
elif base_type == "Red": input_document_content = uploaded_red_content | |
elif base_type == "Gold": input_document_content = uploaded_gold_content | |
if not input_document_content: | |
status_message = f"Error: Cannot use 'upload' option. No {base_type} document was successfully uploaded and processed for this review step. Please upload a valid file." | |
loading_style = {'visibility':'hidden', 'height': '30px'} | |
doc_preview_children = html.Div(status_message, className="text-danger") | |
return doc_preview_children, status_message, loading_style, download_style | |
else: | |
status_message = f"Error: Invalid source option '{source_option}' selected." | |
loading_style = {'visibility':'hidden', 'height': '30px'} | |
doc_preview_children = html.Div(status_message, className="text-danger") | |
return doc_preview_children, status_message, loading_style, download_style | |
# --- Generate Review Document --- | |
logging.info(f"Generating {review_type} using '{input_doc_source_name}' as input and Shredded document as context.") | |
# Reviews need the document being reviewed (Pink/Red/Gold) as primary input | |
# and the Shredded PWS as context/requirements basis. | |
review_result = generate_ai_document(review_type, [input_document_content], context_docs=[shredded_document]) | |
if review_result and not review_result.startswith("Error:"): | |
doc_preview_children = dcc.Markdown(review_result, style={'wordWrap': 'break-word', 'overflowX': 'auto'}) # Allow horizontal scroll | |
status_message = f"{review_type} generated successfully using {input_doc_source_name}." | |
# Store the result in the correct global variable | |
if review_type == "Pink Review": pink_review_document = review_result | |
elif review_type == "Red Review": red_review_document = review_result | |
elif review_type == "Gold Review": gold_review_document = review_result | |
# Update display state | |
current_display_document = review_result | |
current_display_type = review_type | |
download_style = {'display': 'inline-block', 'marginRight': '10px'} # Show download button | |
else: | |
# review_result contains the error message | |
doc_preview_children = html.Div(f"Error generating {review_type}: {review_result}", className="text-danger") | |
status_message = f"Failed to generate {review_type}. See preview for details." | |
current_display_document = review_result # Show error, but don't allow download/chat | |
current_display_type = review_type | |
download_style = {'display': 'none'} | |
loading_style = {'visibility':'hidden', 'height': '30px'} # Hide loading dots | |
return doc_preview_children, status_message, loading_style, download_style | |
# 7. Handle Chat Interaction (Send and Clear) | |
def handle_chat(send_clicks, clear_clicks, chat_input): | |
global current_display_document, current_display_type | |
# Also need to update the specific underlying document variable (e.g., pink_document) | |
# so the chat changes persist if that document is used later. | |
global shredded_document, pink_document, red_document, gold_document, pink_review_document, red_review_document, gold_review_document, loe_document, virtual_board_document | |
# Determine which button was pressed | |
ctx = callback_context | |
if not ctx.triggered: | |
raise PreventUpdate | |
button_id = ctx.triggered[0]['prop_id'].split('.')[0] | |
# --- Handle Clear Chat --- | |
if button_id == 'btn-clear-chat' and clear_clicks > 0: | |
logging.info("Clear chat button clicked.") | |
return html.Div("Chat cleared.", className="text-muted fst-italic"), no_update, "Chat cleared.", "" # Clear output, no preview change, update status, clear input | |
# --- Handle Send Chat --- | |
if button_id == 'btn-send-chat' and send_clicks > 0: | |
if not chat_input or not chat_input.strip(): | |
# No input to send | |
return html.Div("Please enter an instruction.", className="text-warning small"), no_update, "Chat instruction empty.", chat_input # Keep input | |
if not current_display_document or not current_display_type: | |
# No document currently loaded in the preview to refine | |
return html.Div("Error: No document is currently displayed to refine.", className="text-warning"), no_update, "Cannot refine: No document loaded in preview.", "" # Clear input | |
logging.info(f"Chat refinement requested for displayed document type: {current_display_type}. Instruction: '{chat_input[:100]}...'") | |
# Construct prompt for refinement | |
prompt = f"""**Objective:** Refine the following '{current_display_type}' document based *only* on the user's instruction below. | |
**Your Role:** Act as an editor making precise changes. | |
**Core Instructions:** | |
1. **Apply Instruction:** Modify the 'Original Document' solely based on the 'User Instruction'. | |
2. **Maintain Context:** Preserve the overall structure, tone, and format of the original document unless the instruction explicitly directs otherwise. | |
3. **Output Only Updated Document:** Provide *only* the complete, updated '{current_display_type}' document. Do not add any conversational text, preamble, or explanation of changes. Ensure the output format matches the original (e.g., Markdown table if original was a table). | |
**Original Document:** | |
```text | |
{current_display_document} | |
``` | |
**User Instruction:** | |
```text | |
{chat_input} | |
``` | |
**Begin Updated '{current_display_type}' Output:** | |
""" | |
try: | |
# Show loading indicator for chat refinement? (Optional, maybe use inner loading) | |
status_message = f"Refining {current_display_type} based on chat instruction..." | |
# Note: Using a separate loading indicator for chat output is possible but adds complexity. | |
# For now, rely on the main status bar. | |
response = model.generate_content(prompt) | |
# Handle potential safety blocks or empty responses (refined logic) | |
updated_document = "" | |
try: | |
if hasattr(response, 'text'): | |
updated_document = response.text | |
elif hasattr(response, 'parts') and response.parts: | |
updated_document = "".join(part.text for part in response.parts if hasattr(part, 'text')) | |
else: | |
finish_reason = getattr(response, 'prompt_feedback', {}).get('block_reason') or getattr(response, 'candidates', [{}])[0].get('finish_reason') | |
logging.warning(f"Gemini AI response for chat refinement of {current_display_type} has no text/parts. Finish Reason: {finish_reason}. Response: {response}") | |
updated_document = f"Error: AI returned no content during refinement. Possible reason: {finish_reason}. Check Gemini safety settings or instruction complexity." | |
except Exception as resp_err: | |
logging.error(f"Error extracting text from Gemini refinement response for {current_display_type}: {resp_err}", exc_info=True) | |
updated_document = f"Error: Could not parse AI response during refinement for {current_display_type}." | |
if not updated_document.strip() and not updated_document.startswith("Error:"): | |
logging.warning(f"Gemini AI returned empty text for chat refinement of {current_display_type}.") | |
updated_document = f"Error: AI returned empty content during refinement for {current_display_type}. Please try a different instruction." | |
# If refinement failed, show error in chat output, don't update preview | |
if updated_document.startswith("Error:"): | |
chat_response_display = html.Div(updated_document, className="text-danger") | |
status_message = f"Error refining {current_display_type} via chat." | |
return chat_response_display, no_update, status_message, "" # Clear input | |
# --- Successful Refinement --- | |
logging.info(f"Successfully refined {current_display_type} via chat.") | |
# CRITICAL: Update the correct underlying global variable | |
original_doc_updated = False | |
if current_display_type == "Shred": shredded_document = updated_document; original_doc_updated = True | |
elif current_display_type == "Pink": pink_document = updated_document; original_doc_updated = True | |
elif current_display_type == "Pink Review": pink_review_document = updated_document; original_doc_updated = True | |
elif current_display_type == "Red": red_document = updated_document; original_doc_updated = True | |
elif current_display_type == "Red Review": red_review_document = updated_document; original_doc_updated = True | |
elif current_display_type == "Gold": gold_document = updated_document; original_doc_updated = True | |
elif current_display_type == "Gold Review": gold_review_document = updated_document; original_doc_updated = True | |
elif current_display_type == "LOE": loe_document = updated_document; original_doc_updated = True | |
elif current_display_type == "Virtual Board": virtual_board_document = updated_document; original_doc_updated = True | |
if original_doc_updated: | |
logging.info(f"Updated the underlying global variable for {current_display_type} with chat refinement.") | |
else: | |
logging.warning(f"Could not map displayed type '{current_display_type}' to a specific global variable for persistent update after chat.") | |
# Update the preview display immediately | |
current_display_document = updated_document # Keep preview consistent | |
# Display confirmation in chat output area | |
chat_response_display = html.Div([ | |
html.Strong("Refinement applied successfully."), | |
html.Hr(), | |
html.Em("Preview above has been updated. The changes will be used if this document is input for subsequent steps.") | |
]) | |
status_message = f"{current_display_type} updated via chat instruction." | |
# Update the document preview itself | |
doc_preview_update = dcc.Markdown(updated_document, style={'wordWrap': 'break-word', 'overflowX': 'auto'}) | |
return chat_response_display, doc_preview_update, status_message, "" # Clear input field | |
except Exception as e: | |
logging.error(f"Error during chat refinement call for {current_display_type}: {e}", exc_info=True) | |
chat_response_display = html.Div(f"Error refining document via chat: {str(e)}", className="text-danger") | |
status_message = f"Error refining {current_display_type} via chat." | |
# Do not update the main document preview if chat refinement fails | |
return chat_response_display, no_update, status_message, "" # Clear input | |
# If no button was triggered (shouldn't normally happen with prevent_initial_call) | |
raise PreventUpdate | |
# 8. Handle Download Button Click | |
def download_generated_document(n_clicks): | |
"""Prepares the currently displayed document for download.""" | |
global current_display_document, current_display_type | |
if not n_clicks or current_display_document is None or current_display_type is None or current_display_document.startswith("Error:"): | |
# No clicks, nothing to download, or current display is an error message | |
raise PreventUpdate | |
logging.info(f"Download requested for displayed document: {current_display_type}") | |
# Sanitize filename | |
safe_filename_base = "".join(c if c.isalnum() else "_" for c in current_display_type) | |
# Determine if output should be spreadsheet (Excel) or document (Word) | |
is_spreadsheet_type = current_display_type in ["Shred", "Pink Review", "Red Review", "Gold Review", "Virtual Board", "LOE"] | |
if is_spreadsheet_type: | |
filename = f"{safe_filename_base}.xlsx" | |
logging.info(f"Attempting to format {current_display_type} as Excel.") | |
try: | |
# Use StringIO to treat the string data as a file for pandas | |
data_io = StringIO(current_display_document) | |
df = None | |
# Refined parsing logic: prioritize Markdown tables, then CSV/TSV | |
lines = [line.strip() for line in data_io.readlines() if line.strip()] | |
data_io.seek(0) # Reset pointer | |
# Attempt 1: Parse as Markdown table | |
header = [] | |
data = [] | |
header_found = False | |
separator_found = False | |
potential_header_line = -1 | |
potential_sep_line = -1 | |
for i, line in enumerate(lines): | |
if line.startswith('|') and line.endswith('|'): | |
parts = [p.strip() for p in line.strip('|').split('|')] | |
if not header_found and '---' not in line: # Potential header | |
header = parts | |
header_found = True | |
potential_header_line = i | |
elif header_found and '---' in line and potential_header_line == i - 1: # Separator line immediately follows header | |
separator_found = True | |
potential_sep_line = i | |
# Optional: Check column count match | |
if len(line.strip('|').split('|')) != len(header): | |
logging.warning("Markdown table header/separator column count mismatch detected.") | |
# Reset, might not be a valid table | |
header_found = False | |
separator_found = False | |
header = [] | |
continue | |
elif header_found and separator_found and potential_sep_line == i - 1: # Data line follows separator | |
if len(parts) == len(header): | |
data.append(parts) | |
else: | |
logging.warning(f"Markdown table row data mismatch (expected {len(header)}, got {len(parts)}): {line}") | |
# Decide whether to continue or break parsing for this row | |
else: # Doesn't fit the pattern, reset if we were in the middle of parsing a potential table | |
if header_found or separator_found: | |
logging.debug(f"Resetting Markdown parse state at line: {line}") | |
header_found = False | |
separator_found = False | |
header = [] | |
data = [] | |
if header and data: | |
df = pd.DataFrame(data, columns=header) | |
logging.info(f"Successfully parsed {current_display_type} as Markdown Table.") | |
else: | |
# Attempt 2: Try parsing as CSV/TSV | |
logging.warning(f"Could not parse {current_display_type} as Markdown Table. Attempting CSV/TSV parsing.") | |
data_io.seek(0) # Reset pointer | |
try: | |
# Read a sample to sniff delimiter | |
sniffer_sample = data_io.read(4096) # Read more data for better sniffing | |
data_io.seek(0) # Reset pointer after reading sample | |
dialect = pd.io.parsers.readers.csv.Sniffer().sniff(sniffer_sample, delimiters=',|\t') # Sniff common delimiters | |
df = pd.read_csv(data_io, sep=dialect.delimiter) | |
logging.info(f"Successfully parsed {current_display_type} using detected delimiter '{dialect.delimiter}'.") | |
except Exception as e_csv: | |
logging.warning(f"Could not parse {current_display_type} as standard CSV/TSV after Markdown attempt failed ({e_csv}). Sending as text.") | |
# Fallback: If no DataFrame could be created, send as text | |
return dict(content=current_display_document, filename=f"{safe_filename_base}.txt") | |
# If DataFrame was created, save to Excel | |
output = BytesIO() | |
# Use xlsxwriter engine for better compatibility/features | |
with pd.ExcelWriter(output, engine='xlsxwriter') as writer: | |
sheet_name = safe_filename_base[:31] # Use sanitized name, limit 31 chars | |
df.to_excel(writer, sheet_name=sheet_name, index=False) | |
# Auto-adjust column widths (optional, can be slow for large files) | |
worksheet = writer.sheets[sheet_name] | |
for idx, col in enumerate(df): # loop through columns | |
series = df[col] | |
max_len = max(( | |
series.astype(str).map(len).max(), # len of largest item | |
len(str(series.name)) # len of column name/header | |
)) + 1 # adding a little extra space | |
worksheet.set_column(idx, idx, max_len) # set column width | |
logging.info(f"Sending {filename} (Excel format)") | |
return dcc.send_bytes(output.getvalue(), filename) | |
except Exception as e_excel: | |
logging.error(f"Error creating Excel file for {current_display_type}: {e_excel}. Sending as text.", exc_info=True) | |
# Fallback to sending as a text file if any DataFrame/Excel processing fails | |
return dict(content=current_display_document, filename=f"{safe_filename_base}.txt") | |
else: # Assume DOCX for Pink, Red, Gold | |
filename = f"{safe_filename_base}.docx" | |
logging.info(f"Formatting {current_display_type} as DOCX.") | |
try: | |
doc = Document() | |
# Add paragraph by paragraph to potentially preserve some structure like line breaks | |
for paragraph_text in current_display_document.split('\n'): | |
# Add paragraph only if it contains non-whitespace characters | |
if paragraph_text.strip(): | |
doc.add_paragraph(paragraph_text) | |
else: | |
# Add an empty paragraph to represent blank lines | |
doc.add_paragraph() | |
# Save the document to an in-memory BytesIO object | |
output = BytesIO() | |
doc.save(output) | |
logging.info(f"Sending {filename} (DOCX format)") | |
return dcc.send_bytes(output.getvalue(), filename) # Use dcc.send_bytes for BytesIO | |
except Exception as e_docx: | |
logging.error(f"Error creating DOCX file for {current_display_type}: {e_docx}. Sending as text.", exc_info=True) | |
# Fallback to sending as a text file | |
return dict(content=current_display_document, filename=f"{safe_filename_base}.txt") | |
# --- Main Execution --- | |
# Always use this structure for running the app | |
if __name__ == '__main__': | |
print("Starting the Dash application...") | |
# Set debug=False for production/deployment environments like Hugging Face Spaces | |
# Set host='0.0.0.0' to make the app accessible on the network (required for Docker/Spaces) | |
# Default port 8050, using 7860 as often used for ML demos/Spaces | |
# Multi-threading for multiple simultaneous user support is handled by the deployment server (e.g., Gunicorn with workers), not directly in app.run for production. | |
# Set debug=True locally for development, False for HF Spaces | |
app.run(debug=False, host='0.0.0.0', port=7860) | |
print("Dash application has finished running.") |