ask-tricare / app.py
bluenevus's picture
Update app.py via AI Editor
f7e1ed4
raw
history blame
31.9 kB
import os
import threading
import logging
import uuid
import shutil
import json
import tempfile
import glob
from flask import Flask, request as flask_request, make_response
import dash
from dash import dcc, html, Input, Output, State, callback_context, no_update
import dash_bootstrap_components as dbc
import openai
import base64
import datetime
from werkzeug.utils import secure_filename
import numpy as np
import io
import PyPDF2
import docx
import openpyxl
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(threadName)s %(message)s")
logger = logging.getLogger("AskTricare")
app_flask = Flask(__name__)
SESSION_DATA = {}
SESSION_LOCKS = {}
SESSION_DIR_BASE = os.path.join(tempfile.gettempdir(), "asktricare_sessions")
os.makedirs(SESSION_DIR_BASE, exist_ok=True)
openai.api_key = os.environ.get("OPENAI_API_KEY")
EMBEDDING_INDEX = {}
EMBEDDING_TEXTS = {}
EMBEDDING_MODEL = "text-embedding-ada-002"
def get_session_id():
sid = flask_request.cookies.get("asktricare_session_id")
if not sid:
sid = str(uuid.uuid4())
return sid
def get_session_dir(session_id):
d = os.path.join(SESSION_DIR_BASE, session_id)
os.makedirs(d, exist_ok=True)
return d
def get_session_lock(session_id):
if session_id not in SESSION_LOCKS:
SESSION_LOCKS[session_id] = threading.Lock()
return SESSION_LOCKS[session_id]
def get_session_state(session_id):
if session_id not in SESSION_DATA:
SESSION_DATA[session_id] = {
"messages": [],
"uploads": [],
"created": datetime.datetime.utcnow().isoformat(),
"streaming": False,
"stream_buffer": ""
}
return SESSION_DATA[session_id]
def save_session_state(session_id):
state = get_session_state(session_id)
d = get_session_dir(session_id)
with open(os.path.join(d, "state.json"), "w") as f:
json.dump(state, f)
def load_session_state(session_id):
d = get_session_dir(session_id)
path = os.path.join(d, "state.json")
if os.path.exists(path):
with open(path, "r") as f:
SESSION_DATA[session_id] = json.load(f)
def load_system_prompt():
prompt_path = os.path.join(os.getcwd(), "system_prompt.txt")
try:
with open(prompt_path, "r", encoding="utf-8") as f:
return f.read().strip()
except Exception as e:
logger.error(f"Failed to load system prompt: {e}")
return "You are Ask Tricare, a helpful assistant for TRICARE health benefits. Respond conversationally, and cite relevant sources when possible. If you do not know, say so."
def embed_docs_folder():
global EMBEDDING_INDEX, EMBEDDING_TEXTS
docs_folder = os.path.join(os.getcwd(), "docs")
if not os.path.isdir(docs_folder):
logger.warning(f"Docs folder '{docs_folder}' does not exist. Skipping embedding.")
return
doc_files = []
for ext in ("*.txt", "*.md", "*.pdf"):
doc_files.extend(glob.glob(os.path.join(docs_folder, ext)))
for doc_path in doc_files:
fname = os.path.basename(doc_path)
if fname in EMBEDDING_INDEX:
continue
try:
with open(doc_path, "r", encoding="utf-8", errors="ignore") as f:
text = f.read()
if not text.strip():
continue
chunk = text[:4000]
response = openai.Embedding.create(
input=[chunk],
model=EMBEDDING_MODEL
)
embedding = response['data'][0]['embedding']
EMBEDDING_INDEX[fname] = embedding
EMBEDDING_TEXTS[fname] = chunk
logger.info(f"Embedded doc: {fname}")
except Exception as e:
logger.error(f"Embedding failed for {fname}: {e}")
embed_docs_folder()
def embed_user_doc(session_id, filename, text):
session_dir = get_session_dir(session_id)
if not text.strip():
return
try:
chunk = text[:4000]
response = openai.Embedding.create(
input=[chunk],
model=EMBEDDING_MODEL
)
embedding = response['data'][0]['embedding']
user_embeds_path = os.path.join(session_dir, "user_embeds.json")
if os.path.exists(user_embeds_path):
with open(user_embeds_path, "r") as f:
user_embeds = json.load(f)
else:
user_embeds = {"embeddings": [], "texts": [], "filenames": []}
user_embeds["embeddings"].append(embedding)
user_embeds["texts"].append(chunk)
user_embeds["filenames"].append(filename)
with open(user_embeds_path, "w") as f:
json.dump(user_embeds, f)
logger.info(f"Session {session_id}: Embedded user doc {filename}")
except Exception as e:
logger.error(f"Session {session_id}: Failed to embed user doc {filename}: {e}")
def get_user_embeddings(session_id):
session_dir = get_session_dir(session_id)
user_embeds_path = os.path.join(session_dir, "user_embeds.json")
if os.path.exists(user_embeds_path):
with open(user_embeds_path, "r") as f:
d = json.load(f)
embeds = np.array(d.get("embeddings", []))
texts = d.get("texts", [])
filenames = d.get("filenames", [])
return embeds, texts, filenames
return np.array([]), [], []
def semantic_search(query, embed_matrix, texts, filenames, top_k=2):
if len(embed_matrix) == 0:
return []
try:
q_embed = openai.Embedding.create(input=[query], model=EMBEDDING_MODEL)["data"][0]["embedding"]
q_embed = np.array(q_embed)
embed_matrix = np.array(embed_matrix)
scores = np.dot(embed_matrix, q_embed) / (np.linalg.norm(embed_matrix, axis=1) * np.linalg.norm(q_embed) + 1e-8)
idx = np.argsort(scores)[::-1][:top_k]
results = []
for i in idx:
results.append({"filename": filenames[i], "text": texts[i], "score": float(scores[i])})
return results
except Exception as e:
logger.error(f"Semantic search error: {e}")
return []
app = dash.Dash(
__name__,
server=app_flask,
suppress_callback_exceptions=True,
external_stylesheets=[dbc.themes.BOOTSTRAP, "/assets/custom.css"],
update_title="Ask Tricare"
)
def chat_message_card(msg, is_user):
align = "flex-end" if is_user else "flex-start"
color = "primary" if is_user else "secondary"
avatar = "🧑" if is_user else "🤖"
return html.Div(
dbc.Card(
dbc.CardBody([
html.Div([
html.Span(avatar, style={"fontSize": "2rem"}),
html.Span(msg, style={"whiteSpace": "pre-wrap", "marginLeft": "0.75rem", "overflowWrap": "break-word", "wordBreak": "break-word"})
], style={"display": "flex", "alignItems": "center"})
]),
className=f"mb-2 ms-3 me-3",
color=color,
inverse=is_user,
style={"maxWidth": "80%"}
),
style={"display": "flex", "justifyContent": align, "width": "100%"}
)
def uploaded_file_card(filename, is_img):
ext = os.path.splitext(filename)[1].lower()
icon = "🖼️" if is_img else "📄"
return dbc.Card(
dbc.CardBody([
html.Span(icon, style={"fontSize": "2rem", "marginRight": "0.5rem"}),
html.Span(filename)
]),
className="mb-2",
color="tertiary"
)
def disclaimer_card():
return dbc.Card(
dbc.CardBody([
html.H5("Disclaimer", className="card-title"),
html.P("This information is not private. Do not send PII or PHI. For official guidance visit the Tricare website.", style={"fontSize": "0.95rem"})
]),
className="mb-2"
)
def left_navbar_static():
return html.Div([
html.H3("Ask Tricare", className="mb-3 mt-3", style={"fontWeight": "bold"}),
disclaimer_card(),
dcc.Upload(
id="file-upload",
children=dbc.Button("Upload Document/Image", color="secondary", className="mb-2", style={"width": "100%"}),
multiple=True,
style={"width": "100%"}
),
html.Div(id="upload-list"),
html.Hr()
], style={"padding": "1rem", "backgroundColor": "#f8f9fa", "height": "100vh", "overflowY": "auto"})
def chat_box_card():
return dbc.Card(
dbc.CardBody([
html.Div(
id="chat-window-container",
children=[
html.Div(id="chat-window", style={"width": "100%"})
],
style={
"height": "70vh",
"overflowY": "auto",
"overflowX": "hidden",
"backgroundColor": "#fff",
"padding": "0.5rem",
"borderRadius": "0.5rem"
}
)
]),
className="mt-3",
style={
"height": "72vh",
"overflowY": "hidden",
"overflowX": "hidden"
}
)
def user_input_card():
return dbc.Card(
dbc.CardBody([
html.Div([
dcc.Textarea(
id="user-input",
placeholder="Type your question...",
style={"width": "100%", "height": "60px", "resize": "vertical", "wordWrap": "break-word"},
wrap="soft",
maxLength=1000,
n_blur=0,
),
dcc.Store(id="enter-triggered", data=False),
html.Div([
dbc.Button("Send", id="send-btn", color="primary", className="mt-2 me-2", style={"minWidth": "100px"}),
], style={"float": "right", "display": "flex", "gap": "0.5rem"}),
dcc.Store(id="user-input-store", data="", storage_type="session"),
html.Button(id='hidden-send', style={'display': 'none'})
], style={"marginTop": "1rem"}),
html.Div(id="error-message", style={"color": "#bb2124", "marginTop": "0.5rem"}),
dcc.Store(id="should-clear-input", data=False)
])
)
def right_main_static():
return html.Div([
chat_box_card(),
user_input_card(),
dcc.Loading(id="loading", type="default", fullscreen=False, style={"position": "absolute", "top": "5%", "left": "50%"}),
dcc.Interval(id="stream-interval", interval=400, n_intervals=0, disabled=True, max_intervals=1000),
dcc.Store(id="client-question", data="")
], style={"padding": "1rem", "backgroundColor": "#fff", "height": "100vh", "overflowY": "auto"})
app.layout = html.Div([
dcc.Store(id="session-id", storage_type="local"),
dcc.Location(id="url"),
html.Div([
html.Div(left_navbar_static(), id='left-navbar', style={"width": "30vw", "height": "100vh", "position": "fixed", "left": 0, "top": 0, "zIndex": 2, "overflowY": "auto"}),
html.Div(right_main_static(), id='right-main', style={"marginLeft": "30vw", "width": "70vw", "overflowY": "auto"})
], style={"display": "flex"}),
dcc.Store(id="clear-input", data=False),
dcc.Store(id="scroll-bottom", data=0),
dcc.Store(id="enter-pressed", data=False)
])
app.clientside_callback(
"""
function(n, value) {
var ta = document.getElementById('user-input');
if (!ta) return window.dash_clientside.no_update;
if (!window._asktricare_enter_handler) {
ta.addEventListener('keydown', function(e) {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
var btn = document.getElementById('hidden-send');
if (btn) btn.click();
}
});
window._asktricare_enter_handler = true;
}
return window.dash_clientside.no_update;
}
""",
Output('enter-pressed', 'data'),
Input('user-input', 'n_blur'),
State('user-input', 'value')
)
# Clientside callback to scroll chat window to bottom when scroll-bottom is incremented
app.clientside_callback(
"""
function(scrollIndex) {
var chatContainer = document.getElementById('chat-window-container');
if (chatContainer) {
chatContainer.scrollTop = chatContainer.scrollHeight;
}
return null;
}
""",
Output('clear-input', 'data'), # dummy output
Input('scroll-bottom', 'data')
)
def _is_supported_doc(filename):
ext = os.path.splitext(filename)[1].lower()
return ext in [".txt", ".pdf", ".md", ".docx", ".xlsx"]
def _extract_text_from_upload(filepath, ext):
try:
if ext in [".txt", ".md"]:
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
text = f.read()
return text
elif ext == ".pdf":
try:
text = ""
with open(filepath, "rb") as f:
reader = PyPDF2.PdfReader(f)
for page in reader.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
return text
except Exception as e:
logger.error(f"Error reading PDF {filepath}: {e}")
return ""
elif ext == ".docx":
try:
doc = docx.Document(filepath)
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
return "\n".join(paragraphs)
except Exception as e:
logger.error(f"Error reading DOCX {filepath}: {e}")
return ""
elif ext == ".xlsx":
try:
wb = openpyxl.load_workbook(filepath, read_only=True, data_only=True)
text_rows = []
for ws in wb.worksheets:
for row in ws.iter_rows(values_only=True):
row_strs = [str(cell) for cell in row if cell is not None]
if any(row_strs):
text_rows.append("\t".join(row_strs))
return "\n".join(text_rows)
except Exception as e:
logger.error(f"Error reading XLSX {filepath}: {e}")
return ""
else:
return ""
except Exception as e:
logger.error(f"Error extracting text from {filepath}: {e}")
return ""
@app.callback(
Output("session-id", "data"),
Input("url", "href"),
prevent_initial_call=False
)
def assign_session_id(_):
sid = get_session_id()
d = get_session_dir(sid)
load_session_state(sid)
logger.info(f"Assigned session id: {sid}")
return sid
@app.callback(
Output("upload-list", "children"),
Output("chat-window", "children"),
Output("error-message", "children"),
Output("stream-interval", "disabled"),
Output("stream-interval", "n_intervals"),
Output("user-input", "value"),
Output("scroll-bottom", "data"),
Input("session-id", "data"),
Input("send-btn", "n_clicks"),
Input("file-upload", "contents"),
Input("stream-interval", "n_intervals"),
Input('hidden-send', 'n_clicks'),
State("file-upload", "filename"),
State("user-input", "value"),
State("scroll-bottom", "data"),
prevent_initial_call=False
)
def main_callback(session_id, send_clicks, file_contents, stream_n, hidden_send_clicks, file_names, user_input, scroll_bottom):
trigger = callback_context.triggered[0]['prop_id'].split('.')[0] if callback_context.triggered else ""
session_id = session_id or get_session_id()
session_lock = get_session_lock(session_id)
with session_lock:
load_session_state(session_id)
state = get_session_state(session_id)
error = ""
start_streaming = False
uploads = state.get("uploads", [])
file_was_uploaded_and_sent = False
file_upload_message = None
doc_texts_to_send = []
if trigger == "file-upload" and file_contents and file_names:
uploads = []
file_upload_messages = []
if not isinstance(file_contents, list):
file_contents = [file_contents]
file_names = [file_names]
for c, n in zip(file_contents, file_names):
header, data = c.split(',', 1)
ext = os.path.splitext(n)[1].lower()
is_img = ext in [".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp"]
fname = secure_filename(f"{datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S')}_{n}")
session_dir = get_session_dir(session_id)
fp = os.path.join(session_dir, fname)
with open(fp, "wb") as f:
f.write(base64.b64decode(data))
uploads.append({"name": fname, "is_img": is_img, "path": fp})
if _is_supported_doc(n) and not is_img:
text = _extract_text_from_upload(fp, ext)
if text.strip():
embed_user_doc(session_id, fname, text)
logger.info(f"Session {session_id}: Uploaded doc '{n}' embedded for user vector store")
preview = text[:1000]
file_upload_messages.append({
"role": "user",
"content": f"[Document uploaded: {n}]\n{preview if preview.strip() else '[No text extracted]'}"
})
doc_texts_to_send.append(text.strip())
else:
file_upload_messages.append({
"role": "user",
"content": f"[Document uploaded: {n}]\n[No text extracted]"
})
elif is_img:
file_upload_messages.append({
"role": "user",
"content": f"[Image uploaded: {n}]"
})
else:
file_upload_messages.append({
"role": "user",
"content": f"[File uploaded: {n}]"
})
state["uploads"].extend(uploads)
for msg in file_upload_messages:
state["messages"].append(msg)
save_session_state(session_id)
logger.info(f"Session {session_id}: Uploaded files {[u['name'] for u in uploads]}")
if doc_texts_to_send:
doc_question = "\n\n".join(doc_texts_to_send)
state["messages"].append({"role": "user", "content": doc_question})
state["streaming"] = True
state["stream_buffer"] = ""
save_session_state(session_id)
def run_stream_for_doc(session_id, messages, doc_question):
try:
system_prompt = load_system_prompt()
rag_chunks = []
try:
global_embeds = []
global_texts = []
global_fnames = []
for fname, emb in EMBEDDING_INDEX.items():
global_embeds.append(emb)
global_texts.append(EMBEDDING_TEXTS[fname])
global_fnames.append(fname)
global_rag = semantic_search(doc_question, global_embeds, global_texts, global_fnames, top_k=2)
if global_rag:
for r in global_rag:
rag_chunks.append(f"Global doc [{r['filename']}]:\n{r['text'][:1000]}")
user_embeds, user_texts, user_fnames = get_user_embeddings(session_id)
user_rag = semantic_search(doc_question, user_embeds, user_texts, user_fnames, top_k=2)
if user_rag:
for r in user_rag:
rag_chunks.append(f"User upload [{r['filename']}]:\n{r['text'][:1000]}")
except Exception as e:
logger.error(f"Session {session_id}: RAG error (doc upload): {e}")
context_block = ""
if rag_chunks:
context_block = "The following sources may help answer the question:\n\n" + "\n\n".join(rag_chunks) + "\n\n"
msg_list = [{"role": "system", "content": system_prompt}]
if context_block:
msg_list.append({"role": "system", "content": context_block})
for m in messages:
msg_list.append({"role": m["role"], "content": m["content"]})
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=msg_list,
max_tokens=700,
temperature=0.2,
stream=True,
)
reply = ""
for chunk in response:
delta = chunk["choices"][0]["delta"]
content = delta.get("content", "")
if content:
reply += content
session_lock = get_session_lock(session_id)
with session_lock:
load_session_state(session_id)
state = get_session_state(session_id)
state["stream_buffer"] = reply
save_session_state(session_id)
session_lock = get_session_lock(session_id)
with session_lock:
load_session_state(session_id)
state = get_session_state(session_id)
state["messages"].append({"role": "assistant", "content": reply})
state["stream_buffer"] = ""
state["streaming"] = False
save_session_state(session_id)
logger.info(f"Session {session_id}: Assistant responded to doc upload")
except Exception as e:
session_lock = get_session_lock(session_id)
with session_lock:
load_session_state(session_id)
state = get_session_state(session_id)
state["streaming"] = False
state["stream_buffer"] = ""
save_session_state(session_id)
logger.error(f"Session {session_id}: Streaming error for doc upload: {e}")
threading.Thread(target=run_stream_for_doc, args=(session_id, list(state["messages"]), doc_question), daemon=True).start()
start_streaming = True
chat_history = state.get("messages", [])
uploads = state.get("uploads", [])
upload_cards = [uploaded_file_card(os.path.basename(f["name"]), f["is_img"]) for f in uploads]
chat_cards = []
for msg in chat_history:
chat_cards.append(chat_message_card(msg['content'], is_user=(msg['role'] == "user")))
return upload_cards, chat_cards, error, (not state.get("streaming", False)), 0, no_update, scroll_bottom+1
send_triggered = False
if trigger == "send-btn" or trigger == "hidden-send":
send_triggered = True
if send_triggered and user_input and user_input.strip():
question = user_input.strip()
state["messages"].append({"role": "user", "content": question})
state["streaming"] = True
state["stream_buffer"] = ""
save_session_state(session_id)
def run_stream(session_id, messages, question):
try:
system_prompt = load_system_prompt()
rag_chunks = []
try:
global_embeds = []
global_texts = []
global_fnames = []
for fname, emb in EMBEDDING_INDEX.items():
global_embeds.append(emb)
global_texts.append(EMBEDDING_TEXTS[fname])
global_fnames.append(fname)
global_rag = semantic_search(question, global_embeds, global_texts, global_fnames, top_k=2)
if global_rag:
for r in global_rag:
rag_chunks.append(f"Global doc [{r['filename']}]:\n{r['text'][:1000]}")
user_embeds, user_texts, user_fnames = get_user_embeddings(session_id)
user_rag = semantic_search(question, user_embeds, user_texts, user_fnames, top_k=2)
if user_rag:
for r in user_rag:
rag_chunks.append(f"User upload [{r['filename']}]:\n{r['text'][:1000]}")
except Exception as e:
logger.error(f"Session {session_id}: RAG error: {e}")
context_block = ""
if rag_chunks:
context_block = "The following sources may help answer the question:\n\n" + "\n\n".join(rag_chunks) + "\n\n"
msg_list = [{"role": "system", "content": system_prompt}]
if context_block:
msg_list.append({"role": "system", "content": context_block})
for m in messages:
msg_list.append({"role": m["role"], "content": m["content"]})
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=msg_list,
max_tokens=700,
temperature=0.2,
stream=True,
)
reply = ""
for chunk in response:
delta = chunk["choices"][0]["delta"]
content = delta.get("content", "")
if content:
reply += content
session_lock = get_session_lock(session_id)
with session_lock:
load_session_state(session_id)
state = get_session_state(session_id)
state["stream_buffer"] = reply
save_session_state(session_id)
session_lock = get_session_lock(session_id)
with session_lock:
load_session_state(session_id)
state = get_session_state(session_id)
state["messages"].append({"role": "assistant", "content": reply})
state["stream_buffer"] = ""
state["streaming"] = False
save_session_state(session_id)
logger.info(f"Session {session_id}: User: {question} | Assistant: {reply}")
except Exception as e:
session_lock = get_session_lock(session_id)
with session_lock:
load_session_state(session_id)
state = get_session_state(session_id)
state["streaming"] = False
state["stream_buffer"] = ""
save_session_state(session_id)
logger.error(f"Session {session_id}: Streaming error: {e}")
threading.Thread(target=run_stream, args=(session_id, list(state["messages"]), question), daemon=True).start()
start_streaming = True
if trigger == "stream-interval":
chat_history = state.get("messages", [])
chat_cards = []
for msg in chat_history:
chat_cards.append(chat_message_card(msg['content'], is_user=(msg['role'] == "user")))
if state.get("streaming", False):
if state.get("stream_buffer", ""):
chat_cards.append(chat_message_card(state["stream_buffer"], is_user=False))
upload_cards = [uploaded_file_card(os.path.basename(f["name"]), f["is_img"]) for f in state.get("uploads", [])]
return (
upload_cards,
chat_cards,
"",
False,
stream_n+1,
no_update,
scroll_bottom+1
)
else:
chat_cards = []
for msg in state.get("messages", []):
chat_cards.append(chat_message_card(msg['content'], is_user=(msg['role'] == "user")))
upload_cards = [uploaded_file_card(os.path.basename(f["name"]), f["is_img"]) for f in state.get("uploads", [])]
return (
upload_cards,
chat_cards,
"",
True,
0,
no_update,
scroll_bottom+1
)
chat_history = state.get("messages", [])
uploads = state.get("uploads", [])
upload_cards = [uploaded_file_card(os.path.basename(f["name"]), f["is_img"]) for f in uploads]
chat_cards = []
for msg in chat_history:
chat_cards.append(chat_message_card(msg['content'], is_user=(msg['role'] == "user")))
if trigger == "send-btn" or trigger == "hidden-send":
return upload_cards, chat_cards, error, (not state.get("streaming", False)), 0, "", scroll_bottom+1
elif trigger == "file-upload":
return upload_cards, chat_cards, error, (not state.get("streaming", False)), 0, no_update, scroll_bottom+1
else:
return upload_cards, chat_cards, error, (not state.get("streaming", False)), 0, no_update, scroll_bottom
@app_flask.after_request
def set_session_cookie(resp):
sid = flask_request.cookies.get("asktricare_session_id")
if not sid:
sid = str(uuid.uuid4())
resp.set_cookie("asktricare_session_id", sid, max_age=60*60*24*7, path="/")
return resp
def cleanup_sessions(max_age_hours=48):
now = datetime.datetime.utcnow()
for sid in os.listdir(SESSION_DIR_BASE):
d = os.path.join(SESSION_DIR_BASE, sid)
try:
state_path = os.path.join(d, "state.json")
if os.path.exists(state_path):
with open(state_path, "r") as f:
st = json.load(f)
created = st.get("created")
if created and (now - datetime.datetime.fromisoformat(created)).total_seconds() > max_age_hours * 3600:
shutil.rmtree(d)
logger.info(f"Cleaned up session {sid}")
except Exception as e:
logger.error(f"Cleanup error for {sid}: {e}")
try:
import torch
if torch.cuda.is_available():
torch.set_default_tensor_type(torch.cuda.FloatTensor)
logger.info("CUDA GPU detected and configured.")
except Exception as e:
logger.warning(f"CUDA config failed: {e}")
if __name__ == '__main__':
print("Starting the Dash application...")
app.run(debug=True, host='0.0.0.0', port=7860, threaded=True)
print("Dash application has finished running.")