Spaces:
Paused
Paused
import os | |
import threading | |
import logging | |
import uuid | |
import shutil | |
import json | |
import tempfile | |
from flask import Flask, request as flask_request, make_response | |
import dash | |
from dash import dcc, html, Input, Output, State, callback_context | |
import dash_bootstrap_components as dbc | |
import openai | |
import base64 | |
import datetime | |
from werkzeug.utils import secure_filename | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(threadName)s %(message)s") | |
logger = logging.getLogger("AskTricare") | |
app_flask = Flask(__name__) | |
SESSION_DATA = {} | |
SESSION_LOCKS = {} | |
SESSION_DIR_BASE = os.path.join(tempfile.gettempdir(), "asktricare_sessions") | |
os.makedirs(SESSION_DIR_BASE, exist_ok=True) | |
openai.api_key = os.environ.get("OPENAI_API_KEY") | |
def get_session_id(): | |
sid = flask_request.cookies.get("asktricare_session_id") | |
if not sid: | |
sid = str(uuid.uuid4()) | |
return sid | |
def get_session_dir(session_id): | |
d = os.path.join(SESSION_DIR_BASE, session_id) | |
os.makedirs(d, exist_ok=True) | |
return d | |
def get_session_lock(session_id): | |
if session_id not in SESSION_LOCKS: | |
SESSION_LOCKS[session_id] = threading.Lock() | |
return SESSION_LOCKS[session_id] | |
def get_session_state(session_id): | |
if session_id not in SESSION_DATA: | |
SESSION_DATA[session_id] = { | |
"messages": [], | |
"uploads": [], | |
"created": datetime.datetime.utcnow().isoformat(), | |
"streaming": False, | |
"stream_buffer": "", | |
} | |
return SESSION_DATA[session_id] | |
def save_session_state(session_id): | |
state = get_session_state(session_id) | |
d = get_session_dir(session_id) | |
with open(os.path.join(d, "state.json"), "w") as f: | |
json.dump(state, f) | |
def load_session_state(session_id): | |
d = get_session_dir(session_id) | |
path = os.path.join(d, "state.json") | |
if os.path.exists(path): | |
with open(path, "r") as f: | |
SESSION_DATA[session_id] = json.load(f) | |
def load_system_prompt(): | |
prompt_path = os.path.join(os.getcwd(), "system_prompt.txt") | |
try: | |
with open(prompt_path, "r", encoding="utf-8") as f: | |
return f.read().strip() | |
except Exception as e: | |
logger.error(f"Failed to load system prompt: {e}") | |
return "You are Ask Tricare, a helpful assistant for TRICARE health benefits. Respond conversationally, and cite relevant sources when possible. If you do not know, say so." | |
app = dash.Dash( | |
__name__, | |
server=app_flask, | |
suppress_callback_exceptions=True, | |
external_stylesheets=[dbc.themes.BOOTSTRAP, "/assets/custom.css"], | |
update_title="Ask Tricare" | |
) | |
def chat_message_card(msg, is_user): | |
align = "end" if is_user else "start" | |
color = "primary" if is_user else "secondary" | |
avatar = "π§" if is_user else "π€" | |
return dbc.Card( | |
dbc.CardBody([ | |
html.Div([ | |
html.Span(avatar, style={"fontSize": "2rem"}), | |
html.Span(msg, style={"whiteSpace": "pre-wrap", "marginLeft": "0.75rem"}) | |
], style={"display": "flex", "alignItems": "center", "justifyContent": align}) | |
]), | |
className=f"mb-2 ms-3 me-3", | |
color=color, | |
inverse=is_user, | |
style={"maxWidth": "80%", "alignSelf": f"flex-{align}"} | |
) | |
def uploaded_file_card(filename, is_img): | |
ext = os.path.splitext(filename)[1].lower() | |
icon = "πΌοΈ" if is_img else "π" | |
return dbc.Card( | |
dbc.CardBody([ | |
html.Span(icon, style={"fontSize": "2rem", "marginRight": "0.5rem"}), | |
html.Span(filename) | |
]), | |
className="mb-2", | |
color="tertiary" | |
) | |
def disclaimer_card(): | |
return dbc.Card( | |
dbc.CardBody([ | |
html.H5("Disclaimer", className="card-title"), | |
html.P("This information is not private. Do not send PII or PHI. For official guidance visit the Tricare website.", style={"fontSize": "0.95rem"}) | |
]), | |
className="mb-2" | |
) | |
def left_navbar_static(): | |
return html.Div([ | |
html.H3("Ask Tricare", className="mb-3 mt-3", style={"fontWeight": "bold"}), | |
disclaimer_card(), | |
dcc.Upload( | |
id="file-upload", | |
children=dbc.Button("Upload Document/Image", color="secondary", className="mb-2", style={"width": "100%"}), | |
multiple=True, | |
style={"width": "100%"} | |
), | |
html.Div(id="upload-list"), | |
html.Hr(), | |
html.H5("Chat History", className="mb-2"), | |
html.Ul(id="chat-history-list", style={"listStyle": "none", "paddingLeft": "0"}), | |
], style={"padding": "1rem", "backgroundColor": "#f8f9fa", "height": "100vh", "overflowY": "auto"}) | |
def right_main_static(): | |
return html.Div([ | |
dbc.Card([ | |
dbc.CardBody([ | |
html.Div(id="chat-window", style={"minHeight": "60vh", "display": "flex", "flexDirection": "column", "justifyContent": "flex-end"}), | |
html.Div([ | |
dcc.Textarea( | |
id="user-input", | |
placeholder="Type your question...", | |
style={"width": "100%", "height": "60px", "resize": "vertical", "wordWrap": "break-word"}, | |
wrap="soft", | |
maxLength=1000 | |
), | |
dbc.Button("Send", id="send-btn", color="primary", className="mt-2", style={"float": "right", "minWidth": "100px"}), | |
], style={"marginTop": "1rem"}), | |
html.Div(id="error-message", style={"color": "#bb2124", "marginTop": "0.5rem"}), | |
]) | |
], className="mt-3"), | |
dcc.Loading(id="loading", type="default", fullscreen=False, style={"position": "absolute", "top": "5%", "left": "50%"}), | |
dcc.Interval(id="stream-interval", interval=400, n_intervals=0, disabled=True, max_intervals=1000) | |
], style={"padding": "1rem", "backgroundColor": "#fff", "height": "100vh", "overflowY": "auto"}) | |
app.layout = html.Div([ | |
dcc.Store(id="session-id", storage_type="local"), | |
dcc.Location(id="url"), | |
html.Div([ | |
html.Div(left_navbar_static(), id='left-navbar', style={"width": "30vw", "height": "100vh", "position": "fixed", "left": 0, "top": 0, "zIndex": 2, "overflowY": "auto"}), | |
html.Div(right_main_static(), id='right-main', style={"marginLeft": "30vw", "width": "70vw", "overflowY": "auto"}) | |
], style={"display": "flex"}) | |
]) | |
def _is_supported_doc(filename): | |
ext = os.path.splitext(filename)[1].lower() | |
return ext in [".txt", ".pdf", ".md", ".docx"] | |
def assign_session_id(_): | |
sid = get_session_id() | |
d = get_session_dir(sid) | |
load_session_state(sid) | |
logger.info(f"Assigned session id: {sid}") | |
return sid | |
def main_callback(session_id, send_clicks, file_contents, file_names, user_input, stream_n): | |
trigger = callback_context.triggered[0]['prop_id'].split('.')[0] if callback_context.triggered else "" | |
if not session_id: | |
session_id = get_session_id() | |
session_lock = get_session_lock(session_id) | |
with session_lock: | |
load_session_state(session_id) | |
state = get_session_state(session_id) | |
error = "" | |
start_streaming = False | |
if trigger == "file-upload" and file_contents and file_names: | |
uploads = [] | |
if not isinstance(file_contents, list): | |
file_contents = [file_contents] | |
file_names = [file_names] | |
for c, n in zip(file_contents, file_names): | |
header, data = c.split(',', 1) | |
ext = os.path.splitext(n)[1].lower() | |
is_img = ext in [".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp"] | |
fname = secure_filename(f"{datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S')}_{n}") | |
session_dir = get_session_dir(session_id) | |
fp = os.path.join(session_dir, fname) | |
with open(fp, "wb") as f: | |
f.write(base64.b64decode(data)) | |
uploads.append({"name": fname, "is_img": is_img, "path": fp}) | |
state["uploads"].extend(uploads) | |
save_session_state(session_id) | |
logger.info(f"Session {session_id}: Uploaded files {[u['name'] for u in uploads]}") | |
if trigger == "send-btn" and user_input and user_input.strip(): | |
state["messages"].append({"role": "user", "content": user_input}) | |
state["streaming"] = True | |
state["stream_buffer"] = "" | |
save_session_state(session_id) | |
def run_stream(session_id, messages): | |
try: | |
system_prompt = load_system_prompt() | |
msg_list = [{"role": "system", "content": system_prompt}] | |
for m in messages: | |
msg_list.append({"role": m["role"], "content": m["content"]}) | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=msg_list, | |
max_tokens=700, | |
temperature=0.2, | |
stream=True, | |
) | |
reply = "" | |
for chunk in response: | |
delta = chunk["choices"][0]["delta"] | |
content = delta.get("content", "") | |
if content: | |
reply += content | |
# Update buffer in session state | |
session_lock = get_session_lock(session_id) | |
with session_lock: | |
load_session_state(session_id) | |
state = get_session_state(session_id) | |
state["stream_buffer"] = reply | |
save_session_state(session_id) | |
# Finalize message | |
session_lock = get_session_lock(session_id) | |
with session_lock: | |
load_session_state(session_id) | |
state = get_session_state(session_id) | |
state["messages"].append({"role": "assistant", "content": reply}) | |
state["stream_buffer"] = "" | |
state["streaming"] = False | |
save_session_state(session_id) | |
logger.info(f"Session {session_id}: User: {user_input} | Assistant: {reply}") | |
except Exception as e: | |
session_lock = get_session_lock(session_id) | |
with session_lock: | |
load_session_state(session_id) | |
state = get_session_state(session_id) | |
state["streaming"] = False | |
state["stream_buffer"] = "" | |
save_session_state(session_id) | |
logger.error(f"Session {session_id}: Streaming error: {e}") | |
threading.Thread(target=run_stream, args=(session_id, list(state["messages"])), daemon=True).start() | |
start_streaming = True | |
chat_history = state.get("messages", []) | |
uploads = state.get("uploads", []) | |
upload_cards = [uploaded_file_card(os.path.basename(f["name"]), f["is_img"]) for f in uploads] | |
chat_items = [html.Li(html.Span((msg['role'] + ": " + msg['content'])[:40] + ("..." if len(msg['content']) > 40 else ""), style={"fontSize": "0.92rem"})) for msg in chat_history[-6:]] | |
chat_cards = [] | |
for i, msg in enumerate(chat_history): | |
if msg['role'] == "user": | |
chat_cards.append(chat_message_card(msg['content'], is_user=True)) | |
elif msg['role'] == "assistant": | |
chat_cards.append(chat_message_card(msg['content'], is_user=False)) | |
if state.get("streaming", False): | |
# Add a partial assistant message at the end | |
if state.get("stream_buffer", ""): | |
chat_cards.append(chat_message_card(state["stream_buffer"], is_user=False)) | |
return upload_cards, chat_items, chat_cards, error, False, 0 if not start_streaming else False, 0 | |
return upload_cards, chat_items, chat_cards, error, (not state.get("streaming", False)), 0 | |
def poll_stream(n_intervals, session_id): | |
session_lock = get_session_lock(session_id) | |
with session_lock: | |
load_session_state(session_id) | |
state = get_session_state(session_id) | |
chat_history = state.get("messages", []) | |
chat_cards = [] | |
for i, msg in enumerate(chat_history): | |
if msg['role'] == "user": | |
chat_cards.append(chat_message_card(msg['content'], is_user=True)) | |
elif msg['role'] == "assistant": | |
chat_cards.append(chat_message_card(msg['content'], is_user=False)) | |
if state.get("streaming", False): | |
if state.get("stream_buffer", ""): | |
chat_cards.append(chat_message_card(state["stream_buffer"], is_user=False)) | |
return chat_cards, False | |
return chat_cards, True | |
def set_session_cookie(resp): | |
sid = flask_request.cookies.get("asktricare_session_id") | |
if not sid: | |
sid = str(uuid.uuid4()) | |
resp.set_cookie("asktricare_session_id", sid, max_age=60*60*24*7, path="/") | |
return resp | |
def cleanup_sessions(max_age_hours=48): | |
now = datetime.datetime.utcnow() | |
for sid in os.listdir(SESSION_DIR_BASE): | |
d = os.path.join(SESSION_DIR_BASE, sid) | |
try: | |
state_path = os.path.join(d, "state.json") | |
if os.path.exists(state_path): | |
with open(state_path, "r") as f: | |
st = json.load(f) | |
created = st.get("created") | |
if created and (now - datetime.datetime.fromisoformat(created)).total_seconds() > max_age_hours * 3600: | |
shutil.rmtree(d) | |
logger.info(f"Cleaned up session {sid}") | |
except Exception as e: | |
logger.error(f"Cleanup error for {sid}: {e}") | |
try: | |
import torch | |
if torch.cuda.is_available(): | |
torch.set_default_tensor_type(torch.cuda.FloatTensor) | |
logger.info("CUDA GPU detected and configured.") | |
except Exception as e: | |
logger.warning(f"CUDA config failed: {e}") | |
if __name__ == '__main__': | |
print("Starting the Dash application...") | |
app.run(debug=True, host='0.0.0.0', port=7860, threaded=True) | |
print("Dash application has finished running.") |