Spaces:
Paused
Paused
import os | |
import base64 | |
import io | |
import dash | |
from dash import dcc, html, Input, Output, State, callback_context | |
import dash_bootstrap_components as dbc | |
import pandas as pd | |
import logging | |
from docx import Document | |
import mimetypes | |
from threading import Thread | |
import google.generativeai as genai | |
logging.basicConfig( | |
level=logging.INFO, | |
format='[%(asctime)s] %(levelname)s - %(message)s' | |
) | |
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]) | |
server = app.server | |
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY", "") | |
genai.configure(api_key=GOOGLE_API_KEY) | |
GEMINI_MODEL = "models/gemini-2.5-pro-preview-03-25" | |
MAX_INPUT_TOKENS = 1048576 | |
MAX_OUTPUT_TOKENS = 65536 | |
uploaded_documents = {} | |
uploaded_documents_fileid = {} | |
uploaded_proposals = {} | |
uploaded_proposals_fileid = {} | |
generated_documents = {} | |
shredded_documents = {} | |
shredded_document = None | |
generated_response = None | |
def decode_document(decoded_bytes): | |
try: | |
content = decoded_bytes.decode('utf-8') | |
logging.info("Document decoded as UTF-8.") | |
return content | |
except UnicodeDecodeError as e_utf8: | |
try: | |
content = decoded_bytes.decode('latin-1') | |
logging.warning("Document decoded as Latin-1 due to utf-8 decode error: %s", e_utf8) | |
return content | |
except Exception as e: | |
logging.error("Document decode failed for both utf-8 and latin-1: %s", e) | |
return None | |
def guess_mime_type(filename): | |
mime_type, _ = mimetypes.guess_type(filename) | |
if not mime_type: | |
ext = filename.lower().split('.')[-1] | |
if ext == "pdf": | |
mime_type = "application/pdf" | |
elif ext == "docx": | |
mime_type = "application/vnd.openxmlformats-officedocument.wordprocessingml.document" | |
elif ext == "doc": | |
mime_type = "application/msword" | |
elif ext == "xlsx": | |
mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
elif ext == "xls": | |
mime_type = "application/vnd.ms-excel" | |
else: | |
mime_type = "application/octet-stream" | |
return mime_type | |
def upload_to_gemini_file(decoded_bytes, filename): | |
try: | |
mime_type = guess_mime_type(filename) | |
file_obj = io.BytesIO(decoded_bytes) | |
file_obj.name = filename | |
myfile = genai.upload_file(file_obj, display_name=filename, mime_type=mime_type) | |
if hasattr(myfile, "name"): | |
logging.info(f"File uploaded to Gemini: {filename}, file_id: {myfile.name}") | |
return myfile.name | |
else: | |
logging.error(f"Gemini file upload did not return file name/id. Response: {myfile}") | |
return None | |
except Exception as e: | |
logging.error(f"Exception during file upload to Gemini: {e}") | |
return None | |
def gemini_generate_content(prompt, file_id=None, chat_input=None): | |
try: | |
files = [] | |
if file_id: | |
try: | |
gemini_file = genai.get_file(file_id) | |
files.append(gemini_file) | |
except Exception as e: | |
logging.error(f"Could not fetch Gemini file for id {file_id}: {e}") | |
content_list = [] | |
if files: | |
content_list.extend(files) | |
content_list.append("\n\n") | |
content_list.append(prompt) | |
model = genai.GenerativeModel(GEMINI_MODEL) | |
response = model.generate_content( | |
contents=content_list, | |
generation_config=genai.types.GenerationConfig( | |
max_output_tokens=MAX_OUTPUT_TOKENS | |
) | |
) | |
result = response.text if hasattr(response, "text") else str(response) | |
logging.info("Received response from Gemini.") | |
return result | |
except Exception as e: | |
logging.error("Error during Gemini generate_content: %s", e) | |
return f"Error during Gemini completion: {e}" | |
def save_shredded_as_docx(shredded_text, rfp_filename): | |
doc = Document() | |
doc.add_heading(f"Shredded Requirements for {rfp_filename}", 0) | |
for line in shredded_text.split('\n'): | |
doc.add_paragraph(line) | |
memf = io.BytesIO() | |
doc.save(memf) | |
memf.seek(0) | |
return memf.read() | |
def save_proposal_as_docx(proposal_text, base_filename): | |
doc = Document() | |
doc.add_heading(f"Proposal Response for {base_filename}", 0) | |
for line in proposal_text.split('\n'): | |
doc.add_paragraph(line) | |
memf = io.BytesIO() | |
doc.save(memf) | |
memf.seek(0) | |
return memf.read() | |
def process_document(action, selected_filename=None, chat_input=None, selected_proposal=None, selected_generated=None): | |
global shredded_document, generated_response | |
logging.info(f"Process document called with action: {action}") | |
doc_content = None | |
doc_fileid = None | |
if action in ["shred", "generate"]: | |
if selected_filename and selected_filename in uploaded_documents: | |
doc_content = uploaded_documents[selected_filename] | |
doc_fileid = uploaded_documents_fileid.get(selected_filename) | |
elif uploaded_documents: | |
doc_content = next(iter(uploaded_documents.values())) | |
selected_filename = next(iter(uploaded_documents.keys())) | |
doc_fileid = uploaded_documents_fileid.get(selected_filename) | |
else: | |
doc_content = None | |
doc_fileid = None | |
elif action == "proposal": | |
pass | |
if action == 'shred': | |
if not doc_content: | |
logging.warning("No uploaded document found for shredding.") | |
return "No document uploaded.", None, None | |
prompt = ( | |
"Analyze the following RFP/PWS/SOW/RFI and generate a requirements spreadsheet. " | |
"Identify requirements by action words like 'shall', 'will', 'perform', etc. Organize by PWS section and requirement. " | |
"Do not write as if responding to the proposal.\n" | |
) | |
if chat_input: | |
prompt += f"User additional instructions: {chat_input}\n" | |
prompt += f"\nFile Name: {selected_filename}\n\n" | |
result_holder = {"text": None, "docx_bytes": None, "docx_name": None} | |
def thread_shred(): | |
global shredded_document | |
shredded_document = "" | |
try: | |
logging.info("Starting Gemini generate_content for shredding.") | |
result = gemini_generate_content(prompt, file_id=doc_fileid, chat_input=chat_input) | |
shredded_document = result | |
logging.info("Document shredded successfully.") | |
docx_bytes = save_shredded_as_docx(result, selected_filename) | |
generated_docx_name = f"{os.path.splitext(selected_filename)[0]}_shredded.docx" | |
result_holder["text"] = result | |
result_holder["docx_bytes"] = docx_bytes | |
result_holder["docx_name"] = generated_docx_name | |
except Exception as e: | |
shredded_document = f"Error during shredding: {e}" | |
logging.error("Error in thread_shred: %s", e) | |
result_holder["text"] = shredded_document | |
shredded_document = "Shredding in progress..." | |
t = Thread(target=thread_shred) | |
t.start() | |
t.join() | |
return result_holder["text"], result_holder["docx_bytes"], result_holder["docx_name"] | |
elif action == 'generate': | |
if not shredded_document: | |
logging.warning("No shredded document found when generating response.") | |
return "Shredded document not available.", None, None | |
prompt = ( | |
"Create a highly detailed proposal response based on the following PWS requirements. " | |
"Be compliant and compelling. Focus on describing the approach, steps, workflow, people, processes, and technology. " | |
"Refer to research that validates the approach and cite sources with measurable outcomes.\n" | |
) | |
if chat_input: | |
prompt += f"User additional instructions: {chat_input}\n" | |
prompt += f"\nFile Name: {selected_filename}\n\n{shredded_document}" | |
result_holder = {"text": None, "docx_bytes": None, "docx_name": None} | |
def thread_generate(): | |
global generated_response | |
generated_response = "" | |
try: | |
logging.info("Starting Gemini generate_content for generation.") | |
result = gemini_generate_content(prompt, file_id=doc_fileid, chat_input=chat_input) | |
generated_response = result | |
logging.info("Proposal response generated successfully.") | |
docx_bytes = save_proposal_as_docx(result, selected_filename) | |
generated_docx_name = f"{os.path.splitext(selected_filename)[0]}_proposal.docx" | |
result_holder["text"] = result | |
result_holder["docx_bytes"] = docx_bytes | |
result_holder["docx_name"] = generated_docx_name | |
except Exception as e: | |
generated_response = f"Error during generation: {e}" | |
logging.error("Error in thread_generate: %s", e) | |
result_holder["text"] = generated_response | |
generated_response = "Generating response..." | |
t = Thread(target=thread_generate) | |
t.start() | |
t.join() | |
return result_holder["text"], result_holder["docx_bytes"], result_holder["docx_name"] | |
elif action == 'proposal': | |
rfp_content = None | |
generated_doc_content = None | |
rfp_filename = selected_filename | |
generated_docname = selected_generated | |
rfp_fileid = uploaded_documents_fileid.get(selected_filename) | |
gen_fileid = None | |
if not (selected_filename and selected_filename in uploaded_documents): | |
logging.warning("No RFP/SOW/PWS/RFI document selected for proposal action.") | |
return "No RFP/SOW/PWS/RFI document selected.", None, None | |
if not (selected_generated and selected_generated in generated_documents): | |
logging.warning("No generated document selected for proposal action.") | |
return "No generated document selected.", None, None | |
rfp_content = uploaded_documents[selected_filename] | |
gen_bytes = generated_documents[selected_generated] | |
try: | |
try: | |
docx_stream = io.BytesIO(gen_bytes) | |
doc = Document(docx_stream) | |
generated_doc_content = "\n".join([para.text for para in doc.paragraphs]) | |
except Exception as e: | |
try: | |
generated_doc_content = gen_bytes.decode('utf-8') | |
except Exception: | |
generated_doc_content = "<Unable to decode generated document.>" | |
except Exception as e: | |
generated_doc_content = "<Unable to read generated document.>" | |
logging.error(f"Failed to read generated document: {e}") | |
prompt = ( | |
"Respond to the following RFP/SOW/PWS/RFI provided and focus on the Generated Document provided by creating a highly detailed proposal response that follows each section and subsection header. " | |
"The response to each section and subsection will be compliant and compelling, focusing on describing the approach, and how the labor category uses a specific industry standard process in a workflow described in steps, and how technology is used. " | |
"You must show innovation in the approach. Refer to research that validates the approach and cite sources with measurable outcome. " | |
"Be sure to respond in paragraph format only, never use numbering, or bullets, only paragraph describing in detail.\n" | |
) | |
if chat_input: | |
prompt += f"User additional instructions: {chat_input}\n" | |
prompt += f"\n---\nRFP/SOW/PWS/RFI ({rfp_filename}):\n" | |
prompt += f"\n---\nGenerated Document ({generated_docname}):\n{generated_doc_content}\n" | |
logging.info(f"Sending proposal prompt to Gemini. RFP: {rfp_filename}, Generated Doc: {generated_docname}") | |
result_holder = {"text": None, "docx_bytes": None, "docx_name": None} | |
def thread_proposal(): | |
try: | |
logging.info("Connecting to Gemini for proposal.") | |
response = gemini_generate_content(prompt, file_id=rfp_fileid, chat_input=chat_input) | |
logging.info("Received proposal results from Gemini.") | |
docx_bytes = save_proposal_as_docx(response, rfp_filename) | |
generated_docx_name = f"{os.path.splitext(rfp_filename)[0]}_{os.path.splitext(generated_docname)[0]}_proposal.docx" | |
result_holder["text"] = response | |
result_holder["docx_bytes"] = docx_bytes | |
result_holder["docx_name"] = generated_docx_name | |
except Exception as e: | |
logging.error("Error during Gemini proposal request: %s", e) | |
result_holder["text"] = f"Error during Gemini completion: {e}" | |
t = Thread(target=thread_proposal) | |
t.start() | |
t.join() | |
return result_holder["text"], result_holder["docx_bytes"], result_holder["docx_name"] | |
elif action == 'compliance': | |
return "Compliance checking not implemented yet.", None, None | |
elif action == 'recover': | |
return "Recovery not implemented yet.", None, None | |
elif action == 'board': | |
return "Virtual board not implemented yet.", None, None | |
elif action == 'loe': | |
return "LOE estimation not implemented yet.", None, None | |
return "Action not implemented yet.", None, None | |
def get_uploaded_doc_list(docdict): | |
if not docdict: | |
return html.Div("No documents uploaded.", style={"wordWrap": "break-word"}) | |
doc_list = [] | |
for filename in docdict: | |
doc_list.append( | |
dbc.ListGroupItem([ | |
html.Span(filename, style={"wordWrap": "break-word"}), | |
dbc.Button("Delete", id={'type': 'delete-doc-btn', 'index': filename, 'group': 'rfp'}, size="sm", color="danger", className="float-end ms-2") | |
], className="d-flex justify-content-between align-items-center") | |
) | |
return dbc.ListGroup(doc_list, flush=True) | |
def get_shredded_doc_list(shreddict): | |
if not shreddict: | |
return html.Div("No shredded requirements yet.", style={"wordWrap": "break-word"}) | |
doc_list = [] | |
for filename in shreddict: | |
b64 = base64.b64encode(shreddict[filename]).decode('utf-8') | |
download_link = html.A( | |
f"Download {filename}", | |
href=f"data:application/vnd.openxmlformats-officedocument.wordprocessingml.document;base64,{b64}", | |
download=filename, | |
target="_blank", | |
style={"wordWrap": "break-word", "marginRight": "10px"} | |
) | |
doc_list.append( | |
dbc.ListGroupItem([ | |
html.Span(filename, style={"wordWrap": "break-word", "marginRight": "10px"}), | |
download_link, | |
dbc.Button("Delete", id={'type': 'delete-shredded-btn', 'index': filename, 'group': 'shredded'}, size="sm", color="danger", className="float-end ms-2") | |
], className="d-flex justify-content-between align-items-center") | |
) | |
return dbc.ListGroup(doc_list, flush=True) | |
def get_uploaded_proposal_list(docdict): | |
if not docdict: | |
return html.Div("No proposal documents uploaded.", style={"wordWrap": "break-word"}) | |
doc_list = [] | |
for filename in docdict: | |
doc_list.append( | |
dbc.ListGroupItem([ | |
html.Span(filename, style={"wordWrap": "break-word"}), | |
dbc.Button("Delete", id={'type': 'delete-proposal-btn', 'index': filename, 'group': 'proposal'}, size="sm", color="danger", className="float-end ms-2") | |
], className="d-flex justify-content-between align-items-center") | |
) | |
return dbc.ListGroup(doc_list, flush=True) | |
def get_generated_doc_list(docdict): | |
if not docdict: | |
return html.Div("No generated documents yet.", style={"wordWrap": "break-word"}) | |
doc_list = [] | |
for filename in docdict: | |
doc_list.append( | |
dbc.ListGroupItem([ | |
html.Span(filename, style={"wordWrap": "break-word"}), | |
dbc.Button("Delete", id={'type': 'delete-generated-btn', 'index': filename, 'group': 'generated'}, size="sm", color="danger", className="float-end ms-2") | |
], className="d-flex justify-content-between align-items-center") | |
) | |
return dbc.ListGroup(doc_list, flush=True) | |
app.layout = dbc.Container([ | |
dbc.Row([ | |
dbc.Col([ | |
dbc.Card([ | |
dbc.CardHeader(html.H5("RFP/SOW/PWS/RFI")), | |
dbc.CardBody([ | |
html.Div(get_uploaded_doc_list(uploaded_documents), id='uploaded-doc-list'), | |
dcc.Dropdown( | |
id='select-document-dropdown', | |
options=[{'label': fn, 'value': fn} for fn in uploaded_documents.keys()], | |
placeholder="Select a document to work with", | |
value=next(iter(uploaded_documents), None), | |
style={"marginBottom": "10px"} | |
), | |
dcc.Upload( | |
id='upload-document', | |
children=html.Div([ | |
'Drag and Drop or ', | |
html.A('Select Files') | |
]), | |
style={ | |
'width': '100%', | |
'height': '60px', | |
'lineHeight': '60px', | |
'borderWidth': '1px', | |
'borderStyle': 'dashed', | |
'borderRadius': '5px', | |
'textAlign': 'center', | |
'margin': '10px' | |
}, | |
multiple=False | |
), | |
html.Hr(), | |
html.H6("Shredded Requirements"), | |
html.Div(get_shredded_doc_list(shredded_documents), id='shredded-doc-list') | |
]) | |
], className="mb-3"), | |
dbc.Card([ | |
dbc.CardHeader(html.H5("Proposal")), | |
dbc.CardBody([ | |
html.Div(get_uploaded_proposal_list(uploaded_proposals), id='uploaded-proposal-list'), | |
dcc.Dropdown( | |
id='select-proposal-dropdown', | |
options=[{'label': fn, 'value': fn} for fn in uploaded_proposals.keys()], | |
placeholder="Select a proposal document", | |
value=next(iter(uploaded_proposals), None), | |
style={"marginBottom": "10px"} | |
), | |
dcc.Upload( | |
id='upload-proposal', | |
children=html.Div([ | |
'Drag and Drop or ', | |
html.A('Select Files') | |
]), | |
style={ | |
'width': '100%', | |
'height': '60px', | |
'lineHeight': '60px', | |
'borderWidth': '1px', | |
'borderStyle': 'dashed', | |
'borderRadius': '5px', | |
'textAlign': 'center', | |
'margin': '10px' | |
}, | |
multiple=False | |
) | |
]) | |
], className="mb-3"), | |
dbc.Card([ | |
dbc.CardHeader(html.H5("Generated Documents")), | |
dbc.CardBody([ | |
html.Div(get_generated_doc_list(generated_documents), id='generated-doc-list'), | |
dcc.Dropdown( | |
id='select-generated-dropdown', | |
options=[{'label': fn, 'value': fn} for fn in generated_documents.keys()], | |
placeholder="Select a generated document", | |
value=next(iter(generated_documents), None), | |
style={"marginBottom": "10px"} | |
), | |
]) | |
], className="mb-3") | |
], style={'minWidth': '260px', 'width':'30vw','maxWidth':'30vw'}, width=3), | |
dbc.Col([ | |
dbc.Card([ | |
dbc.CardHeader(html.H2("RFP Proposal Assistant", style={'wordWrap': 'break-word'})), | |
dbc.CardBody([ | |
dbc.Form([ | |
dbc.Textarea(id="chat-input", placeholder="Enter additional instructions...", style={"width":"100%", "wordWrap": "break-word"}, className="mb-2"), | |
]), | |
html.Div([ | |
dbc.Button("Shred", id="shred-action-btn", className="me-3 mb-2 btn-primary"), | |
dbc.Button("Proposal", id="proposal-action-btn", className="me-3 mb-2 btn-secondary"), | |
dbc.Button("Compliance", id="compliance-action-btn", className="me-3 mb-2 btn-tertiary"), | |
dbc.Button("Recover", id="recover-action-btn", className="me-3 mb-2 btn-tertiary"), | |
dbc.Button("Virtual Board", id="board-action-btn", className="me-3 mb-2 btn-tertiary"), | |
dbc.Button("LOE", id="loe-action-btn", className="mb-2 btn-tertiary"), | |
], className="mt-3 mb-3 d-flex flex-wrap"), | |
dcc.Loading( | |
id="loading", | |
type="default", | |
children=html.Div(id="output-data-upload"), | |
style={"textAlign": "center"} | |
) | |
]) | |
], style={'backgroundColor': 'white'}) | |
], style={'width':'70vw','maxWidth':'70vw'}, width=9) | |
], style={'marginTop':'20px'}) | |
], fluid=True) | |
def master_callback( | |
rfp_content, rfp_filename, rfp_delete_clicks, selected_doc, | |
proposal_content, proposal_filename, proposal_delete_clicks, selected_proposal, | |
generated_delete_clicks, selected_generated, generated_options, | |
shredded_delete_clicks, shredded_doc_children, | |
shred_clicks, proposal_clicks, compliance_clicks, recover_clicks, board_clicks, loe_clicks, | |
selected_generated_dropdown, | |
chat_input, selected_filename, selected_proposal_dropdown, selected_generated_dropdown_state | |
): | |
ctx = callback_context | |
triggered_id = ctx.triggered[0]['prop_id'].split('.')[0] if ctx.triggered else None | |
upload_triggered = False | |
if triggered_id == 'upload-document' and rfp_content is not None and rfp_filename: | |
content_type, content_string = rfp_content.split(',') | |
decoded = base64.b64decode(content_string) | |
text = decode_document(decoded) | |
fileid = None | |
if rfp_filename.lower().endswith(('.pdf', '.docx', '.xlsx', '.xls')): | |
fileid = upload_to_gemini_file(decoded, rfp_filename) | |
if text is not None: | |
uploaded_documents[rfp_filename] = text | |
if fileid: | |
uploaded_documents_fileid[rfp_filename] = fileid | |
logging.info(f"Document uploaded: {rfp_filename}") | |
else: | |
logging.error(f"Failed to decode uploaded document: {rfp_filename}") | |
upload_triggered = True | |
if triggered_id == 'upload-proposal' and proposal_content is not None and proposal_filename: | |
content_type, content_string = proposal_content.split(',') | |
decoded = base64.b64decode(content_string) | |
text = decode_document(decoded) | |
fileid = None | |
if proposal_filename.lower().endswith(('.pdf', '.docx', '.xlsx', '.xls')): | |
fileid = upload_to_gemini_file(decoded, proposal_filename) | |
if text is not None: | |
uploaded_proposals[proposal_filename] = text | |
if fileid: | |
uploaded_proposals_fileid[proposal_filename] = fileid | |
logging.info(f"Proposal uploaded: {proposal_filename}") | |
else: | |
logging.error(f"Failed to decode uploaded proposal: {proposal_filename}") | |
upload_triggered = True | |
if triggered_id and isinstance(ctx.inputs_list[2], list): | |
for i, n_click in enumerate(rfp_delete_clicks): | |
if n_click: | |
btn_id = ctx.inputs_list[2][i]['id'] | |
del_filename = btn_id['index'] | |
if del_filename in uploaded_documents: | |
del uploaded_documents[del_filename] | |
if del_filename in uploaded_documents_fileid: | |
try: | |
genai.delete_file(uploaded_documents_fileid[del_filename]) | |
except Exception as e: | |
logging.warning(f"Failed to delete Gemini file {del_filename}: {e}") | |
del uploaded_documents_fileid[del_filename] | |
logging.info(f"Document deleted: {del_filename}") | |
if selected_doc == del_filename: | |
selected_doc = next(iter(uploaded_documents), None) | |
upload_triggered = True | |
break | |
if triggered_id and isinstance(ctx.inputs_list[6], list): | |
for i, n_click in enumerate(proposal_delete_clicks): | |
if n_click: | |
btn_id = ctx.inputs_list[6][i]['id'] | |
del_filename = btn_id['index'] | |
if del_filename in uploaded_proposals: | |
del uploaded_proposals[del_filename] | |
if del_filename in uploaded_proposals_fileid: | |
try: | |
genai.delete_file(uploaded_proposals_fileid[del_filename]) | |
except Exception as e: | |
logging.warning(f"Failed to delete Gemini proposal file {del_filename}: {e}") | |
del uploaded_proposals_fileid[del_filename] | |
logging.info(f"Proposal deleted: {del_filename}") | |
if selected_proposal == del_filename: | |
selected_proposal = next(iter(uploaded_proposals), None) | |
upload_triggered = True | |
break | |
if triggered_id and isinstance(ctx.inputs_list[10], list): | |
for i, n_click in enumerate(generated_delete_clicks): | |
if n_click: | |
btn_id = ctx.inputs_list[10][i]['id'] | |
del_filename = btn_id['index'] | |
if del_filename in generated_documents: | |
del generated_documents[del_filename] | |
logging.info(f"Generated doc deleted: {del_filename}") | |
if selected_generated == del_filename: | |
selected_generated = next(iter(generated_documents), None) | |
upload_triggered = True | |
break | |
if triggered_id and isinstance(ctx.inputs_list[12], list): | |
for i, n_click in enumerate(shredded_delete_clicks): | |
if n_click: | |
btn_id = ctx.inputs_list[12][i]['id'] | |
del_filename = btn_id['index'] | |
if del_filename in shredded_documents: | |
del shredded_documents[del_filename] | |
logging.info(f"Shredded doc deleted: {del_filename}") | |
upload_triggered = True | |
break | |
doc_options = [{'label': fn, 'value': fn} for fn in uploaded_documents.keys()] | |
doc_value = selected_doc if selected_doc in uploaded_documents else (next(iter(uploaded_documents), None) if uploaded_documents else None) | |
proposal_options = [{'label': fn, 'value': fn} for fn in uploaded_proposals.keys()] | |
proposal_value = selected_proposal if selected_proposal in uploaded_proposals else (next(iter(uploaded_proposals), None) if uploaded_proposals else None) | |
generated_doc_options = [{'label': fn, 'value': fn} for fn in generated_documents.keys()] | |
generated_doc_value = selected_generated if selected_generated in generated_documents else (next(iter(generated_documents), None) if generated_documents else None) | |
shredded_doc_list_items = get_shredded_doc_list(shredded_documents) | |
uploaded_doc_list = get_uploaded_doc_list(uploaded_documents) | |
uploaded_proposal_list = get_uploaded_proposal_list(uploaded_proposals) | |
generated_doc_list = get_generated_doc_list(generated_documents) | |
output_data_upload = html.Div("No action taken yet.", style={"wordWrap": "break-word"}) | |
action_buttons = [ | |
'shred-action-btn', 'proposal-action-btn', 'compliance-action-btn', | |
'recover-action-btn', 'board-action-btn', 'loe-action-btn' | |
] | |
if triggered_id in action_buttons: | |
result = "" | |
generated_docx_bytes = None | |
generated_docx_name = None | |
new_selected_generated = generated_doc_value | |
if triggered_id == 'shred-action-btn': | |
result, shredded_docx_bytes, shredded_docx_name = process_document('shred', selected_filename, chat_input) | |
if shredded_docx_bytes and shredded_docx_name: | |
shredded_documents[shredded_docx_name] = shredded_docx_bytes | |
logging.info(f"Shredded docx saved: {shredded_docx_name}") | |
shredded_doc_list_items = get_shredded_doc_list(shredded_documents) | |
output_data_upload = dcc.Markdown(result, style={"whiteSpace": "pre-wrap", "wordWrap": "break-word"}) | |
elif triggered_id == 'proposal-action-btn': | |
rfp_doc = selected_filename | |
gen_doc = selected_generated_dropdown_state | |
logging.info(f"Starting proposal streaming with RFP: {rfp_doc}, Generated Doc: {gen_doc}") | |
result, generated_docx_bytes, generated_docx_name = process_document( | |
'proposal', | |
rfp_doc, | |
chat_input, | |
None, | |
gen_doc | |
) | |
if generated_docx_bytes and generated_docx_name: | |
generated_documents[generated_docx_name] = generated_docx_bytes | |
logging.info(f"Generated proposal docx saved: {generated_docx_name}") | |
new_selected_generated = generated_docx_name | |
generated_doc_options = [{'label': fn, 'value': fn} for fn in generated_documents.keys()] | |
generated_doc_value = new_selected_generated if new_selected_generated in generated_documents else (next(iter(generated_documents), None) if generated_documents else None) | |
generated_doc_list = get_generated_doc_list(generated_documents) | |
output_data_upload = dcc.Markdown(result, style={"whiteSpace": "pre-wrap", "wordWrap": "break-word"}) | |
elif triggered_id == 'compliance-action-btn': | |
result, _, _ = process_document('compliance', selected_filename, chat_input) | |
output_data_upload = html.Div(result, style={"wordWrap": "break-word"}) | |
elif triggered_id == 'recover-action-btn': | |
result, _, _ = process_document('recover', selected_filename, chat_input) | |
output_data_upload = html.Div(result, style={"wordWrap": "break-word"}) | |
elif triggered_id == 'board-action-btn': | |
result, _, _ = process_document('board', selected_filename, chat_input) | |
output_data_upload = html.Div(result, style={"wordWrap": "break-word"}) | |
elif triggered_id == 'loe-action-btn': | |
result, _, _ = process_document('loe', selected_filename, chat_input) | |
output_data_upload = html.Div(result, style={"wordWrap": "break-word"}) | |
else: | |
result = "Action not implemented yet." | |
output_data_upload = html.Div(result, style={"wordWrap": "break-word"}) | |
elif triggered_id == 'select-generated-dropdown': | |
sel_gen = selected_generated_dropdown | |
if not sel_gen or sel_gen not in generated_documents: | |
output_data_upload = html.Div("No generated document selected.", style={"wordWrap": "break-word"}) | |
else: | |
docx_bytes = generated_documents[sel_gen] | |
b64 = base64.b64encode(docx_bytes).decode('utf-8') | |
download_link = html.A( | |
f"Download {sel_gen}", | |
href=f"data:application/vnd.openxmlformats-officedocument.wordprocessingml.document;base64,{b64}", | |
download=sel_gen, | |
target="_blank", | |
style={"wordWrap": "break-word"} | |
) | |
output_data_upload = html.Div([ | |
html.Div(download_link, style={"marginBottom": "15px"}), | |
html.Div("Preview not available for docx. Download to view.", style={"wordWrap": "break-word"}) | |
]) | |
elif upload_triggered: | |
output_data_upload = html.Div("Upload/Delete completed.", style={"wordWrap": "break-word"}) | |
return ( | |
uploaded_doc_list, | |
doc_options, | |
doc_value, | |
proposal_options, | |
proposal_value, | |
uploaded_proposal_list, | |
generated_doc_list, | |
generated_doc_options, | |
generated_doc_value, | |
shredded_doc_list_items, | |
output_data_upload | |
) | |
if __name__ == '__main__': | |
print("Starting the Dash application...") | |
app.run(debug=True, host='0.0.0.0', port=7860, threaded=True) | |
print("Dash application has finished running.") |