Spaces:
Sleeping
Sleeping
from flask import Flask, request, jsonify, send_file | |
from PIL import Image | |
import torch | |
import torch.nn.functional as F | |
from torchvision import transforms | |
import os | |
import numpy as np | |
from datetime import datetime | |
import sqlite3 | |
import torch.nn as nn | |
import torchvision.models as models | |
app = Flask(__name__) | |
# β Directory and database path | |
OUTPUT_DIR = '/tmp/results' | |
if not os.path.exists(OUTPUT_DIR): | |
os.makedirs(OUTPUT_DIR) | |
DB_PATH = os.path.join(OUTPUT_DIR, 'results.db') | |
def init_db(): | |
conn = sqlite3.connect(DB_PATH) | |
cursor = conn.cursor() | |
cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS results ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
image_filename TEXT, | |
prediction TEXT, | |
confidence REAL, | |
gradcam_filename TEXT, | |
timestamp TEXT | |
) | |
""") | |
conn.commit() | |
conn.close() | |
init_db() | |
# 1οΈβ£ Instantiate the model | |
model = models.densenet169(pretrained=False) | |
model.classifier = nn.Linear(model.classifier.in_features, 3) | |
# 2οΈβ£ Load checkpoint | |
checkpoint = torch.load('densenet169_seed40_best2.pt', map_location='cpu') | |
state_dict = checkpoint['state_dict'] | |
# 3οΈβ£ Fix state dict | |
new_state_dict = {} | |
for k, v in state_dict.items(): | |
# Check if it's prefixed with 'features.0.' | |
if k.startswith('features.0.'): | |
new_key = 'features.' + k[len('features.0.'):] # Remove the '0.' segment | |
else: | |
new_key = k | |
new_state_dict[new_key] = v | |
# 4οΈβ£ Load into the model | |
model.load_state_dict(new_state_dict) | |
# Done! | |
model.eval() | |
# β Class Names | |
CLASS_NAMES = ["Normal", "Early Glaucoma", "Advanced Glaucoma"] | |
def home(): | |
return "Glaucoma Detection Flask API (3-Class Model) is running!" | |
def test_file(): | |
"""Check if the .pt model file is present and readable.""" | |
filepath = "densenet169_seed40_best.pt" | |
if os.path.exists(filepath): | |
return f"β Model file found at: {filepath}" | |
else: | |
return "β Model file NOT found." | |
def predict(): | |
"""Perform prediction using PyTorch (3-class), save results, and save to SQLite database.""" | |
if 'file' not in request.files: | |
return jsonify({'error': 'No file uploaded'}), 400 | |
uploaded_file = request.files['file'] | |
if uploaded_file.filename == '': | |
return jsonify({'error': 'No file selected'}), 400 | |
try: | |
# β Save the uploaded image | |
timestamp = int(datetime.now().timestamp()) | |
uploaded_filename = f"uploaded_{timestamp}.png" | |
uploaded_file_path = os.path.join(OUTPUT_DIR, uploaded_filename) | |
uploaded_file.save(uploaded_file_path) | |
# β Perform prediction | |
img = Image.open(uploaded_file_path).convert('RGB') | |
input_tensor = transform(img).unsqueeze(0) | |
with torch.no_grad(): | |
output = model(input_tensor) | |
probabilities = F.softmax(output, dim=1).cpu().numpy()[0] | |
# β Get result | |
class_index = np.argmax(probabilities) | |
result = CLASS_NAMES[class_index] | |
confidence = float(probabilities[class_index]) | |
# β Save results to SQLite | |
conn = sqlite3.connect(DB_PATH) | |
cursor = conn.cursor() | |
cursor.execute(""" | |
INSERT INTO results (image_filename, prediction, confidence, gradcam_filename, timestamp) | |
VALUES (?, ?, ?, ?, ?) | |
""", (uploaded_filename, result, confidence, '', datetime.now().isoformat())) | |
conn.commit() | |
conn.close() | |
return jsonify({ | |
'prediction': result, | |
'confidence': confidence, | |
'normal_probability': float(probabilities[0]), | |
'early_glaucoma_probability': float(probabilities[1]), | |
'advanced_glaucoma_probability': float(probabilities[2]), | |
'gradcam_image': '', # Not used for now | |
'image_filename': uploaded_filename | |
}) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def results(): | |
"""List all results from the SQLite database.""" | |
conn = sqlite3.connect(DB_PATH) | |
cursor = conn.cursor() | |
cursor.execute("SELECT * FROM results ORDER BY timestamp DESC") | |
results_data = cursor.fetchall() | |
conn.close() | |
results_list = [] | |
for record in results_data: | |
results_list.append({ | |
'id': record[0], | |
'image_filename': record[1], | |
'prediction': record[2], | |
'confidence': record[3], | |
'gradcam_filename': record[4], | |
'timestamp': record[5] | |
}) | |
return jsonify(results_list) | |
def get_gradcam(filename): | |
"""Serve the Grad-CAM overlay image (no-op for now).""" | |
filepath = os.path.join(OUTPUT_DIR, filename) | |
if os.path.exists(filepath): | |
return send_file(filepath, mimetype='image/png') | |
else: | |
return jsonify({'error': 'File not found'}), 404 | |
def get_image(filename): | |
"""Serve the original uploaded image.""" | |
filepath = os.path.join(OUTPUT_DIR, filename) | |
if os.path.exists(filepath): | |
return send_file(filepath, mimetype='image/png') | |
else: | |
return jsonify({'error': 'File not found'}), 404 | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=7860) | |