颅内出血是一种相对常见的疾病,其原因很多,包括创伤,中风,动脉瘤,血管畸形,高血压,非法药物和凝血障碍。
颅内出血有不同的泪下型:
我们将要使用的数据集来自Kaggle(https://www.kaggle.com/c/rsna-intracranial-hemorrhage-detection/data)。在此数据集中,我们将构建一种算法来检测颅内出血的不同类型。
先决条件
我们需要在计算机上安装Python。我们将使用numpy,pandas,seaborn,matplotlib,scipy,pydicom,keras,open-cv和sklearn等库。
加载机器学习数据集
我选择先借助pandas库加载机器学习数据集。
import os
import json
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pydicom
from keras import layers
from keras.applications import DenseNet121
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import Callback, ModelCheckpoint
from keras.preprocessing.image import ImageDataGenerator
from keras.initializers import Constant
from keras.models import Sequential
from keras.optimizers import Adam
from tensorflow.python.ops import array_ops
from tqdm import tqdm
from keras import backend as K
import tensorflow as tf
train_df = pd.read_csv('stage_2_train.csv')
sub_df = pd.read_csv('stage_2_sample_submission.csv')
查看机器学习数据集
train_df['filename'] = train_df['ID'].apply(lambda st: "ID_" + st.split('_')[1] + ".png")
train_df['type'] = train_df['ID'].apply(lambda st: st.split('_')[2])
sub_df['filename'] = sub_df['ID'].apply(lambda st: "ID_" + st.split('_')[1] + ".png")
sub_df['type'] = sub_df['ID'].apply(lambda st: st.split('_')[2])
print(train_df.shape)
train_df.head()
test_df = pd.DataFrame(sub_df.filename.unique(), columns=['filename'])
print(test_df.shape)
test_df.head()
我们只选择总训练数据集的一部分样本(随机选择400k文件),并将其称为sample_df。
np.random.seed(1749)
sample_files = np.random.choice(os.listdir('/stage_2_train/'), 4000)
sample_df = train_df[train_df.filename.apply(lambda x: x.replace('.png', '.dcm')).isin(sample_files)]
pivot_df只是重新格式化了sample_df,以便每个列都是一个标签(通过这种方式,我们可以在以后的数据生成器中使用多标签)。
pivot_df = sample_df[['Label', 'filename', 'type']].drop_duplicates().pivot(
index='filename', columns='type', values='Label').reset_index()
print(pivot_df.shape)
pivot_df.head()
图像处理
在此我们将重新缩放图像并调整图像大小并将其保存为png格式。
现在我们定义window_img函数,该函数用于精确定位图像对象所在的位置,window_img函数是取一幅图像,利用min-max scaler公式将其重新缩放为标准形式。
def window_image(img, window_center,window_width, intercept, slope, rescale=True):
img = (img*slope +intercept)
img_min = window_center - window_width//2
img_max = window_center + window_width//2
img[img<img_min] = img_min
img[img>img_max] = img_max
if rescale:
# Extra rescaling to 0-1, not in the original notebook
img = (img - img_min) / (img_max - img_min)
return img
在window_img函数中,我们传递img,window_center,window_width等参数来执行该任务。
第二个函数get_first_of_dicom_field_as_int用于检查x的类型是否等于pydicom.multival.MultiValue,然后返回x的第一个索引,否则返回x的整数部分。
def get_first_of_dicom_field_as_int(x):
#get x[0] as in int is x is a 'pydicom.multival.MultiValue', otherwise get int(x)
if type(x) == pydicom.multival.MultiValue:
return int(x[0])
else:
return int(x)
Get windowing函数用于获取窗口中心、窗口宽度值以及数据的截距和斜率值
def get_windowing(data):
dicom_fields = [data[('0028','1050')].value, #window center
data[('0028','1051')].value, #window width
data[('0028','1052')].value, #intercept
data[('0028','1053')].value] #slope
return [get_first_of_dicom_field_as_int(x) for x in dicom_fields]
现在我们将图像从dcm格式转换为png格式,然后使用dcmread函数读取路径并将其存储在dcm变量中。然后,我们调用get windowing函数并传递dcm变量。最后,我们调用window_img函数并将其存储在img中,并调整图像的大小并将其保存在目录中。
def save_and_resize(filenames, load_dir):
save_dir = '/kaggle/tmp/'
if not os.path.exists(save_dir):
os.makedirs(save_dir)
for filename in tqdm(filenames):
path = load_dir + filename
new_path = save_dir + filename.replace('.dcm', '.png')
dcm = pydicom.dcmread(path)
window_center , window_width, intercept, slope = get_windowing(dcm)
img = dcm.pixel_array
img = window_image(img, window_center, window_width, intercept, slope)
resized = cv2.resize(img, (224, 224))
res = cv2.imwrite(new_path, resized)
save_and_resize(filenames=sample_files, load_dir='/stage_2_train/')
save_and_resize(filenames=os.listdir('/stage_2_test/'), load_dir='/stage_2_test/')
应用深度学习模型
现在,我们定义自己的数据生成器函数,其验证拆分为0.15。现在我们再定义三个生成器,用于训练,测试和验证。
BATCH_SIZE = 64
def create_datagen():
return ImageDataGenerator(validation_split=0.15)
def create_test_gen():
return ImageDataGenerator().flow_from_dataframe(
test_df,
directory='/kaggle/tmp/',
x_col='filename',
class_mode=None,
target_size=(224, 224),
batch_size=BATCH_SIZE,
shuffle=False
)
def create_flow(datagen, subset):
return datagen.flow_from_dataframe(
pivot_df,
directory='/kaggle/tmp/',
x_col='filename',
y_col=['any', 'epidural', 'intraparenchymal',
'intraventricular', 'subarachnoid', 'subdural'],
class_mode='multi_output',
target_size=(224, 224),
batch_size=BATCH_SIZE,
subset=subset
)
# Using original generator
data_generator = create_datagen()
train_gen = create_flow(data_generator, 'training')
val_gen = create_flow(data_generator, 'validation')
test_gen = create_test_gen()
下一步是训练深度学习模型,但是在此之前,我们必须定义一个称为焦点损失的函数,该函数仅在数据不平衡时使用。
def focal_loss(prediction_tensor, target_tensor, weights=None, alpha=0.25, gamma=2):
r"""Compute focal loss for predictions.
Multi-labels Focal loss formula:
FL = -alpha * (z-p)^gamma * log(p) -(1-alpha) * p^gamma * log(1-p)
,which alpha = 0.25, gamma = 2, p = sigmoid(x), z = target_tensor.
"""
sigmoid_p = tf.nn.sigmoid(prediction_tensor)
zeros = array_ops.zeros_like(sigmoid_p, dtype=sigmoid_p.dtype)
# For poitive prediction, only need consider front part loss, back part is 0;
# target_tensor > zeros <=> z=1, so poitive coefficient = z - p.
pos_p_sub = array_ops.where(target_tensor > zeros, target_tensor - sigmoid_p, zeros)
# For negative prediction, only need consider back part loss, front part is 0;
# target_tensor > zeros <=> z=1, so negative coefficient = 0.
neg_p_sub = array_ops.where(target_tensor > zeros, zeros, sigmoid_p)
per_entry_cross_ent = - alpha * (pos_p_sub ** gamma) * tf.log(tf.clip_by_value(sigmoid_p, 1e-8, 1.0)) \
- (1 - alpha) * (neg_p_sub ** gamma) * tf.log(tf.clip_by_value(1.0 - sigmoid_p, 1e-8, 1.0))
return tf.reduce_sum(per_entry_cross_ent)
在以上函数中,我们正在计算多标签列的焦点损失值。焦点损失的公式为:
fl= -alpha * (z-p)^gamma * log(p) - (1-alpha) *p^gamma *log(1-p) where alpha=0.25, gamma=2, p=sigmoid(x), z=target_tensor
现在我们初始化densenet类
densenet = DenseNet121(
weights='../input/densenet-keras/DenseNet-BC-121-32-no-top.h5',
include_top=False,
input_shape=(224,224,3)
)
我们使用Keras定义我们的神经网络架构,我们使用Adam优化器以“ categorical_crossentropy”作为损失函数。
def build_model():
model = Sequential()
model.add(densenet)
model.add(layers.GlobalAveragePooling2D())
model.add(layers.Dropout(0.5))
# model.add(layers.Dense(6, activation='sigmoid',
# bias_initializer=Constant(value=-5.5)))
model.add(layers.Dense(6, activation='sigmoid'))
model.compile(
# loss=focal_loss,
loss='categorical_crossentropy',
optimizer=Adam(lr=0.001),
metrics=['accuracy']
)
return model
model = build_model()
model.summary()
现在创建检查点并将其保存为model.h5格式。
checkpoint = ModelCheckpoint(
'model.h5',
monitor='val_loss',
verbose=0,
save_best_only=True,
save_weights_only=False,
mode='auto'
)
total_steps = sample_files.shape[0] / BATCH_SIZE
history = model.fit_generator(
train_gen,
steps_per_epoch=2000,
validation_data=val_gen,
validation_steps=total_steps * 0.15,
callbacks=[checkpoint],
epochs=5
)
with open('history.json', 'w') as f:
json.dump(history.history, f)
history_df = pd.DataFrame(history.history)
history_df[['loss', 'val_loss']].plot()
history_df[['acc', 'val_acc']].plot()
我们可以使用Keras创建的深度学习模型将其“拟合”到我们的训练数据中了。
对测试集进行预测
Python进行预测的示例代码如下