from flask import Flask, request, jsonify, render_template from PIL import Image import io import os import requests from roboflow import Roboflow import supervision as sv import cv2 import tempfile import gdown import os import requests import requests import cloudinary import model import cloudinary.uploader from a import main # Initialize Flask app app = Flask(__name__) GDRIVE_MODEL_URL = "https://drive.google.com/uc?id=1fzKneepaRt_--dzamTcDBM-9d3_dLX7z" LOCAL_MODEL_PATH = "checkpoint32.pth" print(GDRIVE_MODEL_URL) def download_file_from_google_drive(): gdown.download(GDRIVE_MODEL_URL, LOCAL_MODEL_PATH, quiet=False) file_id = "1fzKneepaRt_--dzamTcDBM-9d3_dLX7z" destination = "checkpoint32.pth" def download_model(): if not os.path.exists(LOCAL_MODEL_PATH): response = requests.get(GDRIVE_MODEL_URL, stream=True) if response.status_code == 200: with open(LOCAL_MODEL_PATH, "wb") as f: f.write(response.content) else: raise Exception( f"Failed to download model from Google Drive: {response.status_code}" ) download_file_from_google_drive() @app.route("/") def home(): return render_template("index.html") @app.route("/fetch-image", methods=["POST"]) def fetchImage(): file = None if "url" in request.form: url = request.form["url"] response = requests.get(url) file = io.BytesIO(response.content) elif "file" in request.files: file = request.files["file"] # url = "https://firebasestorage.googleapis.com/v0/b/car-damage-detector-s34rrz.firebasestorage.app/o/users%2FYMd99dt33HaktTWpYp5MM5oYeBE3%2Fuploads%2F1737454072124000.jpg?alt=media&token=9eae79fa-4c06-41a5-9f58-236c39efaac0" # File name for saving file_name = "downloaded_image.jpg" # Download the image response = requests.get(url) # Save the image to the current directory if response.status_code == 200: file_name = "downloaded_image.jpg" image = Image.open(io.BytesIO(response.content)) if image.mode == "RGBA": image = image.convert("RGB") image.save(file_name, "JPEG", quality=100) print(f"Image downloaded and saved as {file_name}") else: print(f"Failed to download image. Status code: {response.status_code}") # Load image image = cv2.imread(file_name) rf = Roboflow(api_key="LqD8Cs4OsoK8seO3CPkf") project_parts = rf.workspace().project("car-parts-segmentation") model_parts = project_parts.version(2).model project_damage = rf.workspace().project("car-damage-detection-ha5mm") model_damage = project_damage.version(1).model # Run the damage detection model result_damage = model_damage.predict( file_name, confidence=40, ).json() # Extract detections from the result detections_damage = sv.Detections.from_inference(result_damage) # Read the input image # Annotate damaged areas of the car mask_annotator = sv.MaskAnnotator() annotated_image_damage = mask_annotator.annotate( scene=image, detections=detections_damage ) # Create a temporary directory to save outputs temp_dir = tempfile.mkdtemp() # Define a repair cost dictionary (per part) repair_cost_dict = { "wheel": 100, # Base cost for wheel "door": 200, # Base cost for door "hood": 300, # Base cost for hood "front_bumper": 250, # Base cost for bumper "trunk": 200, "front_glass": 150, "back_left_door": 200, "left_mirror": 20, "back_glass": 150, } # Initialize total cost total_cost = 0 # Ensure coordinate processing is done in chunks of 4 coordinates = list(map(int, detections_damage.xyxy.flatten())) num_damages = ( len(coordinates) // 4 ) # Each damage has 4 coordinates (x1, y1, x2, y2) # Iterate through damages for i in range(num_damages): x1, y1, x2, y2 = coordinates[i * 4: (i + 1) * 4] # Ensure the coordinates are within image bounds x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(image.shape[1], x2), min(image.shape[0], y2) # Crop the damaged region cropped_damage = image[y1:y2, x1:x2] # Check if the cropped region is valid if cropped_damage.size == 0: print(f"Skipping empty crop for damage region {i + 1}") continue # Save the cropped damaged area damage_image_path = os.path.join(temp_dir, f"damage_image_{i}.png") cv2.imwrite(damage_image_path, cropped_damage) # Run the parts detection model on the cropped damage result_parts = model_parts.predict( damage_image_path, confidence=15).json() detections_parts = sv.Detections.from_inference(result_parts) # Calculate repair cost for each detected part for part in result_parts["predictions"]: part_name = part["class"] damage_area = part["width"] * part["height"] cropped_area = (x2 - x1) * (y2 - y1) damage_percentage = (damage_area / cropped_area) * 100 # Lookup cost and add to total base_cost = repair_cost_dict.get( part_name, 0 ) # Default to 0 if part not in dict repair_cost = (damage_percentage / 100) * 10 * base_cost total_cost += round(repair_cost, ndigits=1) print( f"Damage {i + 1} - {part_name}: {damage_percentage:.2f}% damaged, Cost: ${repair_cost:.2f}" ) # Annotate and save the result part_annotator = sv.LabelAnnotator() annotated_parts_image = part_annotator.annotate( scene=cropped_damage, detections=detections_parts ) annotated_parts_path = os.path.join( temp_dir, f"annotated_parts_{i}.png") cv2.imwrite(annotated_parts_path, annotated_parts_image) # Save the overall annotated image annotated_image_path = os.path.join(temp_dir, "annotated_image_damage.png") cv2.imwrite(annotated_image_path, annotated_image_damage) # Return the total cost in the specified format result = {"total_cost": total_cost} print(result) return jsonify(result) @app.route("/generate-report", methods=["POST"]) def generate_report(): file = None if "report_url" in request.form: report_url = request.form["report_url"] insurance_url = request.form["insurance_url"] url = main(report_url, insurance_url, "output.pdf") result = {"url": url} return jsonify(result), 200 elif "file" in request.files: file = request.files["file"] with open("uploaded_report.pdf", "wb") as f: f.write(file.read()) return jsonify({"message": "Something happened!."}), 404 @app.route("/ms-detection", methods=["POST"]) def predict(): file = request.files["file"] if not file: return jsonify({"error": "file not uploaded"}), 400 # Save file temporarily temp_path = os.path.join(tempfile.gettempdir(), file.filename) file.save(temp_path) if file.filename.lower().endswith((".png", ".jpg", ".jpeg")): image = Image.open(temp_path) image_save_path = os.path.join( tempfile.gettempdir(), "saved_image.jpg") image.save(image_save_path) return jsonify({"message": model.check_file(temp_path), "saved_path": image_save_path}) if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)