Spaces:
Configuration error
Configuration error
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# | |
# Copyright (c) 2022 Intel Corporation | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
import os | |
import shutil | |
import numpy as np | |
import pandas as pd | |
from sklearn.model_selection import train_test_split | |
from collections import defaultdict | |
def copy_files_src_to_tgt(samples, fns_dict, src_folder, tgt_folder): | |
for sample in samples: | |
files_to_copy = fns_dict.get(sample) | |
for _file in files_to_copy: | |
src_fn = os.path.join(src_folder, _file) | |
tgt_fn = os.path.join(tgt_folder, _file) | |
shutil.copy2(src_fn, tgt_fn) | |
def split_images(src_folder, tgt_folder): | |
labels = os.listdir(src_folder) | |
print("Number of labels = ", len(labels)) | |
print("Labels are: \n", labels) | |
for label in labels: | |
fns = os.listdir(os.path.join(src_folder, label)) | |
fns.sort() | |
fns_root = ['_'.join(x.split('_')[:2]) for x in fns] | |
# Convert list of tuples to dictionary value lists | |
print("\nCreating default dict for stratifying the subject in {}.".format(label)) | |
fns_dict = defaultdict(list) | |
for i, j in zip(fns_root, fns): | |
fns_dict[i].append(j) | |
train_samples, test_samples = train_test_split(list(fns_dict.keys()), test_size=0.2, random_state=100) | |
src_dir = os.path.join(src_folder, label) | |
tgt_dir = os.path.join(tgt_folder, 'train', label) | |
os.makedirs(tgt_dir, exist_ok=True) | |
copy_files_src_to_tgt(train_samples, fns_dict, src_dir, tgt_dir) | |
tgt_dir = os.path.join(tgt_folder, 'test', label) | |
os.makedirs(tgt_dir, exist_ok=True) | |
copy_files_src_to_tgt(test_samples, fns_dict, src_dir, tgt_dir) | |
print("Done splitting the files for label = {}\n".format(label)) | |
print("Done splitting the data. Output data is here: ", tgt_folder) | |
def get_subject_id(image_name): | |
image_name = image_name.split("/")[-1] | |
patient_id = "".join(image_name.split("_")[:2])[1:] | |
return patient_id | |
def create_patient_id_list(image_data_folder, folder): | |
folder_pth = os.path.join(folder, image_data_folder) | |
patient_id_list = [] | |
for fldr in os.listdir(folder_pth): | |
for f in os.listdir(os.path.join(folder_pth, fldr)): | |
patient_id_list.append(get_subject_id(f)) | |
return np.unique(patient_id_list) | |
def read_annotation_file( | |
folder, | |
file_name, | |
label_column, | |
data_column, | |
patient_id, | |
patient_id_list, | |
image_data_folder | |
): | |
df_path = os.path.join(folder, file_name) | |
df = pd.read_csv(df_path) | |
label_map, reverse_label_map = label2map(df, label_column) | |
if patient_id_list is not None: | |
df = df[df[patient_id].isin(patient_id_list)] | |
else: | |
image_name_list = [] | |
for label in os.listdir(image_data_folder): | |
image_name_list.extend(os.listdir(os.path.join(image_data_folder, label))) | |
df = df[df[patient_id].isin(np.unique([get_subject_id(i) for i in image_name_list]))] | |
df_new = pd.DataFrame(columns=[label_column, data_column, patient_id]) | |
for i in df[patient_id].unique(): | |
annotation = " ".join(df[df[patient_id].isin([i])][data_column].to_list()) | |
temp_labels = df[df[patient_id] == i][label_column].unique() | |
if len(temp_labels) == 1: | |
df_new.loc[len(df_new)] = [temp_labels[0], annotation, i] | |
else: | |
if patient_id_list is not None: | |
# this is the case only shows for inference | |
# label assigne as a place holder | |
df_new.loc[len(df_new)] = ["Normal", annotation, i] | |
else: | |
Warning("Conflict in labelling ....") | |
return df_new, label_map, reverse_label_map | |
def label2map(df, label_column): | |
label_map, reverse_label_map = {}, {} | |
for i, v in enumerate(df[label_column].unique().tolist()): | |
label_map[v] = i | |
reverse_label_map[i] = v | |
return label_map, reverse_label_map | |
def create_train_test_set(df, patient_id, patient_id_list): | |
train_label, test_label = train_test_split( | |
patient_id_list, test_size=0.33, random_state=42 | |
) | |
df_test = df[df[patient_id].isin(test_label)] | |
df_train = df[df[patient_id].isin(train_label)] | |
return df_train, df_test | |
def split_annotation(folder, file_name, image_data_folder): | |
label_column = "label" | |
data_column = "symptoms" | |
patient_id = "Patient_ID" | |
patient_id_list = None | |
df, label_map, reverse_label_map = read_annotation_file( | |
folder, | |
file_name, | |
label_column, | |
data_column, | |
patient_id, | |
patient_id_list, | |
image_data_folder | |
) | |
patient_id_list = create_patient_id_list(image_data_folder, folder) | |
df_train, df_test = create_train_test_set(df, patient_id, patient_id_list) | |
return df_train | |