从本质上讲,深度残差收缩网络属于卷积神经网络,是深度残差网络(deep residual network, ResNet)的一个变种。它的核心思想在于,在深度学习进行特征学习的过程中,剔除冗余信息是非常重要的;软阈值化是一种非常灵活的、删除冗余信息的方式。
1.深度残差网络
首先,在介绍深度残差收缩网络的时候,经常需要从深度残差网络开始讲起。下图展示了深度残差网络的基本模块,包括一些非线性层(残差路径)和一个跨层的恒等连接。恒等连接是深度残差网络的核心,是其优异性能的一个保障。
2.深度残差收缩网络
深度残差收缩网络,就是对深度残差网络的残差路径进行收缩的一种网络。这里的“收缩”指的就是软阈值化。
软阈值化是许多信号降噪方法的核心步骤,它是将接近于零(或者说绝对值低于某一阈值τ)的特征置为0,也就是将[-τ, τ]区间内的特征置为0,让其他的、距0较远的特征也朝着0进行收缩。
如果和前一个卷积层的偏置b放在一起看的话,这个置为零的区间就变成了[-τ+b, τ+b]。因为τ和b都是可以自动学习得到的参数,这个角度看的话,软阈值化其实是可以将任意区间的特征置为零,是一种更灵活的、删除某个取值范围特征的方式,也可以理解成一种更灵活的非线性映射。
从另一个方面来看,前面的两个卷积层、两个批标准化和两个激活函数,将冗余信息的特征,变换成接近于零的值;将有用的特征,变换成远离零的值。之后,通过自动学习得到一组阈值,利用软阈值化将冗余特征剔除掉,将有用特征保留下来。
通过堆叠一定数量的基本模块,可以构成完整的深度残差收缩网络,如下图所示:
3.图像识别及Keras编程
虽然深度残差收缩网络原先是应用于基于振动信号的故障诊断,但是深度残差收缩网络事实上是一种通用的特征学习方法,相信在很多任务(计算机视觉、语音、文本)中都可能有一定的用处。
下面是基于深度残差收缩网络的MNIST手写数字识别程序(程序很简单,仅供参考):
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Sat Dec 28 23:24:05 2019
- Implemented using TensorFlow 1.0.1 and Keras 2.2.1
-
- M. Zhao, S. Zhong, X. Fu, et al., Deep Residual Shrinkage Networks for Fault Diagnosis,
- IEEE Transactions on Industrial Informatics, 2019, DOI: 10.1109/TII.2019.2943898
- @author: me
- """
- from __future__ import print_function
- import keras
- import numpy as np
- from keras.datasets import mnist
- from keras.layers import Dense, Conv2D, BatchNormalization, Activation
- from keras.layers import AveragePooling2D, Input, GlobalAveragePooling2D
- from keras.optimizers import Adam
- from keras.regularizers import l2
- from keras import backend as K
- from keras.models import Model
- from keras.layers.core import Lambda
- K.set_learning_phase(1)
- # Input image dimensions
- img_rows, img_cols = 28, 28
- # The data, split between train and test sets
- (x_train, y_train), (x_test, y_test) = mnist.load_data()
- if K.image_data_format() == 'channels_first':
- x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
- x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
- input_shape = (1, img_rows, img_cols)
- else:
- x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
- x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
- input_shape = (img_rows, img_cols, 1)
- # Noised data
- x_train = x_train.astype('float32') / 255. + 0.5*np.random.random([x_train.shape[0], img_rows, img_cols, 1])
- x_test = x_test.astype('float32') / 255. + 0.5*np.random.random([x_test.shape[0], img_rows, img_cols, 1])
- print('x_train shape:', x_train.shape)
- print(x_train.shape[0], 'train samples')
- print(x_test.shape[0], 'test samples')
- # convert class vectors to binary class matrices
- y_train = keras.utils.to_categorical(y_train, 10)
- y_test = keras.utils.to_categorical(y_test, 10)
- def abs_backend(inputs):
- return K.abs(inputs)
- def expand_dim_backend(inputs):
- return K.expand_dims(K.expand_dims(inputs,1),1)
- def sign_backend(inputs):
- return K.sign(inputs)
- def pad_backend(inputs, in_channels, out_channels):
- pad_dim = (out_channels - in_channels)//2
- return K.spatial_3d_padding(inputs, padding = ((0,0),(0,0),(pad_dim,pad_dim)))
- # Residual Shrinakge Block
- def residual_shrinkage_block(incoming, nb_blocks, out_channels, downsample=False,
- downsample_strides=2):
-
- residual = incoming
- in_channels = incoming.get_shape().as_list()[-1]
-
- for i in range(nb_blocks):
-
- identity = residual
-
- if not downsample:
- downsample_strides = 1
-
- residual = BatchNormalization()(residual)
- residual = Activation('relu')(residual)
- residual = Conv2D(out_channels, 3, strides=(downsample_strides, downsample_strides),
- padding='same', kernel_initializer='he_normal',
- kernel_regularizer=l2(1e-4))(residual)
-
- residual = BatchNormalization()(residual)
- residual = Activation('relu')(residual)
- residual = Conv2D(out_channels, 3, padding='same', kernel_initializer='he_normal',
- kernel_regularizer=l2(1e-4))(residual)
-
- # Calculate global means
- residual_abs = Lambda(abs_backend)(residual)
- abs_mean = GlobalAveragePooling2D()(residual_abs)
-
- # Calculate scaling coefficients
- scales = Dense(out_channels, activation=None, kernel_initializer='he_normal',
- kernel_regularizer=l2(1e-4))(abs_mean)
- scales = BatchNormalization()(scales)
- scales = Activation('relu')(scales)
- scales = Dense(out_channels, activation='sigmoid', kernel_regularizer=l2(1e-4))(scales)
- scales = Lambda(expand_dim_backend)(scales)
-
- # Calculate thresholds
- thres = keras.layers.multiply([abs_mean, scales])
-
- # Soft thresholding
- sub = keras.layers.subtract([residual_abs, thres])
- zeros = keras.layers.subtract([sub, sub])
- n_sub = keras.layers.maximum([sub, zeros])
- residual = keras.layers.multiply([Lambda(sign_backend)(residual), n_sub])
-
- # Downsampling (it is important to use the pooL-size of (1, 1))
- if downsample_strides > 1:
- identity = AveragePooling2D(pool_size=(1,1), strides=(2,2))(identity)
-
- # Zero_padding to match channels (it is important to use zero padding rather than 1by1 convolution)
- if in_channels != out_channels:
- identity = Lambda(pad_backend)(identity, in_channels, out_channels)
-
- residual = keras.layers.add([residual, identity])
-
- return residual
- # define and train a model
- inputs = Input(shape=input_shape)
- net = Conv2D(8, 3, padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(inputs)
- net = residual_shrinkage_block(net, 1, 8, downsample=True)
- net = BatchNormalization()(net)
- net = Activation('relu')(net)
- net = GlobalAveragePooling2D()(net)
- outputs = Dense(10, activation='softmax', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(net)
- model = Model(inputs=inputs, outputs=outputs)
- model.compile(loss='categorical_crossentropy', optimizer=Adam(), metrics=['accuracy'])
- model.fit(x_train, y_train, batch_size=100, epochs=5, verbose=1, validation_data=(x_test, y_test))
- # get results
- K.set_learning_phase(0)
- DRSN_train_score = model.evaluate(x_train, y_train, batch_size=100, verbose=0)
- print('Train loss:', DRSN_train_score[0])
- print('Train accuracy:', DRSN_train_score[1])
- DRSN_test_score = model.evaluate(x_test, y_test, batch_size=100, verbose=0)
- print('Test loss:', DRSN_test_score[0])
- print('Test accuracy:', DRSN_test_score[1])
为方便对比,深度残差网络的代码如下:- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Sat Dec 28 23:19:03 2019
- Implemented using TensorFlow 1.0 and Keras 2.2.1
- K. He, X. Zhang, S. Ren, J. Sun, Deep Residual Learning for Image Recognition, CVPR, 2016.
- @author: me
- """
- from __future__ import print_function
- import numpy as np
- import keras
- from keras.datasets import mnist
- from keras.layers import Dense, Conv2D, BatchNormalization, Activation
- from keras.layers import AveragePooling2D, Input, GlobalAveragePooling2D
- from keras.optimizers import Adam
- from keras.regularizers import l2
- from keras import backend as K
- from keras.models import Model
- from keras.layers.core import Lambda
- K.set_learning_phase(1)
- # input image dimensions
- img_rows, img_cols = 28, 28
- # the data, split between train and test sets
- (x_train, y_train), (x_test, y_test) = mnist.load_data()
- if K.image_data_format() == 'channels_first':
- x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
- x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
- input_shape = (1, img_rows, img_cols)
- else:
- x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
- x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
- input_shape = (img_rows, img_cols, 1)
- # Noised data
- x_train = x_train.astype('float32') / 255. + 0.5*np.random.random([x_train.shape[0], img_rows, img_cols, 1])
- x_test = x_test.astype('float32') / 255. + 0.5*np.random.random([x_test.shape[0], img_rows, img_cols, 1])
- print('x_train shape:', x_train.shape)
- print(x_train.shape[0], 'train samples')
- print(x_test.shape[0], 'test samples')
- # convert class vectors to binary class matrices
- y_train = keras.utils.to_categorical(y_train, 10)
- y_test = keras.utils.to_categorical(y_test, 10)
- def pad_backend(inputs, in_channels, out_channels):
- pad_dim = (out_channels - in_channels)//2
- return K.spatial_3d_padding(inputs, padding = ((0,0),(0,0),(pad_dim,pad_dim)))
- def residual_block(incoming, nb_blocks, out_channels, downsample=False,
- downsample_strides=2):
-
- residual = incoming
- in_channels = incoming.get_shape().as_list()[-1]
-
- for i in range(nb_blocks):
-
- identity = residual
-
- if not downsample:
- downsample_strides = 1
-
- residual = BatchNormalization()(residual)
- residual = Activation('relu')(residual)
- residual = Conv2D(out_channels, 3, strides=(downsample_strides, downsample_strides),
- padding='same', kernel_initializer='he_normal',
- kernel_regularizer=l2(1e-4))(residual)
-
- residual = BatchNormalization()(residual)
- residual = Activation('relu')(residual)
- residual = Conv2D(out_channels, 3, padding='same', kernel_initializer='he_normal',
- kernel_regularizer=l2(1e-4))(residual)
-
- # Downsampling (it is important to use the pooL-size of (1, 1))
- if downsample_strides > 1:
- identity = AveragePooling2D(pool_size=(1, 1), strides=(2, 2))(identity)
-
- # Zero_padding to match channels (it is important to use zero padding rather than 1by1 convolution)
- if in_channels != out_channels:
- identity = Lambda(pad_backend)(identity, in_channels, out_channels)
-
- residual = keras.layers.add([residual, identity])
-
- return residual
- # define and train a model
- inputs = Input(shape=input_shape)
- net = Conv2D(8, 3, padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(inputs)
- net = residual_block(net, 1, 8, downsample=True)
- net = BatchNormalization()(net)
- net = Activation('relu')(net)
- net = GlobalAveragePooling2D()(net)
- outputs = Dense(10, activation='softmax', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(net)
- model = Model(inputs=inputs, outputs=outputs)
- model.compile(loss='categorical_crossentropy', optimizer=Adam(), metrics=['accuracy'])
- model.fit(x_train, y_train, batch_size=100, epochs=5, verbose=1, validation_data=(x_test, y_test))
- # get results
- K.set_learning_phase(0)
- resnet_train_score = model.evaluate(x_train, y_train, batch_size=100, verbose=0)
- print('Train loss:', resnet_train_score[0])
- print('Train accuracy:', resnet_train_score[1])
- resnet_test_score = model.evaluate(x_test, y_test, batch_size=100, verbose=0)
- print('Test loss:', resnet_test_score[0])
- print('Test accuracy:', resnet_test_score[1])
备注:
(1)深度残差收缩网络的结构比普通的深度残差网络复杂,也许更难训练。
(2)程序里只设置了一个基本模块,在更复杂的数据集上,可适当增加。
(3)如果遇到这个TypeError:softmax() got an unexpected keyword argument 'axis',就点开tensorflow_backend.py,将return tf.nn.softmax(x, axis=axis)中的第一个axis改成dim即可。
参考文献:
M. Zhao, S. Zhong, X. Fu, et al., Deep residual shrinkage networks for fault diagnosis, IEEE Transactions on Industrial Informatics, 2019, DOI: 10.1109/TII.2019.2943898
用户3902290 2020-1-15 21:21
curton 2020-1-15 15:28