proposal-writer / app.py
bluenevus's picture
Update app.py via AI Editor
cedcdf8
raw
history blame
24.5 kB
import os
import base64
import io
import dash
from dash import dcc, html, Input, Output, State, callback_context
import dash_bootstrap_components as dbc
import pandas as pd
import anthropic
from threading import Thread
import logging
from docx import Document
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s - %(message)s'
)
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])
server = app.server
ANTHROPIC_KEY = os.environ.get("ANTHROPIC_API_KEY", "")
anthropic_client = anthropic.Anthropic(api_key=ANTHROPIC_KEY)
CLAUDE3_SONNET_MODEL = "claude-3-7-sonnet-20250219"
CLAUDE3_MAX_CONTEXT_TOKENS = 200_000
CLAUDE3_MAX_OUTPUT_TOKENS = 64_000
uploaded_documents = {}
uploaded_proposals = {}
generated_documents = {}
shredded_document = None
generated_response = None
def decode_document(decoded_bytes):
try:
content = decoded_bytes.decode('utf-8')
logging.info("Document decoded as UTF-8.")
return content
except UnicodeDecodeError as e_utf8:
try:
content = decoded_bytes.decode('latin-1')
logging.warning("Document decoded as Latin-1 due to utf-8 decode error: %s", e_utf8)
return content
except Exception as e:
logging.error("Document decode failed for both utf-8 and latin-1: %s", e)
return None
def anthropic_stream_generate(prompt):
stream_result = []
try:
with anthropic_client.messages.create(
model=CLAUDE3_SONNET_MODEL,
max_tokens=CLAUDE3_MAX_OUTPUT_TOKENS,
messages=[{"role": "user", "content": prompt}],
stream=True
) as stream:
for event in stream:
if event.type == "content_block_delta":
piece = event.delta.text
stream_result.append(piece)
logging.debug(f"Streaming piece: {piece}")
return ''.join(stream_result)
except Exception as e:
logging.error("Error during anthropic streaming request: %s", e)
return f"Error during streaming: {e}"
def save_shredded_as_docx(shredded_text, rfp_filename):
doc = Document()
doc.add_heading(f"Shredded Requirements for {rfp_filename}", 0)
for line in shredded_text.split('\n'):
doc.add_paragraph(line)
memf = io.BytesIO()
doc.save(memf)
memf.seek(0)
return memf.read()
def process_document(action, selected_filename=None, chat_input=None, selected_proposal=None):
global shredded_document, generated_response
logging.info(f"Process document called with action: {action}")
doc_content = None
if action in ["shred", "generate"]:
if selected_filename and selected_filename in uploaded_documents:
doc_content = uploaded_documents[selected_filename]
elif uploaded_documents:
doc_content = next(iter(uploaded_documents.values()))
selected_filename = next(iter(uploaded_documents.keys()))
else:
doc_content = None
elif action == "proposal":
if selected_proposal and selected_proposal in uploaded_proposals:
doc_content = uploaded_proposals[selected_proposal]
elif uploaded_proposals:
doc_content = next(iter(uploaded_proposals.values()))
selected_proposal = next(iter(uploaded_proposals.keys()))
else:
doc_content = None
if action == 'shred':
if not doc_content:
logging.warning("No uploaded document found for shredding.")
return "No document uploaded.", None, None
prompt = (
"Analyze the following RFP/PWS/SOW/RFI and generate a requirements spreadsheet. "
"Identify requirements by action words like 'shall', 'will', 'perform', etc. Organize by PWS section and requirement. "
"Do not write as if responding to the proposal.\n"
)
if chat_input:
prompt += f"User additional instructions: {chat_input}\n"
prompt += f"\nFile Name: {selected_filename}\n\n{doc_content}"
result_holder = {"text": None, "docx_bytes": None, "docx_name": None}
def thread_shred():
global shredded_document
shredded_document = ""
try:
logging.info("Starting streaming shredding operation with Anthropics.")
result = anthropic_stream_generate(prompt)
shredded_document = result
logging.info("Document shredded successfully.")
docx_bytes = save_shredded_as_docx(result, selected_filename)
generated_docx_name = f"{os.path.splitext(selected_filename)[0]}_shredded.docx"
result_holder["text"] = result
result_holder["docx_bytes"] = docx_bytes
result_holder["docx_name"] = generated_docx_name
except Exception as e:
shredded_document = f"Error during shredding: {e}"
logging.error("Error in thread_shred: %s", e)
result_holder["text"] = shredded_document
shredded_document = "Shredding in progress..."
t = Thread(target=thread_shred)
t.start()
t.join()
return result_holder["text"], result_holder["docx_bytes"], result_holder["docx_name"]
elif action == 'generate':
if not shredded_document:
logging.warning("No shredded document found when generating response.")
return "Shredded document not available.", None, None
prompt = (
"Create a highly detailed proposal response based on the following PWS requirements. "
"Be compliant and compelling. Focus on describing the approach, steps, workflow, people, processes, and technology. "
"Refer to research that validates the approach and cite sources with measurable outcomes.\n"
)
if chat_input:
prompt += f"User additional instructions: {chat_input}\n"
prompt += f"\nFile Name: {selected_filename}\n\n{shredded_document}"
result_holder = {"text": None}
def thread_generate():
global generated_response
generated_response = ""
try:
logging.info("Starting streaming generation operation with Anthropics.")
result = anthropic_stream_generate(prompt)
generated_response = result
logging.info("Proposal response generated successfully.")
result_holder["text"] = result
except Exception as e:
generated_response = f"Error during generation: {e}"
logging.error("Error in thread_generate: %s", e)
result_holder["text"] = generated_response
generated_response = "Generating response..."
t = Thread(target=thread_generate)
t.start()
t.join()
return result_holder["text"], None, None
elif action == 'proposal':
if not doc_content:
return "No proposal document uploaded.", None, None
return dcc.Markdown(doc_content, style={"whiteSpace": "pre-wrap", "wordWrap": "break-word"}), None, None
elif action == 'compliance':
return "Compliance checking not implemented yet.", None, None
elif action == 'recover':
return "Recovery not implemented yet.", None, None
elif action == 'board':
return "Virtual board not implemented yet.", None, None
elif action == 'loe':
return "LOE estimation not implemented yet.", None, None
return "Action not implemented yet.", None, None
def get_uploaded_doc_list(docdict):
if not docdict:
return html.Div("No documents uploaded.", style={"wordWrap": "break-word"})
doc_list = []
for filename in docdict:
doc_list.append(
dbc.ListGroupItem([
html.Span(filename, style={"wordWrap": "break-word"}),
dbc.Button("Delete", id={'type': 'delete-doc-btn', 'index': filename, 'group': 'rfp'}, size="sm", color="danger", className="float-end ms-2")
], className="d-flex justify-content-between align-items-center")
)
return dbc.ListGroup(doc_list, flush=True)
def get_uploaded_proposal_list(docdict):
if not docdict:
return html.Div("No proposal documents uploaded.", style={"wordWrap": "break-word"})
doc_list = []
for filename in docdict:
doc_list.append(
dbc.ListGroupItem([
html.Span(filename, style={"wordWrap": "break-word"}),
dbc.Button("Delete", id={'type': 'delete-proposal-btn', 'index': filename, 'group': 'proposal'}, size="sm", color="danger", className="float-end ms-2")
], className="d-flex justify-content-between align-items-center")
)
return dbc.ListGroup(doc_list, flush=True)
def get_generated_doc_list(docdict):
if not docdict:
return html.Div("No generated documents yet.", style={"wordWrap": "break-word"})
doc_list = []
for filename in docdict:
doc_list.append(
dbc.ListGroupItem([
html.Span(filename, style={"wordWrap": "break-word"}),
dbc.Button("Delete", id={'type': 'delete-generated-btn', 'index': filename, 'group': 'generated'}, size="sm", color="danger", className="float-end ms-2")
], className="d-flex justify-content-between align-items-center")
)
return dbc.ListGroup(doc_list, flush=True)
app.layout = dbc.Container([
dbc.Row([
dbc.Col([
dbc.Card([
dbc.CardHeader(html.H5("RFP/SOW/PWS/RFI")),
dbc.CardBody([
html.Div(get_uploaded_doc_list(uploaded_documents), id='uploaded-doc-list'),
dcc.Dropdown(
id='select-document-dropdown',
options=[{'label': fn, 'value': fn} for fn in uploaded_documents.keys()],
placeholder="Select a document to work with",
value=next(iter(uploaded_documents), None),
style={"marginBottom": "10px"}
),
dcc.Upload(
id='upload-document',
children=html.Div([
'Drag and Drop or ',
html.A('Select Files')
]),
style={
'width': '100%',
'height': '60px',
'lineHeight': '60px',
'borderWidth': '1px',
'borderStyle': 'dashed',
'borderRadius': '5px',
'textAlign': 'center',
'margin': '10px'
},
multiple=False
)
])
], className="mb-3"),
dbc.Card([
dbc.CardHeader(html.H5("Proposal")),
dbc.CardBody([
html.Div(get_uploaded_proposal_list(uploaded_proposals), id='uploaded-proposal-list'),
dcc.Dropdown(
id='select-proposal-dropdown',
options=[{'label': fn, 'value': fn} for fn in uploaded_proposals.keys()],
placeholder="Select a proposal document",
value=next(iter(uploaded_proposals), None),
style={"marginBottom": "10px"}
),
dcc.Upload(
id='upload-proposal',
children=html.Div([
'Drag and Drop or ',
html.A('Select Files')
]),
style={
'width': '100%',
'height': '60px',
'lineHeight': '60px',
'borderWidth': '1px',
'borderStyle': 'dashed',
'borderRadius': '5px',
'textAlign': 'center',
'margin': '10px'
},
multiple=False
)
])
], className="mb-3"),
dbc.Card([
dbc.CardHeader(html.H5("Generated Documents")),
dbc.CardBody([
html.Div(get_generated_doc_list(generated_documents), id='generated-doc-list'),
dcc.Dropdown(
id='select-generated-dropdown',
options=[{'label': fn, 'value': fn} for fn in generated_documents.keys()],
placeholder="Select a generated document",
value=next(iter(generated_documents), None),
style={"marginBottom": "10px"}
),
])
], className="mb-3")
], style={'minWidth': '260px', 'width':'30vw','maxWidth':'30vw'}, width=3),
dbc.Col([
dbc.Card([
dbc.CardHeader(html.H2("RFP Proposal Assistant", style={'wordWrap': 'break-word'})),
dbc.CardBody([
dbc.Form([
dbc.Textarea(id="chat-input", placeholder="Enter additional instructions...", style={"width":"100%", "wordWrap": "break-word"}, className="mb-2"),
]),
html.Div([
dbc.Button("Shred", id="shred-action-btn", className="me-3 mb-2 btn-primary"),
dbc.Button("Proposal", id="generate-action-btn", className="me-3 mb-2 btn-secondary"),
dbc.Button("Compliance", id="compliance-action-btn", className="me-3 mb-2 btn-tertiary"),
dbc.Button("Recover", id="recover-action-btn", className="me-3 mb-2 btn-tertiary"),
dbc.Button("Virtual Board", id="board-action-btn", className="me-3 mb-2 btn-tertiary"),
dbc.Button("LOE", id="loe-action-btn", className="mb-2 btn-tertiary"),
], className="mt-3 mb-3 d-flex flex-wrap"),
dcc.Loading(
id="loading",
type="default",
children=html.Div(id="output-data-upload"),
style={"textAlign": "center"}
)
])
], style={'backgroundColor': 'white'})
], style={'width':'70vw','maxWidth':'70vw'}, width=9)
], style={'marginTop':'20px'})
], fluid=True)
@app.callback(
Output('uploaded-doc-list', 'children'),
Output('select-document-dropdown', 'options'),
Output('select-document-dropdown', 'value'),
Output('select-proposal-dropdown', 'options'),
Output('select-proposal-dropdown', 'value'),
Output('uploaded-proposal-list', 'children'),
Output('generated-doc-list', 'children'),
Output('select-generated-dropdown', 'options'),
Output('select-generated-dropdown', 'value'),
Input('upload-document', 'contents'),
State('upload-document', 'filename'),
Input({'type': 'delete-doc-btn', 'index': dash.ALL, 'group': 'rfp'}, 'n_clicks'),
State('select-document-dropdown', 'value'),
Input('upload-proposal', 'contents'),
State('upload-proposal', 'filename'),
Input({'type': 'delete-proposal-btn', 'index': dash.ALL, 'group': 'proposal'}, 'n_clicks'),
State('select-proposal-dropdown', 'value'),
Input({'type': 'delete-generated-btn', 'index': dash.ALL, 'group': 'generated'}, 'n_clicks'),
State('select-generated-dropdown', 'value'),
State('select-generated-dropdown', 'options'),
prevent_initial_call=True
)
def update_uploaded_docs(
rfp_content, rfp_filename, rfp_delete_clicks, selected_doc,
proposal_content, proposal_filename, proposal_delete_clicks, selected_proposal,
generated_delete_clicks, selected_generated, generated_options
):
ctx = callback_context
triggered = ctx.triggered
if rfp_content is not None and rfp_filename:
content_type, content_string = rfp_content.split(',')
decoded = base64.b64decode(content_string)
text = decode_document(decoded)
if text is not None:
uploaded_documents[rfp_filename] = text
logging.info(f"Document uploaded: {rfp_filename}")
else:
logging.error(f"Failed to decode uploaded document: {rfp_filename}")
if proposal_content is not None and proposal_filename:
content_type, content_string = proposal_content.split(',')
decoded = base64.b64decode(content_string)
text = decode_document(decoded)
if text is not None:
uploaded_proposals[proposal_filename] = text
logging.info(f"Proposal uploaded: {proposal_filename}")
else:
logging.error(f"Failed to decode uploaded proposal: {proposal_filename}")
if rfp_delete_clicks:
for i, n_click in enumerate(rfp_delete_clicks):
if n_click:
btn_id = ctx.inputs_list[2][i]['id']
del_filename = btn_id['index']
if del_filename in uploaded_documents:
del uploaded_documents[del_filename]
logging.info(f"Document deleted: {del_filename}")
if selected_doc == del_filename:
selected_doc = next(iter(uploaded_documents), None)
break
if proposal_delete_clicks:
for i, n_click in enumerate(proposal_delete_clicks):
if n_click:
btn_id = ctx.inputs_list[6][i]['id']
del_filename = btn_id['index']
if del_filename in uploaded_proposals:
del uploaded_proposals[del_filename]
logging.info(f"Proposal deleted: {del_filename}")
if selected_proposal == del_filename:
selected_proposal = next(iter(uploaded_proposals), None)
break
if generated_delete_clicks:
for i, n_click in enumerate(generated_delete_clicks):
if n_click:
btn_id = ctx.inputs_list[10][i]['id']
del_filename = btn_id['index']
if del_filename in generated_documents:
del generated_documents[del_filename]
logging.info(f"Generated doc deleted: {del_filename}")
if selected_generated == del_filename:
selected_generated = next(iter(generated_documents), None)
break
doc_options = [{'label': fn, 'value': fn} for fn in uploaded_documents.keys()]
doc_value = selected_doc if selected_doc in uploaded_documents else (next(iter(uploaded_documents), None) if uploaded_documents else None)
proposal_options = [{'label': fn, 'value': fn} for fn in uploaded_proposals.keys()]
proposal_value = selected_proposal if selected_proposal in uploaded_proposals else (next(iter(uploaded_proposals), None) if uploaded_proposals else None)
generated_doc_options = [{'label': fn, 'value': fn} for fn in generated_documents.keys()]
generated_doc_value = selected_generated if selected_generated in generated_documents else (next(iter(generated_documents), None) if generated_documents else None)
return (
get_uploaded_doc_list(uploaded_documents),
doc_options,
doc_value,
proposal_options,
proposal_value,
get_uploaded_proposal_list(uploaded_proposals),
get_generated_doc_list(generated_documents),
generated_doc_options,
generated_doc_value
)
@app.callback(
Output('output-data-upload', 'children'),
Output('generated-doc-list', 'children'),
Output('select-generated-dropdown', 'options'),
Output('select-generated-dropdown', 'value'),
[
Input('shred-action-btn', 'n_clicks'),
Input('generate-action-btn', 'n_clicks'),
Input('compliance-action-btn', 'n_clicks'),
Input('recover-action-btn', 'n_clicks'),
Input('board-action-btn', 'n_clicks'),
Input('loe-action-btn', 'n_clicks'),
],
State('chat-input', 'value'),
State('select-document-dropdown', 'value'),
State('select-proposal-dropdown', 'value'),
State('select-generated-dropdown', 'value'),
prevent_initial_call=True
)
def handle_actions(
shred_clicks, generate_clicks, compliance_clicks, recover_clicks, board_clicks, loe_clicks,
chat_input, selected_filename, selected_proposal, selected_generated
):
ctx = callback_context
if not ctx.triggered:
logging.info("No action triggered yet.")
return html.Div("No action taken yet.", style={"wordWrap": "break-word"}), get_generated_doc_list(generated_documents), [{'label': fn, 'value': fn} for fn in generated_documents.keys()], selected_generated
button_id = ctx.triggered[0]['prop_id'].split('.')[0]
logging.info(f"Button pressed: {button_id}")
result = ""
generated_docx_bytes = None
generated_docx_name = None
new_selected_generated = selected_generated
if button_id == 'shred-action-btn':
result, generated_docx_bytes, generated_docx_name = process_document('shred', selected_filename, chat_input)
if generated_docx_bytes and generated_docx_name:
generated_documents[generated_docx_name] = generated_docx_bytes
logging.info(f"Generated docx saved: {generated_docx_name}")
new_selected_generated = generated_docx_name
elif button_id == 'generate-action-btn':
result, _, _ = process_document('generate', selected_filename, chat_input)
elif button_id == 'compliance-action-btn':
result, _, _ = process_document('compliance', selected_filename, chat_input)
elif button_id == 'recover-action-btn':
result, _, _ = process_document('recover', selected_filename, chat_input)
elif button_id == 'board-action-btn':
result, _, _ = process_document('board', selected_filename, chat_input)
elif button_id == 'loe-action-btn':
result, _, _ = process_document('loe', selected_filename, chat_input)
else:
result = "Action not implemented yet."
if isinstance(result, str) and result.strip().startswith("Error"):
return html.Div(result, style={"wordWrap": "break-word"}), get_generated_doc_list(generated_documents), [{'label': fn, 'value': fn} for fn in generated_documents.keys()], new_selected_generated
if isinstance(result, str) and ("not implemented" in result or "No document uploaded" in result or "Shredding in progress" in result or "Generating response" in result or "Shredded document not available" in result):
return html.Div(result, style={"wordWrap": "break-word"}), get_generated_doc_list(generated_documents), [{'label': fn, 'value': fn} for fn in generated_documents.keys()], new_selected_generated
return dcc.Markdown(result, style={"whiteSpace": "pre-wrap", "wordWrap": "break-word"}), get_generated_doc_list(generated_documents), [{'label': fn, 'value': fn} for fn in generated_documents.keys()], new_selected_generated
@app.callback(
Output('output-data-upload', 'children'),
Input('select-generated-dropdown', 'value'),
prevent_initial_call=True
)
def display_generated_doc(selected_generated):
if not selected_generated or selected_generated not in generated_documents:
return html.Div("No generated document selected.", style={"wordWrap": "break-word"})
docx_bytes = generated_documents[selected_generated]
b64 = base64.b64encode(docx_bytes).decode('utf-8')
download_link = html.A(
f"Download {selected_generated}",
href=f"data:application/vnd.openxmlformats-officedocument.wordprocessingml.document;base64,{b64}",
download=selected_generated,
target="_blank",
style={"wordWrap": "break-word"}
)
return html.Div([
html.Div(download_link, style={"marginBottom": "15px"}),
html.Div("Preview not available for docx. Download to view.", style={"wordWrap": "break-word"})
])
if __name__ == '__main__':
print("Starting the Dash application...")
app.run(debug=True, host='0.0.0.0', port=7860, threaded=True)
print("Dash application has finished running.")