基于我们常见的副本复制机制,HDFS 会将每个replica复制到多个rack和host上。 复制(replication)提供了一种简单而强大的冗余形式,可以防止大多数故障情况。这种副本策略简单而直接:即然我们怕数据丢失,那就把相同的数据复制多份,放到不同的地方。
但是这种通过复制来实现数据安全的方式显然会带来额外的存储空间开销。比如,HDFS默认的3副本存储策略,会带来200%的额外存储开销,在写数据的时候也增加了额外的200%的写流量。而这部分额外的开销仅仅是在极少发生的某个副本丢失的情况下才会发生价值。多副本的放置策略带来的数据本地访问的收益我认为也非常有限和牵强,很多情况下,读取数据的客户端和数据根本不运行在一个集群,即使在一个集群(比如NM和DN伴随部署),假如集群规模很大,客户端和数据块在一起的概率也不大。
一个自然的改进是使用纠删码(Easure Coding)代替复制,它使用更少的存储空间,同时仍然提供相同级别的容错能力。 在典型配置下,与 3x 复制相比,EC 可以将存储成本降低约 50%。 纠删码是Cloudera 和英特尔的工程师与更广泛的 Apache Hadoop 社区合作,推动了 HDFS-7285下的 HDFS-EC 项目。 目前,Hadoop 3.0 开始支持纠删吗。
但EC也绝非完胜Replication,因为本质上EC是用计算换存储,即,虽然实现了更小的存储空间,但是需要用更多的计算资源,比如数据写入的时候的额外计算,以及数据恢复时候的额外计算。
在阅读跟块管理相关的代码时,一个典型的感觉是混乱:
连续布局 | 条带布局 | |
---|---|---|
Block(块) | 三个相同物理块的副本集合 | 一个逻辑块,包含了多个不同的物理块 |
Replica(副本) | 存储在某台机器的单个物理副本 | 一个物理块(也叫Internal Block或者Physical Block),或者是数据物理块data block,或者是校验物理块parity block |
Logical Block(逻辑块) | 逻辑块和物理块含义相同 | 一个逻辑块,包含了多个不同的物理块 |
Internal/Physical Block(物理块/内部块) | 逻辑块和物理块含义相同 | 一个物理块,有可能是数据物理块data block,有可能是校验物理块parity block |
本文依然按照从上到下的原则,先讲解纠删码的基本原理,再讲解纠删码在Hadoop中的设计和实现。
这篇文章目前还处于草稿状态,没怎么校对,有时间会校对然后不断修改更新。
在Cloudera的文章Introduction to HDFS Erasure Coding in Apache Hadoop中对纠删码有很详细的上层介绍。本文有一小部分内容来源于该文章,如有侵权,我会立刻删除。
在我们比较不同的数据存储方案的时候,有两个考虑因素,数据持久性(这种存储方案可以容忍怎样的数据副本失效而保证数据依然不丢失),数据的使用效率(去重以后的有效数据占总存储数据的比值)。比如,HDFS的默认3副本存储方案,其数据持久是可以容忍最多两副本的丢失,同时,数据的使用效率是1/3,即虽然存储了三副本,但是有两份副本都作为备份。
基于纠删码的存储策略,就是为了在保证数据持久性不变的情况下,提高了数据的使用效率。但是也如上文介绍,引入了更多的CPU开销。
最简单形式的纠删码基于异或 (XOR)运算,如表-1 所示的亦或关系表
X | Y | Z |
---|---|---|
0 | 1 | 1 |
0 | 0 | 0 |
1 | 0 | 1 |
1 | 1 | 0 |
亦或运算可交换的,比如X ⊕ Y = Y ⊕ X。亦或运算也是可任意组合的: X ⊕ Y ⊕ Z = (X ⊕ Y) ⊕ Z。 这意味着 XOR 可以生成 1 来自任意数量的数据位的奇偶校验位。 例如,1 ⊕ 0 ⊕ 1 ⊕ 1 = 1,当第三位丢失时,可以通过对剩余数据位{1,0,1}和奇偶校验位1进行异或来恢复。而异或可以取任意数量的数据单元作为输入,它是非常有限的,因为它最多只能产生一个奇偶校验单元。 因此,基于 XOR 的编码最多可以容忍一个数据丢失,数据的使用效率为 n-1/n(一组 n 个总单元有 n-1 个数据单元),但对于像 HDFS 这样需要容忍多个单元的丢失的系统来说是不够的。
EC 的另一种形式 Reed-Solomon (RS) 解决了这一限制。 RS 使用复杂的线性代数运算来生成多个奇偶校验单元,因此可以容忍每组多个故障。 这使其成为生产存储系统的常见选择。 RS 可通过两个参数进行配置:k 和 m。 如图 1 所示,RS(k,m) 的工作原理是将 k 个数据单元的向量与生成矩阵 (GT) 相乘,以生成具有 k 个数据单元和 m 个奇偶校验单元的扩展码字向量。 只要 (k + m) 个单元中的 k 个可用,就可以通过将幸存的单元(数据单元和校验单元)乘以 GT 的转置矩阵来恢复存储故障。 (GT 中对应于故障单元的行应在取其倒数之前删除。)这意味着该组可以容忍任何 m 个单元的故障。
通过Reed-Solomon,用户可以通过选择不同的k和m值来灵活调整数据持久性和存储成本。 奇偶校验单元的数量 (m) 决定了可以容忍的同时存储故障的数量。 数据单元与奇偶校验单元的比率决定了存储效率, 即 k / k + m
从图-2可以看到,我们和最常用的基于副本复制的3副本存储策略对比,RS(6,3)和RS(10,4)均表现出更好的数据持久性和存储效率,但是这是以更高的CPU消耗为代价的。我们在讲Hadoop的具体实现的时候会详细讲解。
EC 长期以来一直用于本地存储系统,特别是以 RAID-5 和 RAID-6 的形式。 RAID-5 通常使用 XOR 编码,因为它只需要容忍单个磁盘故障,而 RAID-6 使用带有两个奇偶校验单元的 Reed-Solomon 来容忍最多两次故障。 单元大小通常是可配置的,每个磁盘上具有相同偏移量的单元形成的纠删码组。
和我们的磁盘用扇区去划分存储结构一样,为了管理大小不一的各种文件,分布式存储系统通常将文件划分为固定大小的逻辑字节范围(称为逻辑块),然后这些逻辑块被映射到集群上的存储块,这反映了集群上数据的物理布局。
逻辑块和存储块之间最简单的映射是连续块布局,它将每个逻辑块一对一地映射到存储块。 读取具有连续块布局的文件就像按顺序线性读取每个存储块一样简单。因此,对于连续块布局,我们甚至都不需要区分逻辑块和物理块,他们都代表一个文件的一块连续字节范围。
相比之下,条带块布局将逻辑块分解为更小的存储单元(通常称为Cell),并在一组存储块中循环写入单元条带(Stripe)。 读取具有条带布局的文件需要查询逻辑块的存储块集合,然后从该存储块集合读取单元条带。
块布局方案(连续与条带)和块冗余形式(复制与 纠删码)是两个正交维度,形成了四种排列组合形式。尽管在HDFS的使用场景下,连续布局被用在基于复制的块冗余形式,而条带布局用在了纠删码中,但是,他们其实可以随意正交。
些系统(包括 Ceph 和 QFS)支持在每个目录或每个文件的基础上配置布局和/或冗余。
本节讨论如何在两种块布局上支持 EC。
下图显示了HDFS常用了基于复制的冗余方式下,使用连续布局:
下图显示了HDFS常用了基于复制的冗余方式下,使用条带布局:
下图显示了纠删码冗余形式下的连续布局和条带布局:
对于 HDFS-EC,应该选择哪种布局方式? 连续布局更容易实现,因为读写路径仍然与当前具有复制的系统非常相似。 然而,它仅适用于文件非常大的情况,因为只有在写入完整条带时才能实现全部成本节省。 例如,对于 RS (10,4),只有一个 128MB 数据块的条带最终仍会写入四个 128MB 奇偶校验块,存储开销为 400%(比 3 路复制更糟糕)。 连续布局方式下,客户端不适合进行EC校验码的计算,因为这需要将GB级别的校验块load到内存计算校验码。
另一方面,采用条带布局的纠删码可以实现小文件和大文件的存储节省,因为单元(Cell)大小要小得多(通常为 64KB 或 1MB, HDFS默认为1MB)。 这种整体较小的组大小还支持在线 EC,其中客户端直接写入纠删码数据,因为计算奇偶校验信息只需要几兆字节的缓冲。 但是也有一个缺点,一些对本地化很敏感的工作场景会收到影响,因为一个文件的连续部分总是会分散到不同的机器上取,而不是向连续布局一样,一个大的Block只会写到一台机器上。 为了更好地服务此类工作负载,可以将条带文件转换为连续布局,但这几乎需要重写整个文件。
基于以上考虑,HDFS中基于复制的块冗余策略使用了连续布局,基于纠删码的块冗余策略使用了条带布局,而不允许两种冗余策略和两种布局方式可以随意正交。
连续块布局的假设广泛而深入地嵌入到 HDFS 内部逻辑中,为了在这种情况下让HDFS支持基于纠删码的条带布局的块冗余形式,需要定义一个统一的块的概念,即严格讲逻辑块和物理块分开 。 为了支持条带布局,逻辑块的概念必须与存储块的概念分开。 前者表示文件中的逻辑字节范围,后者是DataNode上存储的数据块的基本单位。 图 -7 演示了逻辑块和存储块的概念。 在示例中,文件 /tmp/foo 在逻辑上分为 13 个条带单元(cell_0 到 cell_12)。 逻辑块0代表单元0~8的逻辑字节范围,逻辑块1代表单元9~12。 Cell 0、3、6 形成一个物理存储块,它将作为单个数据块存储在 DataNode 上。 为了简洁起见,该图不包括奇偶校验块/单元:
支持这种泛化的一个简单机制是 HDFS NameNode 监视其块映射中的每个存储块,该块映射从块 ID 映射到相应的块,然后使用另一个映射从逻辑块到其成员存储块。 然而,这意味着小文件将在 NameNode 上产生大量内存开销,因为条带化会产生比复制更多的存储块。
为了减少这种开销,我们引入了一种新的分层块命名协议。 目前HDFS根据块创建时间顺序分配块ID。 相反,该协议将每个块 ID 分为 2~3 个部分,如图 7 所示。每个块 ID 以指示其布局的标志开头(连续 = 0,条带 = 1)。 对于条带块来说,ID的其余部分由两部分组成:中间部分是逻辑块的ID,尾部分表示逻辑块中存储块的索引。 这允许 NameNode 将逻辑块作为其存储块的摘要进行管理。 存储块ID可以通过屏蔽索引映射到其逻辑块; 当NameNode处理DataNode块报告时,这是必需的。
在讲纠删码的时候,基于复制的块冗余都使用3副本场景来举例,而纠删码都用RS(6,2)来举例。
在讲解纠删码的具体实现以前,我们先讲解HDFS中怎么维护Block、Replica、DataNode之间复杂的关联关系的。基于对这些信息的理解,我们再去理解纠删码的相关内容。
从基本直觉出发,我们可以想到,hadoop的块管理,需要管理以下的信息和提供以下的功能:
BlockInfo封装的是一个Block的基本信息,比如这个Block属于哪个BlockPool,副本数多少(只有连续布局方式有副本数的概念,条带布局没有这个参数),还有下文将要讲到的BlockUnderConstructionFeature,代表这个Block当前正处于构造(正在被写入)的临时状态,以及最重要的,这个Block的所有Replica信息,这个信息存放在一个叫做triplets的数组中。其基本结构如下图所示。从triplets的名字可以看出来(triplet在英文中是三胞胎/三联体的意思),triplets是一个三元组的集合,这里的每个三元组代表了一个replica的相关信息,对于第i个replica,triplets[3 * i]
存放的是这个replica i
的DatanodeStorageInfo信息,triplets[3*i + 1]
存放的是这个replica所在的DatanodeStorageInfo的Replica链表中的前一个Block的BlockInfo,triplets[3*i + 2]
存放的是这个replica所在的DatanodeStorageInfo的Replica链表中的后一个Block的BlockInfo。
请注意这里的前和后的含义,这里的前和后,不是指这个Replica所在的Block的前一个Replica或者后一个Replica,也不是指整个HDFS集群中存在一个Replica的全局链表的前一个或者后一个Block,而是指这个Replica所在的DatanodeStorageInfo对应的Replica链表的前一个或者后一个replica。我发现很多HDFS的书籍上都并未对这一点说清楚,导致读者不看代码根本不知道其真实含义,只是人云亦云地知道是前和后。
所以这个triplets数组中的每一个triplet有三个元素,整个triplets数组其实是一个一维数组,所以整个triplets的大小就是3 * baseSize,这个baseSize对于连续布局来说,就是副本数量,比如系统配置的文件副本数是5,那么某个BlockInfo.triplets[]数组的长度就是3 * 5 = 15
,对于纠删码来说,就是一个BlockGroup中的internal block数量, 比如对于RS(6,2),就是8,tripelets[]数组的长度就是3 * 8 = 24
。
通过这样的一个3 * baseSize的数组,实际上形成了连接BlockInfo的一个双向链表。
public abstract class BlockInfo extends Block
implements LightWeightGSet.LinkedElement {
private short replication; // 副本数量
private volatile long bcId; // 这个block所属于的block collection,比如一个INodeFile就是一个BlockCollection
private LightWeightGSet.LinkedElement nextLinkedElement;
protected Object[] triplets; // 存放replica信息的三元组
下图是这个triplets数组的示意图。为了方便起见,我们将一维的triplets
数组变成一个baseSize * 3
的二维数组:
在NameNode端,使用DatanodeDescriptor来表示一个DataNode的相关信息,包括一些静态信息(静态信息存放在它的父类DatanodeInfo中)和一些动态信息比如不断变化的块信息。
DatanodeStorageInfo是NameNode端用来代表DataNode端的一块数据存储路径(比如某一个Volume),显然,这个存储介质具有特定的唯一ID,存储介质类型,容量,当前的使用率等等信息。在DataNode端与之对等的类是DatanodeStorage
。显然,DatanodeStorageInfo和DatanodeDescriptor是多对一的关系。类关系图如下:
----------------------------------------------DatanodeStorageInfo----------------------------------------------
// 一个DataNodeStorageInfo指的是某一台DN的某个storage
private final DatanodeDescriptor dn;
private final String storageID;
private StorageType storageType;
private State state;
private long capacity;
private long dfsUsed;
private long nonDfsUsed;
private volatile long remaining;
private long blockPoolUsed;
// 这个虽然叫blockList,但是由于并不是用Java的Collection集合,而是自己维护的数组,因此blockList只是这个数组的header
private volatile BlockInfo blockList = null;
为了表达一个DatanodeStorage上挂载的Replica的信息,DatanodeStorage使用blockList保存了这个链表的head节点。链表之间的链接关系通过BlockInfo内部的对象引用来实现。我们下文讲BlockInfo的时候会讲到。
总之, DatanodeStorageInfo保存了这个 Storage上的Replica链表的头结点,用以维护这个Datanode上所有的Replica信息,方便遍历。比如,我们需要将一个DataNode进入maintenance状态,这意味着这个DataNode上的所有DatanodeStorageInfo上的所有的replica都需要进行进入maintenance前的处理,比如通过复制保证副本数等。
class BlockIterator implements Iterator<BlockInfo> {
private BlockInfo current;
BlockIterator(BlockInfo head) {
this.current = head;
}
public boolean hasNext() {
return current != null;
}
public BlockInfo next() {
BlockInfo res = current;
current =
current.getNext(current.findStorageInfo(DatanodeStorageInfo.this));
return res;
}
public void remove() {
throw new UnsupportedOperationException("Sorry. can't remove.");
}
}
我们从next()方法可以看到,寻找当前的BlockInfo的下一个节点是通过调用当前节点的BlockInfo的findStorageInfo来实现的。
如果我们需要遍历一个Datanode上的所有Replica呢?其实只需要遍历这个 DatanodeDescriptor上的所有DatanodeStorageInfo,然后根据DatanodeStorageInfo.BlockIterator来进行遍历就行了。因此DatanodeDescriptor.BlockIterator是对DatanodeStorageInfo.BlockIterator的封装。这里不再赘述。
这里的块信息其实并不准确,应该是Replica的信息的维护。块的整体的增删是发生在客户端写文件或者删除文件导致的块的增加和删除。
这里的Replica信息的变化发生在DataNode 的blockReport时,NameNode会根据BlockReport的信息,对Replica或者Block进行同步的增删等操作。必须再次强调,这里的BlockReport,其实表达并不准确,DataNode汇报的并不是 Block,而是Replica。
NameNode收到新增Replica的 blockReport,会通过BlockManager.addStoredBlock()来将这个Replica添加到其属于的Block的BlockInfo中去。而这个BlockInfo对应的整个Block,是在客户端写数据过程中在NameNode端创建好的,只不过当时这个Block还没有收到任何的DataNode的块汇报。
这是通过调用DatanodeStorageInfo的insertToList方法来实现的。
--------------------------------------------------DatanodeStorageInfo-------------------------------------------------
public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
.......
// 把BlockInfo b 添加到this(DataNodeStorageInfo中去)
b.addStorage(this, reportedBlock); // 先更新这个Replica的Storage信息
insertToList(b);// 设置这个block在this(当前的这个DataNodeStorageInfo)中的信息
......
}
从上面的代码看到,会首先通过b.addStore()将这个Replica添加到其所属的Block对应的BlockInfo中去,代表这个Block增加了一个Replica。
addStorage()的代码如下所示,显然, addStorage的职责仅仅是将这个Block新汇报的 Replica放到BlockInfo的triplets中去,并设置了这个Replica的DatanodeStorageInfo信息,但是,还没有设置任何的前向(pre)或者后向(next)的任何信息:
----------------------------------------------BlockInfo---------------------------------------------------------------
// reportedBlock是要进行插入的replica
boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
// find the last null node
int lastNode = ensureCapacity(1); // 可插入位置的三元组的第一个位置,这个位置存放这个block的DatanodeStorageInfo信息
setStorageInfo(lastNode, storage); // 把lastnode设置为这个Storage,代表我设置了当前这个BlockInfoContinuous的存储信息
setNext(lastNode, null); // 先暂时不设置previous和next的信息
setPrevious(lastNode, null);
return true;
}
在BlockInfo中设置好了replica的位置信息以后,就要维护DatanodeStorageInfo中的BlockInfo的链表信息,这是通过调用DatanodeStorageInfo.insertToList()实现的:
--------------------------------------------------DatanodeStorageInfo-------------------------------------------------
/**
* this指的是当前的DatanodeStorageInfo, b是新的replica所对应的BlockInfo, blockList是this.blockList,即
* 当前这个DatanodeStorageInfo上的BlockInfo链表的头结点
* @param b
*/
public void insertToList(BlockInfo b) {
blockList = b.listInsert(blockList, this); // 把当前节点b插入到DatanodeStorageInfo(this)的头结点blockList所代表的链表的头部
numBlocks++;
}
insertToList()
会将当前新汇报的这个Replica
所属的Block
的BlockInfo
插入到DatanodeStorageInfo
的链表的头部,因此原来的头结点blockList
将会更新为当前新插入的节点(参数b
)。这是通过调用BlockInfo.listInsert()来实现的:
----------------------------------------------BlockInfo---------------------------------------------------------------
/*
* 根据当前的这个this(block)所在的DataNodeBlockInfo中维护的BlockInfoContinuous链表的头节点head,
* 把this(block)的pre和next插入进去。显然,this(block)会成为旧的head的pre节点,旧的head是this(block)的next
*/
BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
// 在前面已经通过setStorage设置了这个BlockInfo的对应的Replica的storage了,因此肯定可以找到这个DatanodeStorageInfo
int dnIndex = this.findStorageInfo(storage);
this.setPrevious(dnIndex, null);
this.setNext(dnIndex, head); // next节点就是head, head的pre节点就是自己
if (head != null) {
head.setPrevious(head.findStorageInfo(storage), this);// 把head节点的pre设置为当前的BlockInfoContinous节点
}
return this;
}
// 将BlockInfo to设置为当前的BlockInfo(this)的pre节点
BlockInfo setPrevious(int index, BlockInfo to) {
BlockInfo info = (BlockInfo) triplets[index*3+1];
triplets[index*3+1] = to;
return info;
}
// 将BlockInfo to设置为当前的BlockInfo(this)的next节点
BlockInfo setNext(int index, BlockInfo to) {
BlockInfo info = (BlockInfo) triplets[index*3+2];
triplets[index*3+2] = to;
return info;
}
其基本流程如下图所示:
至于删除一个Replica,道理相同,本质上也是通过每一个DatanodeStorageInfo挂载的BlockInfo以及更新每一个BlockInfo的triplets[]数组来表达的双向链表关系,来维护前向和后缀指针,这里不做赘述。
BlocksMap是一个对象,其成员变量blocks是一个GSet,这个GSet中的元素是一个个的BlockInfo, GSet其一个基于链地址法处理hash冲突的一个hash表,用来基于hash的方式查找BlockInfo。
----------------------------------------------------BlocksMap-----------------------------------------------------
class BlocksMap {
......
/** Constant {@link LightWeightGSet} capacity. */
private final int capacity;
private GSet<Block, BlockInfo> blocks; // 这是一个Set,一个BlockInfo的HashSet,不是一个map
----------------------------------------------------BlocksMap-----------------------------------------------------
/** Returns the block object if it exists in the map. */
BlockInfo getStoredBlock(Block b) {
return blocks.get(b);
}
DatanodeStorageInfo
信息,其实就是根据这个Block查找到对应的BlockInfo,然后在这个BlockInfo的triplets[]中逐个遍历replica的DatanodeStorageInfo信息----------------------------------------------------BlocksMap-----------------------------------------------------
Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
return new Iterable<DatanodeStorageInfo>() {
@Override
public Iterator<DatanodeStorageInfo> iterator() {
return new StorageIterator(storedBlock);
}
};
}
public static class StorageIterator implements Iterator<DatanodeStorageInfo> {
private final BlockInfo blockInfo;
private int nextIdx = 0;
StorageIterator(BlockInfo blkInfo) {
this.blockInfo = blkInfo;
}
@Override
public boolean hasNext() {
if (blockInfo == null) {
return false;
}
while (nextIdx < blockInfo.getCapacity() &&
blockInfo.getDatanode(nextIdx) == null) {
// note that for striped blocks there may be null in the triplets
nextIdx++;
}
return nextIdx < blockInfo.getCapacity();
}
这里的添加block显然不是添加Replica,而是添加一个Block的元数据信息,它不是datanode的块汇报导致的,而是客户端在写文件的过程中添加一个block或者客户端删除一个文件导致block被删除导致的。
添加Block是通过addBlockCollection()
来进行的,因为一个Block
肯定是属于一个BlockCollection
的:
----------------------------------------------------BlocksMap-----------------------------------------------------
BlockInfo addBlockCollection(BlockInfo b, BlockCollection bc) {
BlockInfo info = blocks.get(b);
if (info != b) { // 如果找不到(info==null)或者不一致
info = b;
blocks.put(info);
incrementBlockStat(info);
}
info.setBlockCollectionId(bc.getId());
return info;
}
删除Block
是通过removeBlock()
来进行的,除了要将BlocksMap中Block的信息删除,还要检查这个Block的所有Replica对应的 DatanodeStorageInfo,将这些replica从对应的DatanodeStorageInfo的块链表中清除:
----------------------------------------------------BlocksMap-----------------------------------------------------
void removeBlock(BlockInfo block) {
BlockInfo blockInfo = blocks.remove(block); // 从BlocksMap中删除
final int size = blockInfo.isStriped() ?
blockInfo.getCapacity() : blockInfo.numNodes();
for(int idx = size - 1; idx >= 0; idx--) { //将这个block的每一个replica从其DatanodeStorageInfo中清除
DatanodeDescriptor dn = blockInfo.getDatanode(idx);
if (dn != null) {
removeBlock(dn, blockInfo); // remove from the list and wipe the location
}
}
}
在Hadoop中,纠删码的Policy定义为ErasureCodingPolicy类:
public final class ErasureCodingPolicy implements Serializable {
private final String name; // policy的名字
private final ECSchema schema; // 纠删码的模式定义,包含了纠删码的名字,
private final int cellSize; // cell的大小
private final byte id;
public final class ECSchema implements Serializable
/**
* The erasure codec name associated.
*/
private final String codecName; // schema的名字,比如基于RS的schema的名字是rs,基于xor的schema的名字是xor
/**
* Number of source data units coded
*/
private final int numDataUnits; // 数据部分的单元数量,比如RS(6,2), numDataUnits=6, numParityUnits=2
/**
* Number of parity units generated in a coding
*/
private final int numParityUnits; // 校验部分的数量
HDFS在启动的时候,会默认加载多个已经默认支持的ECPolicy,这些Policy定义在SystemErasureCodingPolicies
中:
RS-3-2-1024k
, RS-6-3-1024k
, RS-10-4-1024k
, RS-LEGACY-6-3-1024k
, XOR-2-1-1024k
从Policy的名字就能看出其含义:{编码方式,比如基于RS还是XOR}-{data block的数量}-{parity block的数量}-{一个Cell的大小}
。用户通过官方文档https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html去进行EC相关的操作,比如
显然,由于纠删码本身的特殊块布局方式,导致有些情况无法再像基于复制的方式一样支持:
早期没有条带布局方式时,Logical Block和Internal Block是一一对应,NameNode端的BlockID很好分配,自增即可。但是有了条带布局,Logical Block和Internal Block不再一一对应,因此,为了对这两种情况进行统一管理,HDFS统一了BlockID的分配方式, 一种新的分层块命名协议。 目前HDFS根据块创建时间顺序分配块ID。 相反,该协议将每个块 ID 分为 2~3 个部分,如图 7 所示。每个块 ID 以指示其布局的标志开头(连续 = 0,条带 = 1)。 对于条带块来说,ID的其余部分由两部分组成:中间部分是逻辑块的ID,尾部分表示逻辑块中存储块的索引。 这允许 NameNode 将逻辑块作为其存储块的摘要进行管理。 存储块ID可以通过屏蔽索引映射到其逻辑块; 当NameNode处理DataNode块报告时,这是必需的。
在HDFS中,BlockIdManager负责生成对应的BlockID,它委托SequentialBlockIdGenerator来生成基于复制的连续块布局的BlockID,而SequentialBlockGroupIdGenerator来生成基于条带的连续块布局的BlockID(实际上这是一个BlockGroupID,即按照上面将的分层的块命名协议,低四位为0,即还没有将BlockGroup切分为Internal Block)。下文会讲,NameNode会将BlockGroup进行切分,并把每一个切分成的InternalBlock分配到对应的DataNode上。
------------------------------------- BlockIdManager -------------------------------------
long nextBlockId(BlockType blockType) {
switch(blockType) {
case CONTIGUOUS: return blockIdGenerator.nextValue(); //使用SequentialBlockIdGenerator为连续块布局方式分配Block ID
case STRIPED: return blockGroupIdGenerator.nextValue(); // 使用SequentialBlockGroupIdGenerator为条带布局方式分配Block ID
}
纠删码的写入过程与普通的连续布局的写入过程的基本区别是:
基于条带的块布局方式改变了基本的数据存储结构,因此,对于条带布局和连续布局文件的读写方式发生了很大变化,但是基本的写流程没有发生变化:
客户端通过调用DFSClient.create()创建对应文件
客户端收到服务器端返回的包含文件元数据信息的HdfsFileStatus对象,基于该对象,构建DFSOutputStream对象的实现
客户端开始进行writeChunk操作,并根据需要,调用addBlock()接口开始申请block
创建DataStreamer对象,负责Block的数据写入。在三副本情况下,DataStreamer只会和第一个节点建立Socket连接,然后第一个节点会把收到的packet转发给剩余的两个节点。
protected void computePacketChunkSize(int psize, int csize) {
final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
final int chunkSize = csize + getChecksumSize();
chunksPerPacket = Math.max(bodySize/chunkSize, 1);
packetSize = chunkSize*chunksPerPacket;
DataStream所代表的异步线程负责从自己的dataQueue中poll数据,并发送给pipeline,并将这个数据包从dataQueue中移除,放到ackQueue中,代表这个packet正等待ack
响应处理器ResponseProcessor从数据节点接收确认。 当从所有数据节点接收到数据包的成功确认时,响应处理器将从确认队列中删除相应的数据包。
由于块布局方式的变化,基于纠删码的条带布局为了尽量减少对已有的基于复制的块布局代码的侵入,采用的继承的方式,比如,StripedDataStreamer继承了DataStreamer, 而DFSStripedOutputStream继承了DFSOutputStream,LocatedStripedBlock继承了LocatedBlock。
在具体写方式上,条带布局方式沿用了连续布局方式的异步写的整体架构,比如:
但是由于条带布局的逻辑块和物理块不再一一对应,写的逻辑也发生了根本变化。在概念上变化的基本基本原则是,HDFS基于原有的连续布局方式的block到了stripe布局中对应block group,而block group中的具体物理块,在hadoop中叫做internal block。
写方式不同的地方主要包括:
BlockPlacementPolicyDefault
会将第一个replica放在一个rack的某台机器上,另外两个replica会放在另外一个rack的两台不同机器上。这种策略显然不适用条带布局,因为条带布局情况下不希望任何情况下损失多于一个replica,因此使用BlockPlacementPolicyRackFaultTolerant策略,这个策略的基本原则是,将条带布局下的所有replica分配到不同的基架。StripedDataStreamer
依然会负责新的BlockGroup中对应的index的internal block的写入操作。-------------------------------------DFSStripedOutputStream.java ------------------------------------------
/** Construct a new output stream for creating a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes)
throws IOException {
super(dfsClient, src, stat, flag, progress, checksum, favoredNodes, false);
....
ecPolicy = stat.getErasureCodingPolicy();
final int numParityBlocks = ecPolicy.getNumParityUnits();
cellSize = ecPolicy.getCellSize();
numDataBlocks = ecPolicy.getNumDataUnits();
.....
coordinator = new Coordinator(numAllBlocks);
cellBuffers = new CellBuffers(numParityBlocks);// 虽然初始化是用的numParityBlocks, 其实内部放的是datablock 和 parity block
streamers = new ArrayList<>(numAllBlocks);// 并行写一个logical block中的所有internal block,但是串型写不同的logical block
for (short i = 0; i < numAllBlocks; i++) { // 每一个physical block会有一个DataStreamer与之对应,即StrippedDataStreamer对象,用来与对应的DN进行流数据写操作
StripedDataStreamer streamer = new StripedDataStreamer(stat,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
favoredNodes, i, coordinator, getAddBlockFlags());
streamers.add(streamer);
}
currentPackets = new DFSPacket[streamers.size()];
datanodeRestartTimeout = dfsClient.getConf().getDatanodeRestartTimeout();
setCurrentStreamer(0); // 肯定从第一个streamer开始写
}
static class Coordinator {
/**
* followingBlocks是一个List<BlockingQueue<T>> queues,每一个BlockingQueue代表index=i的多个LocatedBlock
*/
private final MultipleBlockingQueue<LocatedBlock> followingBlocks; // 指的是block group的下一个internal block, 每一个index对应了一系列block的list
下面的代码显示了在写文件过程中,是以chunk为单位计算checksum和写数据到Packet。
-------------------------------------DFSStripedOutputStream.java ------------------------------------------
/**
* chunk是计算checksum的单位,以chunk为单位计算checksum,以packet为单位发送数据
* 将bytes中[offset, offset+len]的数据作为一个chunk写入packet(从调用者的代码可以看到,这个数据只可能小于等于一个chunk的长度)
* 通过currentPacket和currentStream来确定是由哪个streamer来负责写这个数据
*
* @throws IOException
*/
@Override
protected synchronized void writeChunk(byte[] bytes, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
final int index = getCurrentIndex();
final int pos = cellBuffers.addTo(index, bytes, offset, len); // 往data block中写入数据,返回更新以后的position
final boolean cellFull = pos == cellSize; // 根据不同的Stripe Policy确定的,默认是1MB
// 如果是第一个block,或者当前的block group写完了,需要写到一个新的block group,那么就创建一个新的
if (currentBlockGroup == null || shouldEndBlockGroup()) {
// the incoming data should belong to a new block. Allocate a new block.
allocateNewBlock(); // 对于一个刚刚创建的currentBlockGroup, numBytes=0
}
currentBlockGroup.setNumBytes(currentBlockGroup.getNumBytes() + len);// 在写入过程中,numbers逐渐增加
// note: the current streamer can be refreshed after allocating a new block
final StripedDataStreamer current = getCurrentStreamer(); // 获取当前的streamer
// 将当前的chunk写入到Packet中(不代表packet需要enqueue了)
super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
.....
if (cellFull) { // cell写满了,需要将stream切到下一个,同时,需要写parity block了
int next = index + 1;
// 如果不是最后一个data block, 是不需要写parity chunk的
if (next == numDataBlocks) { // 刚刚写的是这个logic group中的最后一个data block的chunk,意味着下一个chunk是写parity block的chunk
cellBuffers.flipDataBuffers();
writeParityCells(); // 在写parity的过程中,stream也是在往后切换的
....
setCurrentStreamer(next);// 将当前stream切换到下一个,因为当前cell写满了。不写满不切换stream
}
}
private boolean shouldEndBlockGroup() {
return currentBlockGroup != null &&
currentBlockGroup.getNumBytes() == blockSize * numDataBlocks;
}
下面的代码可以看到,拿到LocatedStripedBlock以后,客户端会根据LocatedStripedBlock中携带的信息,将这个返回的Group切分成一个一个的Internal Block,每个Internal Block也用LocatedStripedBlock表示,同时,创建对应的DataStreamer,用来和这个Internal Block所分配的机器进行socket通信已实现Block的写操作。
-------------------------------------DFSStripedOutputStream.java ------------------------------------------
private void allocateNewBlock() throws IOException {
if (currentBlockGroup != null) {
for (int i = 0; i < numAllBlocks; i++) {
// sync all the healthy streamers before writing to the new block
waitEndBlocks(i);
}
DatanodeInfo[] excludedNodes = getExcludedNodes();
//......
final LocatedBlock lb;
try {
// 向NameNode申请Block.对于Stripped的场景,是申请一个BlockGroup(Logic block)
// 这个addBlock是调用的super class的addBlock(), 即DFSStrippedOutputStream和DFSOutputStream都是一样的addBlock()逻辑
// 从代码来看,这里并没有blocksize的信息
lb = addBlock(excludedNodes, dfsClient, src,
prevBlockGroup, fileId, favoredNodes, getAddBlockFlags());
// assign the new block to the current block group
currentBlockGroup = lb.getBlock();
blockGroupIndex++;
// 从申请到的LocatedBlock(对应了一个Block Group)切分出Internal block
// 这里的blocks的索引已经是跟internal block在group中的索引一致了,每一个internal block的大小也从Block group中拆分了
final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
(LocatedStripedBlock) lb, cellSize, numDataBlocks,
numAllBlocks - numDataBlocks);
for (int i = 0; i < blocks.length; i++) {
// ......
// 把这个Block group里面的所有的internal block分配给每一个streamer
coordinator.getFollowingBlocks().offer(i, blocks[i]);
}
服务端收到create()请求以后,会做以下事情:
文件创建完成以后,就可以开始进行写操作了,写操作在任何时刻一定是处在某一个Logical block中,因此,刚开始写,或者写完一个 block,都需要向NameNode申请Block。有了条带存储以后,这个addBlock()的block归一化为Logical Block, 即申请一个逻辑块。但是, NameNode在处理申请的时候,创建了一个逻辑块,还会根据分配的机器将这个逻辑块切分成物理块,然后返回给客户端。
连接文件的写过程,必须了解文件在写入过程中的各种状态,NameNode将块写入过程中的信息封装在BlockUnderConstructionFeature对象中。
在NameNode端,每一个块都有一个BlockInfo对象,每一个BlockInfo对象都有一个BlockUnderConstructionFeature uc,BlockUnderConstructionFeature中维护了跟块写入过程相关的信息,最重要的是当前正在写入的块的状态(BlockUCState)。一个块正在被写入的时候,uc是不为null的,当这个块写入完成(NameNode收到了这个块的足够数量的DataNode 的汇报),进入COMPLETE状态,uc会被置为空,代表这个块处于UnderConstruction(UC)的状态。块写入过程中的状态被表示称为BlockUCState:
enum BlockUCState {
UNDER_CONSTRUCTION, //这个block应该正处于被写的状态
COMMITTED // 客户端已经写完了,但是还没有DataNode收到确认的汇报,这个状态较COMPLETE要更早
COMPLETE, // block的构建完成,block的FINALIZED的副本数量已经达到了最小副本数量,这是block的最后的稳定状态
UNDER_RECOVERY,// block写入失败,需要进行recovvery操作
}
UNDER_CONSTRUCTION : 这个块处于正常地被写入状态
COMMITTED: 这个块已经被写入到Data,但是目前NameNode还没有收到任何一个或者还没有收到足够的DataNode的汇报。
COMPLETE: NameNode已经收到了关于这个Block的汇报,汇报数量已经达到的要求的最小块数量,并且这个块不会再被修改。
dfs.namenode.replication.min
所配置的数量的DataNode上(即NameNode已经收到了这么多个DataNode汇报了这个replica),就认为是COMPLETE。默认情况下,dfs.namenode.replication.min为1,即只要这个replica对应的target有一个汇报上来,NameNode端的Block的状态就是COMPLETE。从上面的描述看到,COMPLETE状态表达的是一个Block已经到了immutatable的状态了,但是很显然,在COMPLETE状态的副本只要丢失任何一个replica,就会造成数据丢失。
UNDER_RECOVERY: 块写入过程中失败,因此正处在恢复之中。
下文将会讲到的块的reconstruction(重构)都是指的处于COMPLETE状态的replica由于出现一些特殊情况道导致块的副本数量无法满足要求因此进行重构。而Recover(恢复)是指处于UNDER_CONSTRUCTION状态下的块由于异常情况(客户端中断,DataNode崩溃)导致副本出现不一致状态而进行的修复。
服务器端收到了addBlock()
请求以后,会做以下事情:
相关校验,通过validateAddBlock()方法对addBlock()操作进行校验,把校验结果封装在ValidateAddBlockResult中,校验的内容包括:
租约是否匹配(只有拿到文件租约的客户端才能写文件)
static ValidateAddBlockResult validateAddBlock(){
........
FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
previous, onRetryBlock);
........
}
private static FileState analyzeFileState(
FSNamesystem fsn, INodesInPath iip, long fileId, String clientName,
ExtendedBlock previous, LocatedBlock[] onRetryBlock)
...........
final INodeFile file = fsn.checkLease(iip, clientName, fileId);
.......
上一个block是否已经彻底完成复制。只要新添加的block不是文件的第一个block,那么在基于副本复制的块布局方式下,必须确认上一个block已经完成了副本复制。
final INodeFile pendingFile = fileState.inode; // 校验文件块的状态,比如倒数第二个文件块是否已经complete或者committed了
if (!fsn.checkFileProgress(src, pendingFile, false)) {
throw new NotReplicatedYetException("Not replicated yet: " + src);
}
无论是Continuous Block还是 Striped Block,在addBlock()或者close()的时候,都会通过checkFileProgress()方法对该文件的block进行校验,校验的规则是:
客户端通过配置dfs.namenode.file.close.num-committed-allowed
代表文件在关闭或者添加新的block的时候,最多允许多少个block仅仅处于COMMITTED的状态而还未到达COMPLETE的状态
DFSOutputStream.close()
,那么要求:
block数量不可以超过单个文件最大的副本数量
if (pendingFile.getBlocks().length >= fsn.maxBlocksPerFile) {
throw new IOException("File has reached the limit on maximum number of"
+ " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
+ "): " + pendingFile.getBlocks().length + " >= "
+ fsn.maxBlocksPerFile);
}
将校验结果,包括文件所需要的副本数量(对于连续布局,target数量就是该文件的副本数,对于条带布局,target数量就是ECSchema中定义的dataUnit + parityUnit的数量,以及blockSize)和blocksize封装到ValidateAddBlockResult中,供后续创建block使用
return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID,
clientMachine, blockType, ecPolicy);
为这个addBlock()请求分配对应的DataNode。 在addBlock()的时候,通过制定的excludedNode,以及favoredNode,和上一步确定的所需要的机器数量,交给对应的PlacementPolicy,交由BlockManager.chooseTarget4NewBlock()进行机器的分配。
BlockPlacementPolicies类负责对PlacementPolicy进行管理,在NameNode启动的时候,就会根据配置文件,决定不同的布局方式所应该使用的PlacementPolicy。通过dfs.block.replicator.classname
配置基于复制的连续块布局的副本放置策略,默认是BlockPlacementPolicyDefault,典型的3场景,会将第一个replica放在一个rack,剩下两个replica放在另外一个rack的不同host。通过dfs.block.placement.ec.classname
配置纠删码的PlacementPolicy,不配置的默认策略是BlockPlacementPolicyRackFaultTolerant
,一种尽量将所有块放置到不同Rack上的策略。
-------------------------------------BlockPlacementPolicies-------------------------------------
public class BlockPlacementPolicies{
public BlockPlacementPolicy getPolicy(BlockType blockType){
switch (blockType) {
case CONTIGUOUS: return replicationPolicy; // 基于复制的布局方式的PlacementPolicy
case STRIPED: return ecPolicy; // 基于条带的布局方式的PlacementPolicy
......
}
为这个addBlock()请求分配对应的BlockID。如果是StripedBlock,这个ID仅仅是BlockGroup ID(低四位为0,还没有分配具体的Internal Block)
分配Internal BlockID,创建Block和文件的对应关系并创建replica和targets之间的对应关系
对于StripedBlock, 这里的BlockID其实是BlockGroup ID,上文已经讲过BlockID的具体格式和分配方式。
将创建的Block挂在到这个文件的INodeFile上,构建对应关系。显然,假如是一个大文件,那么这个INodeFile对应的Block数量不止一个,因此INodeFile通过一个 BlockInfo[] blocks;来维护自己的所有的Block
构建Replica和Datanode的对应关系。这是因为刚刚只是委托PlacementPolicy分配了指定数量的DataNode,基于复制的连续布局很简单,所有的DataNode存储相同的replica,但是在条带布局下,这个申请的Block其实是一个BlockGroup,那么这个Group中的每一个Internal Block 该放到哪个DataNode上去呢?这就是这里需要确定的。
BlockManager.newLocatedBlock()
就是将对应的replica(Internal Block)具体分配到对应的DataNode上面去,并返回一个LocatedStripedBlock对象。LocatedStripedBlock是LocatedBlock针对StripedBlock 的具体实现,最重要的是它有了一个叫做blockIndices的成员,blockIndices[i]的含义是第i个DataNode存放的InternalBlock的block index,而DatanodeInfoWithStorage[] locs
中locs[i]则存放了对应位置i的DataNode信息,联合blockIndices和locs,就知道了任何一个Internal Block的存储位置。
public class LocatedStripedBlock extends LocatedBlock {
// blockIndices[i]代表了replicas[i]对应的replica在block group中的索引值,
// 查看BlockUnderConstructionFeature.setExpectedLocations L105
private final byte[] blockIndices; // physical block在block group中的索引
NameNode 构建完成LocatedStripedBlock信息,就完成了整个StripedBlock的创建过程,包括Block Group ID(上面讲了block id的分层方式,Block Group ID的低四位是0),分配的节点,以及这个BlockGroup 中的每一个InternalBlock 和节点之间的对应关系。在客户端拿到NameNode返回的LocatedStripedBlock以后,就通过方法constructInternalBlock中按照要求将LocatedStripedBlock拆分成一个一个的Internal block(也是一个LocatedBlock对象,由于是Internal Block 那么di四位就是internal block在group中的index),然后将这些internal block绑定给对应的DataStreamer,然后开始数据写操作。
public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
int idxInReturnedLocs, int cellSize, int dataBlkNum,
int idxInBlockGroup) {
final ExtendedBlock blk = constructInternalBlock(
bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup);
.....
locatedBlock = new LocatedBlock(blk, // 每一个internal都从block group中切分属于自己的location, storage id, storage type
new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},// 这个internal block对应的DataNode info
new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, // 这个internal block对应的storage id
new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, // 这个internal block对应的storage type
bg.getStartOffset(), bg.isCorrupt(), null);
return locatedBlock;
故障块的重构都是基于已经处于BlockUCState.COMPLETE
状态的,通过COMPLETE的定义我们知道,意味着重构(Reconstruct)针对的是已经写完成(汇报上来的块副本达到了要求比如最小允许副本数),但是后来块的状态发生问题的那些块。
本章节讲解块重构过程中的块状态,这些状态是StoredReplicaState的状态。StoredReplicaState是块已经完成写操作以后的状态阶段中的不同状态,而不是块在写入过程中的状态BlockUCState。
BlockManager通过方法checkReplicaOnStorage()来统一收集并计算副本状态,并返回一个收集了副本状态信息的对象NumberReplicas。 从NumberReplicas的名字听起来似乎这个类是统计副本的总数量,但是其实这个类是统计在各个状态下的副本数量的分类统计信息。那么,副本会有哪些状态呢?
所有的副本状态被StoredReplicaState表述。顾名思义,StoredReplicaState所表述的副本状态是已经存储下来的副本的整个状态,这些层面的状态一方面是为了给用户的诸如fsck的命令返回副本的统计信息,更重要的,这些状态的统计信息,将用来决定下一步对副本是否需要进行重构的策略,比如副本数是否太低,太低的副本需要进行重构,副本数是否太高,副本数太高的副本需要进行部分的删除。
public enum StoredReplicaState {
// live replicas. for a striped block, this value excludes redundant
// replicas for the same internal block
LIVE,
READONLY,
// decommissioning replicas. for a striped block ,this value excludes
// redundant and live replicas for the same internal block.
DECOMMISSIONING,
DECOMMISSIONED,
// We need live ENTERING_MAINTENANCE nodes to continue
// to serve read request while it is being transitioned to live
// IN_MAINTENANCE if these are the only replicas left.
// MAINTENANCE_NOT_FOR_READ == maintenanceReplicas -
// Live ENTERING_MAINTENANCE.
// 当 一个节点处于ENTERING_MAINTENANCE中(还没到达最终的IN_MAINTENANCE), 这个节点上的internal block如果没有其它副本,
// 那么这个node还是会接着serve 这个replica 的读请求。显然,当节点进入到IN_MAINTENANCE中的时候,读请求就不会过来了,
// 因此进入MAINTENANCE_NOT_FOR_READ
MAINTENANCE_NOT_FOR_READ,
// Live ENTERING_MAINTENANCE nodes to serve read requests.
MAINTENANCE_FOR_READ,
CORRUPT,
// excess replicas already tracked by blockmanager's excess map
EXCESS, // 副本数量超过了要求,这些超过要求的replica 最终会被删除
STALESTORAGE,
// for striped blocks only. number of redundant internal block replicas
// that have not been tracked by blockmanager yet (i.e., not in excess)
REDUNDANT
}
副本的状态StoredReplicaState是对连续布局和条带布局统一而言的,并不是专指某种布局方式。但是,某些特定状态在两种布局模式下的含义稍有不同。副本每一个状态的具体含义,我们可以从BlockManager构造NumberReplicas对象的方法checkReplicaOnStorage()
清楚地看到:
private StoredReplicaState checkReplicaOnStorage(NumberReplicas counters,
BlockInfo b, DatanodeStorageInfo storage,
Collection<DatanodeDescriptor> nodesCorrupt, boolean inStartupSafeMode) {
final StoredReplicaState s;
if (storage.getState() == State.NORMAL) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if (nodesCorrupt != null && nodesCorrupt.contains(node)) {
s = StoredReplicaState.CORRUPT;
} else if (inStartupSafeMode) {
s = StoredReplicaState.LIVE;
counters.add(s, 1);
return s;
} else if (node.isDecommissionInProgress()) {
s = StoredReplicaState.DECOMMISSIONING;
} else if (node.isDecommissioned()) {
s = StoredReplicaState.DECOMMISSIONED;
} else if (node.isMaintenance()) {
if (node.isInMaintenance() || !node.isAlive()) {
s = StoredReplicaState.MAINTENANCE_NOT_FOR_READ;
} else {
s = StoredReplicaState.MAINTENANCE_FOR_READ;
}
} else if (isExcess(node, b)) {
s = StoredReplicaState.EXCESS;
} else {
s = StoredReplicaState.LIVE;
}
counters.add(s, 1);
// //如果这个Storage是stale storage,那么,认为这个replica是stale状态,直到收到对应DN的heartbeat
if (storage.areBlockContentsStale()) {
counters.add(StoredReplicaState.STALESTORAGE, 1);
}
} else if (!inStartupSafeMode &&
storage.getState() == State.READ_ONLY_SHARED) {
s = StoredReplicaState.READONLY;
counters.add(s, 1);
副本的大部分状态是由这个副本所在的存储状态决定的:
BlockManager.countLiveAndDecommissioningReplicas()
。BlockManager.countLiveAndDecommissioningReplicas()
。public class CorruptReplicasMap{
/** The corruption reason code */
public enum Reason {
NONE, // not specified.
ANY, // wildcard reason
GENSTAMP_MISMATCH, // mismatch in generation stamps
SIZE_MISMATCH, // mismatch in sizes
INVALID_STATE, // invalid state
CORRUPTION_REPORTED // client or datanode reported the corruption
}
// 存放了所有corrupt的replica
private final Map<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
new HashMap<Block, Map<DatanodeDescriptor, Reason>>();
在BlockManager中维护了CorruptReplicasMap的引用,所有corrupt replica都是通过BlockManager.markReplicaAsCorrupt()
来添加到corruptReplicas
中的,主要有以下情况:
一个Block的replica的数量处于不正常的状态,比如,有多余的副本,或者低于预期的副本等,都属于副本不正常的情况。注意,副本不正常的情况都是指已经处于COMPLETE状态的副本,即不可变的块。可变的、正在写的块不在重构的处理范围内,而是块恢复(Recover)的职责。
NameNode会通过各种方式不断检测每个副本,确定这个副本是否属于错误副本,如果属于错误副本,则进行进一步处理
enum MisReplicationResult {
/** The block should be invalidated since it belongs to a deleted file. */
INVALID, // 这个块属于一个被删除的文件,副本可以删除了
/** The block is currently under-replicated. */
UNDER_REPLICATED, // 这个块的副本数不足
/** The block is currently over-replicated. */
OVER_REPLICATED,
/** A decision can't currently be made about this block. */
POSTPONE,
/** The block is under construction, so should be ignored. */
UNDER_CONSTRUCTION,
/** The block is properly replicated. */
OK
}
BlockManager.run()通过调用processMisReplicatedBlock() 来扫描当前replica的位置有问题(放置不正确,或者缺少replica)的所有block, 如果这个Replica的确需要重构,那么就放入到neededConstruction中。 processMisReplicatedBlock() 的扫描操作会在以下情况下被触发:
BlockManager使用postponedMisreplicatedBlocks
保存了一些需要延迟处理的、放置(MisReplicationResult中表述的比如UNDER_REPLICATED,OVER_REPLICATED)状态不正常的Block。
/**
* After a failover, over-replicated blocks may not be handled
* until all of the replicas have done a block report to the
* new active. This is to make sure that this NameNode has been
* notified of all block deletions that might have been pending
* when the failover happened.
*/
private final Set<Block> postponedMisreplicatedBlocks =
new LinkedHashSet<Block>();
延迟处理是因为,系统刚刚才启动,或者,Failover刚发生(当前的这个Active NameNode是刚刚才从Standby到Active状态的),因此,还没有收到一部分DataNode的第一次block report,即目前NameNode关于Block中副本的状态是过时(Stale)的,因此需要延迟对这个misReplicatedBlock进行进一步处理。
本质上,这是为了解决新的NameNode所看到的集群状态其实并非集群最新状态的特殊情况。这种特殊情况在HDFS特指需要删除副本数过多的Block的情况:
新的NameNode发现这个Block的副本超过了要求的副本数量(over-replicated),因此需要将某一个或者多个replica 加入到invalidateBlocks中,随后被删掉。但是,NameNode也发现这个Block有一个副本是在Stale DataNode上,这时候问题出现了:由于NameNode当前掌握的某个replica-1的DataNode(DN-1)的状态是过期(Stale)的,很有可能之前的Active NameNode已经发现了这个over-replicated的Block并且已经让这个Stale DataNode删掉了副本,但是当前的新的Active NameNode还不知道。如果这时候Active NameNode直接指示其它DataNode(DN-2)删掉副本,当之前的DN-1汇报说我已经把副本replica-1已经删除,这时候就造成Block的副本数不足。因此最好等待这个Block中位于Stale DataNode的heartbeat上来再对这个Invalidate操作进行处理。
在NameNode刚刚切换到 Active状态的时候,会将所有的DataNodeStorageInfo的状态置为Stale,直到第一次收到这个DataNode对应的heartbeat:
// NameNode刚启动,会将所有DataNode标记为Stale
void markStaleAfterFailover() {
heartbeatedSinceFailover = false;
blockContentsStale = true;
}
// 收到heartbeat
void receivedHeartbeat(StorageReport report) {
updateState(report);
heartbeatedSinceFailover = true;
}
//收到heart
void receivedBlockReport() {
if (heartbeatedSinceFailover) { // Failover以后已经收到了heartbeat
blockContentsStale = false; // 将DataNodeStorageInfo置为非Stale状态
}
blockReportCount++;
}
上面讲过,postponedMisreplicatedBlocks中的所有Block都是那种1) replica数量超过预期值并且 2) 有Replica在stale DataNode上。
大概有下面的这些情况,NameNode会往postponedMisreplicatedBlocks中添加Block:
RedundancyMonitor这个Daemon线程负责重新扫描postponedMisreplicatedBlocks中的每一个Block,用来对这里的Block的状态进行重新的确认。重新扫描的逻辑发生在方法rescanPostponedMisreplicatedBlocks()中:
显然,postponedMisreplicatedBlocks中的Block会发生以下的各种情况:
BlockManager中维护了一个PendingReconstructionBlocks pendingReconstruction对象,用来监控那些尚且需要获取更多已经存储的副本的Block:
class PendingReconstructionBlocks {
private final Map<BlockInfo, PendingBlockInfo> pendingReconstructions;
static class PendingBlockInfo {
private long timeStamp;
private final List<DatanodeStorageInfo> targets; // 这个block需要复制到的位置
从PendingReconstructionBlocks的定义可以看到,pendingReconstructions是一个以block作为key、以PendingBlockInfo作为value的map,代表这个block对应的PendingBlockInfo里面的target尚未向NameNode进行汇报。我们都知道客户端在写数据并且addBlock()的时候,NameNode会通过PlacementPolicy为这个Block寻找合适的target location并且返回给客户端,在这个block commit了以后, NameNode就会把这个block和对应的 target添加到PendingReconstructionBlocks中,意思是:这个block会写入到这些targets,但是我目前还没有收到这些targets对这个block的汇报,所以就把这个Block和它对应的targets 记录下来,当收到了DataNode的汇报以后,就把这个DataNode从这个block的记录里删除,如果这个blocks的所有targets都汇报了这个block,就可以把这个block从pendingReconstructions中彻底删除。
PendingReconstructionBlocks.increment()
方法是向PendingReconstructionBlocks对象中添加targets记录的方法,我们跟踪这个方法的调用者,可以看到什么时候NameNode会往pendingReconstruction中添加block -> targets 记录:
与PendingReconstructionBlocks.increment()
相反,PendingReconstructionBlocks.decrement()
会将一个block -> datanode的对应关系从pendingReconstruction中删除,这发生在DataNode在通过DatanodeProtocol.blockReceivedAndDeleted 接口进行增量块汇报的时候,会将这个block -> datanode的对应关系从pendingReconstruction中删除。同时,如果这个block对应的所有targets都已经完成了汇报,就把block从这个 pendingReconstruction中删除。
上面讲过,PendingReconstructionBlocks就像Block的守护者一样,NameNode通过它可以知道目前还有哪些block在等待Datanode向上汇报的信息。正常情况下,一个block存放到在PendingReconstructionBlocks,当所有的targets完成块汇报,很快就会从pendingReconstruction中删除。但是,分布式系统中,任何异常状态都有可能发生,如果pendingReconstruction 中的某个Block所预期的DataNode始终没有全部汇报上来,应该怎么样呢?
原来,为了识别这种很长时间依然没有删除的block, PendingReconstructionBlocks通过一个单独的线程PendingReconstructionMonitor去监控所有添加进来的block并为每个添加进来的block记时,超过指定时间,就会被标记为超时是block。在RedundancyMonitor线程的每一轮运行中,都会通过方法processPendingReconstructions()尝试去处理pendingReconstruction中超时的block:
private class RedundancyMonitor implements Runnable {
@Override
public void run() {
while (namesystem.isRunning()) { // 这是一个无限循环,只要是active namenode,就不断运行
try {
// Process recovery work only when active NN is out of safe mode.
if (isPopulatingReplQueues()) {
computeDatanodeWork(); // 选择并且将需要re-construct的节点发送给对应的Node 去执行
processPendingReconstructions(); // 看看pendingReconstruction 中有哪些block需要加入到neededConstruction 中去
rescanPostponedMisreplicatedBlocks();
/**
* If there were any reconstruction requests that timed out, reap them
* and put them back into the neededReconstruction queue
*/
void processPendingReconstructions() {
// pendingReconstruction主要来自与文件最后close的时候的最后一个block,在这里,如果pendingReconstruction
// 中的block在指定时间内依然没有完成construction,那么就需要放到neededConstruction中去
BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks();
// .......
for (int i = 0; i < timedOutItems.length; i++) {
BlockInfo bi = blocksMap.getStoredBlock(timedOutItems[i]);
NumberReplicas num = countNodes(timedOutItems[i]);
if (isNeededReconstruction(bi, num)) { //这个pendingReconstruction 中的block的确需要re-construction
neededReconstruction.add(bi, num.liveReplicas(),
num.readOnlyReplicas(), num.outOfServiceReplicas(),
getExpectedRedundancyNum(bi));
}
}
} finally {
namesystem.writeUnlock();
如上面代码所示,RedundancyMonitor线程会从pendingReconstruction取出超时的blocks,如果发现这些block需要进行重构,则将这需要重构的block加入到neededReconstruction中。下一轮循环过来,就会通过computeDatanodeWork()对needConstruction中需要重构的block进行处理。而一个block是否需要重构的判断入下代码所示:
boolean isNeededReconstruction(BlockInfo storedBlock,
NumberReplicas numberReplicas, int pending) {
return storedBlock.isComplete() &&
!hasEnoughEffectiveReplicas(storedBlock, numberReplicas, pending);
}
// 如果 live + pending replicas 的数量不小于所需要的replica数量,并且(还有pending的replica,或者没有pending的并且已经满足放置策略)
// 这里的含义是,如果有pending的,那么我们先不用考虑是否满足placement policy
boolean hasEnoughEffectiveReplicas(BlockInfo block,
NumberReplicas numReplicas, int pendingReplicaNum) {
int required = getExpectedLiveRedundancyNum(block, numReplicas); // 先看看需要多少个live的副本
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
return (numEffectiveReplicas >= required) &&
(pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
}
// 可容忍的最少的live replica的数量
public short getExpectedLiveRedundancyNum(BlockInfo block,
NumberReplicas numberReplicas) {
// 对于striped block,expectedRedundancy 数量指的是data block + parity block的数量,
// 对于replication,expectedRedundancy指的是replication factor
final short expectedRedundancy = getExpectedRedundancyNum(block);
// 假如当前我的block配置的副本是5, 有2个副本是处于maintenance,那么我期待的live 副本数量是5-2=3
return (short)Math.max(expectedRedundancy -
numberReplicas.maintenanceReplicas(), // 处于maintenance中的replica也算是live,这本身就是maintenance的目的,让replica短时间可以容忍丢失,但是处于decommission的不能算live
// 最小的maintenance 副本数量。对于striped block,最小的maintenance副本数量就是data block 的数量,说明对于
// stripe block, 不需与data block丢失
getMinMaintenanceStorageNum(block));
}
public short getExpectedRedundancyNum(BlockInfo block) {
return block.isStriped() ?
((BlockInfoStriped) block).getRealTotalBlockNum() : // 比如RS(6,2), realTotalBlockNum 就是 6 + 2 = 8
block.getReplication(); // 连续布局情况下,就是配置的块副本数
}
// 能够允许进入maintenance状态
private short getMinMaintenanceStorageNum(BlockInfo block) {
if (block.isStriped()) {
return ((BlockInfoStriped) block).getRealDataBlockNum(); // stripe block实际占用数据的块的数量
} else {
return (short) Math.min(minReplicationToBeInMaintenance,
block.getReplication()); // 或者是replication factor, 或者是最小允许的进入MAINTENANCE状态live replica的数量
}
}
可以看到,对于一个Block是否需要进行reconstruction,是基于已经统计好的Replica的各种状态信息的统计(统一存放在NumberReplicas对象中),就是:
COMPLETE
状态并且有足够多的有效Replica(参考isNeededReconstruction())hasEnoughEffectiveReplicas()
的代码 int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
,即存活状态的副本数量再加上待定状态的副本数量。存活状态的副本是有效副本很容易理解,但是为什么待定的副本也算作有效副本呢?因为待定的副本是正常状态下还缺失的待汇报的数量,这种缺失是很正常的,正常情况下只需要再等等就可以,因此,待定的副本也算作有效。getMinMaintenanceStorageNum()
可以看到,对于纠删码,如果进入ENTERING_MAINTENANCE状态以后,依然最少有dataBlockNum个副本存活,那么这个replica就可以进入Maintenance状态。对于3副本,这个默认值是1,即ENTERING_MAINTENANCE中如果一个Block有一个存活的副本,那么这个副本就可以正式进入MAINTENANCE状态。这就是为什么StoredReplicaState在进入正式的MAINTENANCE状态以前,有一个ENTERING_MAINTENANCE状态,在ENTERING_MAINTENANCE状态时,DataNode会有对应的Monitor确认所有的Block的live replica数量都大于getMinMaintenanceStorageNum(),才会从ENTERING_MAINTENANCE进入到MAINTENANCE状态,避免进入到MAINTENANCE状态以后导致数据副本缺失。getExpectedLiveRedundancyNum()
方法的return (short)Math.max(expectedRedundancy - numberReplicas.maintenanceReplicas(),getMinMaintenanceStorageNum(block));
代码可以看到,不考虑特殊情况,对于连续布局,期望的副本数量就是配置的副本数,对于纠删码,期待的副本数量就是dataBlocksNum + parityBlocksNum, 比如RS(6,2)中等于8。但是,由于有些Block的部分副本有可能处于maintenance状态,这些副本虽然暂时不可用,但是并没有丢失,因此,期待的副本数量应该减去这些处于Maintenance状态的副本。比如RS(6,2)中,有3个副本都处于maintenance状态,那么我期待的存活副本数量是8-3=5。同时期望的存活副本数量应该不小于允许进入maintenance的最小存活副本数量。BlockManager中有一个LowRedundancyBlocks neededReconstruction
变量,记录了当前需要进行reconstruction的所有block。
由于不同情况的恢复优先级不同(比如,3副本情况下丢失1个副本和丢失2个副本的紧急程度显然不同,RS(6,2)的情况下丢失1个replica和丢失2个replica的紧急程度显然不同),LowRedundancyBlocks对象根据紧急程度定义了优先级,每个优先级都有一个对应的block队列,代表了这个block需要进行该优先级的re-construct。
class LowRedundancyBlocks implements Iterable<BlockInfo> {
static final int QUEUE_HIGHEST_PRIORITY = 0;
static final int QUEUE_VERY_LOW_REDUNDANCY = 1;
static final int QUEUE_LOW_REDUNDANCY = 2;
static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3;
static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
显然,对于连续布局和条带布局,确定优先级的基本思想一致,但是计算方式并不相同。我们通过getPriorityContiguous()方法和getPriorityStriped()方法可以清晰地看到对于两种布局方式确定其优先级的计算逻辑。
先抛开布局方式的差异,一般来讲,各种优先级的含义是:
/**
* @param curReplicas 当前live的replica
* @param readOnlyReplicas 处在READONLY状态的replica,
* @param outOfServiceReplicas 指的是状态处在MAINTENANCE_NOT_FOR_READ || MAINTENANCE_FOR_READ || DECOMMISSIONED || DECOMMISSIONING的replica
* @param expectedReplicas 预期的replica数量,比如我们配置的文件副本数量为3
*/
private int getPriorityContiguous(int curReplicas, int readOnlyReplicas,
int outOfServiceReplicas, int expectedReplicas) {
if (curReplicas == 0) {
// If there are zero non-decommissioned replicas but there are
// some out of service replicas, then assign them highest priority
if (outOfServiceReplicas > 0) {
return QUEUE_HIGHEST_PRIORITY;
}
if (readOnlyReplicas > 0) {
// only has read-only replicas, highest risk
// since the read-only replicas may go down all together.
return QUEUE_HIGHEST_PRIORITY;
}
//all we have are corrupt blocks
return QUEUE_WITH_CORRUPT_BLOCKS;
} else if (curReplicas == 1) {
// only one replica, highest risk of loss
// highest priority
return QUEUE_HIGHEST_PRIORITY;
} else if ((curReplicas * 3) < expectedReplicas) {
//can only afford one replica loss 还能再承受一个replica loss,即如果还有两个replica loss,数据就丢失了,block就corrupt了
//this is considered very insufficiently redundant blocks.
return QUEUE_VERY_LOW_REDUNDANCY;
} else {
//add to the normal queue for insufficiently redundant blocks
return QUEUE_LOW_REDUNDANCY;
}
}
/**
* @param curReplicas 当前live的replica的数量
* @param outOfServiceReplicas 指的是状态处在MAINTENANCE_NOT_FOR_READ || MAINTENANCE_FOR_READ || DECOMMISSIONED || DECOMMISSIONING的replica
* @param dataBlkNum 配置的预期的数据块的数量,比如RS(6,2)中,dataBlkNum=6
* @param parityBlkNum 配置的预期的校验块的数量,比如RS(6,2)中,parityBlkNum=2
*/
private int getPriorityStriped(int curReplicas, int outOfServiceReplicas,
short dataBlkNum, short parityBlkNum) {
if (curReplicas < dataBlkNum) {
// There are some replicas on decommissioned nodes so it's not corrupted
if (curReplicas + outOfServiceReplicas >= dataBlkNum) { //有一部分replica在decommissiong 节点上,因此它还没有corrupt,但是必须立刻恢复了
return QUEUE_HIGHEST_PRIORITY;
}
return QUEUE_WITH_CORRUPT_BLOCKS; // 已经corrupt了,无法恢复了
} else if (curReplicas == dataBlkNum) { // 再丢失一个replica数据就无法恢复了
// highest risk of loss, highest priority
return QUEUE_HIGHEST_PRIORITY;
} else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) {
// can only afford one replica loss
// this is considered very insufficiently redundant blocks.
return QUEUE_VERY_LOW_REDUNDANCY; // 再丢失一个replica就corrupt了
} else {
// add to the normal queue for insufficiently redundant blocks.
return QUEUE_LOW_REDUNDANCY;// 一般优先级
}
}
LowRedundancyBlocks会通过getPriorityContiguous()获取优先级,然后将对应的需要进行reconstruct的block放入到对应的优先级队列中:
private boolean add(BlockInfo blockInfo, int priLevel, int expectedReplicas) {
if (priorityQueues.get(priLevel).add(blockInfo)) {
return true;
}
return false;
}
然后,RedundancyMonitor对应的Daemon 线程会根据LowRedundancyBlocks neededReconstruction中维护的优先级队列,按照优先级选择Block, 构造对应的ReconstructionTask。通过blocksToProcess控制每轮循环最多需要进行re-construct的block的数量,每次循环都通过bookmark的方式从上一轮循环处理的位置接着进行:
synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
int blocksToProcess, boolean resetIterators) {
final List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LEVEL);
int count = 0;
int priority = 0;
HashSet<BlockInfo> toRemove = new HashSet<>();
// 依次遍历每一个LEVEL的queue,形成一个需要进行re-construct的List<List>, 外层list就是优先级,内层list就是这个优先级下面的需要进行re-construct的block
for (; count < blocksToProcess && priority < LEVEL; priority++) {
final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority);
// 这个PriorityQueue的bookmark,每次循环都从上一次循环的位置开始,而不是重新开始
final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
final List<BlockInfo> blocks = new LinkedList<>();
if (!inCorruptLevel) { // 因为corrupt block已经没有恢复的必要了
blocksToReconstruct.add(blocks);
}
for(; count < blocksToProcess && i.hasNext(); count++) {
BlockInfo block = i.next();
if (block.isDeleted()) { // 这个block在neededReconstruction中,但是其实已经被NN删除了,因此必须要构造re-construct了
toRemove.add(block);
continue;
}
if (!inCorruptLevel) {
blocks.add(block);
}
}
for (BlockInfo bInfo : toRemove) {
remove(bInfo, priority); // 这个Block已经被系统删除了(比如,文件删除了),因此已经不需要reconstruct了
}
toRemove.clear();
}
....
return blocksToReconstruct;
}
块的重构是由一个持续运行的Daemon线程RedundancyMonitor负责调度的,由于待重构的Block信息逗存放在BlockManager.neededReconstruction(重构队列)中,因此RedundancyMonitor主要功能就是从重构队列中取出Block,构造重构任务,调度出去。同时,它还负责从待定队列中取出那些超时的Block,以及从延迟队列中取出已经不需要继续延迟(Stale状态结束)的节点,如果这些节点需要重构,则创建重构任务和调度重构任务。
本节将会讲解故障块的重构(Reconstruction)的基本流程。下一节会讲解故障块的恢复(Recovery)过程。
故障块的恢复(Recovery)指的是写数据过程中,由于某些非正常原因,造成最终的块文件没有达到最后的COMPLETE状态,比如,写过程客户端的突然离开,写过程中DataNode的突然当机。
而故障块的重构(Reconstruction)发生在写完成以后,由于系统情况的改变,比如,DataNode宕机,DataNode汇报上来的存储故障,用户手动修改了副本数导致副本数量过多,用户手动进行decommission或者maintenance导致副本的数量变化,都需要进行副本相关的重构。
private class RedundancyMonitor implements Runnable {
@Override
public void run() {
while (namesystem.isRunning()) { // 这是一个无限循环,只要是active namenode,就不断运行
try {
// Process recovery work only when active NN is out of safe mode.
if (isPopulatingReplQueues()) {
computeDatanodeWork(); // 从重构队列中取出Block, 选择并且将需要re-construct的节点发送给对应的Node 去执行
processPendingReconstructions(); // 从待定队列中取出Block,,如果需要reconstruct,加入到neededReconstruction中
rescanPostponedMisreplicatedBlocks(); // 从延迟队列中取出Block,如果需要reconstruct,加入到neededReconstruction中
}
RedundancyMonitor的computeDatanodeWork()负责创建块重建的任务并调度出去。
我们在上文讲解过,重构队列(neededReconstruction)是一个优先级队列,重构任务的创建和调度肯定是按照优先级从高到底进行。在LowRedundancyBlocks.chooseLowRedundancyBlocks()中可以看到基于优先级从重构队列中获取Block的逻辑:
int computeBlockReconstructionWork(int blocksToProcess) {
List<List<BlockInfo>> blocksToReconstruct = null;
namesystem.writeLock();
........
// Choose the blocks to be reconstructed
blocksToReconstruct = neededReconstruction // blocksToProcess存储了每次最多可以处理的blocks的数量
.chooseLowRedundancyBlocks(blocksToProcess, reset); // 每次最多处理blocksToProcess个,然后每次都是从上次的bookmark的位置接着进行
......
// 根据挑选的 需要进行reconstruct的block,对他们进行重新构建
return computeReconstructionWorkForBlocks(blocksToReconstruct);
}
// 选择那些处于风险中的块,按照优先级构成一个2层list。注意,这时候只是选出了需要进行重构的Block,至于怎么重构,需要computeReconstructionWorkForBlocks()来确定
synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
int blocksToProcess, boolean resetIterators) {
final List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LEVEL);
int count = 0;
int priority = 0;
HashSet<BlockInfo> toRemove = new HashSet<>();
// 依次遍历每一个LEVEL的queue,形成一个需要进行re-construct的List<List>, 外层list就是优先级,内层list就是这个优先级下面的需要进行re-construct的block
for (; count < blocksToProcess && priority < LEVEL; priority++) {
// Go through all blocks that need reconstructions with current priority.
// Set the iterator to the first unprocessed block at this priority level
// We do not want to skip QUEUE_WITH_CORRUPT_BLOCKS because we still need
// to look for deleted blocks if any.
final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority);
// 这个PriorityQueue的bookmark,每次循环都从上一次循环的位置开始,而不是重新开始
final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
final List<BlockInfo> blocks = new LinkedList<>();
if (!inCorruptLevel) { // 因为corrupt block已经没有恢复的必要了
blocksToReconstruct.add(blocks);
}
.......
return blocksToReconstruct;
}
}
可以看到,chooseLowRedundancyBlocks每次都按照优先级从高到低的顺序,最多选取blocksToProcess
个Block进行重构。请注意,整个分布式系统是一个不断变化的系统,从重构队列中取出的Block很可能已经不需要再重构了,比如(文件已经删除,由于DataNode恢复因此块状态已经完全恢复)等,所以从重构队列中取出的块都会时刻进行状态的重新判断,只有确定这个块的确需要重构,才会放入到blocksToReconstruct。
在上一节选定了需要进行重构的Block(基于复制或者基于纠删码),下面,就开始对这个块的信息进行分析,比如,这个Block是基于复制还是基于基于纠删码,这个Block的每一个replica在哪里(比如EC编码中每一个Internal Block的Storage信息,或者基于复制的Block的每一个Replica的信息),每一个replica的状态是什么样的(LIVE, DECOMMISSIONGING, MAINTENANCE_FOR_READ…),目前丢失因此需要重构的physical block有几个,是哪些(比如EC编码中用internal block index表示,而在复制方式中则只需要关心还缺几个副本)。
由于重构的读取过程显然会带来对应source节点的负载,假如有多种选择方案(比如,ReplicationWork中,有两个副本可用,只需要选择一个source replica拷贝到第三个节点,选哪个更好?),那么基本原则是选择负载最轻的、对系统运行影响最小的节点。
// 为chooseLowRedundancyBlocks()返回的blocks构造BlockReconstructionWork,并调度出去
int computeReconstructionWorkForBlocks(
List<List<BlockInfo>> blocksToReconstruct) { // 按照优先级进行排列的,每一个List<BlockInfo>都是这个对应优先级的block的列表
List<BlockReconstructionWork> reconWork = new ArrayList<>();
....
// 对这些block的重构任务进行分类,构造BlockReconstructionWork的具体实现,ErasureCodingWork或者ReplicationWork
for (int priority = 0; priority < blocksToReconstruct
.size(); priority++) { // 优先级从高到低,0的优先级最高
for (BlockInfo block : blocksToReconstruct.get(priority)) {
// 创建对应的ReconstructionWork,但是还没有到挑选节点的阶段
BlockReconstructionWork rw = scheduleReconstruction(block,
priority);
if (rw != null) {
reconWork.add(rw);
}
......
选择Source节点的逻辑在方法chooseSourceDataNodes()
中,其基本思路为:
dfs.namenode.replication.max-streams-hard-limit
,默认值为4), 即使是优先级最高的重构任务,也不会将该节点作为source节点LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
),不管该节点目前是LIVE,或者DECOMMISSIONING或者MAINTENANCE_FOR_READ, 只要该节点当前正在运行和准备运行的replica任务数量没有超过hard limit(dfs.namenode.replication.max-streams-hard-limit
),那么依然会作为source nodes。dfs.namenode.replication.max-streams
),这个节点就可以作为source下面是chooseSourceDatanodes()的代码,请注意,这些参数比如containingNodes, nodesContainingLiveReplicas,numReplicas,liveBlockIndices,liveBusyBlockIndices都是将这个Block对以的状态信息从方法里面带出方法外,供chooseSourceDatanodes()的调用者scheduleReconstruction()进行重构的下一步调度。
DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> nodesContainingLiveReplicas,
NumberReplicas numReplicas, // 传到这里的numReplicas是一个刚刚初始化、空的统计信息
List<Byte> liveBlockIndices,
List<Byte> liveBusyBlockIndices, int priority) {
if (isStriped) {
int blockNum = ((BlockInfoStriped) block).getTotalBlockNum(); // data unit + parity unit
liveBitSet = new BitSet(blockNum);
decommissioningBitSet = new BitSet(blockNum);
}
for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) { // 对于存储这个Block的每一个 DatanodeStorageInfo
final DatanodeDescriptor node = getDatanodeDescriptorFromStorage(storage);//
final StoredReplicaState state = checkReplicaOnStorage(numReplicas, block, // 获取这个节点上的 replica的状态
storage, corruptReplicas.getNodes(block), false);
if (state == StoredReplicaState.LIVE) { //存活状态
if (storage.getStorageType() == StorageType.PROVIDED) {
storage = new DatanodeStorageInfo(node, storage.getStorageID(),
storage.getStorageType(), storage.getState());
}
nodesContainingLiveReplicas.add(storage); // 加入存活列表
}
containingNodes.add(node);
// do not select the replica if it is corrupt or excess
if (state == StoredReplicaState.CORRUPT ||
state == StoredReplicaState.EXCESS) { // CORRUPT和EXCESS状态不考虑
continue;
}
// Never use maintenance node not suitable for read
// or unknown state replicas.
if (state == null
|| state == StoredReplicaState.MAINTENANCE_NOT_FOR_READ) {
continue;// 不可读状态不考虑
}
if (state == StoredReplicaState.DECOMMISSIONED) { // 已经decommissioned,这个replica不考虑
if (decommissionedSrc == null ||
ThreadLocalRandom.current().nextBoolean()) {
decommissionedSrc = node;
}
continue;
}
byte blockIndex = -1;
if (isStriped) {
blockIndex = ((BlockInfoStriped) block)
.getStorageBlockIndex(storage); // 这个节点所存储的internal block的block index
countLiveAndDecommissioningReplicas(numReplicas, state,
liveBitSet, decommissioningBitSet, blockIndex);
}
if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
&& (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) { // maxReplicationStreams默认值是2
if (isStriped && (state == StoredReplicaState.LIVE
|| state == StoredReplicaState.DECOMMISSIONING)) {
liveBusyBlockIndices.add(blockIndex);
}
continue; // already reached replication limit 已经超过限制,查看下一个replica
}
if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) { // replicationStreamsHardLimit默认值是4
if (isStriped && (state == StoredReplicaState.LIVE
|| state == StoredReplicaState.DECOMMISSIONING)) {
liveBusyBlockIndices.add(blockIndex);
}
continue;
}
if(isStriped || srcNodes.isEmpty()) {
srcNodes.add(node);
if (isStriped) {
liveBlockIndices.add(blockIndex); // 加入到healthy block中
}
continue;
}
.....
} //循环结束
// 对于基于复制的布局方式,如果没有一个存活的节点包含replica,还没有找到一个srcNodes,但是发现了一个已经decommissioned节点包含,那么,依然准备使用这个 decommissioned节点
// Pick a live decommissioned replica, if nothing else is available.
if (!isStriped && nodesContainingLiveReplicas.isEmpty() &&
srcNodes.isEmpty() && decommissionedSrc != null) {
srcNodes.add(decommissionedSrc);
}
// 如果是基于复制,那么srcNodes肯定只有一个元素
return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]);
}
scheduleReconstruction()
通过chooseSourceDatanodes()
选定Source节点以后,就个妞刚刚选择节点过程中获取的Block的source 节点的一些统计信息比如containingNodes, nodesContainingLiveReplicas,liveBlockIndices,liveBusyBlockIndices,计算还需要多少个replica需要重构,将这些综合信息封装成BlockReconstructionWork
,然后开始进行重构任务的调度。
块的重构细节都被封装在对应的BlockReconstructionWork
对象中,根据块布局方式的不同,分为ErasureCodingWork
和ReplicationWork
两种实现。显然,ReplicationWork不需要block index的这类信息的,但是ErasureCodingWork需要。在方法的,两个子类都重写chooseTargets()方法和addTaskToDataNode()方法,因为在选择目标节点上,不同的块布局有不同的PlacementPolicy,同时,在将任务添加到DataNode的时候,也有不同的逻辑。下文将会详细讲解。
class ErasureCodingWork extends BlockReconstructionWork {
private final byte[] liveBlockIndicies;
private final byte[] liveBusyBlockIndicies;
private final String blockPoolId;
public ErasureCodingWork(String blockPoolId,
BlockInfo block,
BlockCollection bc,
DatanodeDescriptor[] srcNodes,// srcNodes.size()和liveBlockIndicies.size()一样,并且liveBlockIndicies相同位置的值就是对应的internal block在group中的位置
List<DatanodeDescriptor> containingNodes, // 包含这个Block的所有的节点,包括DECOMMISSION和MAINTENANCE节点
List<DatanodeStorageInfo> liveReplicaStorages, // 包含这个Block的Live节点,因此不包含DECOMMISSION和MAINTENANCE
int additionalReplRequired, // 还缺少个replica
int priority,
byte[] liveBlockIndicies,// 只针对条带布局,所有live状态的replica的internal block index
byte[] liveBusyBlockIndicies) { // 只针对条带布局,所有处于LIVE或者DECOMMISSIONING并且这个节点上目前已经存在的复制任务超过一定数量
super(block, bc, srcNodes, containingNodes,
liveReplicaStorages, additionalReplRequired, priority);
this.blockPoolId = blockPoolId;
this.liveBlockIndicies = liveBlockIndicies;
this.liveBusyBlockIndicies = liveBusyBlockIndicies;
}
class ReplicationWork extends BlockReconstructionWork {
public ReplicationWork(BlockInfo block,
BlockCollection bc,
DatanodeDescriptor[] srcNodes, // 源节点,对于ReplicationWork, srcNodes的大小一定是1
List<DatanodeDescriptor> containingNodes,// 包含这个Block的所有的节点,包括DECOMMISSION和MAINTENANCE节点
List<DatanodeStorageInfo> liveReplicaStorages, // 包含这个Block的Live节点,因此不包含DECOMMISSION和MAINTENANCE
int additionalReplRequired, // 还需要多少个副本
int priority) {
super(block, bc, srcNodes, containingNodes,
liveReplicaStorages, additionalReplRequired, priority);
// 构造ReplicationWork的时候,还没有确定target节点,但是先进行计数
getSrcNodes()[0].incrementPendingReplicationWithoutTargets();
scheduleReconstruction()方法的具体代码如下,其基本逻辑为:
先通过调用getExpectedLiveRedundancyNum()看看我当前期望这个Block有多少个副本存活。上文在讲getExpectedLiveRedundancyNum()
方法的时候已经详细讲解了期望的存活副本数量(Expected Live Redundancy)的具体含义。
基于当前的期望副本数,和这个副本当前的实际情况(有多少replica是正常的LIVE状态,有多少个replica是pending状态(即pendingReconstructions中的副本,pendingReconstructions的含义上文已经讲过)),我还额外需要多少个副本呢?这是通过下面的代码计算的:
if (numReplicas.liveReplicas() < requiredRedundancy) { // 当前的live replica还不够requiredRedundancy的数量
additionalReplRequired = requiredRedundancy - numReplicas.liveReplicas()
- pendingNum;
}
即使在数量上我看起来不再需要副本(当前Block的实际存活的副本数已经不小于期望的存活副本数量(Expected Live Redundancy)),那有可能副本的位置并不满足PlacementPolicy的需求,这也是需要进行重构的,代码如下:
else { // 尽管副本总数量够了,但是有可能按照Placement policy,副本的位置不符合要求
// Violates placement policy. Needed on a new rack or domain etc.
BlockPlacementStatus placementStatus = getBlockPlacementStatus(block);
additionalReplRequired = placementStatus.getAdditionalReplicasRequired();
}
对于条带布局,在确定targets的数量的时候,还去掉了正处在DECOMMISSIONING和处在MAINTENANCE_FOR_READ的Replica,因为这些replica目前算作LIVE的replica(查看chooseSourceDatanodes()在确定liveBusyBlockIndicies的基本逻辑),而这些Replica即将不可读(因为即将进入DECOMMISSIONED和MAINTENANCE_NOT_FOR_READ状态),因此必须尽快利用这些节点上的Replica进行其它Replica的重构。
在按照上述逻辑确定了targets的数量以后,构造ErasureCodingWork或者ReplicationWork对象,开始选择目标节点。
// 虽然创建了ErasureCodingWork但是有可能是进行replica work,具体调度出去的时候会进行具体分析
return new ErasureCodingWork(getBlockPoolId(), block, bc, newSrcNodes,
containingNodes, liveReplicaNodes, additionalReplRequired,
priority, newIndices, busyIndices);
} else {
return new ReplicationWork(block, bc, srcNodes,
containingNodes, liveReplicaNodes, additionalReplRequired,
priority);
}
目标节点是重构的时候需要写入重构数据的节点。显然,这刚好是PlacementPolicy做的事情。代码如下:
// 为每一个reconstruction work挑选target节点
for (BlockReconstructionWork rw : reconWork) {
// Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes.
final Set<Node> excludedNodes = new HashSet<>(rw.getContainingNodes());
// Exclude all nodes which already exists as targets for the block
List<DatanodeStorageInfo> targets = // 排除掉这个Block当前所在的targets
pendingReconstruction.getTargets(rw.getBlock());
if (targets != null) {
for (DatanodeStorageInfo dn : targets) {
excludedNodes.add(dn.getDatanodeDescriptor());
}
}
//根据布局方式(连续布局还是条带布局),选择合适的块放置策略
final BlockPlacementPolicy placementPolicy =
placementPolicies.getPolicy(rw.getBlock().getBlockType());
rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes);// 最终会调BlockPlacementPolicy.chooseTarget()
}
从上面的代码可以看到,选择目标节点是通过调用BlockReconstructionWork.chooseTargets()
方法实现的,这个方法在父类BlockReconstructionWork上是抽象方法,需要子类去实现。两个子类对这个方法的实现方式大致一致,都是选择PlacementPolicy.chooseTargets()去调度,这里不贴二者实现的代码。
在上文讲条带布局的写过程时我们说过,连续布局的默认块放置策略BlockPlacementPolicyDefault
会将第一个replica放在一个rack的某台机器上,另外两个replica会放在另外一个rack的两台不同机器上。而条带布局的默认块放置策略是BlockPlacementPolicyRackFaultTolerant
,倾向于讲所有的internal block放到不同的基架上去。
我们看一下BlockPlacementPolicy接口,就知道它需要哪些信息:
public abstract DatanodeStorageInfo[] chooseTarget(String srcPath, // 这个块对应的文件的path
int numOfReplicas, // 额外需要的节点数量
Node writer, // 哪个节点负责写数据,这种情况下,如果这个节点的确是一个DataNode,并且这个DataNode不在excludedNodes里面,那么第一个replica会写到这个节点上去
List<DatanodeStorageInfo> chosen, // 已经被选择作为targets的节点
boolean returnChosenNodes, // 是否返回被选择的节点
Set<Node> excludedNodes, // 需要排除的节点
long blocksize, // 一个group中的data block的大小的总和
BlockStoragePolicy storagePolicy, // 这个节点应该放在哪种类型(StorageType比如SSD,DISK等)的介质上
EnumSet<AddBlockFlag> flags //分配block的时候的一些hint(暗示)的flag信息以修改这个PlacementPolicy的默认行为,比如默认会将第一个replica放到writer所在的DataNode上,但是如果指定了NO_LOCAL_WRITER,就不会把第一个replica放到跟writer在一个host上
);
在调用BlockPlacementPolicy.chooseTarget()
进行目标节点选择的时候,基本思路是,我告诉你 1) 已经选择作为target的节点(chosen) 2) 需要排除的节点,即不能作为目标的节点,那么,请根据你的BlockPlacementPolicy
的实现,为我选择numOfReplicas
个节点作为target节点。
到这一阶段,我们已经构建了BlockReconstructionWork的具体实现,ErasureCodingWork或者ReplicationWork,包含了srcNodes(这个Block的每一个replica所在的并且满足一定条件比如节点状态,当前上面已经有的replication的task的数量),containingNodes(这个Block的每一个replica所在的节点,无论什么状态,无论上面已经有多少replication的task),targets(重构的目标节点),liveBlockIndicies(处于LIVE状态或者其他状态但是满足要求的replica的internal block index,只针对条带布局),liveReplicaStorages(处于LIVE状态的DataNodeStorage),现在可以将任务调度出去了。调度的主要逻辑在方法validateReconstructionWork()
中(方法名字似乎跟它做的事情并不一致。。。。)
// 将task调度给对应的DN,进行重构
namesystem.writeLock();
try { // 遍历每一个reconstruction work, 然后调度出去
for (BlockReconstructionWork rw : reconWork) {
.......
synchronized (neededReconstruction) {
if (validateReconstructionWork(rw)) { //validateReconstructionWork会将任务调度出去,即attach到对应的DataNode,等待收到心跳,就把attach到DataNode的任务作为response发送给DataNode
....
}
.....
return scheduledWork;
validateReconstructionWork()做了以下事情
NumberReplicas numReplicas = countNodes(block);// 再次统计这个block当前的状态统计信息
final short requiredRedundancy =
getExpectedLiveRedundancyNum(block, numReplicas);
final int pendingNum = pendingReconstruction.getNumReplicas(block);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) { // 算上pending的,已经有足够的replica,不需要进行re-construct了
neededReconstruction.remove(block, priority);
.......
return false;
}
if ((numReplicas.liveReplicas() >= requiredRedundancy) &&
(!placementStatus.isPlacementPolicySatisfied())) { // 如果replica数量足够,仅仅是分布方式不满足放置策略
BlockPlacementStatus newPlacementStatus =
getBlockPlacementStatus(block, targets); // 当添加了targets以后,获取对应的status
// 节点添加进来以后,依然没有满足当前的放置策略,并且,即使把target加进来,还需要额外的节点数量大于不加入target的时候还需要的节点数量,那么,加入这些target没有任何价值
// 也就是说,如果这些targets加进来以后,放置策略被满足,或者,虽然依然不满足,但是至少让所需要的节点数量减少了,那么,把这些target加进来就有意义
if (!newPlacementStatus.isPlacementPolicySatisfied() &&
(newPlacementStatus.getAdditionalReplicasRequired() >=
placementStatus.getAdditionalReplicasRequired())) { // 分布不满足分布策略,
// If the new targets do not meet the placement policy, or at least
// reduce the number of replicas needed, then no use continuing.
return false;
}
// mark that the reconstruction work is to replicate internal block to a
// new rack.
rw.setNotEnoughRack(); // 仅仅需要把replica放到一个新的rack上
}
每一个BlockPlacementPolicy都必须实现 verifyBlockPlacement()
方法,它返回一个BlockPlacementStatus
对象,封装了当前的副本放置是否满足要求、还需要几个副本等等信息,供调用者进行决策。 public abstract BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, int numOfReplicas);
public interface BlockPlacementStatus {
public boolean isPlacementPolicySatisfied(); // 当前的放置状态是否满足了放置策略
public String getErrorDescription();// 当前的放置状态没有满足放置策略的一些错误或者描述信息
int getAdditionalReplicasRequired(); // 还需要多少个额外副本才能保证放置策略得到满足
}
可以看到,如果副本数量足够但是verifyBlockPlacement()返回的BlockPlacementStatus显示Block当前的Replica放置不满足要求,那么就会调用BlockReconstructionWork.setNotEnoughRack(),将boolean notEnoughRack置为True,这个变量将会影响到向DataNode添加任务的逻辑,因为如果notEnoughRack=True,那么即使是ErasureCodingWork,这个任务也不再是进行encode/decode类型的计算,而是通过将某个internal block进行拷贝来买满足PlacementPolicy的要求。下文会详细介绍向DataNode添加任务的流程。
3. 将任务Attach到DataNode
在BlockReconstructionWorkd的类图中可以看到,addTaskToDataNode()
方法是一个抽象方法,因此ErasureCodingWork和ReplicationWork对这个方法都有自己的实现。
将任务attach到DataNode是通过DatanodeDescriptor.addBlockToBeReplicated()和DatanodeDescriptor.addBlockToBeErasureCoded()方法进行的。我们从下面的DatanodeDescriptor的类图可以看到,DatanodeDescriptor上保存了NameNode即将发送给该DataNode的各种类型的副本操作信息,这些信息将会在下一次DataNode到来的时候作为repsonse发送给DataNode.
从下面的代码可以看到,addBlockToBeReplicated()和addBlockToBeErasureCoded()就是将对应的任务添加到该DatanodeDescriptor的对应的任务队列replicaBlocks和erasurecodeBlocks中去。
---------------------------------------------DatanodeDescriptor----------------------------------------------------
public void addBlockToBeReplicated(Block block, // 这个block是internal block
DatanodeStorageInfo[] targets) {
assert(block != null && targets != null && targets.length > 0);
replicateBlocks.offer(new BlockTargetPair(block, targets));
}
/**
* Store block erasure coding work.
*/
void addBlockToBeErasureCoded(ExtendedBlock block,
DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets,
byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) {
assert (block != null && sources != null && sources.length > 0);
// 构造这个BlockECReconstructionInfo的block是一个block group,即我还不知道需要对哪个internal block进行重算
BlockECReconstructionInfo task = new BlockECReconstructionInfo(block,
sources, targets, liveBlockIndices, ecPolicy);
erasurecodeBlocks.offer(task);
}
当NameNode 收到了DataNode发过来的信息,会在handleHeartbeat()方法中查看当前需要回复给这个DataNode的命令,其中就包括需要该DataNode复制replica到别的节点的命令,或者进行纠删码的encode/decode以恢复纠删码内部块的命令。
所有的命令封装在DatanodeCommand中的对应具体实现类中,比如,对Block进行基于复制的重构new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, pendingList)
, 对Block进行基于纠删码编解码的重构new BlockECReconstructionCommand( DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList)
, 删除一个Block的Replica的命令new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks)
等等。几个重要的DatanodeProtocol:
final static int DNA_TRANSFER = 1; // 将副本从一个节点传输到另一个节点
final static int DNA_INVALIDATE = 2; // 删除一个replica
final static int DNA_SHUTDOWN = 3; // 关闭节点
final static int DNA_REGISTER = 4; // 重新注册
final static int DNA_RECOVERBLOCK = 6; // 恢复一个eblock
final static int DNA_ACCESSKEYUPDATE = 7; // update access key
final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // 更新balancer的带宽
final static int DNA_CACHE = 9; // 缓存一个block
final static int DNA_UNCACHE = 10; // 将一个block从缓存中拿掉
final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // 基于纠删码的重构命令
在DataNode端,DataNode会为每一个NameNode(每一个NameService的每一个NameNode,Active或者Standby)构建一个BPOfferService,用来处理这个NameNode发过来的命令。最终,会在BPOfferService.processCommandFromActive()处理来自Active NameNode的命令
----------------------------------------------BPOfferService------------------------------------------------
boolean processCommandFromActor(DatanodeCommand cmd,
BPServiceActor actor) throws IOException {
.......
try {
if (actor == bpServiceToActive) {
return processCommandFromActive(cmd, actor);
} else {
return processCommandFromStandby(cmd, actor);
}
private boolean processCommandFromActive(DatanodeCommand cmd,
BPServiceActor actor) throws IOException {
.......
switch(cmd.getAction()) {
case DatanodeProtocol.DNA_TRANSFER: // 处理replica复制任务非常简单,就是根据副本信息,将副本复制到指定的远程节点
// Send a copy of a block to another datanode
dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(),
bcmd.getTargets(), bcmd.getTargetStorageTypes(),
bcmd.getTargetStorageIDs());
break;
case DatanodeProtocol.DNA_INVALIDATE:
......
case DatanodeProtocol.DNA_CACHE:
.....
case DatanodeProtocol.DNA_UNCACHE:
.......
case DatanodeProtocol.DNA_SHUTDOWN:
......
case DatanodeProtocol.DNA_FINALIZE:
......
case DatanodeProtocol.DNA_RECOVERBLOCK:
.......
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
.......
// 对于continuous block的恢复,参考DatanodeProtocol.DNA_TRANSFER
case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
LOG.info("DatanodeCommand action: DNA_ERASURE_CODING_RECOVERY");
Collection<BlockECReconstructionInfo> ecTasks =
((BlockECReconstructionCommand) cmd).getECTasks();
dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
.......
}
return true;
}
与NameNode
对应,DatanodeProtocol.DNA_TRANSFER
用来进行副本的复制,而DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION
用来进行纠删码的块重构。基于副本复制的重构方式很简单,我们不做赘述。主要讲解纠删码的块重构。
DataNode会构造一个全局的叫做ErasureCodingWorker的对象负责进行纠删码的块重构任务(注意和NameNode端的ErasureCodingWork类区分开)。DataNode启动的时候会初始化ErasureCodingWorker,在ErasureCodingWorker构造时,会用一个线程池专门执行块的重构任务
public ErasureCodingWorker(Configuration conf, DataNode datanode) {
......
initializeStripedBlkReconstructionThreadPool(conf.getInt(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_THREADS_KEY,
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_THREADS_DEFAULT));
}
private void initializeStripedBlkReconstructionThreadPool(int numThreads) {
LOG.debug("Using striped block reconstruction; pool threads={}",
numThreads);
stripedReconstructionPool = DFSUtilClient.getThreadPoolExecutor(numThreads,
numThreads, 60, new LinkedBlockingQueue<>(),
"StripedBlockReconstruction-", false);
stripedReconstructionPool.allowCoreThreadTimeOut(true);
}
其实就是创建了一个corePoolSize=8, maxPoolSize=8, keepAliveTime=60s, 任务队列是一个阻塞的无界队列LinkedBlockingQueue的线程池。我们都知道,这个在线程池的划分上叫做叫做固定线程数量的线程池(可以参考文章《Java的5中线程池》)。线程池中的每一个线程负责一个Block(注意不是replica)的重构。
------------------------------------------------ErasureCodingWorker---------------------------------------------------
public void processErasureCodingTasks(
Collection<BlockECReconstructionInfo> ecTasks) {
// 对于每一个重构的task
for (BlockECReconstructionInfo reconInfo : ecTasks) {
try {
StripedReconstructionInfo stripedReconInfo =
new StripedReconstructionInfo(
reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(),
reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(),
reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes(),
reconInfo.getTargetStorageIDs());
final StripedBlockReconstructor task =
new StripedBlockReconstructor(this, stripedReconInfo);
if (task.hasValidTargets()) {
stripedReconstructionPool.submit(task);
可以看到,ErasureCodingWorker将块的重构信息封装到StripedReconstructionInfo
中,然后封装为一个StripedBlockReconstructor
,提交给ErasureCodingWorker在初始化的时候构造的固定线程数的线程池stripedReconstructionPool
。由此可见,StripedBlockReconstructor
肯定是一个Runnable,它的run()
方法结构非常清晰,做的事情一目了然:
--------------------------------------------StripedBlockReconstructor----------------------------------------------
class StripedBlockReconstructor extends StripedReconstructor
implements Runnable {
private StripedWriter stripedWriter;
......
@Override
public void run() {
try {
initDecoderIfNecessary(); // 根据ECPolicy创建对应的decoder
initDecodingValidatorIfNecessary(); // 根据ECPolicy创建对应的decoder的validator
getStripedReader().init();//初始化条带块的读取
stripedWriter.init(); //初始化条带块的写入
reconstruct(); // 重构
stripedWriter.endTargetBlocks(); // 结束目标块的写入,重构完成
.....
从上面的代码可以看到,StripedBlockReconstructor会首先初始化Decoder和DecodingValidator,然后初始化了Reader(getStripedReader().init())和Writer(stripedWriter.init()),然后进行重构,最后进行一些关闭操作。
具体架构如下图所示:
关于Decoder和DecodingValidator:
关于Decoder和DecodingValidator
Decoder就是通过已有的replica计算出缺失的block,比如RS(6,2),index=(3,6)的块丢失,因此需要通过Decode的方式重算丢失块。DecodingValidator的过程就是将已经Decode的结果重新作为Input进行Decode,看看生成的块与当前的块是否匹配。
比如,通过decode(d0,d1,d2,d4,p0,p1) 恢复出 (d0, d5), 然后校验过程就可以是decode(d1, d2, d3,d4,d5, p0) 计算出d0’,然后对比d0’和d0,如果一致则校验成功。
关于Reader
显然,纠删码的重构需要从多个Source读取internal block,才能进行重构。比如RS(6,2),应该至少需要从6台DataNode上读取6个internal block(待会儿会讲到文件最后的一个Group可能不需要读这么多的block),才能重构丢失的internal block,和每一个DataNode的通信都需要建立独立的连接,需要多个reader。StripedBlockReconstructor将这些reader的管理交付给StripedReader负责,它管理了StripedBlockReader的List, 每一个StripedBlockReader负责一个DataNode上Replica的读取。
StripeReader的构造方法如下:
StripedReader(StripedReconstructor reconstructor, DataNode datanode,
Configuration conf, StripedReconstructionInfo stripedReconInfo) {
......
dataBlkNum = stripedReconInfo.getEcPolicy().getNumDataUnits();
parityBlkNum = stripedReconInfo.getEcPolicy().getNumParityUnits();
int cellsNum = (int) ((stripedReconInfo.getBlockGroup().getNumBytes() - 1)
/ stripedReconInfo.getEcPolicy().getCellSize() + 1);
minRequiredSources = Math.min(cellsNum, dataBlkNum); // minRequiredSources可能等于data block的数量,有可能等于当前这个stripe的cell数量
if (minRequiredSources < dataBlkNum) { // cellNum小于dataBlkNum,说明这个block group的总的数据量甚至都没有一个stripe那么多,这往往是一个文件的最后一个Block Group
int zeroStripNum = dataBlkNum - minRequiredSources;
zeroStripeBuffers = new ByteBuffer[zeroStripNum];
zeroStripeIndices = new short[zeroStripNum];
}
......
readers = new ArrayList<>(sources.length);
readService = reconstructor.createReadService(); //用来进行并发读取的StripedBlockReader的线程池
}
```
```java
CompletionService<BlockReadStats> createReadService() {
return new ExecutorCompletionService<>(stripedReadPool);
}
// StripedReader的初始化
void init() throws IOException {
initReaders();
initBufferSize();
initZeroStrip();
}
从刚上面的代码可以看到,在构造的时候主要做了以下3件事情:
以RS(6,2),cell大小为1MB为例,为了重构内部物理块,难道不都是需要至少6个内部块吗?答案是不一定。特殊情况发生在文件的最后一个Block Group,假如这个Block Group的大小小于等于(6 - 1) * 1M = 6M,那么就至少有一个internla block是全0的,HDFS对于这种小文件,尽管NameNode会照常分配6+2=8个物理块,但是客户端也只会写有有效数据的块,根本不会写没有任何有效数据的块。这个结果我们在本文的实验验证部分已经验证了。下面的StripedReader的构造方法就是确定最少需要多少个source,即需要构造多少个实际读取数据的reader。剩下的空的 internalblock 也会构建对应的Buffer,但是数据全是0。
根据上面的计算方式,我们以RS-6-2-1024k
为例:
如果这个BlockGroup的大小是3.5 * 1024k = 1536kB,由于一个cell大小为1024KB, 那么这个Block Group只会有 (3584 - 1)/1024 + 1 = 4个internal block,剩下两个internal Block不会生成。
如果这个BlockGroup的大小是8 * 1024 = 8192KB,由于一个cell大小为1024KB,那么(8192 - 1)/1024 + 1 = 8,与最多的6取最小值,因此是需要6个internal block。
两种情况的纠删码的条带布局如下图所示:
可以看到,这里是构造了一个线程池stripedReadPool,corePoolSize=0, maxPoolSize=Integet.MAX_VALUE, keepAliveTime=60s, 任务队列是一个阻塞的无界队列SynchronousQueue的线程池。我们都知道,这个在线程池的划分上叫做叫做缓存线程池(Cached Thread Pool,可以参考文章[《Java的5中线程池》]。SynchronousQueue一般用在无界的缓存线程池中,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
请把这个线程池与上文提到的ErasureCodingWorker中构造的线程池stripedReconstructionPool区分开,stripedReconstructionPool中的每一个task是一个StripedBlockReconstructor,每一个task负责一个Block的reconstruction,这里的线程池stripedReadPool的每一个task则负责一个Block下的Replica的读取。
// 构造线程池
private void initializeStripedReadThreadPool() {
// Essentially, this is a cachedThreadPool.
stripedReadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new Daemon.DaemonFactory() {
.....
},
new ThreadPoolExecutor.CallerRunsPolicy() {
.......
});
stripedReadPool.allowCoreThreadTimeOut(true);
}
// 将线程池封装为ExecutorCompletionService
CompletionService<BlockReadStats> createReadService() {
return new ExecutorCompletionService<>(stripedReadPool);
}
刚刚说过,在StripedBlockReconstructor.run()中,会通过调用getStripedReader().init()来初始化Reader。
我们可以看到,StripedReader初始化主要是初始化每一个Replica的StripedBlockReader,以及初始化buffer和zeroStripe(上面讲到的空的internal block,这些block没有有效数据,decode运算的时候需要假设上面的数据是0)。
从下面的代码可以看到,一共初始化了minRequiredSources个reader,每一个reader会负责与一个DataNode通信拉去对应的replica,所有的reader放在一个readers数组中。
void init() throws IOException {
initReaders();
initBufferSize();
initZeroStrip();
}
private void initReaders() throws IOException {
// 初始化一个success list, 代表可以进行internal block读取的DN,由于每次读取的过程中可能发现有些DN不好用,
// 因此会更新这个list
successList = new int[minRequiredSources];
StripedBlockReader reader;
int nSuccess = 0; // 我们有sources.length个reader,但是实际上我们只需要minRequiredSources个reader
for (int i = 0; i < sources.length && nSuccess < minRequiredSources; i++) {
reader = createReader(i, 0); // offset都是0,因为每一个reader都是从自己负责的replica的0的位置开始读取
readers.add(reader);
if (reader.getBlockReader() != null) { // 连接已经建立好了
initOrVerifyChecksum(reader);
successList[nSuccess++] = i; //successList[nSuccess++]存放了readers数组的索引
}
}
}
/**
* 注意,idxInSources只是在liveIndices数组中的索引,而liveIndices[idxInSources]才是internal block在block group中的索引
*/
StripedBlockReader createReader(int idxInSources, long offsetInBlock) {
return new StripedBlockReader(this, datanode,
conf, liveIndices[idxInSources], // liveIndices[idxInSources] 存的是internal block在group中的index
reconstructor.getBlock(liveIndices[idxInSources]),
sources[idxInSources], offsetInBlock);
}
initBufferSize()
的代码如下,buffer的大小通过dfs.datanode.ec.reconstruction.stripedread.buffer.size
配置,默认64KB。每次构造的时候,minRequiredSources中的每一个StripedBlockReader都会读取bufferSize的数据,进行decode操作,然后写入到target中。
private void initBufferSize() {
int bytesPerChecksum = checksum.getBytesPerChecksum(); // 默认一个chunk是512B,算上4B的checksum,是516B
// The bufferSize is flat to divide bytesPerChecksum
int readBufferSize = stripedReadBufferSize; // 默认64kb
// 假如用户配置的stripedReadBufferSize是9KB, bytesPerChecksum是4KB,那么真正的bufferSize只需要8KB就行,不需要9KB
bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum :
readBufferSize - readBufferSize % bytesPerChecksum;
}
initZeroStrip()
方法主要是为上文讲的空block构造buffer。空的internal block中虽然没有有效数据,却需要置0来进行统一decode运算。这里不做详细讲解。
同StripedReader在逻辑上相似,StripedWriter也是封装了一个或者多个StripedBlockWriter,每一个StripedBlockWriter负责一个target replica的写操作。
StripedWriter(StripedReconstructor reconstructor, DataNode datanode,
Configuration conf, StripedReconstructionInfo stripedReconInfo) {
........
/**
* 有多个writer,可以看到一个StripedWriter负责一个BlockGroup 的一个或者多个internal block的恢复
* 一个StripedBlockWriter负责一个 replica的生成操作
*/
writers = new StripedBlockWriter[targets.length];
targetIndices = new short[targets.length];//targetIndices数组中的每一个元素的值代表了这个target在Block Index中的索引
Preconditions.checkArgument(targetIndices.length <= parityBlkNum,
"Too much missed striped blocks.");
initTargetIndices();
long maxTargetLength = 0L;
for (short targetIndex : targetIndices) { // 遍历每一个internal block,获取最大的internal block的长度
maxTargetLength = Math.max(maxTargetLength,
reconstructor.getBlockLen(targetIndex));
}
reconstructor.setMaxTargetLength(maxTargetLength);
}
// 初始化targetIndices数组,targetIndices数组的每一个元素代表了需要恢复的block在Block Group中的index
private void initTargetIndices() {
BitSet bitset = reconstructor.getLiveBitSet(); // 从StripedReconstructor构造函数可以看到,liveBitSet中存放了liveIndices中的每一个value,即每一个存活的(也包含decommissioning和maintenance_for_read,参考chooseSourceDataNodes()方法)的internal block的index
int m = 0;
hasValidTargets = false;
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
if (!bitset.get(i)) { // 缺数据
if (reconstructor.getBlockLen(i) > 0) { // 的确需要数据
if (m < targets.length) { // 最多构造m个target
targetIndices[m++] = (short)i;
hasValidTargets = true;
}
.......
}
StripedWriter的构造方法主要做了以下几件事情:
刚刚说过,在StripedBlockReconstructor.run()
中,会通过调用stripedWriter.init()
来初始化writer。
StripedWriter的初始化主要是初始化了写数据过程中的data buffer和checksum buffer。一个data buffer用来存放一个packet,一个packet是有头部信息和多个chunk组成。这里不做赘述
void init() throws IOException {
DataChecksum checksum = reconstructor.getChecksum();
checksumSize = checksum.getChecksumSize();// 4B
bytesPerChecksum = checksum.getBytesPerChecksum(); // 512B
int chunkSize = bytesPerChecksum + checksumSize; // 516B
maxChunksPerPacket = Math.max( // WRITE_PACKET_SIZE是64KB,需要包含PKT_MAX_HEADER_LEN的header
(WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN) / chunkSize, 1);
int maxPacketSize = chunkSize * maxChunksPerPacket
+ PacketHeader.PKT_MAX_HEADER_LEN; // 最大的packet的大小
packetBuf = new byte[maxPacketSize]; // 数据的buffer
int tmpLen = checksumSize *
(reconstructor.getBufferSize() / bytesPerChecksum);
checksumBuf = new byte[tmpLen]; // checksum的buffer的长度
}
上面讲过,方法reconstruct()是重构的具体过程。抛开细节,整个重构是一轮一轮进行,一轮重构是指读取一个Stripe中的source的数据(通过上面的线程池stripedReadPool并发进行,但是显然,在进行decode之前必须等所有task成功结束),进行decode,然后发送给一个或者多个targets (注意,多个targets的发送并没有使用线程池的方式并发进行,为什么?)
void reconstruct() throws IOException {
// 在internal block中的位置依然小于最大的target internal block的长度, 意味着还需要接着读数据
while (getPositionInBlock() < getMaxTargetLength()) { // 当前的position还小于最大的internal block的长度
DataNodeFaultInjector.get().stripedBlockReconstruction();
long remaining = getMaxTargetLength() - getPositionInBlock();
final int toReconstructLen = // 最多只能读取buffer size大小的数据(从StripedReader的构造函数可以看到,buffer size肯定是chunk的整数倍)
(int) Math.min(getStripedReader().getBufferSize(), remaining);
// step1: read from minimum source DNs required for reconstruction.
// The returned success list is the source DNs we do real read from
getStripedReader().readMinimumSources(toReconstructLen);// 从远程最多读取toReconstructLen byte的数据
// step2: decode to reconstruct targets
reconstructTargets(toReconstructLen); // 重算,构造数据
// step3: transfer data
// 把数据发送给target
if (stripedWriter.transferData2Targets() == 0) { // 将数据发送给 target
String error = "Transfer failed for all targets.";
throw new IOException(error);
}
updatePositionInBlock(toReconstructLen);
clearBuffers();
}
读取的逻辑很简单,根据需要读取的reconstructLength,并发提交minRequiredSources个任务进行读取操作,每一个任务是一个StripedBlockReader,负责和远程的DataNode通信以读取一个Internal Block,每一个submit都会返回一个Future句柄,供调用者查询结果。提交以后,开始轮询所有的Future。
正常情况下,minRequiredSources个reader都成功,则这一轮读取结束,所有读取的结果都存放在各自的StripedBlockReader的缓存中,供待会儿Decode使用。
如果失败或者超市,则会尝试更换另外一个DataNode读取。显然,只有在minRequiredSources小于liveSources的时候,还有可以更换的DataNode,否则,重构失败。
int[] doReadMinimumSources(int reconstructLength,
CorruptedBlocks corruptedBlocks)
throws IOException {
int nSuccess = 0;
int[] newSuccess = new int[minRequiredSources];
BitSet usedFlag = new BitSet(sources.length);
/*
* Read from minimum source DNs required, the success list contains
* source DNs which we think best.
*/
// 只需要真正的从minRequiredSources个reader中读取数据,剩下的则直接向buffer中填充0
for (int i = 0; i < minRequiredSources; i++) { // 最小的reader数量中的每一个reader都要去读数据
StripedBlockReader reader = readers.get(successList[i]);// successList[i]代表对应的需要从中拉取数据的block,readers.get将获取这个block对应的StripedBlockReader
int toRead = getReadLength(liveIndices[successList[i]],
reconstructLength);
.....
Callable<BlockReadStats> readCallable =
reader.readFromBlock(toRead, corruptedBlocks);
Future<BlockReadStats> f = readService.submit(readCallable);
futures.put(f, successList[i]);
.....
}
while (!futures.isEmpty()) { // 一直循环等待futures清空
try {
StripingChunkReadResult result =
StripedBlockUtil.getNextCompletedStripedRead(
readService, futures, stripedReadTimeoutInMills); // 每次取出一个成功的Future,即一个Replica的读取结果,这个结果有可能是成功,有可能是超时,有可能是失败
int resultIndex = -1;
if (result.state == StripingChunkReadResult.SUCCESSFUL) {
resultIndex = result.index;
} else if (result.state == StripingChunkReadResult.FAILED) { // 和下面的Timeout的处理策略一样,换一个DataNode读取(因为有可能minRequiredSources小于sources的数量,即还有剩余的DataNode可以尝试 )
StripedBlockReader failedReader = readers.get(result.index);
failedReader.closeBlockReader();
resultIndex = scheduleNewRead(usedFlag,
reconstructLength, corruptedBlocks);
} else if (result.state == StripingChunkReadResult.TIMEOUT) {
resultIndex = scheduleNewRead(usedFlag,
reconstructLength, corruptedBlocks); // 和上面的Failed的处理策略一样,换一个DataNode读取(因为有可能minRequiredSources小于sources的数量,即还有剩余的DataNode可以尝试 )
}
if (resultIndex >= 0) {
newSuccess[nSuccess++] = resultIndex; // 记录这个成功的读取,如果成功数量足够了,则这一轮结束
if (nSuccess >= minRequiredSources) { // 足够了,剩下的不用再读了
cancelReads(futures.keySet());
clearFuturesAndService();
break; // 成功数量足够了,这一轮读取结束
......
}
在minRequiredSources个StripedBlockReader都完成了一轮读取,可以进行这一轮的Decode了。这是通过方法reconstructTargets()进行的:
private void reconstructTargets(int toReconstructLen) throws IOException {
// 收集所有的StripedBlockReader刚刚读取的数据
ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen);
int[] erasedIndices = stripedWriter.getRealTargetIndices();
ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen);
markBuffers(inputs); // 记录当前的position
decode(inputs, erasedIndices, outputs); // 在这里进行解码,将解码以后的数据写入到outputs中,outputs其实就是StripedBlockWriter中的buf
resetBuffers(inputs); // 还原position
....
getValidator().validate(inputs, erasedIndices, outputs);
}
......
stripedWriter.updateRealTargetBuffers(toReconstructLen);
}
private void decode(ByteBuffer[] inputs, int[] erasedIndices,
ByteBuffer[] outputs) throws IOException {
long start = System.nanoTime();
getDecoder().decode(inputs, erasedIndices, outputs);
long end = System.nanoTime();
this.getDatanode().getMetrics().incrECDecodingTime(end - start);
}
可以看到,reconstructTargets的任务就是从所有的StripedBlockReader的Buffer中拿到刚刚读取的数据,调用decode(),将数据写入到StripedBlockWriter的Buffer中去。写入完成,并且校验成功,剩下的任务就交给StripedBlockWriter将自己的buffer数据发送给远程的targets。
decode的具体算法我们不做讨论,从其参数可以看到,inputs[]就是所有的StripedBlockReader读取的数据,erasedIndices就是Decode以后生成的internal block的index,outputs就是decode生成的数据即StripedBlockWriter的buffer。可以看到,decode()方法可以在一轮生成多个丢失的targets。
如上文所讲,此时计算好的internal block的数据已经存放到了对应的index的StripedBlockWriter的buffer 中去,StripedBlockWriter会通过调用调用transferData2Target()将数据发送到对应的target node上去。
上文已经说个,一个StripedBlockWriter和一个待恢复的internal block以及存放这个internal block的target DataNode是一一对应的关系,并且在StripedBlockWriter初始化的时候,已经构建好了和这个远程的DataNode的socket连接。
这个发送的过程根客户端写数据的过程是一模一样的。从原始数据、到以chunk为单位计算checksum、到组装成packet、到将数据通过socket发送给DataNode的过程都放在方法transferData2Target()中:
-------------------------------------------------StripedBlockWriter----------------------------------------------------
void transferData2Target(byte[] packetBuf) throws IOException {
// 现在数据已经存放在targetBuffer中(有可能是direct,有可能不是direct),如果targetBuffer是direct,那么计算
// checksum的时候存放checksum的ByteBuffer也得是direct,如果不是direct(在heap中),那么checksum也存放在heap中
if (targetBuffer.isDirect()) {
ByteBuffer directCheckSumBuf =
BUFFER_POOL.getBuffer(true, stripedWriter.getChecksumBuf().length);
stripedWriter.getChecksum().calculateChunkedSums(
targetBuffer, directCheckSumBuf); // 对targetBuffer的数据做校验,写入directCheckSumBuf
directCheckSumBuf.get(stripedWriter.getChecksumBuf()); // directCheckSumBuf写入到stripedWriter的buf中去
BUFFER_POOL.putBuffer(directCheckSumBuf); // 归还directCheckSumBuf到BUFFER_POOL中
} else { // 如果targetBuffer不是direct,那么直接基于数组进行计算
stripedWriter.getChecksum().calculateChunkedSums( // 只有在非direct(即存放在heap中)的情况下array()方法才会有返回值
targetBuffer.array(), 0, targetBuffer.remaining(),
stripedWriter.getChecksumBuf(), 0);
}
int ckOff = 0;
while (targetBuffer.remaining() > 0) { // 一个packet可能装不下这么多数据,因此会分多个packet进行发送
DFSPacket packet = new DFSPacket(packetBuf,
stripedWriter.getMaxChunksPerPacket(),
blockOffset4Target, seqNo4Target++,
stripedWriter.getChecksumSize(), false);
int maxBytesToPacket = stripedWriter.getMaxChunksPerPacket() // 单个packet中的chunk数量 * chunk的长度
* stripedWriter.getBytesPerChecksum();
int toWrite = targetBuffer.remaining() > maxBytesToPacket ?
maxBytesToPacket : targetBuffer.remaining(); // 实际发送的数据长度(不包含checksum,checksum此时是单独存放在checksumBuf中的)
int ckLen = ((toWrite - 1) / stripedWriter.getBytesPerChecksum() + 1)
* stripedWriter.getChecksumSize();// checksum的总长度,比如,我们需要10个checkum,每个checksum是4B,那么ckLen就是40Byte
packet.writeChecksum(stripedWriter.getChecksumBuf(), ckOff, ckLen); // 把checksum中的数据存入packet中
ckOff += ckLen;
// 把inBuffer中的数据写入到Packet中去(注意并不是发送,只是写入到Packet对应的buf中)
packet.writeData(targetBuffer, toWrite); // 把存放在targetBuffer中的长度为toWrite的数据存放到packet中
packet.writeTo(targetOutputStream); // 把packet中的数据发送到targetOutputStream绑定的远程 DN中去
......
}
Target接收写入的internal block的过程和普通客户端写入block的过程完全相同,接收完成以后会将块信息汇报给NameNode,然后NameNode的BlockManager会更新对应的块信息。这个过程本文不做详细讲解。有兴趣的读者可以自行查找资料学习。
验证在不同文件大小的情况下的基于复制和基于纠删码的块处理
基于RS-3-2-1024k
,一个cell是1024KB, 一个Stripe中有3个Data Cell和2个Parity Cell。假如一个Logical Block(Block Group)的大小是128MB, 那么一个Block Group写满数据,大概会有128MB / (3 * 1024KB) = 42个Stripe, 每个Internal Block大概42M左右。那么问题来了,假如数据写不满一个Block Group,比如一个文件本身就很小,或者一个大文件的最后一个 Block Group,NameNode怎么进行块的管理的呢 :
由于我的测试环境仅仅由5台测试机且在同一rack上,因此无法基于RS-6-2-1024k进行实验,该Policy明显要求至少6台DataNode并且分布在不同的Rack上。
因此,实验基于RS-3-2-1024k进行。Hadoop的rack-wareness是在NameNode端进行判断的,而不是DataNode进行报告的。参考《Hadoop Rack Awareness》的文档,用户可以自行通过Java、Shell、Python去随心所欲实现hostname到rack的映射关系。Cloudera通过Host -> Rack的映射文件和一个简单的Python脚本实现了Host到Rack的映射,我自己的测试Hadoop环境是开源环境,节约时间,我借用了Cloudera的处理方式,通过一个映射文件和一个Python脚本,将本来实际属于一个Rack的5台机器定义到了5个不同的Rack,让RS-3-2-1024k 能够在我的测试环境运行。
映射文件:
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<topology>
<node name="rccd101-6a.sjc2.dev.com" rack="/rccd101-6a"/>
<node name="10.30.120.121" rack="/rccd101-6a"/>
<node name="rccd101-6b.sjc2.dev.com" rack="/rccd101-6b"/>
<node name="10.30.120.122" rack="/rccd101-6b"/>
<node name="rccd101-6c.sjc2.dev.com" rack="/rccd101-6c"/>
<node name="10.30.120.123" rack="/rccd101-6c"/>
<node name="rccd101-7a.sjc2.dev.com" rack="/rccd101-7a"/>
<node name="10.30.120.125" rack="/rccd101-7a"/>
<node name="rccd101-7b.sjc2.dev.com" rack="/rccd101-7b"/>
<node name="10.30.120.126" rack="/rccd101-7b"/>
</topology>
Python脚本:
------------------------------------------------------topology.py------------------------------------------------------
#!/usr/bin/env python
import os
import sys
import xml.dom.minidom
def main():
MAP_FILE = 'etc/hadoop/topology.map'
DEFAULT_RACK = '/default'
max_elements = 1
map = dict()
mapFile = open(MAP_FILE, 'r')
dom = xml.dom.minidom.parse(mapFile)
for node in dom.getElementsByTagName("node"):
rack = node.getAttribute("rack")
max_elements = max(max_elements, rack.count("/"))
map[node.getAttribute("name")] = node.getAttribute("rack")
default_rack = "".join([ DEFAULT_RACK for _ in xrange(max_elements)])
if len(sys.argv)==1:
print(default_rack)
else:
print(" ".join([map.get(i, default_rack) for i in sys.argv[1:]]))
return 0
if __name__ == "__main__":
sys.exit(main())
在core-site.xml中添加配置:
<property>
<name>net.topology.script.file.name</name>
<value>etc/hadoop/topology.py</value>
</property>
当我们尝试enable一个data unit的数量大于rack数量的hadoop集群中enable一个policy的时候,会抛出异常:
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs ec -enablePolicy -policy RS-10-4-1024k
Erasure coding policy RS-10-4-1024k is enabled
Warning: The cluster setup does not support EC policy RS-10-4-1024k. Reason: The number of DataNodes (5) is less than the minimum required number of DataNodes (14) for the erasure coding policies: RS-10-4-1024k
因此我们在集群中Enable了RS-3-2-1024k
这个Policy,这个Policy只要求集群由3个rack就够了:
# 创建基于纠删码RS(3,2)的目录
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs dfs -mkdir hdfs://olap-hdfs-test/test-RS-3-2-1024k
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs ec -setPolicy -path hdfs://olap-hdfs-test/test-RS-3-2-1024k -policy RS-3-2-1024k
Set RS-3-2-1024k erasure coding policy on hdfs://olap-hdfs-test/test-RS-3-2-1024k
# 创建基于3副本复制的目录
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs dfs -mkdir hdfs://olap-hdfs-test/test-replication
然后我们向这个policy的目录写入一个500KB和一个4000KB的文件:
hadoop@rccd101-6b:/root$ truncate -s 500KB /tmp/500KB.log
hadoop@rccd101-6b:/root$ truncate -s 4000KB /tmp/4000KB.log
hadoop@rccd101-6b:/root$ truncate -s 122MB /tmp/122MB.log
hadoop@rccd101-6b:/root$ truncate -s 420MB /tmp/420MB.log
# 准备好纠删码的测试文件
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs dfs -copyFromLocal /tmp/500KB.log hdfs://olap-hdfs-test/test-RS-3-2-1024k/500KB.log
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs dfs -copyFromLocal /tmp/4000KB.log hdfs://olap-hdfs-test/test-RS-3-2-1024k/4000KB.log
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs dfs -copyFromLocal /tmp/122MB.log hdfs://olap-hdfs-test/test-RS-3-2-1024k/122MB.log
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs dfs -copyFromLocal /tmp/420MB.log hdfs://olap-hdfs-test/test-RS-3-2-1024k/420MB.log
# 准备好基于3副本复制的文件
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs dfs -copyFromLocal /tmp/4000KB.log hdfs://olap-hdfs-test/test-replication/4000KB.log
对于基于纠删码编码的500KB的文件,查看NameNode的日志:
2023-11-26 03:37:49,956 DEBUG StateChange: BLOCK* getAdditionalBlock: /test-RS-3-2-1024k/500KB.log._COPYING_ inodeId 16419 for DFSClient_NONMAPREDUCE_-2082370100_1
2023-11-26 03:37:49,959 DEBUG StateChange: DIR* FSDirectory.addBlock: /test-RS-3-2-1024k/500KB.log._COPYING_ with blk_-9223372036854775568_1018 block is added to the in-memory file system
2023-11-26 03:37:49,959 INFO StateChange: BLOCK* allocate blk_-9223372036854775568_1018, replicas=11.37.76.122:9866, 11.37.76.126:9866, 11.37.76.123:9866, 11.37.76.121:9866, 11.37.76.125:9866 for /test-RS-3-2-1024k/500KB.log._COPYING_
2023-11-26 03:37:49,959 DEBUG StateChange: persistNewBlock: /test-RS-3-2-1024k/500KB.log._COPYING_ with new block blk_-9223372036854775568_1018, current total block count is 1
2023-11-26 03:37:50,057 DEBUG BlockManager: Reported block blk_-9223372036854775568_1018 on 11.37.76.122:9866 size 500000 replicaState = FINALIZED
2023-11-26 03:37:50,057 DEBUG BlockManager: In memory blockUCState = UNDER_CONSTRUCTION
2023-11-26 03:37:50,059 DEBUG BlockManager: Reported block blk_-9223372036854775565_1018 on 11.37.76.121:9866 size 500000 replicaState = FINALIZED
2023-11-26 03:37:50,059 DEBUG BlockManager: In memory blockUCState = UNDER_CONSTRUCTION
2023-11-26 03:37:50,061 DEBUG BlockManager: Reported block blk_-9223372036854775564_1018 on 11.37.76.125:9866 size 500000 replicaState = FINALIZED
从上面的日志我们可以看到:
对于基于纠删码编码的4MB的文件,查看NameNode的日志:
2023-11-26 03:59:02,205 DEBUG StateChange: BLOCK* getAdditionalBlock: /test-RS-3-2-1024k/4000KB.log._COPYING_ inodeId 16420 for DFSClient_NONMAPREDUCE_-856274170_1
2023-11-26 03:59:02,206 DEBUG StateChange: DIR* FSDirectory.addBlock: /test-RS-3-2-1024k/4000KB.log._COPYING_ with blk_-9223372036854775552_1019 block is added to the in-memory file system
2023-11-26 03:59:02,206 INFO StateChange: BLOCK* allocate blk_-9223372036854775552_1019, replicas=11.37.76.122:9866, 11.37.76.125:9866, 11.37.76.121:9866, 11.37.76.123:9866, 11.37.76.126:9866 for /test-RS-3-2-1024k/4000KB.log._COPYING_
2023-11-26 03:59:02,206 DEBUG StateChange: persistNewBlock: /test-RS-3-2-1024k/4000KB.log._COPYING_ with new block blk_-9223372036854775552_1019, current total block count is 1
2023-11-26 03:59:02,455 DEBUG BlockManager: Reported block blk_-9223372036854775552_1019 on 11.37.76.122:9866 size 1902848 replicaState = FINALIZED
2023-11-26 03:59:02,457 DEBUG BlockManager: Reported block blk_-9223372036854775551_1019 on 11.37.76.125:9866 size 1048576 replicaState = FINALIZED
2023-11-26 03:59:02,458 DEBUG BlockManager: Reported block blk_-9223372036854775550_1019 on 11.37.76.121:9866 size 1048576 replicaState = FINALIZED
2023-11-26 03:59:02,460 DEBUG BlockManager: Reported block blk_-9223372036854775549_1019 on 11.37.76.123:9866 size 1902848 replicaState = FINALIZED
2023-11-26 03:59:02,464 DEBUG BlockManager: Reported block blk_-9223372036854775548_1019 on 11.37.76.126:9866 size 1902848 replicaState = FINALIZED
可以看到:
对于文件大小超过128MB * 3 = 402MB的文件,我们写入纠删码目录:
2023-11-26 04:36:45,130 DEBUG StateChange: BLOCK* getAdditionalBlock: /test-RS-3-2-1024k/420MB.log._COPYING_ inodeId 16427 for DFSClient_NONMAPREDUCE_185912574_1
2023-11-26 04:36:45,131 DEBUG StateChange: DIR* FSDirectory.addBlock: /test-RS-3-2-1024k/420MB.log._COPYING_ with blk_-9223372036854775472_1025 block is added to the in-memory file system
2023-11-26 04:36:45,131 INFO StateChange: BLOCK* allocate blk_-9223372036854775472_1025, replicas=11.37.33.122:9866, 11.37.33.125:9866, 11.37.33.123:9866, 11.37.33.126:9866, 11.37.33.121:9866 for /test-RS-3-2-1024k/420MB.log._COPYING_
2023-11-26 04:36:45,131 DEBUG StateChange: persistNewBlock: /test-RS-3-2-1024k/420MB.log._COPYING_ with new block blk_-9223372036854775472_1025, current total block count is 1
2023-11-26 04:36:47,317 DEBUG StateChange: BLOCK* getAdditionalBlock: /test-RS-3-2-1024k/420MB.log._COPYING_ inodeId 16427 for DFSClient_NONMAPREDUCE_185912574_1
2023-11-26 04:36:47,318 DEBUG StateChange: DIR* FSDirectory.addBlock: /test-RS-3-2-1024k/420MB.log._COPYING_ with blk_-9223372036854775456_1026 block is added to the in-memory file system
2023-11-26 04:36:47,319 INFO StateChange: BLOCK* allocate blk_-9223372036854775456_1026, replicas=11.37.33.122:9866, 11.37.33.125:9866, 11.37.33.121:9866, 11.37.33.126:9866, 11.37.33.123:9866 for /test-RS-3-2-1024k/420MB.log._COPYING_
2023-11-26 04:36:47,319 DEBUG StateChange: persistNewBlock: /test-RS-3-2-1024k/420MB.log._COPYING_ with new block blk_-9223372036854775456_1026, current total block count is 2
可以看到,客户端一共向NameNode申请了两次Block,这是因为文件的大小为420MB,由于HDFS默认的块大小是128MB(通过dfs.block.size
配置),这个配置对于基于3副本复制的情况很好理解,就是一个replica的大小最大为128MB,但是对于纠删码,它指的并不是一个Block Group的最大大小,而是一个Internal Block的最大大小。因此,在RS(3,2)的情况下,一个Block Group最大为128MB*3 = 402MB,所以一个420MB的文件就需要客户端两次向NameNode申请Block,这里申请的Block是Block Group。
对于基于3副本复制的500KB的文件,我们看看文件的size小于一个Block(默认128MB)的情况,查看NameNode的日志:
2023-11-26 04:09:41,604 DEBUG StateChange: DIR* FSDirectory.addBlock: /test-replication/500KB.log._COPYING_ with blk_1073741826_1020 block is added to the in-memory file system
2023-11-26 04:09:41,604 INFO StateChange: BLOCK* allocate blk_1073741826_1020, replicas=11.37.33.122:9866, 11.37.33.123:9866, 11.37.33.126:9866 for /test-replication/500KB.log._COPYING_
2023-11-26 04:09:41,605 DEBUG StateChange: persistNewBlock: /test-replication/500KB.log._COPYING_ with new block blk_1073741826_1020, current total block count is 1
2023-11-26 04:09:41,705 DEBUG BlockManager: Reported block blk_1073741826_1020 on 11.37.33.126:9866 size 500000 replicaState = FINALIZED
2023-11-26 04:09:41,707 DEBUG BlockManager: Reported block blk_1073741826_1020 on 11.37.33.123:9866 size 500000 replicaState = FINALIZED
2023-11-26 04:09:41,708 DEBUG BlockManager: Reported block blk_1073741826_1020 on 11.37.33.122:9866 size 500000 replicaState = FINALIZED
可以看到,在基于副本复制的存储策略下,如果文件大小小于128MB的块大小,那么实际的块的大小是以文件大小为准的,而不是固定的128MB
https://issues.apache.org/jira/browse/HDFS-8734
Understanding the Performance of Erasure Codes in Hadoop Distributed File System
Introduction to HDFS Erasure Coding in Apache Hadoop
HDFS Erasure Coding
Hadoop3.2.1 【 HDFS 】源码分析 : BPOfferService 解析
Rack Awareness
HDFS Erasure Coding
Hadoop3.2.1 【 HDFS 】源码分析 :BlockManager解析
blockGroup的getNumBytes到底是128MB
还是128MB * 6
? 即通过addBlock()申请一个group的时候返回的size,倒是是128MB
还是128MB * 6
?
假如是128MB, 在客户端addBlock申请了一个block group以后,对internal block的大小按照data block的数量进行了切分StripedBlockUtil.parseStripedBlockGroup
,那么一个internal block是128MB/6? 这跟Cloudera的官方文档的图一致,但是从代码里面看,下面代码中的blockSize就是创建文件的时候根据配置文件中配置的block size申请以后返回的,应该是128MB,所以怎么解释currentBlockGroup.getNumBytes() == blockSize * numDataBlocks;
?如果下面的blockSize=128MB, 那就意味着一个block group的大小应该是128MB * 6 ? 可是blockGroup在进行addBlock()申请的时候返回的值是128MB才对。
如果
private boolean shouldEndBlockGroup() {
return currentBlockGroup != null &&
currentBlockGroup.getNumBytes() == blockSize * numDataBlocks;
}
把一个基于纠删码的文件copy到一个基于副本副本复制的目录下,会修改块布局方式吗?
创建文件的时候, ecPolicyName传入的是啥?如果是null,NameNode怎么设置所创建的文件的ECPolicy?