|
""" |
|
Preprocessing script for DICOM medical images |
|
""" |
|
import os |
|
import numpy as np |
|
import pydicom |
|
import cv2 |
|
from glob import glob |
|
from tqdm import tqdm |
|
import argparse |
|
|
|
def apply_window_level(image, window_center, window_width): |
|
""" |
|
Apply windowing to the DICOM image to enhance visualization |
|
|
|
Args: |
|
image (numpy.ndarray): Input image |
|
window_center (float): Window center (level) |
|
window_width (float): Window width |
|
|
|
Returns: |
|
numpy.ndarray: Windowed image |
|
""" |
|
img_min = window_center - window_width // 2 |
|
img_max = window_center + window_width // 2 |
|
|
|
windowed = np.clip(image, img_min, img_max) |
|
windowed = (windowed - img_min) / (img_max - img_min) * 255.0 |
|
|
|
return windowed.astype(np.uint8) |
|
|
|
def process_dicom_files(input_dir, output_dir): |
|
""" |
|
Process all DICOM files in the input directory |
|
|
|
Args: |
|
input_dir (str): Directory containing DICOM files |
|
output_dir (str): Directory to save processed images |
|
""" |
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
images_dir = os.path.join(output_dir, "images") |
|
os.makedirs(images_dir, exist_ok=True) |
|
|
|
|
|
dicom_files = glob(os.path.join(input_dir, "**", "*.dcm"), recursive=True) |
|
print(f"Found {len(dicom_files)} DICOM files") |
|
|
|
|
|
for i, dicom_path in enumerate(tqdm(dicom_files, desc="Processing DICOM files")): |
|
try: |
|
|
|
dicom = pydicom.dcmread(dicom_path) |
|
|
|
|
|
image = dicom.pixel_array |
|
|
|
|
|
if hasattr(dicom, 'WindowCenter') and hasattr(dicom, 'WindowWidth'): |
|
window_center = dicom.WindowCenter |
|
window_width = dicom.WindowWidth |
|
|
|
|
|
if isinstance(window_center, pydicom.multival.MultiValue): |
|
window_center = window_center[0] |
|
if isinstance(window_width, pydicom.multival.MultiValue): |
|
window_width = window_width[0] |
|
|
|
image = apply_window_level(image, window_center, window_width) |
|
else: |
|
|
|
if dicom.Modality == "CT": |
|
image = apply_window_level(image, 40, 400) |
|
else: |
|
|
|
image = ((image - image.min()) / (image.max() - image.min() + 1e-8) * 255).astype(np.uint8) |
|
|
|
|
|
patient_id = dicom.PatientID if hasattr(dicom, 'PatientID') else "unknown" |
|
series_uid = dicom.SeriesInstanceUID if hasattr(dicom, 'SeriesInstanceUID') else "unknown" |
|
instance_uid = dicom.SOPInstanceUID if hasattr(dicom, 'SOPInstanceUID') else str(i) |
|
|
|
output_filename = f"{patient_id}_{series_uid[-8:]}_{instance_uid[-8:]}.png" |
|
output_path = os.path.join(images_dir, output_filename) |
|
|
|
|
|
cv2.imwrite(output_path, image) |
|
|
|
|
|
with open(os.path.join(output_dir, "metadata.txt"), "a") as f: |
|
f.write(f"{output_filename},{dicom_path}\n") |
|
|
|
except Exception as e: |
|
print(f"Error processing {dicom_path}: {e}") |
|
|
|
print(f"Processing complete. Processed images saved to {images_dir}") |
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser(description='Process DICOM files') |
|
parser.add_argument('--input', type=str, required=True, help='Input directory with DICOM files') |
|
parser.add_argument('--output', type=str, default='./processed_data', help='Output directory') |
|
|
|
args = parser.parse_args() |
|
|
|
process_dicom_files(args.input, args.output) |