""" Preprocessing script for DICOM medical images """ import os import numpy as np import pydicom import cv2 from glob import glob from tqdm import tqdm import argparse def apply_window_level(image, window_center, window_width): """ Apply windowing to the DICOM image to enhance visualization Args: image (numpy.ndarray): Input image window_center (float): Window center (level) window_width (float): Window width Returns: numpy.ndarray: Windowed image """ img_min = window_center - window_width // 2 img_max = window_center + window_width // 2 windowed = np.clip(image, img_min, img_max) windowed = (windowed - img_min) / (img_max - img_min) * 255.0 return windowed.astype(np.uint8) def process_dicom_files(input_dir, output_dir): """ Process all DICOM files in the input directory Args: input_dir (str): Directory containing DICOM files output_dir (str): Directory to save processed images """ # Create output directories os.makedirs(output_dir, exist_ok=True) images_dir = os.path.join(output_dir, "images") os.makedirs(images_dir, exist_ok=True) # Find all DICOM files dicom_files = glob(os.path.join(input_dir, "**", "*.dcm"), recursive=True) print(f"Found {len(dicom_files)} DICOM files") # Process each file for i, dicom_path in enumerate(tqdm(dicom_files, desc="Processing DICOM files")): try: # Read DICOM file dicom = pydicom.dcmread(dicom_path) # Extract image data image = dicom.pixel_array # Apply windowing if available if hasattr(dicom, 'WindowCenter') and hasattr(dicom, 'WindowWidth'): window_center = dicom.WindowCenter window_width = dicom.WindowWidth # Handle multiple window values if isinstance(window_center, pydicom.multival.MultiValue): window_center = window_center[0] if isinstance(window_width, pydicom.multival.MultiValue): window_width = window_width[0] image = apply_window_level(image, window_center, window_width) else: # Apply default windowing for CT images if dicom.Modality == "CT": image = apply_window_level(image, 40, 400) # Soft tissue window else: # Normalize to 0-255 range image = ((image - image.min()) / (image.max() - image.min() + 1e-8) * 255).astype(np.uint8) # Generate output filename patient_id = dicom.PatientID if hasattr(dicom, 'PatientID') else "unknown" series_uid = dicom.SeriesInstanceUID if hasattr(dicom, 'SeriesInstanceUID') else "unknown" instance_uid = dicom.SOPInstanceUID if hasattr(dicom, 'SOPInstanceUID') else str(i) output_filename = f"{patient_id}_{series_uid[-8:]}_{instance_uid[-8:]}.png" output_path = os.path.join(images_dir, output_filename) # Save image cv2.imwrite(output_path, image) # Save metadata with open(os.path.join(output_dir, "metadata.txt"), "a") as f: f.write(f"{output_filename},{dicom_path}\n") except Exception as e: print(f"Error processing {dicom_path}: {e}") print(f"Processing complete. Processed images saved to {images_dir}") if __name__ == "__main__": parser = argparse.ArgumentParser(description='Process DICOM files') parser.add_argument('--input', type=str, required=True, help='Input directory with DICOM files') parser.add_argument('--output', type=str, default='./processed_data', help='Output directory') args = parser.parse_args() process_dicom_files(args.input, args.output)