def autopad(k, p=None, d=1): # kernel, padding, dilation
# Pad to 'same' shape outputs
if d > 1:
k = d * (k - 1) + 1 if isinstance(k, int) else [d * (x - 1) + 1 for x in k] # actual kernel-size
if p is None:
p = k // 2 if isinstance(k, int) else [x // 2 for x in k] # auto-pad
return p
autopad
函数用于计算卷积层自动填充(padding)大小的辅助函数,目的是为了在使用卷积层时保持输出特征图的尺寸不变(即所谓的'same' padding)。
参数:
k
:卷积核的大小。可以是一个整数或一个整数列表(针对不同维度)。p
:填充大小。默认为None
,这时候会自动计算填充。d
:膨胀系数。默认为1,表示不膨胀。膨胀卷积调整:
d
大于1,函数会根据膨胀系数和卷积核大小重新计算实际的卷积核大小。k = d * (k - 1) + 1
。这是标准的膨胀卷积核尺寸计算公式。自动计算填充:
p
(填充大小)没有指定,函数会根据卷积核大小自动计算填充,以确保输出尺寸不变。p = k // 2
,即卷积核大小的一半(向下取整)。这是因为在'same'填充模式下,填充的总大小大致等于卷积核大小减1。返回值:
p
。class Conv(nn.Module):
# Standard convolution with args(ch_in, ch_out, kernel, stride, padding, groups, dilation, activation)
default_act = nn.SiLU() # default activation
def __init__(self, c1, c2, k=1, s=1, p=None, g=1, d=1, act=True):
super().__init__()
self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p, d), groups=g, dilation=d, bias=False)
self.bn = nn.BatchNorm2d(c2)
self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity()
def forward(self, x):
return self.act(self.bn(self.conv(x)))
def forward_fuse(self, x):
return self.act(self.conv(x))
这个Conv
类是一个封装好的标准卷积层,用于构建卷积神经网络。它继承自PyTorch的nn.Module
,并在其基础上添加了卷积操作、批归一化(Batch Normalization)和激活函数。
初始化方法 (__init__
):
c1
:输入通道数。c2
:输出通道数。k
:卷积核大小,默认为1。s
:步长(stride),默认为1。p
:填充(padding),默认为None
,这时会使用autopad
函数自动计算填充。g
:组数(groups),默认为1。分组卷积可以减少参数数量。d
:膨胀系数(dilation),默认为1。act
:激活函数,默认为SiLU
(也称为Swish)。如果act
为True
,使用默认的激活函数;如果为False
,则不使用激活函数(即使用nn.Identity()
)。构造卷积层:
nn.Conv2d
创建二维卷积层。其参数包括输入通道数、输出通道数、卷积核大小、步长、自动计算的填充、分组和膨胀系数。nn.BatchNorm2d
,用于稳定训练过程并加速收敛。act
参数选择激活函数。前向传播方法 (forward
):
x
首先通过卷积层,然后通过批归一化层,最后通过激活函数。融合前向传播方法 (forward_fuse
):
class DWConv(Conv):
# Depth-wise convolution
def __init__(self, c1, c2, k=1, s=1, d=1, act=True): # ch_in, ch_out, kernel, stride, dilation, activation
super().__init__(c1, c2, k, s, g=math.gcd(c1, c2), d=d, act=act)
class DWConvTranspose2d(nn.ConvTranspose2d):
# Depth-wise transpose convolution
def __init__(self, c1, c2, k=1, s=1, p1=0, p2=0): # ch_in, ch_out, kernel, stride, padding, padding_out
super().__init__(c1, c2, k, s, p1, p2, groups=math.gcd(c1, c2))
DWConv
类:
Conv
类的子类,专门用于实现深度可分离卷积。__init__
方法中的参数与Conv
类似,但在创建Conv
对象时,它将groups
参数设置为c1
和c2
的最大公约数(math.gcd(c1, c2)
)。这样做使得卷积在每个输入通道上独立进行,实现深度可分离的效果。k
、步长s
、膨胀系数d
和激活函数act
与标准卷积一致。DWConvTranspose2d
类:
nn.ConvTranspose2d
,用于实现深度可分离的转置卷积。c1
、c2
,卷积核大小k
,步长s
,输入和输出的填充p1
和p2
。DWConv
一样,它通过设置groups
参数为c1
和c2
的最大公约数来实现深度可分离效果。class TransformerLayer(nn.Module):
# Transformer layer https://arxiv.org/abs/2010.11929 (LayerNorm layers removed for better performance)
def __init__(self, c, num_heads):
super().__init__()
self.q = nn.Linear(c, c, bias=False)
self.k = nn.Linear(c, c, bias=False)
self.v = nn.Linear(c, c, bias=False)
self.ma = nn.MultiheadAttention(embed_dim=c, num_heads=num_heads)
self.fc1 = nn.Linear(c, c, bias=False)
self.fc2 = nn.Linear(c, c, bias=False)
def forward(self, x):
x = self.ma(self.q(x), self.k(x), self.v(x))[0] + x
x = self.fc2(self.fc1(x)) + x
return x
这个 TransformerLayer
类是一个实现了 Transformer 架构中的标准层的 PyTorch 模块。
初始化方法 (__init__
):
c
:特征的维度。这个参数决定了查询(Q)、键(K)和值(V)的线性变换的维度。num_heads
:多头注意力的头数。在多头注意力机制中,输入会被分成多个头,每个头独立进行注意力运算,提高了模型对不同位置信息的捕获能力。self.q
、self.k
、self.v
)用于生成查询(Q)、键(K)和值(V)。nn.MultiheadAttention
层,用于执行多头注意力运算。前向传播 (forward
):
x
通过查询(Q)、键(K)和值(V)的线性变换。nn.MultiheadAttention
输出一个元组,其中第一个元素是注意力操作的输出,第二个元素是注意力权重(在这里未使用)。x
相加,实现残差连接。残差连接有助于缓解梯度消失问题,提高深层网络的训练效率。self.fc1
和self.fc2
)传递结果,并再次应用残差连接。注意事项:
这个实现中没有使用层归一化(LayerNorm),通常 Transformer 层中会包含层归一化以稳定训练过程并加速收敛。根据注释,这里为了更好的性能移除了层归一化,这可能是基于特定应用场景的决定。class TransformerBlock(nn.Module):
# Vision Transformer https://arxiv.org/abs/2010.11929
def __init__(self, c1, c2, num_heads, num_layers):
super().__init__()
self.conv = None
if c1 != c2:
self.conv = Conv(c1, c2)
self.linear = nn.Linear(c2, c2) # learnable position embedding
self.tr = nn.Sequential(*(TransformerLayer(c2, num_heads) for _ in range(num_layers)))
self.c2 = c2
def forward(self, x):
if self.conv is not None:
x = self.conv(x)
b, _, w, h = x.shape
p = x.flatten(2).permute(2, 0, 1)
return self.tr(p + self.linear(p)).permute(1, 2, 0).reshape(b, self.c2, w, h)
这个 TransformerBlock
类是一个用于图像处理的 Transformer 模块,通常被称为 Vision Transformer (ViT)。
初始化方法 (__init__
):
c1
和 c2
:输入和输出通道数。如果 c1
不等于 c2
,则会应用一个卷积层 (Conv
),以使输入和输出通道数匹配。num_heads
:多头注意力的头数。用于 TransformerLayer
的初始化。num_layers
:Transformer 层的数量。用于创建 num_layers
个 TransformerLayer
的序列。前向传播 (forward
):
c1
不等于 c2
,会应用一个卷积层来进行通道数的匹配。这个卷积层由 Conv
类定义。w
)和高度(h
),并将图像展平为一个序列,以便输入到 Transformer 中。这一步通常被称为位置嵌入 (Position Embedding)。self.linear
) 学习位置嵌入,将位置信息融入输入序列。num_layers
个 TransformerLayer
来处理输入序列。每个 TransformerLayer
会应用多头注意力和残差连接来处理序列数据。class Bottleneck(nn.Module):
# Standard bottleneck
def __init__(self, c1, c2, shortcut=True, g=1, e=0.5): # ch_in, ch_out, shortcut, groups, expansion
super().__init__()
c_ = int(c2 * e) # hidden channels
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = Conv(c_, c2, 3, 1, g=g)
self.add = shortcut and c1 == c2
def forward(self, x):
return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x))
这个 Bottleneck
类定义了一个标准的瓶颈块,通常用于深度卷积神经网络(CNNs)中。
初始化方法 (__init__
):
c1
和 c2
:输入和输出通道数。shortcut
:一个布尔值,表示是否使用快捷连接(残差连接)。如果为 True
,则会添加一个快捷连接,用于跳过某些层,以便保留原始输入信息。g
:分组卷积的组数。默认为 1,表示标准卷积。分组卷积将输入通道分成几个组,并在每个组内执行卷积操作。e
:扩展因子,用于确定瓶颈块中隐藏通道的数量。通常是输出通道数的一部分。前向传播 (forward
):
self.cv1
) 对输入进行降维,将输入通道数从 c1
减少到 c_
,其中 c_
是输出通道数的扩展部分。self.cv2
) 对降维后的特征进行卷积操作。这一步通常会增加非线性特征。shortcut
为 True
且输入通道数等于输出通道数(c1
等于 c2
),则将输入与经过 self.cv2
和 self.cv1
的处理结果相加,形成一个快捷连接(残差连接)。否则,只返回 self.cv2(self.cv1(x))
的结果。class BottleneckCSP(nn.Module):
# CSP Bottleneck https://github.com/WongKinYiu/CrossStagePartialNetworks
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion
super().__init__()
c_ = int(c2 * e) # hidden channels
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = nn.Conv2d(c1, c_, 1, 1, bias=False)
self.cv3 = nn.Conv2d(c_, c_, 1, 1, bias=False)
self.cv4 = Conv(2 * c_, c2, 1, 1)
self.bn = nn.BatchNorm2d(2 * c_) # applied to cat(cv2, cv3)
self.act = nn.SiLU()
self.m = nn.Sequential(*(Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n)))
def forward(self, x):
y1 = self.cv3(self.m(self.cv1(x)))
y2 = self.cv2(x)
return self.cv4(self.act(self.bn(torch.cat((y1, y2), 1))))
这个 BottleneckCSP
类代表了 CSP Bottleneck 结构。
初始化方法 (__init__
):
c1
和 c2
:输入和输出通道数。n
:表示要在内部重复多少次 Bottleneck
块。shortcut
:一个布尔值,表示是否使用快捷连接(残差连接)。如果为 True
,则会添加一个快捷连接,用于跳过某些层,以便保留原始输入信息。g
:分组卷积的组数。默认为 1,表示标准卷积。分组卷积将输入通道分成几个组,并在每个组内执行卷积操作。e
:扩展因子,用于确定瓶颈块中隐藏通道的数量。通常是输出通道数的一部分。前向传播 (forward
):
self.cv1
) 对输入进行降维,将输入通道数从 c1
减少到 c_
,其中 c_
是输出通道数的扩展部分。self.cv2
) 对输入进行降维,将输入通道数从 c1
减少到 c_
。self.cv3
) 对经过 self.cv1
处理后的特征进行降维,将通道数从 c_
减少到 c_
。self.cv3
和 self.cv2
处理后的特征连接起来,并通过一个批量归一化层 (self.bn
) 和激活函数进行处理。self.m
),其中包含多个 Bottleneck
块的重复。这些块会进一步处理特征。self.cv4
) 进行升维,将通道数从 2 * c_
增加到 c2
,并返回结果。class CrossConv(nn.Module):
# Cross Convolution Downsample
def __init__(self, c1, c2, k=3, s=1, g=1, e=1.0, shortcut=False):
# ch_in, ch_out, kernel, stride, groups, expansion, shortcut
super().__init__()
c_ = int(c2 * e) # hidden channels
self.cv1 = Conv(c1, c_, (1, k), (1, s))
self.cv2 = Conv(c_, c2, (k, 1), (s, 1), g=g)
self.add = shortcut and c1 == c2
def forward(self, x):
return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x))
?
这个 CrossConv
类代表了交叉卷积下采样模块。
初始化方法 (__init__
):
c1
和 c2
:输入和输出通道数。k
:卷积核的大小,是一个元组,包括水平和垂直方向的大小。s
:卷积的步幅,也是一个元组,包括水平和垂直方向的步幅。g
:分组卷积的组数。默认为 1,表示标准卷积。分组卷积将输入通道分成几个组,并在每个组内执行卷积操作。e
:扩展因子,用于确定卷积中的隐藏通道数量。通常是输出通道数的一部分。shortcut
:一个布尔值,表示是否使用快捷连接(残差连接)。如果为 True
,则会添加一个快捷连接,用于跳过某些层,以便保留原始输入信息。前向传播 (forward
):
self.cv1
) 对输入进行卷积操作,其中 k
是在水平方向上执行卷积的大小。self.cv2
) 对经过 self.cv1
处理后的特征进行卷积操作,其中 k
是在垂直方向上执行卷积的大小。shortcut
为 True
并且输入通道数等于输出通道数,则将原始输入与卷积后的结果相加,以添加快捷连接。class C2(nn.Module):
# CSP Bottleneck with 3 convolutions
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion
super().__init__()
c_ = int(c2 * e) # hidden channels
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = Conv(c1, c_, 1, 1)
self.cv3 = Conv(2 * c_, c2, 1) # optional act=FReLU(c2)
self.m = nn.Sequential(*(Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n)))
def forward(self, x):
return self.cv3(torch.cat((self.m(self.cv1(x)), self.cv2(x)), 1))
C2
类代表了一个包含3个卷积操作的CSP(Cross Stage Partial) Bottleneck 模块。
初始化方法 (__init__
):
c1
和 c2
:输入和输出通道数。n
:要重复的Bottleneck块的数量。shortcut
:一个布尔值,表示是否使用快捷连接(残差连接)。如果为 True
,则会添加一个快捷连接,用于跳过某些层,以便保留原始输入信息。g
:分组卷积的组数。默认为 1,表示标准卷积。分组卷积将输入通道分成几个组,并在每个组内执行卷积操作。e
:扩展因子,用于确定卷积中的隐藏通道数量。通常是输出通道数的一部分。前向传播 (forward
):
self.cv1
) 对输入进行卷积操作,将通道数从 c1
缩减到 c_
(c2
的一半)。self.cv2
) 对输入进行卷积操作,也将通道数从 c1
缩减到 c_
(c2
的一半)。self.cv1(x)
和 self.cv2(x)
的结果进行拼接,得到一个具有 2 * c_
个通道的特征图。self.m
),该序列包含多个 Bottleneck
块,这些块用于进一步处理特征。self.m(self.cv1(x))
和 self.cv2(x)
的结果再次拼接,然后通过一个 1x1 的卷积层 (self.cv3
) 进行卷积操作,将通道数调整为 c2
,并返回卷积后的结果。class C3(nn.Module):
# CSP Bottleneck with 3 convolutions
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion
super().__init__()
c_ = int(c2 * e) # hidden channels
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = Conv(c1, c_, 1, 1)
self.cv3 = Conv(2 * c_, c2, 1) # optional act=FReLU(c2)
self.m = nn.Sequential(*(Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n)))
def forward(self, x):
return self.cv3(torch.cat((self.m(self.cv1(x)), self.cv2(x)), 1))
?
C3
类与 C2
类非常相似,它也代表了一个包含3个卷积操作的CSP(Cross Stage Partial) Bottleneck 模块。
初始化方法 (__init__
):
c1
和 c2
:输入和输出通道数。n
:要重复的Bottleneck块的数量。shortcut
:一个布尔值,表示是否使用快捷连接(残差连接)。如果为 True
,则会添加一个快捷连接,用于跳过某些层,以便保留原始输入信息。g
:分组卷积的组数。默认为 1,表示标准卷积。分组卷积将输入通道分成几个组,并在每个组内执行卷积操作。e
:扩展因子,用于确定卷积中的隐藏通道数量。通常是输出通道数的一部分。前向传播 (forward
):
self.cv1
) 对输入进行卷积操作,将通道数从 c1
缩减到 c_
(c2
的一半)。self.cv2
) 对输入进行卷积操作,也将通道数从 c1
缩减到 c_
(c2
的一半)。self.cv1(x)
和 self.cv2(x)
的结果进行拼接,得到一个具有 2 * c_
个通道的特征图。self.m
),该序列包含多个 Bottleneck
块,这些块用于进一步处理特征。self.m(self.cv1(x))
和 self.cv2(x)
的结果再次拼接,然后通过一个 1x1 的卷积层 (self.cv3
) 进行卷积操作,将通道数调整为 c2
,并返回卷积后的结果。class C3x(C3):
# C3 module with cross-convolutions
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5):
super().__init__(c1, c2, n, shortcut, g, e)
c_ = int(c2 * e)
self.m = nn.Sequential(*(CrossConv(c_, c_, 3, 1, g, 1.0, shortcut) for _ in range(n)))
?
C3x
类是基于 C3
类的扩展版本,它包含了交叉卷积 (Cross Convolution) 操作。
初始化方法 (__init__
):
c1
和 c2
:输入和输出通道数。n
:要重复的C3块的数量。shortcut
:一个布尔值,表示是否使用快捷连接(残差连接)。如果为 True
,则会添加一个快捷连接,用于跳过某些层,以便保留原始输入信息。g
:分组卷积的组数。默认为 1,表示标准卷积。分组卷积将输入通道分成几个组,并在每个组内执行卷积操作。e
:扩展因子,用于确定卷积中的隐藏通道数量。通常是输出通道数的一部分。前向传播 (forward
):
C3
类类似,首先,使用一个 1x1 的卷积层 (self.cv1
) 对输入进行卷积操作,将通道数从 c1
缩减到 c_
(c2
的一半)。self.cv2
) 对输入进行卷积操作,也将通道数从 c1
缩减到 c_
(c2
的一半)。self.cv1(x)
和 self.cv2(x)
的结果进行拼接,得到一个具有 2 * c_
个通道的特征图。C3x
类将 self.m
定义为一个序列,其中包含多个 CrossConv
块,而不是 Bottleneck
块。每个 CrossConv
块包括了交叉卷积操作,以增强特征的表示能力。self.m
中所有块的结果进行拼接,然后通过一个 1x1 的卷积层 (self.cv3
) 进行卷积操作,将通道数调整为 c2
,并返回卷积后的结果。class C3TR(C3):
# C3 module with TransformerBlock()
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5):
super().__init__(c1, c2, n, shortcut, g, e)
c_ = int(c2 * e)
self.m = TransformerBlock(c_, c_, 4, n)
?
C3TR
类是基于 C3
类的扩展版本,它包含了 TransformerBlock
操作。以下是该类的详细解释:
初始化方法 (__init__
):
c1
和 c2
:输入和输出通道数。n
:要重复的C3块的数量。shortcut
:一个布尔值,表示是否使用快捷连接(残差连接)。如果为 True
,则会添加一个快捷连接,用于跳过某些层,以便保留原始输入信息。g
:分组卷积的组数。默认为 1,表示标准卷积。分组卷积将输入通道分成几个组,并在每个组内执行卷积操作。e
:扩展因子,用于确定卷积中的隐藏通道数量。通常是输出通道数的一部分。前向传播 (forward
):
C3
类类似,C3T
使用一个 1x1 的卷积层 (self.cv1
) 对输入进行卷积操作,将通道数从 c1
缩减到 c_
(c2
的一半)。self.cv2
) 对输入进行卷积操作,也将通道数从 c1
缩减到 c_
(c2
的一半)。self.cv1(x)
和 self.cv2(x)
的结果进行拼接,得到一个具有 2 * c_
个通道的特征图。C3TR
类将 self.m
定义为一个 TransformerBlock
,并指定了 num_layers
参数为 4。这意味着在 self.m
中会有 4 个 Transformer 层堆叠在一起,用于对特征进行自注意力操作。self.m
的结果通过一个 1x1 的卷积层 (self.cv3
) 进行卷积操作,将通道数调整为 c2
,并返回卷积后的结果。class C3SPP(C3):
# C3 module with SPP()
def __init__(self, c1, c2, k=(5, 9, 13), n=1, shortcut=True, g=1, e=0.5):
super().__init__(c1, c2, n, shortcut, g, e)
c_ = int(c2 * e)
self.m = SPP(c_, c_, k)
?
C3SPP
类是基于 C3
类的扩展版本,它包含了 SPP
(Spatial Pyramid Pooling)操作。
初始化方法 (__init__
):
c1
和 c2
:输入和输出通道数。k
:SPP 池化层的池化核大小,是一个包含池化核尺寸的元组或列表。例如,k=(5, 9, 13)
表示使用三个不同尺寸的池化核进行 SPP。n
:要重复的C3块的数量。shortcut
:一个布尔值,表示是否使用快捷连接(残差连接)。如果为 True
,则会添加一个快捷连接,用于跳过某些层,以便保留原始输入信息。g
:分组卷积的组数。默认为 1,表示标准卷积。分组卷积将输入通道分成几个组,并在每个组内执行卷积操作。e
:扩展因子,用于确定卷积中的隐藏通道数量。通常是输出通道数的一部分。前向传播 (forward
):
C3
类类似,C3SPP
使用一个 1x1 的卷积层 (self.cv1
) 对输入进行卷积操作,将通道数从 c1
缩减到 c_
(c2
的一半)。self.cv2
) 对输入进行卷积操作,也将通道数从 c1
缩减到 c_
(c2
的一半)。self.cv1(x)
和 self.cv2(x)
的结果进行拼接,得到一个具有 2 * c_
个通道的特征图。C3SPP
类将 self.m
定义为一个 SPP
层,并指定了 k
参数,以便进行 Spatial Pyramid Pooling 操作。这将在不同尺寸的池化核下对特征图进行池化,以捕获不同尺度的信息。self.m
的结果通过一个 1x1 的卷积层 (self.cv3
) 进行卷积操作,将通道数调整为 c2
,并返回卷积后的结果。class C3Ghost(C3):
# C3 module with GhostBottleneck()
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5):
super().__init__(c1, c2, n, shortcut, g, e)
c_ = int(c2 * e) # hidden channels
self.m = nn.Sequential(*(GhostBottleneck(c_, c_) for _ in range(n)))
C3Ghost
类是基于 C3
类的扩展版本,它包含了 GhostBottleneck
操作。
初始化方法 (__init__
):
c1
和 c2
:输入和输出通道数。n
:要重复的C3块的数量。shortcut
:一个布尔值,表示是否使用快捷连接(残差连接)。如果为 True
,则会添加一个快捷连接,用于跳过某些层,以便保留原始输入信息。g
:分组卷积的组数。默认为 1,表示标准卷积。分组卷积将输入通道分成几个组,并在每个组内执行卷积操作。e
:扩展因子,用于确定卷积中的隐藏通道数量。通常是输出通道数的一部分。前向传播 (forward
):
C3
类类似,C3Ghost
使用一个 1x1 的卷积层 (self.cv1
) 对输入进行卷积操作,将通道数从 c1
缩减到 c_
(c2
的一半)。self.cv2
) 对输入进行卷积操作,也将通道数从 c1
缩减到 c_
(c2
的一半)。self.cv1(x)
和 self.cv2(x)
的结果进行拼接,得到一个具有 2 * c_
个通道的特征图。C3Ghost
类将 self.m
定义为一个 GhostBottleneck
层,并使用 nn.Sequential
将多个 GhostBottleneck
堆叠在一起。每个 GhostBottleneck
层都有自己的一系列卷积操作,其中包括主要的卷积操作和影子卷积操作。self.m
的结果通过一个 1x1 的卷积层 (self.cv3
) 进行卷积操作,将通道数调整为 c2
,并返回卷积后的结果。class SPP(nn.Module):
# Spatial Pyramid Pooling (SPP) layer https://arxiv.org/abs/1406.4729
def __init__(self, c1, c2, k=(5, 9, 13)):
super().__init__()
c_ = c1 // 2 # hidden channels
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = Conv(c_ * (len(k) + 1), c2, 1, 1)
self.m = nn.ModuleList([nn.MaxPool2d(kernel_size=x, stride=1, padding=x // 2) for x in k])
def forward(self, x):
x = self.cv1(x)
with warnings.catch_warnings():
warnings.simplefilter('ignore') # suppress torch 1.9.0 max_pool2d() warning
return self.cv2(torch.cat([x] + [m(x) for m in self.m], 1))
class SPPF(nn.Module):
# Spatial Pyramid Pooling - Fast (SPPF) layer for YOLOv5 by Glenn Jocher
def __init__(self, c1, c2, k=5): # equivalent to SPP(k=(5, 9, 13))
super().__init__()
c_ = c1 // 2 # hidden channels
self.cv1 = Conv(c1, c_, 1, 1)
self.cv2 = Conv(c_ * 4, c2, 1, 1)
self.m = nn.MaxPool2d(kernel_size=k, stride=1, padding=k // 2)
def forward(self, x):
x = self.cv1(x)
with warnings.catch_warnings():
warnings.simplefilter('ignore') # suppress torch 1.9.0 max_pool2d() warning
y1 = self.m(x)
y2 = self.m(y1)
return self.cv2(torch.cat((x, y1, y2, self.m(y2)), 1))
SPP
和 SPPF
类都是与空间金字塔池化(Spatial Pyramid Pooling, SPP)相关的层,用于提取输入特征的不同尺度的信息。以下是这两个类的详细解释:
SPP (Spatial Pyramid Pooling) 层:
__init__
):
c1
和 c2
:输入和输出通道数。k
:一个元组,包含不同大小的池化核的大小。默认为 (5, 9, 13)
,这意味着将使用3种不同大小的池化核进行池化操作。forward
):
self.cv1
) 对输入进行卷积操作,将通道数减半,得到 c_
个通道。nn.MaxPool2d
) 对输入进行不同尺度的池化操作。池化核的大小由 k
参数确定。self.cv2
) 进行卷积操作,将通道数调整为 c2
,并返回卷积后的结果。SPPF (Spatial Pyramid Pooling - Fast) 层:
__init__
):
c1
和 c2
:输入和输出通道数。k
:一个整数,表示使用的池化核的大小。默认为 5
,这意味着将使用一个大小为 5x5
的池化核。SPPF
是 SPP
的快速版本,等效于 SPP
中 k=(5, 9, 13)
的情况。forward
):
SPP
类似,首先使用一个 1x1 的卷积层 (self.cv1
) 对输入进行卷积操作,将通道数减半,得到 c_
个通道。kxk
大小的最大池化层 (nn.MaxPool2d
) 对输入进行池化操作。self.cv2
) 进行卷积操作,将通道数调整为 c2
,并返回卷积后的结果。class Focus(nn.Module):
# Focus wh information into c-space
def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True): # ch_in, ch_out, kernel, stride, padding, groups
super().__init__()
self.conv = Conv(c1 * 4, c2, k, s, p, g, act=act)
# self.contract = Contract(gain=2)
def forward(self, x): # x(b,c,w,h) -> y(b,4c,w/2,h/2)
return self.conv(torch.cat((x[..., ::2, ::2], x[..., 1::2, ::2], x[..., ::2, 1::2], x[..., 1::2, 1::2]), 1))
# return self.conv(self.contract(x))
Focus
类是一个用于将输入的 wh
信息(宽度和高度信息)聚合到通道维度 c-space
的模块。
__init__
):
c1
和 c2
:输入和输出通道数。k
:卷积核的大小,默认为 1
。s
:卷积的步幅,默认为 1
。p
:卷积的填充大小,默认为 None
,表示自动填充。g
:卷积的分组数,默认为 1
。act
:是否使用激活函数,默认为 True
,表示使用激活函数。forward
):
x
的形状为 (b, c, w, h)
,其中 b
是批次大小,c
是通道数,w
是宽度,h
是高度。Focus
模块首先将输入 x
沿通道维度进行分割成4个子张量,然后将这4个子张量按特定规则合并成一个张量。具体来说,它将输入张量 x
分为四个子张量:
self.conv
进行卷积操作,将 wh
信息聚合到通道维度,得到输出。class GhostConv(nn.Module):
# Ghost Convolution https://github.com/huawei-noah/ghostnet
def __init__(self, c1, c2, k=1, s=1, g=1, act=True): # ch_in, ch_out, kernel, stride, groups
super().__init__()
c_ = c2 // 2 # hidden channels
self.cv1 = Conv(c1, c_, k, s, None, g, act=act)
self.cv2 = Conv(c_, c_, 5, 1, None, c_, act=act)
def forward(self, x):
y = self.cv1(x)
return torch.cat((y, self.cv2(y)), 1)
class GhostBottleneck(nn.Module):
# Ghost Bottleneck https://github.com/huawei-noah/ghostnet
def __init__(self, c1, c2, k=3, s=1): # ch_in, ch_out, kernel, stride
super().__init__()
c_ = c2 // 2
self.conv = nn.Sequential(
GhostConv(c1, c_, 1, 1), # pw
DWConv(c_, c_, k, s, act=False) if s == 2 else nn.Identity(), # dw
GhostConv(c_, c2, 1, 1, act=False)) # pw-linear
self.shortcut = nn.Sequential(DWConv(c1, c1, k, s, act=False), Conv(c1, c2, 1, 1,
act=False)) if s == 2 else nn.Identity()
def forward(self, x):
return self.conv(x) + self.shortcut(x)
?
GhostConv
和 GhostBottleneck
类实现了幽灵卷积(Ghost Convolution)和幽灵瓶颈(Ghost Bottleneck)的概念,这些技术用于减少卷积的计算成本,同时保持模型性能。这些技术是在 GhostNet 架构中引入的。
以下是对每个类的说明:
GhostConv
是一个卷积层,由两个部分组成。self.cv1
)是一个标准卷积层,它将输入通道数(c1
)减少到一半(c_
),如果 act=True
,则应用激活函数。self.cv2
)是一个深度卷积层,卷积核大小为 5x5,进一步处理减少通道数的特征。如果 act=True
,则也应用激活函数。GhostBottleneck
是卷积神经网络中常用的瓶颈块。self.conv
)和快捷连接(self.shortcut
)。GhostConv
层进行第一次卷积(pw
- 点卷积),然后如果步幅 s
为 2(以降低空间维度),则进行深度卷积(dw
)。最后,再次应用 GhostConv
层进行第二次点卷积(pw-linear
)。s
为 2,则应用深度卷积,然后是标准卷积,以调整快捷连接的维度,以使其与主要卷积部分的维度匹配。?
class Contract(nn.Module):
# Contract width-height into channels, i.e. x(1,64,80,80) to x(1,256,40,40)
def __init__(self, gain=2):
super().__init__()
self.gain = gain
def forward(self, x):
b, c, h, w = x.size() # assert (h / s == 0) and (W / s == 0), 'Indivisible gain'
s = self.gain
x = x.view(b, c, h // s, s, w // s, s) # x(1,64,40,2,40,2)
x = x.permute(0, 3, 5, 1, 2, 4).contiguous() # x(1,2,2,64,40,40)
return x.view(b, c * s * s, h // s, w // s) # x(1,256,40,40)
class Expand(nn.Module):
# Expand channels into width-height, i.e. x(1,64,80,80) to x(1,16,160,160)
def __init__(self, gain=2):
super().__init__()
self.gain = gain
def forward(self, x):
b, c, h, w = x.size() # assert C / s ** 2 == 0, 'Indivisible gain'
s = self.gain
x = x.view(b, s, s, c // s ** 2, h, w) # x(1,2,2,16,80,80)
x = x.permute(0, 3, 4, 1, 5, 2).contiguous() # x(1,16,80,2,80,2)
return x.view(b, c // s ** 2, h * s, w * s) # x(1,16,160,160)
class Concat(nn.Module):
# Concatenate a list of tensors along dimension
def __init__(self, dimension=1):
super().__init__()
self.d = dimension
def forward(self, x):
return torch.cat(x, self.d)
Contract
模块Contract
模块的作用是将输入张量的宽度和高度合并成通道数。例如,将输入张量从维度 (1, 64, 80, 80)
转换为维度 (1, 256, 40, 40)
。gain
参数用于控制合并的倍数,默认值为 2。gain
整除。gain
倍数进行合并,然后重新排列维度。Expand
模块Expand
模块的作用与 Contract
模块相反,它将通道数扩展为宽度和高度。例如,将输入张量从维度 (1, 64, 80, 80)
转换为维度 (1, 16, 160, 160)
。gain
参数用于控制扩展的倍数,默认值为 2。gain
整除。gain
倍数进行扩展,然后重新排列维度。Concat
模块Concat
模块用于将一组张量沿着指定维度进行拼接。dimension
参数指定了要拼接的维度,默认为 1。class DetectMultiBackend(nn.Module):
# YOLOv5 MultiBackend class for python inference on various backends
def __init__(self, weights='yolov5s.pt', device=torch.device('cpu'), dnn=False, data=None, fp16=False, fuse=True):
# Usage:
# PyTorch: weights = *.pt
# TorchScript: *.torchscript
# ONNX Runtime: *.onnx
# ONNX OpenCV DNN: *.onnx --dnn
# OpenVINO: *_openvino_model
# CoreML: *.mlmodel
# TensorRT: *.engine
# TensorFlow SavedModel: *_saved_model
# TensorFlow GraphDef: *.pb
# TensorFlow Lite: *.tflite
# TensorFlow Edge TPU: *_edgetpu.tflite
# PaddlePaddle: *_paddle_model
from models.experimental import attempt_download, attempt_load # scoped to avoid circular import
super().__init__()
w = str(weights[0] if isinstance(weights, list) else weights)
pt, jit, onnx, xml, engine, coreml, saved_model, pb, tflite, edgetpu, tfjs, paddle, triton = self._model_type(w)
fp16 &= pt or jit or onnx or engine or triton # FP16
nhwc = coreml or saved_model or pb or tflite or edgetpu # BHWC formats (vs torch BCWH)
stride = 32 # default stride
cuda = torch.cuda.is_available() and device.type != 'cpu' # use CUDA
if not (pt or triton):
w = attempt_download(w) # download if not local
if pt: # PyTorch
model = attempt_load(weights if isinstance(weights, list) else w, device=device, inplace=True, fuse=fuse)
stride = max(int(model.stride.max()), 32) # model stride
names = model.module.names if hasattr(model, 'module') else model.names # get class names
model.half() if fp16 else model.float()
self.model = model # explicitly assign for to(), cpu(), cuda(), half()
elif jit: # TorchScript
LOGGER.info(f'Loading {w} for TorchScript inference...')
extra_files = {'config.txt': ''} # model metadata
model = torch.jit.load(w, _extra_files=extra_files, map_location=device)
model.half() if fp16 else model.float()
if extra_files['config.txt']: # load metadata dict
d = json.loads(extra_files['config.txt'],
object_hook=lambda d: {
int(k) if k.isdigit() else k: v
for k, v in d.items()})
stride, names = int(d['stride']), d['names']
elif dnn: # ONNX OpenCV DNN
LOGGER.info(f'Loading {w} for ONNX OpenCV DNN inference...')
check_requirements('opencv-python>=4.5.4')
net = cv2.dnn.readNetFromONNX(w)
elif onnx: # ONNX Runtime
LOGGER.info(f'Loading {w} for ONNX Runtime inference...')
check_requirements(('onnx', 'onnxruntime-gpu' if cuda else 'onnxruntime'))
import onnxruntime
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if cuda else ['CPUExecutionProvider']
session = onnxruntime.InferenceSession(w, providers=providers)
output_names = [x.name for x in session.get_outputs()]
meta = session.get_modelmeta().custom_metadata_map # metadata
if 'stride' in meta:
stride, names = int(meta['stride']), eval(meta['names'])
elif xml: # OpenVINO
LOGGER.info(f'Loading {w} for OpenVINO inference...')
check_requirements('openvino>=2023.0') # requires openvino-dev: https://pypi.org/project/openvino-dev/
from openvino.runtime import Core, Layout, get_batch
core = Core()
if not Path(w).is_file(): # if not *.xml
w = next(Path(w).glob('*.xml')) # get *.xml file from *_openvino_model dir
ov_model = core.read_model(model=w, weights=Path(w).with_suffix('.bin'))
if ov_model.get_parameters()[0].get_layout().empty:
ov_model.get_parameters()[0].set_layout(Layout('NCHW'))
batch_dim = get_batch(ov_model)
if batch_dim.is_static:
batch_size = batch_dim.get_length()
ov_compiled_model = core.compile_model(ov_model, device_name='AUTO') # AUTO selects best available device
stride, names = self._load_metadata(Path(w).with_suffix('.yaml')) # load metadata
elif engine: # TensorRT
LOGGER.info(f'Loading {w} for TensorRT inference...')
import tensorrt as trt # https://developer.nvidia.com/nvidia-tensorrt-download
check_version(trt.__version__, '7.0.0', hard=True) # require tensorrt>=7.0.0
if device.type == 'cpu':
device = torch.device('cuda:0')
Binding = namedtuple('Binding', ('name', 'dtype', 'shape', 'data', 'ptr'))
logger = trt.Logger(trt.Logger.INFO)
with open(w, 'rb') as f, trt.Runtime(logger) as runtime:
model = runtime.deserialize_cuda_engine(f.read())
context = model.create_execution_context()
bindings = OrderedDict()
output_names = []
fp16 = False # default updated below
dynamic = False
for i in range(model.num_bindings):
name = model.get_binding_name(i)
dtype = trt.nptype(model.get_binding_dtype(i))
if model.binding_is_input(i):
if -1 in tuple(model.get_binding_shape(i)): # dynamic
dynamic = True
context.set_binding_shape(i, tuple(model.get_profile_shape(0, i)[2]))
if dtype == np.float16:
fp16 = True
else: # output
output_names.append(name)
shape = tuple(context.get_binding_shape(i))
im = torch.from_numpy(np.empty(shape, dtype=dtype)).to(device)
bindings[name] = Binding(name, dtype, shape, im, int(im.data_ptr()))
binding_addrs = OrderedDict((n, d.ptr) for n, d in bindings.items())
batch_size = bindings['images'].shape[0] # if dynamic, this is instead max batch size
elif coreml: # CoreML
LOGGER.info(f'Loading {w} for CoreML inference...')
import coremltools as ct
model = ct.models.MLModel(w)
elif saved_model: # TF SavedModel
LOGGER.info(f'Loading {w} for TensorFlow SavedModel inference...')
import tensorflow as tf
keras = False # assume TF1 saved_model
model = tf.keras.models.load_model(w) if keras else tf.saved_model.load(w)
elif pb: # GraphDef https://www.tensorflow.org/guide/migrate#a_graphpb_or_graphpbtxt
LOGGER.info(f'Loading {w} for TensorFlow GraphDef inference...')
import tensorflow as tf
def wrap_frozen_graph(gd, inputs, outputs):
x = tf.compat.v1.wrap_function(lambda: tf.compat.v1.import_graph_def(gd, name=''), []) # wrapped
ge = x.graph.as_graph_element
return x.prune(tf.nest.map_structure(ge, inputs), tf.nest.map_structure(ge, outputs))
def gd_outputs(gd):
name_list, input_list = [], []
for node in gd.node: # tensorflow.core.framework.node_def_pb2.NodeDef
name_list.append(node.name)
input_list.extend(node.input)
return sorted(f'{x}:0' for x in list(set(name_list) - set(input_list)) if not x.startswith('NoOp'))
gd = tf.Graph().as_graph_def() # TF GraphDef
with open(w, 'rb') as f:
gd.ParseFromString(f.read())
frozen_func = wrap_frozen_graph(gd, inputs='x:0', outputs=gd_outputs(gd))
elif tflite or edgetpu: # https://www.tensorflow.org/lite/guide/python#install_tensorflow_lite_for_python
try: # https://coral.ai/docs/edgetpu/tflite-python/#update-existing-tf-lite-code-for-the-edge-tpu
from tflite_runtime.interpreter import Interpreter, load_delegate
except ImportError:
import tensorflow as tf
Interpreter, load_delegate = tf.lite.Interpreter, tf.lite.experimental.load_delegate,
if edgetpu: # TF Edge TPU https://coral.ai/software/#edgetpu-runtime
LOGGER.info(f'Loading {w} for TensorFlow Lite Edge TPU inference...')
delegate = {
'Linux': 'libedgetpu.so.1',
'Darwin': 'libedgetpu.1.dylib',
'Windows': 'edgetpu.dll'}[platform.system()]
interpreter = Interpreter(model_path=w, experimental_delegates=[load_delegate(delegate)])
else: # TFLite
LOGGER.info(f'Loading {w} for TensorFlow Lite inference...')
interpreter = Interpreter(model_path=w) # load TFLite model
interpreter.allocate_tensors() # allocate
input_details = interpreter.get_input_details() # inputs
output_details = interpreter.get_output_details() # outputs
# load metadata
with contextlib.suppress(zipfile.BadZipFile):
with zipfile.ZipFile(w, 'r') as model:
meta_file = model.namelist()[0]
meta = ast.literal_eval(model.read(meta_file).decode('utf-8'))
stride, names = int(meta['stride']), meta['names']
elif tfjs: # TF.js
raise NotImplementedError('ERROR: YOLOv5 TF.js inference is not supported')
elif paddle: # PaddlePaddle
LOGGER.info(f'Loading {w} for PaddlePaddle inference...')
check_requirements('paddlepaddle-gpu' if cuda else 'paddlepaddle')
import paddle.inference as pdi
if not Path(w).is_file(): # if not *.pdmodel
w = next(Path(w).rglob('*.pdmodel')) # get *.pdmodel file from *_paddle_model dir
weights = Path(w).with_suffix('.pdiparams')
config = pdi.Config(str(w), str(weights))
if cuda:
config.enable_use_gpu(memory_pool_init_size_mb=2048, device_id=0)
predictor = pdi.create_predictor(config)
input_handle = predictor.get_input_handle(predictor.get_input_names()[0])
output_names = predictor.get_output_names()
elif triton: # NVIDIA Triton Inference Server
LOGGER.info(f'Using {w} as Triton Inference Server...')
check_requirements('tritonclient[all]')
from utils.triton import TritonRemoteModel
model = TritonRemoteModel(url=w)
nhwc = model.runtime.startswith('tensorflow')
else:
raise NotImplementedError(f'ERROR: {w} is not a supported format')
# class names
if 'names' not in locals():
names = yaml_load(data)['names'] if data else {i: f'class{i}' for i in range(999)}
if names[0] == 'n01440764' and len(names) == 1000: # ImageNet
names = yaml_load(ROOT / 'data/ImageNet.yaml')['names'] # human-readable names
self.__dict__.update(locals()) # assign all variables to self
def forward(self, im, augment=False, visualize=False):
# YOLOv5 MultiBackend inference
b, ch, h, w = im.shape # batch, channel, height, width
if self.fp16 and im.dtype != torch.float16:
im = im.half() # to FP16
if self.nhwc:
im = im.permute(0, 2, 3, 1) # torch BCHW to numpy BHWC shape(1,320,192,3)
if self.pt: # PyTorch
y = self.model(im, augment=augment, visualize=visualize) if augment or visualize else self.model(im)
elif self.jit: # TorchScript
y = self.model(im)
elif self.dnn: # ONNX OpenCV DNN
im = im.cpu().numpy() # torch to numpy
self.net.setInput(im)
y = self.net.forward()
elif self.onnx: # ONNX Runtime
im = im.cpu().numpy() # torch to numpy
y = self.session.run(self.output_names, {self.session.get_inputs()[0].name: im})
elif self.xml: # OpenVINO
im = im.cpu().numpy() # FP32
y = list(self.ov_compiled_model(im).values())
elif self.engine: # TensorRT
if self.dynamic and im.shape != self.bindings['images'].shape:
i = self.model.get_binding_index('images')
self.context.set_binding_shape(i, im.shape) # reshape if dynamic
self.bindings['images'] = self.bindings['images']._replace(shape=im.shape)
for name in self.output_names:
i = self.model.get_binding_index(name)
self.bindings[name].data.resize_(tuple(self.context.get_binding_shape(i)))
s = self.bindings['images'].shape
assert im.shape == s, f"input size {im.shape} {'>' if self.dynamic else 'not equal to'} max model size {s}"
self.binding_addrs['images'] = int(im.data_ptr())
self.context.execute_v2(list(self.binding_addrs.values()))
y = [self.bindings[x].data for x in sorted(self.output_names)]
elif self.coreml: # CoreML
im = im.cpu().numpy()
im = Image.fromarray((im[0] * 255).astype('uint8'))
# im = im.resize((192, 320), Image.BILINEAR)
y = self.model.predict({'image': im}) # coordinates are xywh normalized
if 'confidence' in y:
box = xywh2xyxy(y['coordinates'] * [[w, h, w, h]]) # xyxy pixels
conf, cls = y['confidence'].max(1), y['confidence'].argmax(1).astype(np.float)
y = np.concatenate((box, conf.reshape(-1, 1), cls.reshape(-1, 1)), 1)
else:
y = list(reversed(y.values())) # reversed for segmentation models (pred, proto)
elif self.paddle: # PaddlePaddle
im = im.cpu().numpy().astype(np.float32)
self.input_handle.copy_from_cpu(im)
self.predictor.run()
y = [self.predictor.get_output_handle(x).copy_to_cpu() for x in self.output_names]
elif self.triton: # NVIDIA Triton Inference Server
y = self.model(im)
else: # TensorFlow (SavedModel, GraphDef, Lite, Edge TPU)
im = im.cpu().numpy()
if self.saved_model: # SavedModel
y = self.model(im, training=False) if self.keras else self.model(im)
elif self.pb: # GraphDef
y = self.frozen_func(x=self.tf.constant(im))
else: # Lite or Edge TPU
input = self.input_details[0]
int8 = input['dtype'] == np.uint8 # is TFLite quantized uint8 model
if int8:
scale, zero_point = input['quantization']
im = (im / scale + zero_point).astype(np.uint8) # de-scale
self.interpreter.set_tensor(input['index'], im)
self.interpreter.invoke()
y = []
for output in self.output_details:
x = self.interpreter.get_tensor(output['index'])
if int8:
scale, zero_point = output['quantization']
x = (x.astype(np.float32) - zero_point) * scale # re-scale
y.append(x)
y = [x if isinstance(x, np.ndarray) else x.numpy() for x in y]
y[0][..., :4] *= [w, h, w, h] # xywh normalized to pixels
if isinstance(y, (list, tuple)):
return self.from_numpy(y[0]) if len(y) == 1 else [self.from_numpy(x) for x in y]
else:
return self.from_numpy(y)
def from_numpy(self, x):
return torch.from_numpy(x).to(self.device) if isinstance(x, np.ndarray) else x
def warmup(self, imgsz=(1, 3, 640, 640)):
# Warmup model by running inference once
warmup_types = self.pt, self.jit, self.onnx, self.engine, self.saved_model, self.pb, self.triton
if any(warmup_types) and (self.device.type != 'cpu' or self.triton):
im = torch.empty(*imgsz, dtype=torch.half if self.fp16 else torch.float, device=self.device) # input
for _ in range(2 if self.jit else 1): #
self.forward(im) # warmup
@staticmethod
def _model_type(p='path/to/model.pt'):
# Return model type from model path, i.e. path='path/to/model.onnx' -> type=onnx
# types = [pt, jit, onnx, xml, engine, coreml, saved_model, pb, tflite, edgetpu, tfjs, paddle]
from export import export_formats
from utils.downloads import is_url
sf = list(export_formats().Suffix) # export suffixes
if not is_url(p, check=False):
check_suffix(p, sf) # checks
url = urlparse(p) # if url may be Triton inference server
types = [s in Path(p).name for s in sf]
types[8] &= not types[9] # tflite &= not edgetpu
triton = not any(types) and all([any(s in url.scheme for s in ['http', 'grpc']), url.netloc])
return types + [triton]
@staticmethod
def _load_metadata(f=Path('path/to/meta.yaml')):
# Load metadata from meta.yaml if it exists
if f.exists():
d = yaml_load(f)
return d['stride'], d['names'] # assign stride, names
return None, None
DetectMultiBackend
?模块,用于在不同的后端上执行 YOLOv5 目标检测模型推理。这个模块支持多种推理后端,包括 PyTorch、TorchScript、ONNX Runtime、OpenVINO、TensorRT、CoreML、TensorFlow SavedModel、TensorFlow GraphDef、TensorFlow Lite、TensorFlow Edge TPU、PaddlePaddle 和 NVIDIA Triton Inference Server。以下是主要功能和用法的简要说明:
weights
参数用于指定模型权重的路径或名称,可以是不同后端支持的多种格式。例如,PyTorch 模型可以是 *.pt
文件,ONNX 模型可以是 *.onnx
文件,以此类推。device
参数用于指定模型推理运行的设备,可以是 CPU 或 GPU。dnn
参数用于指定是否使用 OpenCV DNN 推理后端。data
参数用于指定模型的元数据信息,例如模型的类别名称等。fp16
参数用于指定是否使用 FP16 数据类型进行推理。fuse
参数用于指定是否在模型中启用融合操作。class AutoShape(nn.Module):
# YOLOv5 input-robust model wrapper for passing cv2/np/PIL/torch inputs. Includes preprocessing, inference and NMS
conf = 0.25 # NMS confidence threshold
iou = 0.45 # NMS IoU threshold
agnostic = False # NMS class-agnostic
multi_label = False # NMS multiple labels per box
classes = None # (optional list) filter by class, i.e. = [0, 15, 16] for COCO persons, cats and dogs
max_det = 1000 # maximum number of detections per image
amp = False # Automatic Mixed Precision (AMP) inference
def __init__(self, model, verbose=True):
super().__init__()
if verbose:
LOGGER.info('Adding AutoShape... ')
copy_attr(self, model, include=('yaml', 'nc', 'hyp', 'names', 'stride', 'abc'), exclude=()) # copy attributes
self.dmb = isinstance(model, DetectMultiBackend) # DetectMultiBackend() instance
self.pt = not self.dmb or model.pt # PyTorch model
self.model = model.eval()
if self.pt:
m = self.model.model.model[-1] if self.dmb else self.model.model[-1] # Detect()
m.inplace = False # Detect.inplace=False for safe multithread inference
m.export = True # do not output loss values
def _apply(self, fn):
# Apply to(), cpu(), cuda(), half() to model tensors that are not parameters or registered buffers
self = super()._apply(fn)
if self.pt:
m = self.model.model.model[-1] if self.dmb else self.model.model[-1] # Detect()
m.stride = fn(m.stride)
m.grid = list(map(fn, m.grid))
if isinstance(m.anchor_grid, list):
m.anchor_grid = list(map(fn, m.anchor_grid))
return self
@smart_inference_mode()
def forward(self, ims, size=640, augment=False, profile=False):
# Inference from various sources. For size(height=640, width=1280), RGB images example inputs are:
# file: ims = 'data/images/zidane.jpg' # str or PosixPath
# URI: = 'https://ultralytics.com/images/zidane.jpg'
# OpenCV: = cv2.imread('image.jpg')[:,:,::-1] # HWC BGR to RGB x(640,1280,3)
# PIL: = Image.open('image.jpg') or ImageGrab.grab() # HWC x(640,1280,3)
# numpy: = np.zeros((640,1280,3)) # HWC
# torch: = torch.zeros(16,3,320,640) # BCHW (scaled to size=640, 0-1 values)
# multiple: = [Image.open('image1.jpg'), Image.open('image2.jpg'), ...] # list of images
dt = (Profile(), Profile(), Profile())
with dt[0]:
if isinstance(size, int): # expand
size = (size, size)
p = next(self.model.parameters()) if self.pt else torch.empty(1, device=self.model.device) # param
autocast = self.amp and (p.device.type != 'cpu') # Automatic Mixed Precision (AMP) inference
if isinstance(ims, torch.Tensor): # torch
with amp.autocast(autocast):
return self.model(ims.to(p.device).type_as(p), augment=augment) # inference
# Pre-process
n, ims = (len(ims), list(ims)) if isinstance(ims, (list, tuple)) else (1, [ims]) # number, list of images
shape0, shape1, files = [], [], [] # image and inference shapes, filenames
for i, im in enumerate(ims):
f = f'image{i}' # filename
if isinstance(im, (str, Path)): # filename or uri
im, f = Image.open(requests.get(im, stream=True).raw if str(im).startswith('http') else im), im
im = np.asarray(exif_transpose(im))
elif isinstance(im, Image.Image): # PIL Image
im, f = np.asarray(exif_transpose(im)), getattr(im, 'filename', f) or f
files.append(Path(f).with_suffix('.jpg').name)
if im.shape[0] < 5: # image in CHW
im = im.transpose((1, 2, 0)) # reverse dataloader .transpose(2, 0, 1)
im = im[..., :3] if im.ndim == 3 else cv2.cvtColor(im, cv2.COLOR_GRAY2BGR) # enforce 3ch input
s = im.shape[:2] # HWC
shape0.append(s) # image shape
g = max(size) / max(s) # gain
shape1.append([int(y * g) for y in s])
ims[i] = im if im.data.contiguous else np.ascontiguousarray(im) # update
shape1 = [make_divisible(x, self.stride) for x in np.array(shape1).max(0)] # inf shape
x = [letterbox(im, shape1, auto=False)[0] for im in ims] # pad
x = np.ascontiguousarray(np.array(x).transpose((0, 3, 1, 2))) # stack and BHWC to BCHW
x = torch.from_numpy(x).to(p.device).type_as(p) / 255 # uint8 to fp16/32
with amp.autocast(autocast):
# Inference
with dt[1]:
y = self.model(x, augment=augment) # forward
# Post-process
with dt[2]:
y = non_max_suppression(y if self.dmb else y[0],
self.conf,
self.iou,
self.classes,
self.agnostic,
self.multi_label,
max_det=self.max_det) # NMS
for i in range(n):
scale_boxes(shape1, y[i][:, :4], shape0[i])
return Detections(ims, y, files, dt, self.names, x.shape)
?
AutoShape
类,它是YOLOv5模型的一个包装器,用于处理不同类型的输入数据并进行模型推理和后处理。
这个类包括了一些配置参数,如NMS(非极大值抑制)的置信度阈值、IoU(交并比)阈值、是否类别无关等。这些参数用于在模型推理后执行NMS以获得最终的检测结果。
__init__
方法用于初始化AutoShape
类的实例。它接受一个模型作为参数,并可以选择是否启用详细输出(verbose)。在初始化过程中,它复制了模型的一些属性,并将模型设为评估模式(eval)。
_apply
方法用于将函数应用于模型张量,以便在不影响参数或注册缓冲区的情况下应用函数。它还处理了一些模型的属性,如步幅(stride)等。
forward
方法是模型的前向推理方法。它接受输入数据ims
,可以是各种来源的图像数据,包括文件、URI、OpenCV、PIL、numpy数组和torch张量等。该方法首先对输入数据进行预处理,然后使用模型进行推理,最后执行NMS以获得检测结果。
class Detections:
# YOLOv5 detections class for inference results
def __init__(self, ims, pred, files, times=(0, 0, 0), names=None, shape=None):
super().__init__()
d = pred[0].device # device
gn = [torch.tensor([*(im.shape[i] for i in [1, 0, 1, 0]), 1, 1], device=d) for im in ims] # normalizations
self.ims = ims # list of images as numpy arrays
self.pred = pred # list of tensors pred[0] = (xyxy, conf, cls)
self.names = names # class names
self.files = files # image filenames
self.times = times # profiling times
self.xyxy = pred # xyxy pixels
self.xywh = [xyxy2xywh(x) for x in pred] # xywh pixels
self.xyxyn = [x / g for x, g in zip(self.xyxy, gn)] # xyxy normalized
self.xywhn = [x / g for x, g in zip(self.xywh, gn)] # xywh normalized
self.n = len(self.pred) # number of images (batch size)
self.t = tuple(x.t / self.n * 1E3 for x in times) # timestamps (ms)
self.s = tuple(shape) # inference BCHW shape
def _run(self, pprint=False, show=False, save=False, crop=False, render=False, labels=True, save_dir=Path('')):
s, crops = '', []
for i, (im, pred) in enumerate(zip(self.ims, self.pred)):
s += f'\nimage {i + 1}/{len(self.pred)}: {im.shape[0]}x{im.shape[1]} ' # string
if pred.shape[0]:
for c in pred[:, -1].unique():
n = (pred[:, -1] == c).sum() # detections per class
s += f"{n} {self.names[int(c)]}{'s' * (n > 1)}, " # add to string
s = s.rstrip(', ')
if show or save or render or crop:
annotator = Annotator(im, example=str(self.names))
for *box, conf, cls in reversed(pred): # xyxy, confidence, class
label = f'{self.names[int(cls)]} {conf:.2f}'
if crop:
file = save_dir / 'crops' / self.names[int(cls)] / self.files[i] if save else None
crops.append({
'box': box,
'conf': conf,
'cls': cls,
'label': label,
'im': save_one_box(box, im, file=file, save=save)})
else: # all others
annotator.box_label(box, label if labels else '', color=colors(cls))
im = annotator.im
else:
s += '(no detections)'
im = Image.fromarray(im.astype(np.uint8)) if isinstance(im, np.ndarray) else im # from np
if show:
if is_jupyter():
from IPython.display import display
display(im)
else:
im.show(self.files[i])
if save:
f = self.files[i]
im.save(save_dir / f) # save
if i == self.n - 1:
LOGGER.info(f"Saved {self.n} image{'s' * (self.n > 1)} to {colorstr('bold', save_dir)}")
if render:
self.ims[i] = np.asarray(im)
if pprint:
s = s.lstrip('\n')
return f'{s}\nSpeed: %.1fms pre-process, %.1fms inference, %.1fms NMS per image at shape {self.s}' % self.t
if crop:
if save:
LOGGER.info(f'Saved results to {save_dir}\n')
return crops
@TryExcept('Showing images is not supported in this environment')
def show(self, labels=True):
self._run(show=True, labels=labels) # show results
def save(self, labels=True, save_dir='runs/detect/exp', exist_ok=False):
save_dir = increment_path(save_dir, exist_ok, mkdir=True) # increment save_dir
self._run(save=True, labels=labels, save_dir=save_dir) # save results
def crop(self, save=True, save_dir='runs/detect/exp', exist_ok=False):
save_dir = increment_path(save_dir, exist_ok, mkdir=True) if save else None
return self._run(crop=True, save=save, save_dir=save_dir) # crop results
def render(self, labels=True):
self._run(render=True, labels=labels) # render results
return self.ims
def pandas(self):
# return detections as pandas DataFrames, i.e. print(results.pandas().xyxy[0])
new = copy(self) # return copy
ca = 'xmin', 'ymin', 'xmax', 'ymax', 'confidence', 'class', 'name' # xyxy columns
cb = 'xcenter', 'ycenter', 'width', 'height', 'confidence', 'class', 'name' # xywh columns
for k, c in zip(['xyxy', 'xyxyn', 'xywh', 'xywhn'], [ca, ca, cb, cb]):
a = [[x[:5] + [int(x[5]), self.names[int(x[5])]] for x in x.tolist()] for x in getattr(self, k)] # update
setattr(new, k, [pd.DataFrame(x, columns=c) for x in a])
return new
def tolist(self):
# return a list of Detections objects, i.e. 'for result in results.tolist():'
r = range(self.n) # iterable
x = [Detections([self.ims[i]], [self.pred[i]], [self.files[i]], self.times, self.names, self.s) for i in r]
# for d in x:
# for k in ['ims', 'pred', 'xyxy', 'xyxyn', 'xywh', 'xywhn']:
# setattr(d, k, getattr(d, k)[0]) # pop out of list
return x
def print(self):
LOGGER.info(self.__str__())
def __len__(self): # override len(results)
return self.n
def __str__(self): # override print(results)
return self._run(pprint=True) # print results
def __repr__(self):
return f'YOLOv5 {self.__class__} instance\n' + self.__str__()
Detections
的类,用于处理YOLOv5模型的推理结果。
__init__
方法用于初始化Detections
类的实例。它接受多个参数,包括输入的图像(ims)、模型的预测结果(pred)、图像文件名(files)、推理时间(times)、类别名称(names)和推理的形状(shape)等。这个类的主要目的是将这些信息组织起来,以便后续的处理和展示。
show
方法用于展示推理结果,可以选择是否显示类别标签(labels)。
save
方法用于将推理结果保存到指定的目录,可以选择是否保存类别标签(labels)。
crop
方法用于裁剪检测结果的区域,并可以选择是否保存裁剪的结果。
render
方法用于渲染检测结果,可以选择是否显示类别标签(labels)。
pandas
方法返回检测结果的Pandas DataFrame格式,可以方便地进行数据分析和处理。
tolist
方法返回一个包含多个Detections
对象的列表,每个对象代表一个检测结果。
print
方法用于打印检测结果的摘要信息。
__len__
方法返回检测结果的数量。
__str__
方法用于生成检测结果的字符串表示,包括推理时间和检测结果的统计信息。
__repr__
方法返回该类的描述信息。
class Proto(nn.Module):
# YOLOv5 mask Proto module for segmentation models
def __init__(self, c1, c_=256, c2=32): # ch_in, number of protos, number of masks
super().__init__()
self.cv1 = Conv(c1, c_, k=3)
self.upsample = nn.Upsample(scale_factor=2, mode='nearest')
self.cv2 = Conv(c_, c_, k=3)
self.cv3 = Conv(c_, c2)
def forward(self, x):
return self.cv3(self.cv2(self.upsample(self.cv1(x))))
class Classify(nn.Module):
# YOLOv5 classification head, i.e. x(b,c1,20,20) to x(b,c2)
def __init__(self,
c1,
c2,
k=1,
s=1,
p=None,
g=1,
dropout_p=0.0): # ch_in, ch_out, kernel, stride, padding, groups, dropout probability
super().__init__()
c_ = 1280 # efficientnet_b0 size
self.conv = Conv(c1, c_, k, s, autopad(k, p), g)
self.pool = nn.AdaptiveAvgPool2d(1) # to x(b,c_,1,1)
self.drop = nn.Dropout(p=dropout_p, inplace=True)
self.linear = nn.Linear(c_, c2) # to x(b,c2)
def forward(self, x):
if isinstance(x, list):
x = torch.cat(x, 1)
return self.linear(self.drop(self.pool(self.conv(x)).flatten(1)))
Proto
类:
Classify
类: