Spaces:
Paused
Paused
import base64 | |
import io | |
import os | |
import pandas as pd | |
from docx import Document | |
from io import BytesIO | |
import dash | |
import dash_bootstrap_components as dbc | |
from dash import html, dcc, Input, Output, State, callback_context | |
import google.generativeai as genai | |
from docx.shared import Pt | |
from docx.enum.style import WD_STYLE_TYPE | |
from PyPDF2 import PdfReader | |
from io import StringIO | |
# Initialize Dash app | |
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]) | |
# Configure Gemini AI | |
genai.configure(api_key=os.environ["GEMINI_API_KEY"]) | |
model = genai.GenerativeModel('gemini-2.5-pro-preview-03-25') | |
# Global variables | |
uploaded_files = {} | |
current_document = None | |
document_type = None | |
shredded_document = None | |
generated_documents = {} | |
# Document types and their descriptions | |
document_types = { | |
"Shred": "Generate a requirements spreadsheet of the Project Work Statement (PWS) identified by action words like shall, will, perform etc. by pws section, requirement. Do not write as if you're responding to the proposal. Its a spreadsheet to distill the requirements, not microhealth's approach", | |
"Pink": "Create a Pink Team document based on the PWS outline. Your goal is to be compliant and compelling.", | |
"Pink Review": "Evaluate compliance of the Pink Team document against the requirements and output a spreadsheet of non compliant findings by pws number, the goal of that pws section, what made it non compliant and your recommendations for recovery", | |
"Red": "Produce a Red Team document based on the Pink Review by pws sections. Your goal is to be compliant and compelling by recovering all the findings in Pink Review", | |
"Red Review": "Evaluate compliance of the Red Team document against the requirements and output a spreadsheet of non compliant findings by pws number, the goal of that pws section, what made it non compliant and your recommendations for recovery", | |
"Gold": "Create a Gold Team document based on the PWS response by pws sections. Your goal is to be compliant and compelling by recovering all the findings in Red Review", | |
"Gold Review": "Perform a final compliance review against the requirements and output a spreadsheet of non compliant findings by pws number, the goal of that pws section, what made it non compliant and your recommendations for recovery", | |
"Virtual Board": "Based on the requirements and in particular the evaulation criteria, you will evaluate the proposal as if you were a contracting office and provide section by section evaluation as unsatisfactory, satisfactory, good, very good, excellent and why in a spreadsheet", | |
"LOE": "Generate a Level of Effort (LOE) breakdown as a spreadsheet" | |
} | |
app.layout = dbc.Container([ | |
dbc.Row([ | |
dbc.Col([ | |
html.H4("Proposal Documents", className="mt-3 mb-4"), | |
dcc.Upload( | |
id='upload-document', | |
children=html.Div([ | |
'Drag and Drop or ', | |
html.A('Select Files') | |
]), | |
style={ | |
'width': '100%', | |
'height': '60px', | |
'lineHeight': '60px', | |
'borderWidth': '1px', | |
'borderStyle': 'dashed', | |
'borderRadius': '5px', | |
'textAlign': 'center', | |
'margin': '10px 0' | |
}, | |
multiple=True | |
), | |
html.Div(id='file-list'), | |
html.Hr(), | |
html.Div([ | |
dbc.Button( | |
doc_type, | |
id=f'btn-{doc_type.lower().replace("_", "-")}', | |
color="link", | |
className="mb-2 w-100 text-left custom-button", | |
style={'overflow': 'hidden', 'text-overflow': 'ellipsis', 'white-space': 'nowrap'} | |
) for doc_type in document_types.keys() | |
]) | |
], width=3), | |
dbc.Col([ | |
html.Div(id='status-bar', className="alert alert-info", style={'marginBottom': '20px'}), | |
dcc.Loading( | |
id="loading-indicator", | |
type="dot", | |
children=[html.Div(id="loading-output")] | |
), | |
html.Div(id='document-upload-area', style={'display': 'none'}, children=[ | |
dcc.Upload( | |
id='upload-specific-document', | |
children=html.Div(['Drag and Drop or ', html.A('Select Document')]), | |
style={ | |
'width': '100%', | |
'height': '60px', | |
'lineHeight': '60px', | |
'borderWidth': '1px', | |
'borderStyle': 'dashed', | |
'borderRadius': '5px', | |
'textAlign': 'center', | |
'margin': '10px 0' | |
}, | |
multiple=False | |
), | |
html.Div(id='specific-document-name') | |
]), | |
html.Div(id='document-choice', style={'display': 'none'}, children=[ | |
dcc.RadioItems( | |
id='document-source', | |
options=[ | |
{'label': 'Use Generated Document', 'value': 'generated'}, | |
{'label': 'Use Uploaded Document', 'value': 'uploaded'} | |
], | |
value='generated' | |
) | |
]), | |
dbc.Button("Process Document", id="btn-process-document", color="primary", className="mt-3", style={'display': 'none'}), | |
html.Div(id='document-preview', className="border p-3 mb-3"), | |
dbc.Button("Download Document", id="btn-download", color="success", className="mt-3"), | |
dcc.Download(id="download-document"), | |
html.Hr(), | |
dcc.Loading( | |
id="chat-loading", | |
type="dot", | |
children=[ | |
dbc.Input(id="chat-input", type="text", placeholder="Chat with AI to update document...", className="mb-2"), | |
dbc.Button("Send", id="btn-send-chat", color="primary", className="mb-3"), | |
html.Div(id="chat-output") | |
] | |
) | |
], width=9) | |
]) | |
], fluid=True) | |
def process_document(contents, filename): | |
content_type, content_string = contents.split(',') | |
decoded = base64.b64decode(content_string) | |
try: | |
if filename.lower().endswith('.docx'): | |
doc = Document(BytesIO(decoded)) | |
text = "\n".join([para.text for para in doc.paragraphs]) | |
return text | |
elif filename.lower().endswith('.pdf'): | |
pdf = PdfReader(BytesIO(decoded)) | |
text = "" | |
for page in pdf.pages: | |
text += page.extract_text() | |
return text | |
else: | |
return f"Unsupported file format: {filename}. Please upload a PDF or DOCX file." | |
except Exception as e: | |
return f"Error processing document: {str(e)}" | |
def update_output(list_of_contents, list_of_names, existing_files): | |
global uploaded_files, shredded_document | |
if list_of_contents is not None: | |
new_files = [] | |
for i, (content, name) in enumerate(zip(list_of_contents, list_of_names)): | |
file_content = process_document(content, name) | |
uploaded_files[name] = file_content | |
new_files.append(html.Div([ | |
html.Button('×', id={'type': 'remove-file', 'index': name}, style={'marginRight': '5px', 'fontSize': '10px'}), | |
html.Span(name) | |
])) | |
if existing_files is None: | |
existing_files = [] | |
shredded_document = None # Reset shredded document when new files are uploaded | |
return existing_files + new_files, "Document uploaded. Please select a document type to proceed." | |
return existing_files, "Please upload a document to begin." | |
def remove_file(n_clicks, existing_files): | |
global uploaded_files, shredded_document | |
ctx = dash.callback_context | |
if not ctx.triggered: | |
raise dash.exceptions.PreventUpdate | |
removed_file = ctx.triggered[0]['prop_id'].split(',')[0].split(':')[-1].strip('}') | |
uploaded_files.pop(removed_file, None) | |
shredded_document = None # Reset shredded document when a file is removed | |
return [file for file in existing_files if file['props']['children'][1]['props']['children'] != removed_file], "Document removed. Please upload a document to begin." | |
def show_document_options(*args): | |
ctx = dash.callback_context | |
if not ctx.triggered: | |
raise dash.exceptions.PreventUpdate | |
button_id = ctx.triggered[0]['prop_id'].split('.')[0] | |
document_type = button_id.replace('btn-', '').replace('-', '_').title() | |
if document_type == "Shred": | |
return {'display': 'none'}, {'display': 'none'}, {'display': 'block'} | |
else: | |
return {'display': 'block'}, {'display': 'block'}, {'display': 'block'} | |
def generate_document_preview(n_clicks, doc_source, uploaded_doc_name, *args): | |
global current_document, document_type, shredded_document, generated_documents | |
ctx = dash.callback_context | |
if not ctx.triggered: | |
raise dash.exceptions.PreventUpdate | |
# Determine which document type button was last clicked | |
for i, arg in enumerate(args): | |
if arg is not None and arg > 0: | |
document_type = list(document_types.keys())[i] | |
break | |
if not uploaded_files and document_type != "Shred": | |
return html.Div("Please upload a document first."), "", "Please upload a document first." | |
try: | |
if document_type == "Shred": | |
file_contents = list(uploaded_files.values()) | |
shredded_document = generate_document(document_type, file_contents) | |
generated_documents['Shred'] = shredded_document | |
current_document = shredded_document | |
elif doc_source == 'uploaded' and uploaded_doc_name: | |
current_document = process_document(uploaded_doc_name, uploaded_doc_name) | |
else: | |
if document_type == "Pink": | |
current_document = generate_document(document_type, [shredded_document]) | |
elif document_type == "Red": | |
if 'Pink Review' not in generated_documents: | |
return html.Div("Please complete Pink Review first."), "", "Please complete Pink Review first." | |
current_document = generate_document(document_type, [generated_documents['Pink Review'], shredded_document]) | |
elif document_type == "Gold": | |
if 'Red Review' not in generated_documents: | |
return html.Div("Please complete Red Review first."), "", "Please complete Red Review first." | |
current_document = generate_document(document_type, [generated_documents['Red Review'], shredded_document]) | |
elif document_type in ["Virtual Board", "LOE"]: | |
if 'Gold' not in generated_documents: | |
return html.Div("Please complete Gold document first."), "", "Please complete Gold document first." | |
current_document = generate_document(document_type, [generated_documents['Gold'], shredded_document]) | |
else: | |
current_document = generate_document(document_type, [shredded_document]) | |
generated_documents[document_type] = current_document | |
return dcc.Markdown(current_document), f"{document_type} generated", f"{document_type} document generated successfully." | |
except Exception as e: | |
print(f"Error generating document: {str(e)}") | |
return html.Div(f"Error generating document: {str(e)}"), "Error", "An error occurred while generating the document." | |
def update_specific_document_name(contents, filename): | |
if contents is not None: | |
return filename | |
return "" | |
def update_document_via_chat(n_clicks, chat_input): | |
global current_document, document_type | |
if not chat_input or current_document is None: | |
raise dash.exceptions.PreventUpdate | |
prompt = f"""Update the following {document_type} based on this instruction: {chat_input} | |
Current document: | |
{current_document} | |
Instructions: | |
1. Provide the updated document content. | |
2. Maintain proper formatting and structure. | |
3. Incorporate the requested changes seamlessly. | |
Now, provide the updated {document_type}: | |
""" | |
response = model.generate_content(prompt) | |
current_document = response.text | |
return f"Document updated based on: {chat_input}", dcc.Markdown(current_document) | |
def download_document(n_clicks): | |
global current_document, document_type | |
if current_document is None: | |
raise dash.exceptions.PreventUpdate | |
if document_type in ["LOE", "Pink Review", "Red Review", "Gold Review", "Virtual Board", "Shred"]: | |
# Create a pandas DataFrame for spreadsheet-type documents | |
df = pd.read_csv(StringIO(current_document)) | |
# Save the DataFrame to an Excel file | |
output = BytesIO() | |
with pd.ExcelWriter(output, engine='xlsxwriter') as writer: | |
df.to_excel | |
def download_document(n_clicks): | |
global current_document, document_type | |
if current_document is None: | |
raise dash.exceptions.PreventUpdate | |
if document_type in ["LOE", "Pink Review", "Red Review", "Gold Review", "Virtual Board", "Shred"]: | |
# Create a pandas DataFrame for spreadsheet-type documents | |
df = pd.read_csv(StringIO(current_document)) | |
# Save the DataFrame to an Excel file | |
output = BytesIO() | |
with pd.ExcelWriter(output, engine='xlsxwriter') as writer: | |
df.to_excel(writer, sheet_name=document_type, index=False) | |
return dcc.send_bytes(output.getvalue(), f"{document_type}.xlsx") | |
else: | |
# Create an in-memory Word document | |
doc = Document() | |
doc.add_paragraph(current_document) | |
# Save the document to a BytesIO object | |
output = BytesIO() | |
doc.save(output) | |
return dcc.send_bytes(output.getvalue(), f"{document_type}.docx") | |
if __name__ == '__main__': | |
print("Starting the Dash application...") | |
app.run(debug=False, host='0.0.0.0', port=7860) | |
print("Dash application has finished running.") |