Spaces:
Runtime error
Runtime error
from flask import Flask, request, jsonify, render_template | |
from PIL import Image | |
import io | |
import os | |
import requests | |
from roboflow import Roboflow | |
import supervision as sv | |
import cv2 | |
import tempfile | |
import gdown | |
import os | |
import requests | |
import requests | |
import cloudinary | |
import model | |
import cloudinary.uploader | |
from a import main | |
import numpy as np | |
import torchvision.transforms as transforms | |
# Initialize Flask app | |
app = Flask(__name__) | |
GDRIVE_MODEL_URL = "https://drive.google.com/uc?id=1fzKneepaRt_--dzamTcDBM-9d3_dLX7z" | |
LOCAL_MODEL_PATH = "checkpoint32.pth" | |
d = "https://drive.google.com/uc?id=1GfrlFNoa7E4liMHyMuF73nA21yT9SNSb" | |
def download_file_from_google_drive(): | |
gdown.download(GDRIVE_MODEL_URL, LOCAL_MODEL_PATH, quiet=False) | |
da = "a.pth" | |
def download_file_from_google_drived(): | |
gdown.download(d, da, quiet=False) | |
def download_model(): | |
if not os.path.exists(LOCAL_MODEL_PATH): | |
response = requests.get(GDRIVE_MODEL_URL, stream=True) | |
if response.status_code == 200: | |
with open(LOCAL_MODEL_PATH, "wb") as f: | |
f.write(response.content) | |
else: | |
raise Exception( | |
f"Failed to download model from Google Drive: {response.status_code}" | |
) | |
download_file_from_google_drive() | |
download_file_from_google_drived() | |
def home(): | |
return render_template("index.html") | |
def fetchImage(): | |
file = None | |
url = "" | |
if "url" in request.form: | |
url = request.form["url"] | |
response = requests.get(url) | |
file = io.BytesIO(response.content) | |
elif "file" in request.files: | |
file = request.files["file"] | |
# url = "https://firebasestorage.googleapis.com/v0/b/car-damage-detector-s34rrz.firebasestorage.app/o/users%2FYMd99dt33HaktTWpYp5MM5oYeBE3%2Fuploads%2F1737454072124000.jpg?alt=media&token=9eae79fa-4c06-41a5-9f58-236c39efaac0" | |
# File name for saving | |
file_name = "downloaded_image.jpg" | |
# Download the image | |
response = requests.get(url) | |
# Save the image to the current directory | |
if response.status_code == 200: | |
file_name = "downloaded_image.jpg" | |
image = Image.open(io.BytesIO(response.content)) | |
if image.mode == "RGBA": | |
image = image.convert("RGB") | |
image.save(file_name, "JPEG", quality=100) | |
print(f"Image downloaded and saved as {file_name}") | |
else: | |
print(f"Failed to download image. Status code: {response.status_code}") | |
# Load image | |
image = cv2.imread(file_name) | |
rf = Roboflow(api_key="LqD8Cs4OsoK8seO3CPkf") | |
project_parts = rf.workspace().project("car-parts-segmentation") | |
model_parts = project_parts.version(2).model | |
project_damage = rf.workspace().project("car-damage-detection-ha5mm") | |
model_damage = project_damage.version(1).model | |
# Run the damage detection model | |
result_damage = model_damage.predict( | |
file_name, | |
confidence=40, | |
).json() | |
# Extract detections from the result | |
detections_damage = sv.Detections.from_inference(result_damage) | |
# Read the input image | |
# Annotate damaged areas of the car | |
mask_annotator = sv.MaskAnnotator() | |
annotated_image_damage = mask_annotator.annotate( | |
scene=image, detections=detections_damage | |
) | |
# Create a temporary directory to save outputs | |
temp_dir = tempfile.mkdtemp() | |
# Define a repair cost dictionary (per part) | |
repair_cost_dict = { | |
"wheel": 100, # Base cost for wheel | |
"door": 200, # Base cost for door | |
"hood": 300, # Base cost for hood | |
"front_bumper": 250, # Base cost for bumper | |
"trunk": 200, | |
"front_glass": 150, | |
"back_left_door": 200, | |
"left_mirror": 20, | |
"back_glass": 150, | |
} | |
# Initialize total cost | |
total_cost = 0 | |
# Ensure coordinate processing is done in chunks of 4 | |
coordinates = list(map(int, detections_damage.xyxy.flatten())) | |
num_damages = ( | |
len(coordinates) // 4 | |
) # Each damage has 4 coordinates (x1, y1, x2, y2) | |
# Iterate through damages | |
for i in range(num_damages): | |
x1, y1, x2, y2 = coordinates[i * 4: (i + 1) * 4] | |
# Ensure the coordinates are within image bounds | |
x1, y1 = max(0, x1), max(0, y1) | |
x2, y2 = min(image.shape[1], x2), min(image.shape[0], y2) | |
# Crop the damaged region | |
cropped_damage = image[y1:y2, x1:x2] | |
# Check if the cropped region is valid | |
if cropped_damage.size == 0: | |
print(f"Skipping empty crop for damage region {i + 1}") | |
continue | |
# Save the cropped damaged area | |
damage_image_path = os.path.join(temp_dir, f"damage_image_{i}.png") | |
cv2.imwrite(damage_image_path, cropped_damage) | |
# Run the parts detection model on the cropped damage | |
result_parts = model_parts.predict( | |
damage_image_path, confidence=15).json() | |
detections_parts = sv.Detections.from_inference(result_parts) | |
# Calculate repair cost for each detected part | |
for part in result_parts["predictions"]: | |
part_name = part["class"] | |
damage_area = part["width"] * part["height"] | |
cropped_area = (x2 - x1) * (y2 - y1) | |
damage_percentage = (damage_area / cropped_area) * 100 | |
# Lookup cost and add to total | |
base_cost = repair_cost_dict.get( | |
part_name, 0 | |
) # Default to 0 if part not in dict | |
repair_cost = (damage_percentage / 100) * 10 * base_cost | |
total_cost += round(repair_cost, ndigits=1) | |
print( | |
f"Damage {i + 1} - {part_name}: {damage_percentage:.2f}% damaged, Cost: ${repair_cost:.2f}" | |
) | |
# Annotate and save the result | |
part_annotator = sv.LabelAnnotator() | |
annotated_parts_image = part_annotator.annotate( | |
scene=cropped_damage, detections=detections_parts | |
) | |
annotated_parts_path = os.path.join( | |
temp_dir, f"annotated_parts_{i}.png") | |
cv2.imwrite(annotated_parts_path, annotated_parts_image) | |
# Save the overall annotated image | |
annotated_image_path = os.path.join(temp_dir, "annotated_image_damage.png") | |
cv2.imwrite(annotated_image_path, annotated_image_damage) | |
# Return the total cost in the specified format | |
result = {"total_cost": total_cost} | |
print(result) | |
return jsonify(result) | |
def generate_report(): | |
file = None | |
if "report_url" in request.form: | |
report_url = request.form["report_url"] | |
insurance_url = request.form["insurance_url"] | |
url = main(report_url, insurance_url, "output.pdf") | |
result = {"url": url} | |
return jsonify(result), 200 | |
elif "file" in request.files: | |
file = request.files["file"] | |
with open("uploaded_report.pdf", "wb") as f: | |
f.write(file.read()) | |
return jsonify({"message": "Something happened!."}), 404 | |
def predict(): | |
file = request.files["file"] | |
if not file: | |
return jsonify({"error": "file not uploaded"}), 400 | |
# Save file temporarily | |
temp_path = os.path.join(tempfile.gettempdir(), file.filename) | |
file.save(temp_path) | |
transform = transforms.Compose([ | |
transforms.Resize((224, 224)), | |
transforms.ToTensor(), | |
]) | |
if file.filename.lower().endswith((".png", ".jpg", ".jpeg")): | |
image = Image.open(temp_path) | |
image_save_path = os.path.join( | |
tempfile.gettempdir(), file.filename.lower()) | |
image.save(image_save_path) | |
def is_mri_image(image_path): | |
img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) | |
if img is None: | |
return False # Invalid image | |
# Apply Canny edge detection | |
edges = cv2.Canny(img, 50, 150) | |
# Calculate edge density (MRI images have high edge presence) | |
edge_density = np.sum(edges > 0) / edges.size | |
print(edge_density) | |
return edge_density > 0.05 | |
if (is_mri_image(temp_path)): | |
return jsonify({"message": "Not an mri image", "confidence": 0.95, "saved_path": image_save_path}) | |
a, b = model.check_file(temp_path) | |
image = Image.open(temp_path).convert("RGB") | |
output = model(transform(image).unsqueeze(0).to(device)) | |
stage = output.item() | |
if stage <= 2.0: | |
stage = "Mild" | |
elif stage >= 2.0 and stage <= 3.2: | |
stage = "Moderate" | |
else: | |
stage = "Severe" | |
return jsonify({"message": a, "confidence": b, "stage": stage, "saved_path": image_save_path}) | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=7860) | |