reidddd's picture
updated roboflow model to improve accuracy
b0086c4
raw
history blame
11.8 kB
from flask import Flask, request, jsonify, render_template
from PIL import Image
import io
import os
import requests
from roboflow import Roboflow
import supervision as sv
import cv2
import tempfile
import gdown
import os
import requests
import requests
import cloudinary
import model
import cloudinary.uploader
from a import main
import numpy as np
import torchvision.transforms as transforms
import pandas as pd
import nibabel as nib
import numpy as np
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import cv2
from PIL import Image
from sklearn.model_selection import train_test_split
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from PIL import Image
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from inference_sdk import InferenceHTTPClient, InferenceConfiguration
# Initialize Flask app
app = Flask(__name__)
GDRIVE_MODEL_URL = "https://drive.google.com/uc?id=1fzKneepaRt_--dzamTcDBM-9d3_dLX7z"
LOCAL_MODEL_PATH = "checkpoint32.pth"
d = "https://drive.google.com/uc?id=1GfrlFNoa7E4liMHyMuF73nA21yT9SNSb"
def download_file_from_google_drive():
gdown.download(GDRIVE_MODEL_URL, LOCAL_MODEL_PATH, quiet=False)
da = "a.pth"
def download_file_from_google_drived():
gdown.download(d, da, quiet=False)
def download_model():
if not os.path.exists(LOCAL_MODEL_PATH):
response = requests.get(GDRIVE_MODEL_URL, stream=True)
if response.status_code == 200:
with open(LOCAL_MODEL_PATH, "wb") as f:
f.write(response.content)
else:
raise Exception(
f"Failed to download model from Google Drive: {response.status_code}"
)
download_file_from_google_drive()
download_file_from_google_drived()
@app.route("/")
def home():
return render_template("index.html")
@app.route("/fetch-image", methods=["POST"])
def fetchImage():
file = None
url = ""
if "url" in request.form:
url = request.form["url"]
response = requests.get(url)
file = io.BytesIO(response.content)
elif "file" in request.files:
file = request.files["file"]
# url = "https://firebasestorage.googleapis.com/v0/b/car-damage-detector-s34rrz.firebasestorage.app/o/users%2FYMd99dt33HaktTWpYp5MM5oYeBE3%2Fuploads%2F1737454072124000.jpg?alt=media&token=9eae79fa-4c06-41a5-9f58-236c39efaac0"
# File name for saving
file_name = "downloaded_image.jpg"
# Download the image
response = requests.get(url)
# Save the image to the current directory
if response.status_code == 200:
file_name = "downloaded_image.jpg"
image = Image.open(io.BytesIO(response.content))
if image.mode == "RGBA":
image = image.convert("RGB")
image.save(file_name, "JPEG", quality=100)
print(f"Image downloaded and saved as {file_name}")
else:
print(f"Failed to download image. Status code: {response.status_code}")
image = cv2.imread(file_name)
rf = Roboflow(api_key="LqD8Cs4OsoK8seO3CPkf")
project_parts = rf.workspace().project("car-parts-segmentation")
model_parts = project_parts.version(2).model
project_damage = rf.workspace().project("car-damage-detection-ha5mm")
model_damage = project_damage.version(1).model
# Run the damage detection model
result_damage = model_damage.predict(
file_name,
confidence=40,
).json()
# Extract detections from the result
detections_damage = sv.Detections.from_inference(result_damage)
# Read the input image
# Annotate damaged areas of the car
mask_annotator = sv.MaskAnnotator()
annotated_image_damage = mask_annotator.annotate(
scene=image, detections=detections_damage
)
# temp_dir = tempfile.mkdtemp()
# Define a repair cost dictionary (per part)
repair_costs = {
"damage-front-windscreen": 950,
"damaged-door": 720,
"damaged-fender": 680,
"damaged-front-bumper": 890,
"damaged-head-light": 530,
"damaged-hood": 970,
"damaged-rear-bumper": 910,
"damaged-rear-window": 760,
"damaged-side-window": 780,
"damaged-tail-light": 550,
"quaterpanel-dent": 640
}
total_cost = 0
# coordinates = list(map(int, detections_damage.xyxy.flatten()))
# num_damages = (
# len(coordinates) // 4
# ) # Each damage has 4 coordinates (x1, y1, x2, y2)
# Iterate through damages
# for i in range(num_damages):
# x1, y1, x2, y2 = coordinates[i * 4: (i + 1) * 4]
# # Ensure the coordinates are within image bounds
# x1, y1 = max(0, x1), max(0, y1)
# x2, y2 = min(image.shape[1], x2), min(image.shape[0], y2)
# # Crop the damaged region
# cropped_damage = image[y1:y2, x1:x2]
# # Check if the cropped region is valid
# if cropped_damage.size == 0:
# print(f"Skipping empty crop for damage region {i + 1}")
# continue
# # Save the cropped damaged area
# damage_image_path = os.path.join(temp_dir, f"damage_image_{i}.png")
# cv2.imwrite(damage_image_path, cropped_damage)
# # Run the parts detection model on the cropped damage
# result_parts = model_parts.predict(
# damage_image_path, confidence=15).json()
# detections_parts = sv.Detections.from_inference(result_parts)
# # Calculate repair cost for each detected part
# for part in result_parts["predictions"]:
# part_name = part["class"]
# damage_area = part["width"] * part["height"]
# cropped_area = (x2 - x1) * (y2 - y1)
# damage_percentage = (damage_area / cropped_area) * 100
# # Lookup cost and add to total
# base_cost = repair_cost_dict.get(
# part_name, 0
# ) # Default to 0 if part not in dict
# repair_cost = (damage_percentage / 100) * 10 * base_cost
# total_cost += round(repair_cost, ndigits=1)
# print(
# f"Damage {i + 1} - {part_name}: {damage_percentage:.2f}% damaged, Cost: ${repair_cost:.2f}"
# )
# # Annotate and save the result
# part_annotator = sv.LabelAnnotator()
# annotated_parts_image = part_annotator.annotate(
# scene=cropped_damage, detections=detections_parts
# )
# annotated_parts_path = os.path.join(
# temp_dir, f"annotated_parts_{i}.png")
# cv2.imwrite(annotated_parts_path, annotated_parts_image)
# # Save the overall annotated image
# annotated_image_path = os.path.join(temp_dir, "annotated_image_damage.png")
# cv2.imwrite(annotated_image_path, annotated_image_damage)
# # Return the total cost in the specified format
CLIENT = InferenceHTTPClient(
api_url="https://detect.roboflow.com",
api_key="LqD8Cs4OsoK8seO3CPkf"
)
print(file_name)
# Set confidence threshold to 50%
custom_configuration = InferenceConfiguration(confidence_threshold=0.7)
with CLIENT.use_configuration(custom_configuration):
result = CLIENT.infer(
file_name, model_id="car-recognition-v4/4")
filtered_labels = [item
for item in result["predictions"] if item["confidence"] >= threshold]
if filtered_labels == []:
i = 0.7
while filtered_labels == []:
i = i - 0.1
custom_configuration = InferenceConfiguration(
confidence_threshold=i)
with CLIENT.use_configuration(custom_configuration):
result = CLIENT.infer(
file_name, model_id="car-recognition-v4/4")
threshold = 0.3 # Adjust this as needed
filtered_labels = [item
for item in result["predictions"] if item["confidence"] >= threshold]
labels = filtered_labels
print(labels)
for class_ in labels:
total_cost += repair_costs.get(class_["class"],
0)
result = {"total_cost": round(total_cost, 2)}
print(result)
return jsonify(result)
@app.route("/generate-report", methods=["POST"])
def generate_report():
file = None
if "report_url" in request.form:
report_url = request.form["report_url"]
insurance_url = request.form["insurance_url"]
url = main(report_url, insurance_url, "output.pdf")
result = {"url": url}
return jsonify(result), 200
elif "file" in request.files:
file = request.files["file"]
with open("uploaded_report.pdf", "wb") as f:
f.write(file.read())
return jsonify({"message": "Something happened!."}), 404
@app.route("/ms-detection", methods=["POST"])
def predict():
file = request.files["file"]
if not file:
return jsonify({"error": "file not uploaded"}), 400
# Save file temporarily
temp_path = os.path.join(tempfile.gettempdir(), file.filename)
file.save(temp_path)
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
])
if file.filename.lower().endswith((".png", ".jpg", ".jpeg")):
image = Image.open(temp_path)
image_save_path = os.path.join(
tempfile.gettempdir(), file.filename.lower())
image.save(image_save_path)
def is_mri_image(image_path):
img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
if img is None:
return False # Invalid image
# Apply Canny edge detection
edges = cv2.Canny(img, 50, 150)
# Calculate edge density (MRI images have high edge presence)
edge_density = np.sum(edges > 0) / edges.size
print(edge_density)
return edge_density > 0.05
if (is_mri_image(temp_path)):
return jsonify({"message": "Not an mri image", "confidence": 0.95, "saved_path": image_save_path})
a, b = model.check_file(temp_path)
class ResNetRegression(nn.Module):
def __init__(self):
super(ResNetRegression, self).__init__()
self.model = models.resnet34(pretrained=True)
in_features = self.model.fc.in_features
# Change output layer for regression
self.model.fc = nn.Linear(in_features, 1)
def forward(self, x):
return self.model(x)
# Initialize Model, Loss, and Optimizer
model_new = ResNetRegression()
checkpoint = torch.load(
"/home/user/app/a.pth", weights_only=False, map_location=torch.device('cpu'))
def remove_module_from_checkpoint(checkpoint):
new_state_dict = {}
print(checkpoint.keys())
for key, value in checkpoint.items():
new_key = key.replace("module.", "")
new_state_dict[new_key] = value
checkpoint = new_state_dict
return checkpoint
checkpoint = remove_module_from_checkpoint(checkpoint)
model_new.load_state_dict(checkpoint)
image = Image.open(temp_path).convert("RGB")
output = model_new(transform(image).unsqueeze(0))
stage = output.item()
if not a == "No ms detected":
if stage <= 2.0:
stage = "Mild"
elif stage >= 2.0 and stage <= 3.2:
stage = "Moderate"
else:
stage = "Severe"
else:
stage = "No ms detected"
return jsonify({"message": a, "confidence": b, "stage": stage, "saved_path": image_save_path})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)