from flask import Flask, request, jsonify, render_template from PIL import Image import io import os import requests from roboflow import Roboflow import supervision as sv import cv2 import tempfile import gdown import os import requests import requests import cloudinary import model import cloudinary.uploader from a import main import numpy as np import torchvision.transforms as transforms import pandas as pd import nibabel as nib import numpy as np import torch import torch.nn as nn import torchvision import torchvision.transforms as transforms import cv2 from PIL import Image from sklearn.model_selection import train_test_split import os import torch import torch.nn as nn import torch.optim as optim import torchvision.transforms as transforms import torchvision.models as models from torch.utils.data import Dataset, DataLoader import pandas as pd from PIL import Image import os from sklearn.model_selection import train_test_split from sklearn.preprocessing import MinMaxScaler from inference_sdk import InferenceHTTPClient, InferenceConfiguration # Initialize Flask app app = Flask(__name__) GDRIVE_MODEL_URL = "https://drive.google.com/uc?id=1fzKneepaRt_--dzamTcDBM-9d3_dLX7z" LOCAL_MODEL_PATH = "checkpoint32.pth" d = "https://drive.google.com/uc?id=1GfrlFNoa7E4liMHyMuF73nA21yT9SNSb" def download_file_from_google_drive(): gdown.download(GDRIVE_MODEL_URL, LOCAL_MODEL_PATH, quiet=False) da = "a.pth" def download_file_from_google_drived(): gdown.download(d, da, quiet=False) def download_model(): if not os.path.exists(LOCAL_MODEL_PATH): response = requests.get(GDRIVE_MODEL_URL, stream=True) if response.status_code == 200: with open(LOCAL_MODEL_PATH, "wb") as f: f.write(response.content) else: raise Exception( f"Failed to download model from Google Drive: {response.status_code}" ) download_file_from_google_drive() download_file_from_google_drived() @app.route("/") def home(): return render_template("index.html") @app.route("/fetch-image", methods=["POST"]) def fetchImage(): file = None url = "" if "url" in request.form: url = request.form["url"] response = requests.get(url) file = io.BytesIO(response.content) elif "file" in request.files: file = request.files["file"] # url = "https://firebasestorage.googleapis.com/v0/b/car-damage-detector-s34rrz.firebasestorage.app/o/users%2FYMd99dt33HaktTWpYp5MM5oYeBE3%2Fuploads%2F1737454072124000.jpg?alt=media&token=9eae79fa-4c06-41a5-9f58-236c39efaac0" # File name for saving file_name = "downloaded_image.jpg" # Download the image response = requests.get(url) # Save the image to the current directory if response.status_code == 200: file_name = "downloaded_image.jpg" image = Image.open(io.BytesIO(response.content)) if image.mode == "RGBA": image = image.convert("RGB") image.save(file_name, "JPEG", quality=100) print(f"Image downloaded and saved as {file_name}") else: print(f"Failed to download image. Status code: {response.status_code}") image = cv2.imread(file_name) rf = Roboflow(api_key="LqD8Cs4OsoK8seO3CPkf") project_parts = rf.workspace().project("car-parts-segmentation") model_parts = project_parts.version(2).model project_damage = rf.workspace().project("car-damage-detection-ha5mm") model_damage = project_damage.version(1).model # Run the damage detection model result_damage = model_damage.predict( file_name, confidence=40, ).json() # Extract detections from the result detections_damage = sv.Detections.from_inference(result_damage) # Read the input image # Annotate damaged areas of the car mask_annotator = sv.MaskAnnotator() annotated_image_damage = mask_annotator.annotate( scene=image, detections=detections_damage ) # temp_dir = tempfile.mkdtemp() # Define a repair cost dictionary (per part) repair_costs = { "Car-Damage-Detection-1KxY": 1000, # General damage assessment cost "Bodypanel-Dent": 200*2, "Front-Windscreen-Damage": 400, "Headlight-Damage": 250*2*2, "Rear-windscreen-Damage": 350, "RunningBoard-Dent": 150, "Sidemirror-Damage": 180*2, "Signlight-Damage": 120*2, "Taillight-Damage": 220*2, "back-bumper": 500, "back-glass": 400, "bonnet-dent": 300, "boot-dent": 350, "broken_lamp": 100*2*2, "crack": 250*2*2, "damaged-door": 600, "damaged-front-bumper": 550, "damaged-head-light": 270*2, "damaged-hood": 500, "damaged-rear-bumper": 520, "damaged-rear-window": 380*2, "damaged-tail-light": 230*2, "damaged-trunk": 600, "damaged-window": 280*2*2, "damaged-windscreen": 450, "dent": 200*2, "dent-or-scratch": 180*2, "door": 700, "doorouter-dent": 250, "fender-dent": 220*2, "flat_tire": 100*2*2, "front-bumper": 500, "front-bumper-dent": 450, "front-glass": 400, "headlight": 250*2, "hood": 500, "mirror": 180*2*2, "pillar-dent": 220*2, "quaterpanel-dent": 270, "rear-bumper-dent": 480, "roof-dent": 400, "scratch": 150, "shattered_glass": 500, "taillight": 220*2, "trunk": 600, "wheel": 250*2, "window": 300, } total_cost = 0 # coordinates = list(map(int, detections_damage.xyxy.flatten())) # num_damages = ( # len(coordinates) // 4 # ) # Each damage has 4 coordinates (x1, y1, x2, y2) # Iterate through damages # for i in range(num_damages): # x1, y1, x2, y2 = coordinates[i * 4: (i + 1) * 4] # # Ensure the coordinates are within image bounds # x1, y1 = max(0, x1), max(0, y1) # x2, y2 = min(image.shape[1], x2), min(image.shape[0], y2) # # Crop the damaged region # cropped_damage = image[y1:y2, x1:x2] # # Check if the cropped region is valid # if cropped_damage.size == 0: # print(f"Skipping empty crop for damage region {i + 1}") # continue # # Save the cropped damaged area # damage_image_path = os.path.join(temp_dir, f"damage_image_{i}.png") # cv2.imwrite(damage_image_path, cropped_damage) # # Run the parts detection model on the cropped damage # result_parts = model_parts.predict( # damage_image_path, confidence=15).json() # detections_parts = sv.Detections.from_inference(result_parts) # # Calculate repair cost for each detected part # for part in result_parts["predictions"]: # part_name = part["class"] # damage_area = part["width"] * part["height"] # cropped_area = (x2 - x1) * (y2 - y1) # damage_percentage = (damage_area / cropped_area) * 100 # # Lookup cost and add to total # base_cost = repair_cost_dict.get( # part_name, 0 # ) # Default to 0 if part not in dict # repair_cost = (damage_percentage / 100) * 10 * base_cost # total_cost += round(repair_cost, ndigits=1) # print( # f"Damage {i + 1} - {part_name}: {damage_percentage:.2f}% damaged, Cost: ${repair_cost:.2f}" # ) # # Annotate and save the result # part_annotator = sv.LabelAnnotator() # annotated_parts_image = part_annotator.annotate( # scene=cropped_damage, detections=detections_parts # ) # annotated_parts_path = os.path.join( # temp_dir, f"annotated_parts_{i}.png") # cv2.imwrite(annotated_parts_path, annotated_parts_image) # # Save the overall annotated image # annotated_image_path = os.path.join(temp_dir, "annotated_image_damage.png") # cv2.imwrite(annotated_image_path, annotated_image_damage) # # Return the total cost in the specified format CLIENT = InferenceHTTPClient( api_url="https://detect.roboflow.com", api_key="LqD8Cs4OsoK8seO3CPkf" ) print(file_name) # Set confidence threshold to 50% custom_configuration = InferenceConfiguration(confidence_threshold=0.7) with CLIENT.use_configuration(custom_configuration): result = CLIENT.infer( file_name, model_id="car-damage-detection-krsix/1") print(result) threshold = 0.3 # Adjust this as needed filtered_labels = [item for item in result["predictions"] if item["confidence"] >= threshold] print(filtered_labels) labels = filtered_labels print(labels) for class_ in labels: total_cost += repair_costs.get(class_["class"], 0) result = {"total_cost": round(total_cost, 2)} print(result) return jsonify(result) @app.route("/generate-report", methods=["POST"]) def generate_report(): file = None if "report_url" in request.form: report_url = request.form["report_url"] insurance_url = request.form["insurance_url"] url = main(report_url, insurance_url, "output.pdf") result = {"url": url} return jsonify(result), 200 elif "file" in request.files: file = request.files["file"] with open("uploaded_report.pdf", "wb") as f: f.write(file.read()) return jsonify({"message": "Something happened!."}), 404 @app.route("/ms-detection", methods=["POST"]) def predict(): file = request.files["file"] if not file: return jsonify({"error": "file not uploaded"}), 400 # Save file temporarily temp_path = os.path.join(tempfile.gettempdir(), file.filename) file.save(temp_path) transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), ]) if file.filename.lower().endswith((".png", ".jpg", ".jpeg")): image = Image.open(temp_path) image_save_path = os.path.join( tempfile.gettempdir(), file.filename.lower()) image.save(image_save_path) def is_mri_image(image_path): img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) if img is None: return False # Invalid image # Apply Canny edge detection edges = cv2.Canny(img, 50, 150) # Calculate edge density (MRI images have high edge presence) edge_density = np.sum(edges > 0) / edges.size print(edge_density) return edge_density > 0.05 if (is_mri_image(temp_path)): return jsonify({"message": "Not an mri image", "confidence": 0.95, "saved_path": image_save_path}) a, b = model.check_file(temp_path) class ResNetRegression(nn.Module): def __init__(self): super(ResNetRegression, self).__init__() self.model = models.resnet34(pretrained=True) in_features = self.model.fc.in_features # Change output layer for regression self.model.fc = nn.Linear(in_features, 1) def forward(self, x): return self.model(x) # Initialize Model, Loss, and Optimizer model_new = ResNetRegression() checkpoint = torch.load( "/home/user/app/a.pth", weights_only=False, map_location=torch.device('cpu')) def remove_module_from_checkpoint(checkpoint): new_state_dict = {} print(checkpoint.keys()) for key, value in checkpoint.items(): new_key = key.replace("module.", "") new_state_dict[new_key] = value checkpoint = new_state_dict return checkpoint checkpoint = remove_module_from_checkpoint(checkpoint) model_new.load_state_dict(checkpoint) image = Image.open(temp_path).convert("RGB") output = model_new(transform(image).unsqueeze(0)) stage = output.item() if not a == "No ms detected": if stage <= 2.0: stage = "Mild" elif stage >= 2.0 and stage <= 3.2: stage = "Moderate" else: stage = "Severe" else: stage = "No ms detected" return jsonify({"message": a, "confidence": b, "stage": stage, "saved_path": image_save_path}) if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)