我要把别人的东西总结成自己的知识,自己能看得懂就行,有些东西言语表达未必能表达好
def conv_kernel_initializer(scale=2.0):
return keras.initializers.VarianceScaling(
scale=scale, mode="fan_out", distribution="truncated_normal"
)
def round_filters(filters, width_coefficient, min_depth, depth_divisor):
filters *= width_coefficient
minimum_depth = min_depth or depth_divisor
new_filters = max(
minimum_depth,
int(filters + depth_divisor / 2) // depth_divisor * depth_divisor,
)
return int(new_filters)
def round_repeats(repeats, depth_coefficient):
return int(math.ceil(depth_coefficient * repeats))
BN_AXIS = 3 # 特征轴
CONV_KERNEL_INITIALIZER = { # 核初始化
"class_name": "VarianceScaling", # 方差比例
"config": {
"scale": 2.0,
"mode": "fan_out",
"distribution": "truncated_normal",
},
}
# 融合卷积
@keras_cv_export("keras_cv.layers.FusedMBConvBlock")
class FusedMBConvBlock(keras.layers.Layer):
def __init__(
self,
input_filters: int,
output_filters: int,
expand_ratio=1,
kernel_size=3,
strides=1,
se_ratio=0.0,
bn_momentum=0.9,
activation="swish",
survival_probability: float = 0.8,
**kwargs
):
super().__init__(**kwargs)
self.input_filters = input_filters
self.output_filters = output_filters
self.expand_ratio = expand_ratio
self.kernel_size = kernel_size
self.strides = strides
self.se_ratio = se_ratio
self.bn_momentum = bn_momentum
self.activation = activation
self.survival_probability = survival_probability
self.filters = self.input_filters * self.expand_ratio
self.filters_se = max(1, int(input_filters * se_ratio))
# 具有指定内核大小和步幅的卷积层,而不是1x1 扩张卷积
self.conv1 = keras.layers.Conv2D(
filters=self.filters,
kernel_size=kernel_size,
strides=strides,
kernel_initializer=CONV_KERNEL_INITIALIZER,
padding="same",
data_format="channels_last",
use_bias=False,
name=self.name + "expand_conv",
)
self.bn1 = keras.layers.BatchNormalization(
axis=BN_AXIS,
momentum=self.bn_momentum,
name=self.name + "expand_bn",
)
self.act = keras.layers.Activation(
self.activation, name=self.name + "expand_activation"
)
self.bn2 = keras.layers.BatchNormalization(
axis=BN_AXIS, momentum=self.bn_momentum, name=self.name + "bn"
)
# 挤压点卷积
self.se_conv1 = keras.layers.Conv2D(
self.filters_se,
1,
padding="same",
activation=self.activation,
kernel_initializer=CONV_KERNEL_INITIALIZER,
name=self.name + "se_reduce",
)
# 激励点卷积
self.se_conv2 = keras.layers.Conv2D(
self.filters,
1,
padding="same",
activation="sigmoid",
kernel_initializer=CONV_KERNEL_INITIALIZER,
name=self.name + "se_expand",
)
# 看扩张比例,如果是1,应该是普通卷积,否则是点卷积
self.output_conv = keras.layers.Conv2D(
filters=self.output_filters,
kernel_size=1 if expand_ratio != 1 else kernel_size,
strides=1,
kernel_initializer=CONV_KERNEL_INITIALIZER,
padding="same",
data_format="channels_last",
use_bias=False,
name=self.name + "project_conv",
)
self.bn3 = keras.layers.BatchNormalization(
axis=BN_AXIS,
momentum=self.bn_momentum,
name=self.name + "project_bn",
)
if self.survival_probability:
self.dropout = keras.layers.Dropout(
self.survival_probability,
noise_shape=(None, 1, 1, 1),
name=self.name + "drop",
)
def build(self, input_shape):
if self.name is None:
self.name = keras.backend.get_uid("block0")
def call(self, inputs):
# 扩张阶段,如果expand_ratio== 1,不改变,否则用具有指定内核大小和步幅的卷积,
# 而不是先进行 1x1 扩张卷积,这时用的是普通卷积
if self.expand_ratio != 1:
x = self.conv1(inputs)
x = self.bn1(x)
x = self.act(x)
else:
x = inputs
# se块(使用的条件是0<se_ratio <= 1)
if 0 < self.se_ratio <= 1:
# 全局平均池化
se = keras.layers.GlobalAveragePooling2D(
name=self.name + "se_squeeze"
)(x)
if BN_AXIS == 1:
se_shape = (self.filters, 1, 1)
else:
se_shape = (1, 1, self.filters)
# 变形
se = keras.layers.Reshape(se_shape, name=self.name + "se_reshape")(
se
)
# 压缩激励,用sigmoid打分
se = self.se_conv1(se)
se = self.se_conv2(se)
# 通道加权
x = keras.layers.multiply([x, se], name=self.name + "se_excite")
# 输出阶段,如果expand_ratio 等于 1,使用普通卷积,否则使用 1x1 卷积降低通道数,并进行批量归一化。如果
# expand_ratio 等于 1,则在输出卷积后应用激活函数。
x = self.output_conv(x)
x = self.bn3(x)
if self.expand_ratio == 1:
x = self.act(x)
# 残差,条件是strides == 1 and input_filters == output_filters
if self.strides == 1 and self.input_filters == self.output_filters:
if self.survival_probability:
x = self.dropout(x)
x = keras.layers.Add(name=self.name + "add")([x, inputs])
return x
# 获取配置
def get_config(self):
config = {
"input_filters": self.input_filters,
"output_filters": self.output_filters,
"expand_ratio": self.expand_ratio,
"kernel_size": self.kernel_size,
"strides": self.strides,
"se_ratio": self.se_ratio,
"bn_momentum": self.bn_momentum,
"activation": self.activation,
"survival_probability": self.survival_probability,
}
base_config = super().get_config()
return dict(list(base_config.items()) + list(config.items()))
# MBConv块是在面向移动设备和高效的架构中常用的模块,出现在诸如MobileNet、EfficientNet、MaxViT等架构中。
# MBConv块遵循窄-宽-窄的结构——通过扩展1x1卷积,应用深度卷积,然后缩小回1x1卷积,这比传统的宽-窄-宽结构更有效率。
# 由于这些模块经常用于部署到边缘设备的模型中,因此为了便于使用和复用,我们将其实现为一个层。
# 未融合的卷积
@keras_cv_export("keras_cv.layers.MBConvBlock")
class MBConvBlock(keras.layers.Layer):
def __init__(
self,
input_filters: int,
output_filters: int,
expand_ratio=1,
kernel_size=3,
strides=1,
se_ratio=0.0,
bn_momentum=0.9,
activation="swish",
survival_probability: float = 0.8,
**kwargs
):
super().__init__(**kwargs)
self.input_filters = input_filters
self.output_filters = output_filters
self.expand_ratio = expand_ratio
self.kernel_size = kernel_size
self.strides = strides
self.se_ratio = se_ratio
self.bn_momentum = bn_momentum
self.activation = activation
self.survival_probability = survival_probability
self.filters = self.input_filters * self.expand_ratio # 内部通道数
self.filters_se = max(1, int(input_filters * se_ratio)) # 挤压通道数
# 扩张点卷积
self.conv1 = keras.layers.Conv2D(
filters=self.filters,
kernel_size=1,
strides=1,
kernel_initializer=CONV_KERNEL_INITIALIZER,
padding="same",
data_format="channels_last",
use_bias=False,
name=self.name + "expand_conv",
)
# 批次标准化
self.bn1 = keras.layers.BatchNormalization(
axis=BN_AXIS,
momentum=self.bn_momentum,
name=self.name + "expand_bn",
)
self.act = keras.layers.Activation( # 激活函数
self.activation, name=self.name + "activation"
)
self.depthwise = keras.layers.DepthwiseConv2D( # 深度卷积
kernel_size=self.kernel_size,
strides=self.strides,
depthwise_initializer=CONV_KERNEL_INITIALIZER,
padding="same",
data_format="channels_last",
use_bias=False,
name=self.name + "dwconv2",
)
self.bn2 = keras.layers.BatchNormalization(
axis=BN_AXIS, momentum=self.bn_momentum, name=self.name + "bn"
)
# 挤压点卷积
self.se_conv1 = keras.layers.Conv2D(
self.filters_se,
1,
padding="same",
activation=self.activation,
kernel_initializer=CONV_KERNEL_INITIALIZER,
name=self.name + "se_reduce",
)
# 激励点卷积,sigmoid会给每个通道单独打分
self.se_conv2 = keras.layers.Conv2D(
self.filters,
1,
padding="same",
activation="sigmoid",
kernel_initializer=CONV_KERNEL_INITIALIZER,
name=self.name + "se_expand",
)
self.output_conv = keras.layers.Conv2D(
filters=self.output_filters,
# 这意味着扩张比例是1时,核大小可以不是1,那就是普通卷积
kernel_size=1 if expand_ratio != 1 else kernel_size,
strides=1,
kernel_initializer=CONV_KERNEL_INITIALIZER,
padding="same",
data_format="channels_last",
use_bias=False,
name=self.name + "project_conv",
)
self.bn3 = keras.layers.BatchNormalization(
axis=BN_AXIS,
momentum=self.bn_momentum,
name=self.name + "project_bn",
)
# dropout,noise_shape=(None, 1, 1, 1)
if self.survival_probability:
self.dropout = keras.layers.Dropout(
self.survival_probability,
noise_shape=(None, 1, 1, 1),
name=self.name + "drop",
)
def build(self, input_shape):
if self.name is None:
self.name = keras.backend.get_uid("block0")
def call(self, inputs): # inputs:(b,h,w,c)
# 扩张阶段,如果扩张比例不等于1,用扩张卷积,否则不改变
if self.expand_ratio != 1:
# 扩张卷积块
x = self.conv1(inputs)
x = self.bn1(x)
x = self.act(x)
else:
x = inputs
# 深度卷积块
x = self.depthwise(x)
x = self.bn2(x)
x = self.act(x)
# se块,全局平均池化,获取通道描述符,之后压缩激励,打分加权
if 0 < self.se_ratio <= 1:
se = keras.layers.GlobalAveragePooling2D(
name=self.name + "se_squeeze"
)(x)
if BN_AXIS == 1:
se_shape = (self.filters, 1, 1)
else:
se_shape = (1, 1, self.filters)
se = keras.layers.Reshape(se_shape, name=self.name + "se_reshape")(
se
)
se = self.se_conv1(se)
se = self.se_conv2(se)
x = keras.layers.multiply([x, se], name=self.name + "se_excite")
# 输出阶段
x = self.output_conv(x)
x = self.bn3(x)
# 残差连接,条件是步长是1,并且输入输出通道数相同
if self.strides == 1 and self.input_filters == self.output_filters:
if self.survival_probability:
x = self.dropout(x)
x = keras.layers.Add(name=self.name + "add")([x, inputs])
return x
def get_config(self):
# 子类特有的配置(字典)
config = {
"input_filters": self.input_filters,
"output_filters": self.output_filters,
"expand_ratio": self.expand_ratio,
"kernel_size": self.kernel_size,
"strides": self.strides,
"se_ratio": self.se_ratio,
"bn_momentum": self.bn_momentum,
"activation": self.activation,
"survival_probability": self.survival_probability,
}
# 获取父类的配置
base_config = super().get_config()
# 返回合并的配置
return dict(list(base_config.items()) + list(config.items()))
# 获取conv构造函数,如果传入"unfused",就使用MBConvBlock,否则使用FusedMBConvBlock
def get_conv_constructor(conv_type):
if conv_type == "unfused": # 未融合,点卷积和深度卷积组合,这个用于网络的后部分
return MBConvBlock
elif conv_type == "fused": # 融合,就是普通卷积,这个用于网络的前部分
return FusedMBConvBlock
else:
raise ValueError(
"Expected `conv_type` to be "
"one of 'unfused', 'fused', but got "
f"`conv_type={conv_type}`"
)
# 这个注解用于提供外部访问这个类的导入路径
@keras_cv_export("keras_cv.models.EfficientNetV2Backbone")
class EfficientNetV2Backbone(Backbone):
def __init__(
self,
*,
include_rescaling,
width_coefficient,
depth_coefficient,
stackwise_kernel_sizes,
stackwise_num_repeats,
stackwise_input_filters,
stackwise_output_filters,
stackwise_expansion_ratios,
stackwise_squeeze_and_excite_ratios,
stackwise_strides,
stackwise_conv_types,
skip_connection_dropout=0.2,
depth_divisor=8,
min_depth=8,
activation="swish",
input_shape=(None, None, 3),
input_tensor=None,
**kwargs,
):
# 确定合适的输入
img_input = utils.parse_model_inputs(input_shape, input_tensor)
x = img_input
if include_rescaling:
x = keras.layers.Rescaling(scale=1 / 255.0)(x) # 归一化
# 规范化卷积通道数(一般是8的倍数)
stem_filters = round_filters(
filters=stackwise_input_filters[0],
width_coefficient=width_coefficient,
min_depth=min_depth,
depth_divisor=depth_divisor,
)
# 第一个下采样块(112,112,3)
x = keras.layers.Conv2D(
filters=stem_filters,
kernel_size=3,
strides=2,
kernel_initializer=conv_kernel_initializer(),
padding="same",
use_bias=False,
name="stem_conv",
)(x)
x = keras.layers.BatchNormalization(
momentum=0.9,
name="stem_bn",
)(x)
x = keras.layers.Activation(activation, name="stem_activation")(x)
# 提取块的块索引
block_id = 0
blocks = float( # 总的提取块数
sum(num_repeats for num_repeats in stackwise_num_repeats)
)
# 金字塔层级的特征图层名列表
pyramid_level_inputs = []
# 遍历每个层级
for i in range(len(stackwise_kernel_sizes)):
num_repeats = stackwise_num_repeats[i] # 指定层级的重复提取次数
input_filters = stackwise_input_filters[i] # 卷积的输入通道数
output_filters = stackwise_output_filters[i] # 卷积的输出通道数
# 规范化输入输出通道数
input_filters = round_filters(
filters=input_filters,
width_coefficient=width_coefficient,
min_depth=min_depth,
depth_divisor=depth_divisor,
)
output_filters = round_filters(
filters=output_filters,
width_coefficient=width_coefficient,
min_depth=min_depth,
depth_divisor=depth_divisor,
)
# 规范化重复次数
repeats = round_repeats(
repeats=num_repeats,
depth_coefficient=depth_coefficient,
)
strides = stackwise_strides[i] # 步长
squeeze_and_excite_ratio = stackwise_squeeze_and_excite_ratios[i] # 挤激比
# 遍历一个层级下的每个提取块
for j in range(repeats):
# 如果不是第一个提取块,这时步长是1,并且输入和输出通道数相同
if j > 0:
strides = 1
input_filters = output_filters
# 步长等于2,这时把之前的特征图加入FPN
if strides != 1:
pyramid_level_inputs.append(utils.get_tensor_input_name(x))
# 用来设置提取块前缀,a...
letter_identifier = chr(j + 97)
block = get_conv_constructor(stackwise_conv_types[i])(
input_filters=input_filters,
output_filters=output_filters,
expand_ratio=stackwise_expansion_ratios[i], # 扩张比例
kernel_size=stackwise_kernel_sizes[i],
strides=strides,
se_ratio=squeeze_and_excite_ratio, # 挤激比
activation=activation,
# dropout比率,随着层级的加深,比率变大
survival_probability=skip_connection_dropout
* block_id
/ blocks,
bn_momentum=0.9,
name="block{}{}_".format(i + 1, letter_identifier),
)
# 通过提取块处理
x = block(x)
block_id += 1 # 块计数器+1
# 规范化顶部输出通道数
top_filters = round_filters(
filters=1280,
width_coefficient=width_coefficient, # 宽度系数
min_depth=min_depth,
depth_divisor=depth_divisor, # 深度因子
)
# 点卷积切换通道
x = keras.layers.Conv2D(
filters=top_filters,
kernel_size=1,
strides=1,
kernel_initializer=conv_kernel_initializer(),
padding="same", # 填充
data_format="channels_last",
use_bias=False,
name="top_conv",
)(x)
x = keras.layers.BatchNormalization(
momentum=0.9,
name="top_bn",
)(x)
x = keras.layers.Activation(
activation=activation, name="top_activation"
)(x)
# FPN特征提取层列表
pyramid_level_inputs.append(utils.get_tensor_input_name(x))
# Create model.
super().__init__(inputs=img_input, outputs=x, **kwargs)
# 设置实例属性
self.include_rescaling = include_rescaling
self.width_coefficient = width_coefficient
self.depth_coefficient = depth_coefficient
self.skip_connection_dropout = skip_connection_dropout
self.depth_divisor = depth_divisor
self.min_depth = min_depth
self.activation = activation
self.input_tensor = input_tensor
# FPN特征提取字典,idx-->name
self.pyramid_level_inputs = {
f"P{i + 1}": name for i, name in enumerate(pyramid_level_inputs)
}
# 各个层级的配置信息。
self.stackwise_kernel_sizes = stackwise_kernel_sizes
self.stackwise_num_repeats = stackwise_num_repeats
self.stackwise_input_filters = stackwise_input_filters
self.stackwise_output_filters = stackwise_output_filters
self.stackwise_expansion_ratios = stackwise_expansion_ratios
self.stackwise_squeeze_and_excite_ratios = (
stackwise_squeeze_and_excite_ratios
)
self.stackwise_strides = stackwise_strides
self.stackwise_conv_types = stackwise_conv_types
# 配置,用于序列化和反序列化
def get_config(self):
# 获取父类的配置对象,字典形式
config = super().get_config()
config.update( # 更新子类独有的设置
{
"include_rescaling": self.include_rescaling,
"width_coefficient": self.width_coefficient,
"depth_coefficient": self.depth_coefficient,
"skip_connection_dropout": self.skip_connection_dropout,
"depth_divisor": self.depth_divisor,
"min_depth": self.min_depth,
"activation": self.activation,
"input_shape": self.input_shape[1:],
"input_tensor": self.input_tensor,
"stackwise_kernel_sizes": self.stackwise_kernel_sizes,
"stackwise_num_repeats": self.stackwise_num_repeats,
"stackwise_input_filters": self.stackwise_input_filters,
"stackwise_output_filters": self.stackwise_output_filters,
"stackwise_expansion_ratios": self.stackwise_expansion_ratios,
"stackwise_squeeze_and_excite_ratios": self.stackwise_squeeze_and_excite_ratios, # noqa: E501
"stackwise_strides": self.stackwise_strides,
"stackwise_conv_types": self.stackwise_conv_types,
}
)
return config
# presets 是一个类属性,用于存储预设的配置信息
@classproperty
def presets(cls):
return copy.deepcopy(backbone_presets)
# 类属性,预设的权重
@classproperty
def presets_with_weights(cls):
return copy.deepcopy(backbone_presets_with_weights)