HDFS的EC Coding(纠删码)和块管理(WIP)

发布时间:2024年01月22日

文章目录

前言

基于我们常见的副本复制机制,HDFS 会将每个replica复制到多个rack和host上。 复制(replication)提供了一种简单而强大的冗余形式,可以防止大多数故障情况。这种副本策略简单而直接:即然我们怕数据丢失,那就把相同的数据复制多份,放到不同的地方。

但是这种通过复制来实现数据安全的方式显然会带来额外的存储空间开销。比如,HDFS默认的3副本存储策略,会带来200%的额外存储开销,在写数据的时候也增加了额外的200%的写流量。而这部分额外的开销仅仅是在极少发生的某个副本丢失的情况下才会发生价值。多副本的放置策略带来的数据本地访问的收益我认为也非常有限和牵强,很多情况下,读取数据的客户端和数据根本不运行在一个集群,即使在一个集群(比如NM和DN伴随部署),假如集群规模很大,客户端和数据块在一起的概率也不大。

一个自然的改进是使用纠删码(Easure Coding)代替复制,它使用更少的存储空间,同时仍然提供相同级别的容错能力。 在典型配置下,与 3x 复制相比,EC 可以将存储成本降低约 50%。 纠删码是Cloudera 和英特尔的工程师与更广泛的 Apache Hadoop 社区合作,推动了 HDFS-7285下的 HDFS-EC 项目。 目前,Hadoop 3.0 开始支持纠删吗。

但EC也绝非完胜Replication,因为本质上EC是用计算换存储,即,虽然实现了更小的存储空间,但是需要用更多的计算资源,比如数据写入的时候的额外计算,以及数据恢复时候的额外计算。

在阅读跟块管理相关的代码时,一个典型的感觉是混乱

  1. NameNode代码中关于Block和Replica的定义不够清楚,很多时候相互混用。举个很简单的例子,我们最常说的Data的Block Report(块汇报),其实,DataNode汇报的并不是Block,而是Block中的一个Replica而已,比如基于复制的连续布局,DataNode汇报的仅仅是一个副本(replica),在纠删码中的条带布局中,一个DataNode汇报的仅仅是一个Internal Block(replica)。
  2. 有了EC和条带布局以后,逻辑块和物理块的定义不在清晰,很多类比如BlockInfo, Block在这里用来表示逻辑块,在那里用来表示物理块。
    这些混乱给跟块相关的代码的理解带来了很多困难。在本文中,我力求精确,约定如下:
连续布局条带布局
Block(块)三个相同物理块的副本集合一个逻辑块,包含了多个不同的物理块
Replica(副本)存储在某台机器的单个物理副本一个物理块(也叫Internal Block或者Physical Block),或者是数据物理块data block,或者是校验物理块parity block
Logical Block(逻辑块)逻辑块和物理块含义相同一个逻辑块,包含了多个不同的物理块
Internal/Physical Block(物理块/内部块)逻辑块和物理块含义相同一个物理块,有可能是数据物理块data block,有可能是校验物理块parity block

本文依然按照从上到下的原则,先讲解纠删码的基本原理,再讲解纠删码在Hadoop中的设计和实现。

这篇文章目前还处于草稿状态,没怎么校对,有时间会校对然后不断修改更新。

1. 纠删码的基本介绍以及和基于复制的方案的对比

1.1 纠删码简介以及和传统副本复制方式的对比

在Cloudera的文章Introduction to HDFS Erasure Coding in Apache Hadoop中对纠删码有很详细的上层介绍。本文有一小部分内容来源于该文章,如有侵权,我会立刻删除。
在我们比较不同的数据存储方案的时候,有两个考虑因素,数据持久性(这种存储方案可以容忍怎样的数据副本失效而保证数据依然不丢失),数据的使用效率(去重以后的有效数据占总存储数据的比值)。比如,HDFS的默认3副本存储方案,其数据持久是可以容忍最多两副本的丢失,同时,数据的使用效率是1/3,即虽然存储了三副本,但是有两份副本都作为备份。
基于纠删码的存储策略,就是为了在保证数据持久性不变的情况下,提高了数据的使用效率。但是也如上文介绍,引入了更多的CPU开销。

最简单形式的纠删码基于异或 (XOR)运算,如表-1 所示的亦或关系表

XYZ
011
000
101
110
表-1 亦或运算关系表

亦或运算可交换的,比如X ⊕ Y = Y ⊕ X。亦或运算也是可任意组合的: X ⊕ Y ⊕ Z = (X ⊕ Y) ⊕ Z。 这意味着 XOR 可以生成 1 来自任意数量的数据位的奇偶校验位。 例如,1 ⊕ 0 ⊕ 1 ⊕ 1 = 1,当第三位丢失时,可以通过对剩余数据位{1,0,1}和奇偶校验位1进行异或来恢复。而异或可以取任意数量的数据单元作为输入,它是非常有限的,因为它最多只能产生一个奇偶校验单元。 因此,基于 XOR 的编码最多可以容忍一个数据丢失,数据的使用效率为 n-1/n(一组 n 个总单元有 n-1 个数据单元),但对于像 HDFS 这样需要容忍多个单元的丢失的系统来说是不够的

EC 的另一种形式 Reed-Solomon (RS) 解决了这一限制。 RS 使用复杂的线性代数运算来生成多个奇偶校验单元,因此可以容忍每组多个故障。 这使其成为生产存储系统的常见选择。 RS 可通过两个参数进行配置:k 和 m。 如图 1 所示,RS(k,m) 的工作原理是将 k 个数据单元的向量与生成矩阵 (GT) 相乘,以生成具有 k 个数据单元和 m 个奇偶校验单元的扩展码字向量。 只要 (k + m) 个单元中的 k 个可用,就可以通过将幸存的单元(数据单元和校验单元)乘以 GT 的转置矩阵来恢复存储故障。 (GT 中对应于故障单元的行应在取其倒数之前删除。)这意味着该组可以容忍任何 m 个单元的故障。


图 1:具有四个数据单元和两个奇偶校验单元的 Reed-Solomon 编码

通过Reed-Solomon,用户可以通过选择不同的k和m值来灵活调整数据持久性和存储成本。 奇偶校验单元的数量 (m) 决定了可以容忍的同时存储故障的数量。 数据单元与奇偶校验单元的比率决定了存储效率, 即 k / k + m


图 1:具有四个数据单元和两个奇偶校验单元的 Reed-Solomon 编码

图 2:各种存储方案的数据持久性和存储效率对比

从图-2可以看到,我们和最常用的基于副本复制的3副本存储策略对比,RS(6,3)和RS(10,4)均表现出更好的数据持久性和存储效率,但是这是以更高的CPU消耗为代价的。我们在讲Hadoop的具体实现的时候会详细讲解。


图 3:RAID 5和RAID 6的EC校验示意图

EC 长期以来一直用于本地存储系统,特别是以 RAID-5 和 RAID-6 的形式。 RAID-5 通常使用 XOR 编码,因为它只需要容忍单个磁盘故障,而 RAID-6 使用带有两个奇偶校验单元的 Reed-Solomon 来容忍最多两次故障。 单元大小通常是可配置的,每个磁盘上具有相同偏移量的单元形成的纠删码组。

1.2 纠删码的存储布局–连续还是条带?

和我们的磁盘用扇区去划分存储结构一样,为了管理大小不一的各种文件,分布式存储系统通常将文件划分为固定大小的逻辑字节范围(称为逻辑块),然后这些逻辑块被映射到集群上的存储块,这反映了集群上数据的物理布局。

逻辑块和存储块之间最简单的映射是连续块布局,它将每个逻辑块一对一地映射到存储块。 读取具有连续块布局的文件就像按顺序线性读取每个存储块一样简单。因此,对于连续块布局,我们甚至都不需要区分逻辑块和物理块,他们都代表一个文件的一块连续字节范围。

相比之下,条带块布局将逻辑块分解为更小的存储单元(通常称为Cell),并在一组存储块中循环写入单元条带(Stripe)。 读取具有条带布局的文件需要查询逻辑块的存储块集合,然后从该存储块集合读取单元条带。
块布局方案(连续与条带)和块冗余形式(复制与 纠删码)是两个正交维度,形成了四种排列组合形式。尽管在HDFS的使用场景下,连续布局被用在基于复制的块冗余形式,而条带布局用在了纠删码中,但是,他们其实可以随意正交。
些系统(包括 Ceph 和 QFS)支持在每个目录或每个文件的基础上配置布局和/或冗余。
本节讨论如何在两种块布局上支持 EC。
下图显示了HDFS常用了基于复制的冗余方式下,使用连续布局:

图 4:基于复制的冗余方式下,使用连续布局/div>

下图显示了HDFS常用了基于复制的冗余方式下,使用条带布局:

图 5:基于复制的冗余方式下,使用条带布局/div>

下图显示了纠删码冗余形式下的连续布局和条带布局:

图 6:基于复制的冗余方式下,使用条带布局/div>

对于 HDFS-EC,应该选择哪种布局方式? 连续布局更容易实现,因为读写路径仍然与当前具有复制的系统非常相似。 然而,它仅适用于文件非常大的情况,因为只有在写入完整条带时才能实现全部成本节省。 例如,对于 RS (10,4),只有一个 128MB 数据块的条带最终仍会写入四个 128MB 奇偶校验块,存储开销为 400%(比 3 路复制更糟糕)。 连续布局方式下,客户端不适合进行EC校验码的计算,因为这需要将GB级别的校验块load到内存计算校验码。

另一方面,采用条带布局的纠删码可以实现小文件和大文件的存储节省,因为单元(Cell)大小要小得多(通常为 64KB 或 1MB, HDFS默认为1MB)。 这种整体较小的组大小还支持在线 EC,其中客户端直接写入纠删码数据,因为计算奇偶校验信息只需要几兆字节的缓冲。 但是也有一个缺点,一些对本地化很敏感的工作场景会收到影响,因为一个文件的连续部分总是会分散到不同的机器上取,而不是向连续布局一样,一个大的Block只会写到一台机器上。 为了更好地服务此类工作负载,可以将条带文件转换为连续布局,但这几乎需要重写整个文件。

基于以上考虑,HDFS中基于复制的块冗余策略使用了连续布局,基于纠删码的块冗余策略使用了条带布局,而不允许两种冗余策略和两种布局方式可以随意正交。

连续块布局的假设广泛而深入地嵌入到 HDFS 内部逻辑中,为了在这种情况下让HDFS支持基于纠删码的条带布局的块冗余形式,需要定义一个统一的块的概念,即严格讲逻辑块和物理块分开 。 为了支持条带布局,逻辑块的概念必须与存储块的概念分开。 前者表示文件中的逻辑字节范围,后者是DataNode上存储的数据块的基本单位。 图 -7 演示了逻辑块和存储块的概念。 在示例中,文件 /tmp/foo 在逻辑上分为 13 个条带单元(cell_0 到 cell_12)。 逻辑块0代表单元0~8的逻辑字节范围,逻辑块1代表单元9~12。 Cell 0、3、6 形成一个物理存储块,它将作为单个数据块存储在 DataNode 上。 为了简洁起见,该图不包括奇偶校验块/单元:

图 7:逻辑块和物理块映射关系/div>

支持这种泛化的一个简单机制是 HDFS NameNode 监视其块映射中的每个存储块,该块映射从块 ID 映射到相应的块,然后使用另一个映射从逻辑块到其成员存储块。 然而,这意味着小文件将在 NameNode 上产生大量内存开销,因为条带化会产生比复制更多的存储块。

为了减少这种开销,我们引入了一种新的分层块命名协议。 目前HDFS根据块创建时间顺序分配块ID。 相反,该协议将每个块 ID 分为 2~3 个部分,如图 7 所示。每个块 ID 以指示其布局的标志开头(连续 = 0,条带 = 1)。 对于条带块来说,ID的其余部分由两部分组成:中间部分是逻辑块的ID,尾部分表示逻辑块中存储块的索引。 这允许 NameNode 将逻辑块作为其存储块的摘要进行管理。 存储块ID可以通过屏蔽索引映射到其逻辑块; 当NameNode处理DataNode块报告时,这是必需的。

2. 纠删码在Hadoop中的具体实现

在讲纠删码的时候,基于复制的块冗余都使用3副本场景来举例,而纠删码都用RS(6,2)来举例。
在讲解纠删码的具体实现以前,我们先讲解HDFS中怎么维护Block、Replica、DataNode之间复杂的关联关系的。基于对这些信息的理解,我们再去理解纠删码的相关内容。

Hadoop的Block管理的基本逻辑和数据结构

从基本直觉出发,我们可以想到,hadoop的块管理,需要管理以下的信息和提供以下的功能:

  • Block以及Block -> Replica的对应: 整个集群的所有的Block的信息,以及Block中每一个Replica的信息。这个信息在Hadoop中封装成为BlockInfo对象,根据Block是连续布局还是条带布局,其具体实现类分为了BlockInfoContinuous和BlockInfoStriped。
  • Replica的存储位置: 对于Replica,我们显然需要知道replica存放在哪个DataNode了,这个存放的位置在Hadoop中用DatanodeStorageInfo对象表示,即不仅仅精确到DataNode,还精确到了DataNode的某个Storage的层次,因为有可能一个DataNode支持多种不同的存放,disk,ram或者flash等。
  • 对Block信息的快速查找: 拿到一个Block的ID,我们可以立刻找到这个Block的BlockInfo。显然,这里需要一个类似Hash的存储结构,比如Set或者Map。在Hadoop中,这个Hash的结构被封装在BlocksMap对象中。
  • DataNode到Replica的一对多信息:对于一个DatanodeStorageInfo,我们可以知道上面的所有的replica。显然,这里似乎需要一个链表的结构。这里并不需要一个hash的结构,因为如果我们需要知道一个Replica是否存在于某个DataNode上,其实只需要通过BlocksMap来查找到BlockInfo,进而知道其所有replica的DatanodeStorageInfo的信息
  • Replica的快速增删:对于一个replica的增加和删除(都是由某一个DataNode的快汇报产生),我们需要在它对应的BlockInfo中将replica的信息删除,同时,需要在DatanodeStorageInfo中将挂载的这个Replica删除。

BlockInfo的基本结构

BlockInfo封装的是一个Block的基本信息,比如这个Block属于哪个BlockPool,副本数多少(只有连续布局方式有副本数的概念,条带布局没有这个参数),还有下文将要讲到的BlockUnderConstructionFeature,代表这个Block当前正处于构造(正在被写入)的临时状态,以及最重要的,这个Block的所有Replica信息,这个信息存放在一个叫做triplets的数组中。其基本结构如下图所示。从triplets的名字可以看出来(triplet在英文中是三胞胎/三联体的意思),triplets是一个三元组的集合,这里的每个三元组代表了一个replica的相关信息,对于第i个replica,triplets[3 * i]存放的是这个replica i 的DatanodeStorageInfo信息,triplets[3*i + 1]存放的是这个replica所在的DatanodeStorageInfo的Replica链表中的前一个Block的BlockInfo,triplets[3*i + 2]存放的是这个replica所在的DatanodeStorageInfo的Replica链表中的后一个Block的BlockInfo。
请注意这里的的含义,这里的,不是指这个Replica所在的Block的前一个Replica或者后一个Replica,也不是指整个HDFS集群中存在一个Replica的全局链表的前一个或者后一个Block,而是指这个Replica所在的DatanodeStorageInfo对应的Replica链表的前一个或者后一个replica。我发现很多HDFS的书籍上都并未对这一点说清楚,导致读者不看代码根本不知道其真实含义,只是人云亦云地知道是
所以这个triplets数组中的每一个triplet有三个元素,整个triplets数组其实是一个一维数组,所以整个triplets的大小就是3 * baseSize,这个baseSize对于连续布局来说,就是副本数量,比如系统配置的文件副本数是5,那么某个BlockInfo.triplets[]数组的长度就是3 * 5 = 15,对于纠删码来说,就是一个BlockGroup中的internal block数量, 比如对于RS(6,2),就是8,tripelets[]数组的长度就是3 * 8 = 24
通过这样的一个3 * baseSize的数组,实际上形成了连接BlockInfo的一个双向链表

public abstract class BlockInfo extends Block
    implements LightWeightGSet.LinkedElement {

  private short replication; // 副本数量
  private volatile long bcId; // 这个block所属于的block collection,比如一个INodeFile就是一个BlockCollection
  private LightWeightGSet.LinkedElement nextLinkedElement;
  protected Object[] triplets; // 存放replica信息的三元组

下图是这个triplets数组的示意图。为了方便起见,我们将一维的triplets数组变成一个baseSize * 3 的二维数组:
在这里插入图片描述

DatanodeStorageInfo的基本结构

基本信息介绍

在NameNode端,使用DatanodeDescriptor来表示一个DataNode的相关信息,包括一些静态信息(静态信息存放在它的父类DatanodeInfo中)和一些动态信息比如不断变化的块信息。
DatanodeStorageInfo是NameNode端用来代表DataNode端的一块数据存储路径(比如某一个Volume),显然,这个存储介质具有特定的唯一ID,存储介质类型,容量,当前的使用率等等信息。在DataNode端与之对等的类是DatanodeStorage。显然,DatanodeStorageInfo和DatanodeDescriptor是多对一的关系。类关系图如下:
在这里插入图片描述

----------------------------------------------DatanodeStorageInfo----------------------------------------------
  // 一个DataNodeStorageInfo指的是某一台DN的某个storage
  private final DatanodeDescriptor dn;
  private final String storageID;
  private StorageType storageType;
  private State state;

  private long capacity;
  private long dfsUsed;
  private long nonDfsUsed;
  private volatile long remaining;
  private long blockPoolUsed;
    // 这个虽然叫blockList,但是由于并不是用Java的Collection集合,而是自己维护的数组,因此blockList只是这个数组的header
  private volatile BlockInfo blockList = null;

为了表达一个DatanodeStorage上挂载的Replica的信息,DatanodeStorage使用blockList保存了这个链表的head节点。链表之间的链接关系通过BlockInfo内部的对象引用来实现。我们下文讲BlockInfo的时候会讲到。
总之, DatanodeStorageInfo保存了这个 Storage上的Replica链表的头结点,用以维护这个Datanode上所有的Replica信息,方便遍历。比如,我们需要将一个DataNode进入maintenance状态,这意味着这个DataNode上的所有DatanodeStorageInfo上的所有的replica都需要进行进入maintenance前的处理,比如通过复制保证副本数等。

遍历这个DatanodeStoragreInfo上的所有Replica
  class BlockIterator implements Iterator<BlockInfo> {
    private BlockInfo current;

    BlockIterator(BlockInfo head) {
      this.current = head;
    }

    public boolean hasNext() {
      return current != null;
    }

    public BlockInfo next() {
      BlockInfo res = current;
      current =
          current.getNext(current.findStorageInfo(DatanodeStorageInfo.this));
      return res;
    }

    public void remove() {
      throw new UnsupportedOperationException("Sorry. can't remove.");
    }
  }

我们从next()方法可以看到,寻找当前的BlockInfo的下一个节点是通过调用当前节点的BlockInfo的findStorageInfo来实现的。
如果我们需要遍历一个Datanode上的所有Replica呢?其实只需要遍历这个 DatanodeDescriptor上的所有DatanodeStorageInfo,然后根据DatanodeStorageInfo.BlockIterator来进行遍历就行了。因此DatanodeDescriptor.BlockIterator是对DatanodeStorageInfo.BlockIterator的封装。这里不再赘述。

BlockInfo和DatanodeStorageInfo中块数据结构的动态维护

这里的块信息其实并不准确,应该是Replica的信息的维护。块的整体的增删是发生在客户端写文件或者删除文件导致的块的增加和删除。
这里的Replica信息的变化发生在DataNode 的blockReport时,NameNode会根据BlockReport的信息,对Replica或者Block进行同步的增删等操作。必须再次强调,这里的BlockReport,其实表达并不准确,DataNode汇报的并不是 Block,而是Replica。
NameNode收到新增Replica的 blockReport,会通过BlockManager.addStoredBlock()来将这个Replica添加到其属于的Block的BlockInfo中去。而这个BlockInfo对应的整个Block,是在客户端写数据过程中在NameNode端创建好的,只不过当时这个Block还没有收到任何的DataNode的块汇报。
这是通过调用DatanodeStorageInfo的insertToList方法来实现的。

--------------------------------------------------DatanodeStorageInfo-------------------------------------------------
public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
    .......
    // 把BlockInfo b 添加到this(DataNodeStorageInfo中去)
    b.addStorage(this, reportedBlock); // 先更新这个Replica的Storage信息
    insertToList(b);// 设置这个block在this(当前的这个DataNodeStorageInfo)中的信息
    ......
  }

从上面的代码看到,会首先通过b.addStore()将这个Replica添加到其所属的Block对应的BlockInfo中去,代表这个Block增加了一个Replica。
addStorage()的代码如下所示,显然, addStorage的职责仅仅是将这个Block新汇报的 Replica放到BlockInfo的triplets中去,并设置了这个Replica的DatanodeStorageInfo信息,但是,还没有设置任何的前向(pre)或者后向(next)的任何信息:

----------------------------------------------BlockInfo---------------------------------------------------------------
  // reportedBlock是要进行插入的replica
  boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
    // find the last null node
    int lastNode = ensureCapacity(1); // 可插入位置的三元组的第一个位置,这个位置存放这个block的DatanodeStorageInfo信息
    setStorageInfo(lastNode, storage); // 把lastnode设置为这个Storage,代表我设置了当前这个BlockInfoContinuous的存储信息
    setNext(lastNode, null); // 先暂时不设置previous和next的信息
    setPrevious(lastNode, null);
    return true;
  }

在BlockInfo中设置好了replica的位置信息以后,就要维护DatanodeStorageInfo中的BlockInfo的链表信息,这是通过调用DatanodeStorageInfo.insertToList()实现的:

--------------------------------------------------DatanodeStorageInfo-------------------------------------------------
  /**
   * this指的是当前的DatanodeStorageInfo, b是新的replica所对应的BlockInfo, blockList是this.blockList,即
   * 当前这个DatanodeStorageInfo上的BlockInfo链表的头结点
   * @param b
   */
  public void insertToList(BlockInfo b) {
    blockList = b.listInsert(blockList, this); // 把当前节点b插入到DatanodeStorageInfo(this)的头结点blockList所代表的链表的头部
    numBlocks++;
  }

insertToList()会将当前新汇报的这个Replica所属的BlockBlockInfo插入到DatanodeStorageInfo的链表的头部,因此原来的头结点blockList将会更新为当前新插入的节点(参数b)。这是通过调用BlockInfo.listInsert()来实现的:

----------------------------------------------BlockInfo---------------------------------------------------------------
   /*
   * 根据当前的这个this(block)所在的DataNodeBlockInfo中维护的BlockInfoContinuous链表的头节点head,
   * 把this(block)的pre和next插入进去。显然,this(block)会成为旧的head的pre节点,旧的head是this(block)的next
   */
  BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
    // 在前面已经通过setStorage设置了这个BlockInfo的对应的Replica的storage了,因此肯定可以找到这个DatanodeStorageInfo
    int dnIndex = this.findStorageInfo(storage);
    this.setPrevious(dnIndex, null);
    this.setNext(dnIndex, head); // next节点就是head, head的pre节点就是自己
    if (head != null) {
      head.setPrevious(head.findStorageInfo(storage), this);// 把head节点的pre设置为当前的BlockInfoContinous节点
    }
    return this;
  }
 // 将BlockInfo to设置为当前的BlockInfo(this)的pre节点
  BlockInfo setPrevious(int index, BlockInfo to) {
    BlockInfo info = (BlockInfo) triplets[index*3+1];
    triplets[index*3+1] = to;
    return info;
  }
  // 将BlockInfo to设置为当前的BlockInfo(this)的next节点
  BlockInfo setNext(int index, BlockInfo to) {
    BlockInfo info = (BlockInfo) triplets[index*3+2];
    triplets[index*3+2] = to;
    return info;
  }

其基本流程如下图所示:
在这里插入图片描述
至于删除一个Replica,道理相同,本质上也是通过每一个DatanodeStorageInfo挂载的BlockInfo以及更新每一个BlockInfo的triplets[]数组来表达的双向链表关系,来维护前向和后缀指针,这里不做赘述。

BlocksMap的基本结构和动态管理

BlocksMap是一个对象,其成员变量blocks是一个GSet,这个GSet中的元素是一个个的BlockInfo, GSet其一个基于链地址法处理hash冲突的一个hash表,用来基于hash的方式查找BlockInfo。

----------------------------------------------------BlocksMap-----------------------------------------------------
class BlocksMap {
  ......
  /** Constant {@link LightWeightGSet} capacity. */
  private final int capacity;
  
  private GSet<Block, BlockInfo> blocks; // 这是一个Set,一个BlockInfo的HashSet,不是一个map
  1. 获取一个Block(Replica)的BlockInfo
    比如,根据一个Block查找一个BlockInfo(这个典型的情况是DataNode汇报上来一个Replica,需要将这个 Replica存放到对应的 Block的BlockInfo中),这里寻址显然需要依赖BlockInfo.hashCode()方法,这里BlockInfo的hashCode()返回值就是blockId。
----------------------------------------------------BlocksMap-----------------------------------------------------
  /** Returns the block object if it exists in the map. */
  BlockInfo getStoredBlock(Block b) {
    return blocks.get(b);
  }
  1. 获取一个Block的Location信息,即这个Block的所有的Replica的DatanodeStorageInfo信息,其实就是根据这个Block查找到对应的BlockInfo,然后在这个BlockInfo的triplets[]中逐个遍历replica的DatanodeStorageInfo信息
----------------------------------------------------BlocksMap-----------------------------------------------------
  Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
    return new Iterable<DatanodeStorageInfo>() {
      @Override
      public Iterator<DatanodeStorageInfo> iterator() {
        return new StorageIterator(storedBlock);
      }
    };
  }
  
  public static class StorageIterator implements Iterator<DatanodeStorageInfo> {
    private final BlockInfo blockInfo;
    private int nextIdx = 0;

    StorageIterator(BlockInfo blkInfo) {
      this.blockInfo = blkInfo;
    }

    @Override
    public boolean hasNext() {
      if (blockInfo == null) {
        return false;
      }
      while (nextIdx < blockInfo.getCapacity() &&
          blockInfo.getDatanode(nextIdx) == null) {
        // note that for striped blocks there may be null in the triplets
        nextIdx++;
      }
      return nextIdx < blockInfo.getCapacity();
    }
  1. 添加或者删除Block

这里的添加block显然不是添加Replica,而是添加一个Block的元数据信息,它不是datanode的块汇报导致的,而是客户端在写文件的过程中添加一个block或者客户端删除一个文件导致block被删除导致的。
添加Block是通过addBlockCollection()来进行的,因为一个Block肯定是属于一个BlockCollection的:

----------------------------------------------------BlocksMap-----------------------------------------------------
  BlockInfo addBlockCollection(BlockInfo b, BlockCollection bc) {
    BlockInfo info = blocks.get(b);
    if (info != b) { // 如果找不到(info==null)或者不一致
      info = b;
      blocks.put(info);
      incrementBlockStat(info);
    }
    info.setBlockCollectionId(bc.getId());
    return info;
  }

删除Block是通过removeBlock()来进行的,除了要将BlocksMap中Block的信息删除,还要检查这个Block的所有Replica对应的 DatanodeStorageInfo,将这些replica从对应的DatanodeStorageInfo的块链表中清除:

----------------------------------------------------BlocksMap-----------------------------------------------------
 void removeBlock(BlockInfo block) {
    BlockInfo blockInfo = blocks.remove(block); // 从BlocksMap中删除
    final int size = blockInfo.isStriped() ?
        blockInfo.getCapacity() : blockInfo.numNodes();
    for(int idx = size - 1; idx >= 0; idx--) { //将这个block的每一个replica从其DatanodeStorageInfo中清除
      DatanodeDescriptor dn = blockInfo.getDatanode(idx);
      if (dn != null) {
        removeBlock(dn, blockInfo); // remove from the list and wipe the location
      }
    }
  }

2.1 Hadoop对纠删码在用户层的支持

在Hadoop中,纠删码的Policy定义为ErasureCodingPolicy类:

public final class ErasureCodingPolicy implements Serializable {
  private final String name; // policy的名字
  private final ECSchema schema; // 纠删码的模式定义,包含了纠删码的名字,
  private final int cellSize; // cell的大小
  private final byte id;
public final class ECSchema implements Serializable 

  /**
   * The erasure codec name associated.
   */
  private final String codecName; // schema的名字,比如基于RS的schema的名字是rs,基于xor的schema的名字是xor

  /**
   * Number of source data units coded
   */
  private final int numDataUnits; // 数据部分的单元数量,比如RS(6,2), numDataUnits=6, numParityUnits=2

  /**
   * Number of parity units generated in a coding
   */
  private final int numParityUnits; // 校验部分的数量

HDFS在启动的时候,会默认加载多个已经默认支持的ECPolicy,这些Policy定义在SystemErasureCodingPolicies中:
RS-3-2-1024k, RS-6-3-1024k, RS-10-4-1024k, RS-LEGACY-6-3-1024k, XOR-2-1-1024k
从Policy的名字就能看出其含义:{编码方式,比如基于RS还是XOR}-{data block的数量}-{parity block的数量}-{一个Cell的大小}

。用户通过官方文档https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html去进行EC相关的操作,比如

  • 设定一个目录基于纠删码进行副本管理,而不是基于复制进行,那么这个目录下面新创建的文件都会基于父目录的ECPolicy进行存储,但是当一个目录下面已经存放了基于复制进行副本管理文件,这时候我们设定这些文件的父目录,不会改变已有文件的布局方式,因为这将引起大量的块的拷贝等操作。
  • 修改文件的布局方式,或者将一个基于纠删码的文件拷贝到另外一个基于复制副本的目录下,都将引发文件背后的block的大量拷贝
  • rename(move)一个文件(比如从一个基于纠删码的目录移动到一个基于多副本复制的目录下),不会修改文件的块布局方式,否则一个简单的move操作会带来block的大量拷贝

显然,由于纠删码本身的特殊块布局方式,导致有些情况无法再像基于复制的方式一样支持:

  • 对纠删码文件执行append() 和truncate() 将抛出IOException。
  • 如果文件与不同的纠删码策略或复制的文件混合在一起,concat() 将抛出 IOException。
  • setReplication() 对纠删码文件不执行任何操作。
  • DFSStripedOutputStream 上的 hflush() 和 hsync() 是无操作的。 因此,在纠删码文件上调用 hflush() 或 hsync() 不能保证数据持久。客户端可以使用StreamCapability API查询OutputStream是否支持hflush()和hsync()。 如果客户端希望通过 hflush() 和 hsync() 实现数据持久化,当前的补救措施是在非纠删码目录中创建常规 3x 复制文件,或者使用 FSDataOutputStreamBuilder#replicate() API 在以下位置创建 3x 复制文件: 纠删码目录。

2.2 条带布局和连续布局的BlockID的分配和管理

早期没有条带布局方式时,Logical Block和Internal Block是一一对应,NameNode端的BlockID很好分配,自增即可。但是有了条带布局,Logical Block和Internal Block不再一一对应,因此,为了对这两种情况进行统一管理,HDFS统一了BlockID的分配方式, 一种新的分层块命名协议。 目前HDFS根据块创建时间顺序分配块ID。 相反,该协议将每个块 ID 分为 2~3 个部分,如图 7 所示。每个块 ID 以指示其布局的标志开头(连续 = 0,条带 = 1)。 对于条带块来说,ID的其余部分由两部分组成:中间部分是逻辑块的ID,尾部分表示逻辑块中存储块的索引。 这允许 NameNode 将逻辑块作为其存储块的摘要进行管理。 存储块ID可以通过屏蔽索引映射到其逻辑块; 当NameNode处理DataNode块报告时,这是必需的。
在HDFS中,BlockIdManager负责生成对应的BlockID,它委托SequentialBlockIdGenerator来生成基于复制的连续块布局的BlockID,而SequentialBlockGroupIdGenerator来生成基于条带的连续块布局的BlockID(实际上这是一个BlockGroupID,即按照上面将的分层的块命名协议,低四位为0,即还没有将BlockGroup切分为Internal Block)。下文会讲,NameNode会将BlockGroup进行切分,并把每一个切分成的InternalBlock分配到对应的DataNode上。

------------------------------------- BlockIdManager -------------------------------------
  long nextBlockId(BlockType blockType) {
    switch(blockType) {
    case CONTIGUOUS: return blockIdGenerator.nextValue(); //使用SequentialBlockIdGenerator为连续块布局方式分配Block ID
    case STRIPED: return blockGroupIdGenerator.nextValue(); // 使用SequentialBlockGroupIdGenerator为条带布局方式分配Block ID
    }

在这里插入图片描述

2.3 基于条带布局的纠删码文件的写入过程

纠删码的写入过程与普通的连续布局的写入过程的基本区别是:

  1. 物理写入逐条带进行:由于我们重新区分了逻辑块和物理块,逻辑块代表了文件本身的字节顺序,条带布局情况下,逻辑块内部的物理块顺序不再对应文件的实际字节顺序,代表文件的字节顺序的变成了条带,因此,在条带布局的时候,写入会逐条带进行,而不是逐物理块进行。
  2. 写入并发:由于写入按照条带进行,并且一个条带跨了多个物理块,因此在一个条带的每个Internal Block的每个cell的写入可以异步并行(Ansynchronized)进行,但是条带之间则是串行同步(Synchronized)进行。
  3. 需要计算纠删码:计算纠删码(Parity Block)的过程叫做encode的过程。没写完一个条带(Stipe)的data block部分,就要开始计算纠删码然后写parity block,然后开始写下一个Stipe。纠删码的计算在客户端进行,因此会给客户端引入一定的CPU负载。
    在这里插入图片描述

2.3.1 客户端的流程

基于条带的块布局方式改变了基本的数据存储结构,因此,对于条带布局和连续布局文件的读写方式发生了很大变化,但是基本的写流程没有发生变化:

  1. 客户端通过调用DFSClient.create()创建对应文件

    • 服务器端收到create请求以后,会构建对应文件的INodeFile并维护在内存和Editlog中,INode信息中包含了文件的元数据信息,比如,读写权限,文件名,副本的块冗余形式(纠删码还是复制?)等等信息。注意,这时候还没有创建任何block信息
  2. 客户端收到服务器端返回的包含文件元数据信息的HdfsFileStatus对象,基于该对象,构建DFSOutputStream对象的实现

  3. 客户端开始进行writeChunk操作,并根据需要,调用addBlock()接口开始申请block

    • 服务器端根据block申请,创建对应的的BlockID, 副本的块冗余形式(纠删码还是复制?),调用不同的块放置策略,确定块放置的目标机器,将这些所有信息封装在LocatedBlock对象中,返回给客户端
  4. 创建DataStreamer对象,负责Block的数据写入。在三副本情况下,DataStreamer只会和第一个节点建立Socket连接,然后第一个节点会把收到的packet转发给剩余的两个节点。

    • DataStreamer是一个Daemon,用来进行异步的数据发送。请注意,数据发送的基本单位是Packet。在HDFS中,一个Packet 的默认大小是64KB。checksum的基本单位是chunk,长度为512B,HDFS会对每个chunk计算一个checksum值,长度为4B。一个Packet的总容量是64KB,按照64KB/516B = 127,貌似大概一个packet可以有127个 chunk, 但是由于每一个packet都由header 信息,比如packet的长度信息,以及和protobuf相关的一些信息,因此,一个packet可以承载126个chunk。按照数据和checksum的比例,默认一个128MB的Block会由大概1MB的checksum数据。
     protected void computePacketChunkSize(int psize, int csize) {
        final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
        final int chunkSize = csize + getChecksumSize();
        chunksPerPacket = Math.max(bodySize/chunkSize, 1);
        packetSize = chunkSize*chunksPerPacket;
    
    • 客户端在通过DFSOutputStream写数据过程中,写满64KB,数据就会被封装为DFSPacket对象,并将该对象交付给DataStreamer的dataQueue。dataQueue中存放了所有需要发送给pipeline的待发送数据。chunk, checksum和最终组装成Packet入下图所示。可以看到写数据的过程就是将数据按照chunkj进行拆分、到以chunk为单位计算checksum、到组装成packet、到将数据通过socket发送给DataNode的过程
      在这里插入图片描述
  5. DataStream所代表的异步线程负责从自己的dataQueue中poll数据,并发送给pipeline,并将这个数据包从dataQueue中移除,放到ackQueue中,代表这个packet正等待ack

    • 根据HDFS写入数据的 pipeline规则,比如在3副本的情况下,client端只需要把数据发送给第一个target节点,而数据的复制则是由第一个target节点自己像pipeline一样复制给剩下的几台机器。
  6. 响应处理器ResponseProcessor从数据节点接收确认。 当从所有数据节点接收到数据包的成功确认时,响应处理器将从确认队列中删除相应的数据包。

    • 注意,DataStreamer中负责获取ACK的Input Socket和负责发送Packet的Output Socket都只会和第一个DN沟通,在3副本情况下,量外两个副本的ack是返回给第一个节点,然后第一个节点把ack发送给客户端,从而客户端确认所有副本写成功
图-8 HDFS写文件基本框架

由于块布局方式的变化,基于纠删码的条带布局为了尽量减少对已有的基于复制的块布局代码的侵入,采用的继承的方式,比如,StripedDataStreamer继承了DataStreamer, 而DFSStripedOutputStream继承了DFSOutputStream,LocatedStripedBlock继承了LocatedBlock。
在具体写方式上,条带布局方式沿用了连续布局方式的异步写的整体架构,比如:

  1. 文件写还是基于chunk进行checksum校验,当chunk已经写满了Packet的时候,以Packet为单位进行异步发送
  2. 发送依然异步进行,即写满Packet以后,会将Packet 交付给StripedDataStreamer的dataQueue, 发送完成,数据从dataQueue移动到ackQueue,确认完毕,从ackQueue移出。
  3. 文件写的时候还是由DFSOutputStream来进行(stripe模式下DFSStripedOutputStream进行了重载),与pipeline中的DataNode建立通信和发送packet还是由DataStreamer来进行(StripedDataStreamer)。

但是由于条带布局的逻辑块和物理块不再一一对应,写的逻辑也发生了根本变化。在概念上变化的基本基本原则是,HDFS基于原有的连续布局方式的block到了stripe布局中对应block group,而block group中的具体物理块,在hadoop中叫做internal block。

写方式不同的地方主要包括:

  1. 多Streamer的并行写,一个DFSOutputStream只有一个DataStreamer来和一个DataNode通信,因为是串行写。但是在条带写的情况下,一个DFSOutputStream会创建多个StripedDataStreamer,每个DataStreamer i 负责写Block Group中index为i的一个internal block(可能是数据块,可能是校验块)。显然,比如RS(6,2),会创建8个StripedDataStreamer对象,并行进行数据写。写完一个条带,再写下一个条带。写完一个Block Group,会申请新的Block,然后继续一个条带一个条带的写。但是,写下一个Block Group的时候,不会重新创建StripedDataStreamer,每个StripedDataStreamer在新的Block Group中还是负责index=i的internal block的写。
  2. 由于条带布局以Cell为单位,因此在数据写入过程中,每写完一个Cell,就要切换负责下一个Cell的StripedDataStreamer。可见,在写过程中,StripedDataStreamer是依次轮流被切换。
  3. 副本放置策略:客户端通过addBlock()接口申请一个块,NameNode会根据当前集群的状态申请相应数量的机器。连续布局方式(三副本为例)下,默认使用BlockPlacementPolicyDefault会将第一个replica放在一个rack的某台机器上,另外两个replica会放在另外一个rack的两台不同机器上。这种策略显然不适用条带布局,因为条带布局情况下不希望任何情况下损失多于一个replica,因此使用BlockPlacementPolicyRackFaultTolerant策略,这个策略的基本原则是,将条带布局下的所有replica分配到不同的基架。
    在这里插入图片描述
    下面的DFSStripedOutputStream构造方法显示了在构造DFSStripedOutputStream,会根据ECPolicy所需要的DataStreamer数量(dataBlock + parityBlock),创建对应的StripedDataStreamer,负责和DataNode通信。根据internal block的数量,为每一个internal block构建一个StripedDataStreamer,负责BlockGroup中的一个索引位置的Internal Block的写操作,并且从一个Group到下一个Group以后,之前创建的StripedDataStreamer依然会负责新的BlockGroup中对应的index的internal block的写入操作。
    同时,由于条带布局的写方式需要频繁地切换internal block和负责这个internal block写操作的StripedDataStreamer,因此,特意创建了一个叫做Coordinator的类,负责internal block和对应的StripedDataStreamer的协调和切换。他时刻维护着internal block和StripedDataStreamer的对应关系:
-------------------------------------DFSStripedOutputStream.java ------------------------------------------
/** Construct a new output stream for creating a file. */
  DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
                         EnumSet<CreateFlag> flag, Progressable progress,
                         DataChecksum checksum, String[] favoredNodes)
                         throws IOException {
    super(dfsClient, src, stat, flag, progress, checksum, favoredNodes, false);
    ....
    ecPolicy = stat.getErasureCodingPolicy();
    final int numParityBlocks = ecPolicy.getNumParityUnits();
    cellSize = ecPolicy.getCellSize();
    numDataBlocks = ecPolicy.getNumDataUnits();
    .....

    coordinator = new Coordinator(numAllBlocks);
    cellBuffers = new CellBuffers(numParityBlocks);// 虽然初始化是用的numParityBlocks, 其实内部放的是datablock 和 parity block

    streamers = new ArrayList<>(numAllBlocks);// 并行写一个logical block中的所有internal block,但是串型写不同的logical block
    for (short i = 0; i < numAllBlocks; i++) { // 每一个physical block会有一个DataStreamer与之对应,即StrippedDataStreamer对象,用来与对应的DN进行流数据写操作
      StripedDataStreamer streamer = new StripedDataStreamer(stat,
          dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
          favoredNodes, i, coordinator, getAddBlockFlags());
      streamers.add(streamer);
    }
    currentPackets = new DFSPacket[streamers.size()];
    datanodeRestartTimeout = dfsClient.getConf().getDatanodeRestartTimeout();
    setCurrentStreamer(0); // 肯定从第一个streamer开始写
  }
    static class Coordinator {
    /**
     * followingBlocks是一个List<BlockingQueue<T>> queues,每一个BlockingQueue代表index=i的多个LocatedBlock
     */
    private final MultipleBlockingQueue<LocatedBlock> followingBlocks; // 指的是block group的下一个internal block, 每一个index对应了一系列block的list 

下面的代码显示了在写文件过程中,是以chunk为单位计算checksum和写数据到Packet。

  • 在写数据过程中,如果是第一个BlockGroup,或者旧的BlockGroup写完了(判断依据是当前已经写的数据量等于internal block的数据总量),就向NameNode申请新的Block。我门可以理解成通过addBlock()接口申请的Block叫做logical block(即一个Block Group), 申请到的LogicalBlock的对象是LocatedStripedBlock,其中包含了BlockGroup ID,分配到的节点,以及BlockGroup中每一个Internal Block与节点之间的对应关系。拿到LocatedStripedBlock响应以后,客户端会将其作为一个Block Group,切分成data block 和 parity block。
  • 由于Striped本身的特性,如果Cell写满了,就需要切换到下一个StripedDataStreamer。由于Cell默认为1MB,所以可以想见StripedDataStreamer的切换是随着数据写入不断进行的。这就是上面提到的Coordinator的作用。
  • 如果当前写完的Cell是当前Stripe的data block的最后一个Cell,那么意味着下一个Cell就不是写数据了,而是计算parity block然后写parity
    block到指定的机器上。
-------------------------------------DFSStripedOutputStream.java ------------------------------------------
/**
   * chunk是计算checksum的单位,以chunk为单位计算checksum,以packet为单位发送数据
   * 将bytes中[offset, offset+len]的数据作为一个chunk写入packet(从调用者的代码可以看到,这个数据只可能小于等于一个chunk的长度)
   * 通过currentPacket和currentStream来确定是由哪个streamer来负责写这个数据
   *
   * @throws IOException
   */
  @Override
  protected synchronized void writeChunk(byte[] bytes, int offset, int len,
      byte[] checksum, int ckoff, int cklen) throws IOException {
    final int index = getCurrentIndex();
    final int pos = cellBuffers.addTo(index, bytes, offset, len); // 往data block中写入数据,返回更新以后的position
    final boolean cellFull = pos == cellSize; // 根据不同的Stripe Policy确定的,默认是1MB

    //  如果是第一个block,或者当前的block group写完了,需要写到一个新的block group,那么就创建一个新的
    if (currentBlockGroup == null || shouldEndBlockGroup()) {
      // the incoming data should belong to a new block. Allocate a new block.
      allocateNewBlock(); // 对于一个刚刚创建的currentBlockGroup, numBytes=0
    }

    currentBlockGroup.setNumBytes(currentBlockGroup.getNumBytes() + len);// 在写入过程中,numbers逐渐增加
    // note: the current streamer can be refreshed after allocating a new block
    final StripedDataStreamer current = getCurrentStreamer(); // 获取当前的streamer
     // 将当前的chunk写入到Packet中(不代表packet需要enqueue了)
     super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
     .....

    if (cellFull) { // cell写满了,需要将stream切到下一个,同时,需要写parity block了
      int next = index + 1;
      // 如果不是最后一个data block, 是不需要写parity chunk的
      if (next == numDataBlocks) { // 刚刚写的是这个logic group中的最后一个data block的chunk,意味着下一个chunk是写parity block的chunk
        cellBuffers.flipDataBuffers();
        writeParityCells(); // 在写parity的过程中,stream也是在往后切换的
        ....
      setCurrentStreamer(next);// 将当前stream切换到下一个,因为当前cell写满了。不写满不切换stream
    }
  }
  
  private boolean shouldEndBlockGroup() {
    return currentBlockGroup != null && 
        currentBlockGroup.getNumBytes() == blockSize * numDataBlocks; 
  }

下面的代码可以看到,拿到LocatedStripedBlock以后,客户端会根据LocatedStripedBlock中携带的信息,将这个返回的Group切分成一个一个的Internal Block,每个Internal Block也用LocatedStripedBlock表示,同时,创建对应的DataStreamer,用来和这个Internal Block所分配的机器进行socket通信已实现Block的写操作。

-------------------------------------DFSStripedOutputStream.java ------------------------------------------
private void allocateNewBlock() throws IOException {
    if (currentBlockGroup != null) {
      for (int i = 0; i < numAllBlocks; i++) {
        // sync all the healthy streamers before writing to the new block
        waitEndBlocks(i);
      }
    
    DatanodeInfo[] excludedNodes = getExcludedNodes();
    //......
    final LocatedBlock lb;
    try {
      // 向NameNode申请Block.对于Stripped的场景,是申请一个BlockGroup(Logic block)
      // 这个addBlock是调用的super class的addBlock(), 即DFSStrippedOutputStream和DFSOutputStream都是一样的addBlock()逻辑
      // 从代码来看,这里并没有blocksize的信息
      lb = addBlock(excludedNodes, dfsClient, src,
          prevBlockGroup, fileId, favoredNodes, getAddBlockFlags());
    // assign the new block to the current block group
    currentBlockGroup = lb.getBlock();
    blockGroupIndex++;
    // 从申请到的LocatedBlock(对应了一个Block Group)切分出Internal block
    // 这里的blocks的索引已经是跟internal block在group中的索引一致了,每一个internal block的大小也从Block group中拆分了
    final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
        (LocatedStripedBlock) lb, cellSize, numDataBlocks,
        numAllBlocks - numDataBlocks);
    for (int i = 0; i < blocks.length; i++) {
        // ......
        // 把这个Block group里面的所有的internal block分配给每一个streamer
        coordinator.getFollowingBlocks().offer(i, blocks[i]);
    }

2.3.2 服务端的流程

在这里插入图片描述

文件的创建

服务端收到create()请求以后,会做以下事情:

  • 权限校验和设置,是否有权限创建文件,是否需要循环创建parent目录
  • 路径校验:路径校验,创建对应parent目录,overwrite模式下需要删除同名文件,非overwrite模式下遇到同名文件会抛出异常,
  • 参数合法性校验, 各种参数是否合法,replica和ec不可同时存在,如果是EC,Blocksize不可以比CellSize还要小
  • 如果是基于replication,将确认replication factor,如果是EC, 将设置对应的ECPolicy。 基本原则是,用户如果在创建请求中制定了 ECPolicy,那么就按照用户请求的ECPolicy存储文件,如果用户请求中使用复制方式存储文件,那就按照复制方式,否则,新创建的文件将继承父目录的副本方式。
  • 创建该文件对应的INodeFile,并挂载到系统的INodeDirectory下面(一个HDFS服务的一个Nameservice只有一个INodeDirectory对象,代表了整个文件系统的文件目录树的根入口),这个INodeFile包含了该文件的所有元数据信息,权限,创建和修改时间戳信息,blockSize,块布局方式blockType,ECPolicy, 副本个数,文件的StoragePolicy
    • StoragePolicy管理的是文件应该存放在哪种存储上,SSD, DISK 等,以及在副本复制replication或者文件创建过程中如果要求的StoragePolicy不存在该怎么选择(SSD不够用了,是否允许用DISK?),这些默认支持的Policy在BlockStoragePolicySuite.createDefaultSuite()中初始化
  • 将文件的租约(lease)交给这个创建者
  • 将文件元数据信息持久化到EditLog
  • 将文件信息封装为HdfsFileStatus返回给客户端
Block的添加

文件创建完成以后,就可以开始进行写操作了,写操作在任何时刻一定是处在某一个Logical block中,因此,刚开始写,或者写完一个 block,都需要向NameNode申请Block。有了条带存储以后,这个addBlock()的block归一化为Logical Block, 即申请一个逻辑块。但是, NameNode在处理申请的时候,创建了一个逻辑块,还会根据分配的机器将这个逻辑块切分成物理块,然后返回给客户端。
连接文件的写过程,必须了解文件在写入过程中的各种状态,NameNode将块写入过程中的信息封装在BlockUnderConstructionFeature对象中。
在NameNode端,每一个块都有一个BlockInfo对象,每一个BlockInfo对象都有一个BlockUnderConstructionFeature uc,BlockUnderConstructionFeature中维护了跟块写入过程相关的信息,最重要的是当前正在写入的块的状态(BlockUCState)。一个块正在被写入的时候,uc是不为null的,当这个块写入完成(NameNode收到了这个块的足够数量的DataNode 的汇报),进入COMPLETE状态,uc会被置为空,代表这个块处于UnderConstruction(UC)的状态。块写入过程中的状态被表示称为BlockUCState:

  enum BlockUCState {
	  UNDER_CONSTRUCTION, //这个block应该正处于被写的状态
	  COMMITTED //  客户端已经写完了,但是还没有DataNode收到确认的汇报,这个状态较COMPLETE要更早
	  COMPLETE, // block的构建完成,block的FINALIZED的副本数量已经达到了最小副本数量,这是block的最后的稳定状态
	  UNDER_RECOVERY,// block写入失败,需要进行recovvery操作
  }
  • UNDER_CONSTRUCTION : 这个块处于正常地被写入状态

  • COMMITTED: 这个块已经被写入到Data,但是目前NameNode还没有收到任何一个或者还没有收到足够的DataNode的汇报。

  • COMPLETE: NameNode已经收到了关于这个Block的汇报,汇报数量已经达到的要求的最小块数量,并且这个块不会再被修改。

    • 连续布局:对于连续布局,只要一个块已经存放到dfs.namenode.replication.min所配置的数量的DataNode上(即NameNode已经收到了这么多个DataNode汇报了这个replica),就认为是COMPLETE。默认情况下,dfs.namenode.replication.min为1,即只要这个replica对应的target有一个汇报上来,NameNode端的Block的状态就是COMPLETE。
    • 条带布局:只要NameNode收到了达到numDataBlock个块的DataNode的汇报,就会将这个EC Logical Block的状态置为COMPLETE。对于RS(6,2),只要有6个节点(无论是data block还是parity block)的DataNode汇报给了NameNode,这个logical block 就COMPLETE了。

    从上面的描述看到,COMPLETE状态表达的是一个Block已经到了immutatable的状态了,但是很显然,在COMPLETE状态的副本只要丢失任何一个replica,就会造成数据丢失。

  • UNDER_RECOVERY: 块写入过程中失败,因此正处在恢复之中。
    下文将会讲到的块的reconstruction(重构)都是指的处于COMPLETE状态的replica由于出现一些特殊情况道导致块的副本数量无法满足要求因此进行重构。而Recover(恢复)是指处于UNDER_CONSTRUCTION状态下的块由于异常情况(客户端中断,DataNode崩溃)导致副本出现不一致状态而进行的修复。
    服务器端收到了addBlock()请求以后,会做以下事情:

  1. 相关校验,通过validateAddBlock()方法对addBlock()操作进行校验,把校验结果封装在ValidateAddBlockResult中,校验的内容包括:

    • 租约是否匹配(只有拿到文件租约的客户端才能写文件)

      static ValidateAddBlockResult validateAddBlock(){
      					 ........
      					FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
                                             previous, onRetryBlock);
                          ........
      }
      
      private static FileState analyzeFileState(
      	      FSNamesystem fsn, INodesInPath iip, long fileId, String clientName,
      	      ExtendedBlock previous, LocatedBlock[] onRetryBlock)
      	      ...........
      	      final INodeFile file = fsn.checkLease(iip, clientName, fileId);
      	      .......
      	      
      
    • 上一个block是否已经彻底完成复制。只要新添加的block不是文件的第一个block,那么在基于副本复制的块布局方式下,必须确认上一个block已经完成了副本复制。

      final INodeFile pendingFile = fileState.inode; // 校验文件块的状态,比如倒数第二个文件块是否已经complete或者committed了
      if (!fsn.checkFileProgress(src, pendingFile, false)) {
        throw new NotReplicatedYetException("Not replicated yet: " + src);
      }
      

    无论是Continuous Block还是 Striped Block,在addBlock()或者close()的时候,都会通过checkFileProgress()方法对该文件的block进行校验,校验的规则是:

    • 客户端通过配置dfs.namenode.file.close.num-committed-allowed代表文件在关闭或者添加新的block的时候,最多允许多少个block仅仅处于COMMITTED的状态而还未到达COMPLETE的状态

      • 如果客户端尝试关闭文件DFSOutputStream.close(),那么要求:
        • 倒数第二个block必须是COMPLETE的状态
        • 当确认倒数第二个Block已经COMPLETE了,NameNode经过校验将文件状态改为COMMITTED,如果已经有一定数量的副本汇报,则把状态改为COMPLETED
        • 保证在blocks.size() - num-committed-allowed之前的所有block都已经是COMPLETE状态,剩余的num-committed-allowed个Block都是COMMIT的状态
      • 如果客户端在写文件,客户端申请添加一个Block,在创建并添加这个新的Block以前,必须确保倒数第二个Block必须是COMPLETE的状态,对倒数第一个Block的状态没有要求。
    • block数量不可以超过单个文件最大的副本数量

      if (pendingFile.getBlocks().length >= fsn.maxBlocksPerFile) {
        throw new IOException("File has reached the limit on maximum number of"
            + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
            + "): " + pendingFile.getBlocks().length + " >= "
            + fsn.maxBlocksPerFile);
      }
      
    • 将校验结果,包括文件所需要的副本数量(对于连续布局,target数量就是该文件的副本数,对于条带布局,target数量就是ECSchema中定义的dataUnit + parityUnit的数量,以及blockSize)和blocksize封装到ValidateAddBlockResult中,供后续创建block使用

      return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID,
                                            clientMachine, blockType, ecPolicy);
      
  2. 为这个addBlock()请求分配对应的DataNode。 在addBlock()的时候,通过制定的excludedNode,以及favoredNode,和上一步确定的所需要的机器数量,交给对应的PlacementPolicy,交由BlockManager.chooseTarget4NewBlock()进行机器的分配。

    • BlockPlacementPolicies类负责对PlacementPolicy进行管理,在NameNode启动的时候,就会根据配置文件,决定不同的布局方式所应该使用的PlacementPolicy。通过dfs.block.replicator.classname配置基于复制的连续块布局的副本放置策略,默认是BlockPlacementPolicyDefault,典型的3场景,会将第一个replica放在一个rack,剩下两个replica放在另外一个rack的不同host。通过dfs.block.placement.ec.classname配置纠删码的PlacementPolicy,不配置的默认策略是BlockPlacementPolicyRackFaultTolerant,一种尽量将所有块放置到不同Rack上的策略。

      -------------------------------------BlockPlacementPolicies-------------------------------------
        public class BlockPlacementPolicies{
        	public BlockPlacementPolicy getPolicy(BlockType blockType){
      	    switch (blockType) {
      	    case CONTIGUOUS: return replicationPolicy; // 基于复制的布局方式的PlacementPolicy
      	    case STRIPED: return ecPolicy; // 基于条带的布局方式的PlacementPolicy
      	    ......
          }
      
  3. 为这个addBlock()请求分配对应的BlockID。如果是StripedBlock,这个ID仅仅是BlockGroup ID(低四位为0,还没有分配具体的Internal Block)

  4. 分配Internal BlockID,创建Block和文件的对应关系并创建replica和targets之间的对应关系

    • 对于StripedBlock, 这里的BlockID其实是BlockGroup ID,上文已经讲过BlockID的具体格式和分配方式。

    • 将创建的Block挂在到这个文件的INodeFile上,构建对应关系。显然,假如是一个大文件,那么这个INodeFile对应的Block数量不止一个,因此INodeFile通过一个 BlockInfo[] blocks;来维护自己的所有的Block

    • 构建Replica和Datanode的对应关系。这是因为刚刚只是委托PlacementPolicy分配了指定数量的DataNode,基于复制的连续布局很简单,所有的DataNode存储相同的replica,但是在条带布局下,这个申请的Block其实是一个BlockGroup,那么这个Group中的每一个Internal Block 该放到哪个DataNode上去呢?这就是这里需要确定的。

      BlockManager.newLocatedBlock()就是将对应的replica(Internal Block)具体分配到对应的DataNode上面去,并返回一个LocatedStripedBlock对象。LocatedStripedBlock是LocatedBlock针对StripedBlock 的具体实现,最重要的是它有了一个叫做blockIndices的成员,blockIndices[i]的含义是第i个DataNode存放的InternalBlock的block index,而DatanodeInfoWithStorage[] locs中locs[i]则存放了对应位置i的DataNode信息,联合blockIndices和locs,就知道了任何一个Internal Block的存储位置。

      public class LocatedStripedBlock extends LocatedBlock {
        // blockIndices[i]代表了replicas[i]对应的replica在block group中的索引值,
        // 查看BlockUnderConstructionFeature.setExpectedLocations L105
        private final byte[] blockIndices; // physical block在block group中的索引
      
图-8 HDFS写文件基本框架

NameNode 构建完成LocatedStripedBlock信息,就完成了整个StripedBlock的创建过程,包括Block Group ID(上面讲了block id的分层方式,Block Group ID的低四位是0),分配的节点,以及这个BlockGroup 中的每一个InternalBlock 和节点之间的对应关系。在客户端拿到NameNode返回的LocatedStripedBlock以后,就通过方法constructInternalBlock中按照要求将LocatedStripedBlock拆分成一个一个的Internal block(也是一个LocatedBlock对象,由于是Internal Block 那么di四位就是internal block在group中的index),然后将这些internal block绑定给对应的DataStreamer,然后开始数据写操作。

  public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
      int idxInReturnedLocs, int cellSize, int dataBlkNum,
      int idxInBlockGroup) {
    final ExtendedBlock blk = constructInternalBlock(
        bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup);
    .....
      locatedBlock = new LocatedBlock(blk, // 每一个internal都从block group中切分属于自己的location, storage id, storage type
          new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},// 这个internal block对应的DataNode info
          new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, // 这个internal block对应的storage id
          new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, // 这个internal block对应的storage type
          bg.getStartOffset(), bg.isCorrupt(), null);
     return locatedBlock;

2.4 2.3 基于条带布局的纠删码文件的读取过程

2.4 故障块的重构(Reconstruct)

故障块的重构都是基于已经处于BlockUCState.COMPLETE状态的,通过COMPLETE的定义我们知道,意味着重构(Reconstruct)针对的是已经写完成(汇报上来的块副本达到了要求比如最小允许副本数),但是后来块的状态发生问题的那些块。

2.4.1 故障块的状态定义和各个状态的统计信息

本章节讲解块重构过程中的块状态,这些状态是StoredReplicaState的状态。StoredReplicaState是块已经完成写操作以后的状态阶段中的不同状态,而不是块在写入过程中的状态BlockUCState。
BlockManager通过方法checkReplicaOnStorage()来统一收集并计算副本状态,并返回一个收集了副本状态信息的对象NumberReplicas。 从NumberReplicas的名字听起来似乎这个类是统计副本的总数量,但是其实这个类是统计在各个状态下的副本数量的分类统计信息。那么,副本会有哪些状态呢?
所有的副本状态被StoredReplicaState表述。顾名思义,StoredReplicaState所表述的副本状态是已经存储下来的副本的整个状态,这些层面的状态一方面是为了给用户的诸如fsck的命令返回副本的统计信息,更重要的,这些状态的统计信息,将用来决定下一步对副本是否需要进行重构的策略,比如副本数是否太低,太低的副本需要进行重构,副本数是否太高,副本数太高的副本需要进行部分的删除。

public enum StoredReplicaState {
    // live replicas. for a striped block, this value excludes redundant
    // replicas for the same internal block
    LIVE,
    READONLY,
    // decommissioning replicas. for a striped block ,this value excludes
    // redundant and live replicas for the same internal block.
    DECOMMISSIONING,
    DECOMMISSIONED,
    // We need live ENTERING_MAINTENANCE nodes to continue
    // to serve read request while it is being transitioned to live
    // IN_MAINTENANCE if these are the only replicas left.
    // MAINTENANCE_NOT_FOR_READ == maintenanceReplicas -
    // Live ENTERING_MAINTENANCE.
    // 当 一个节点处于ENTERING_MAINTENANCE中(还没到达最终的IN_MAINTENANCE), 这个节点上的internal block如果没有其它副本,
    // 那么这个node还是会接着serve 这个replica 的读请求。显然,当节点进入到IN_MAINTENANCE中的时候,读请求就不会过来了,
    // 因此进入MAINTENANCE_NOT_FOR_READ
    MAINTENANCE_NOT_FOR_READ,
    // Live ENTERING_MAINTENANCE nodes to serve read requests.
    MAINTENANCE_FOR_READ,
    CORRUPT,
    // excess replicas already tracked by blockmanager's excess map
    EXCESS, // 副本数量超过了要求,这些超过要求的replica 最终会被删除
    STALESTORAGE,
    // for striped blocks only. number of redundant internal block replicas
    // that have not been tracked by blockmanager yet (i.e., not in excess)
    REDUNDANT
  }


副本的状态StoredReplicaState是对连续布局和条带布局统一而言的,并不是专指某种布局方式。但是,某些特定状态在两种布局模式下的含义稍有不同。副本每一个状态的具体含义,我们可以从BlockManager构造NumberReplicas对象的方法checkReplicaOnStorage()清楚地看到:

private StoredReplicaState checkReplicaOnStorage(NumberReplicas counters,
      BlockInfo b, DatanodeStorageInfo storage,
      Collection<DatanodeDescriptor> nodesCorrupt, boolean inStartupSafeMode) {
    final StoredReplicaState s;
    if (storage.getState() == State.NORMAL) {
      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
      if (nodesCorrupt != null && nodesCorrupt.contains(node)) {
        s = StoredReplicaState.CORRUPT;
      } else if (inStartupSafeMode) {
        s = StoredReplicaState.LIVE;
        counters.add(s, 1);
        return s;
      } else if (node.isDecommissionInProgress()) {
        s = StoredReplicaState.DECOMMISSIONING;
      } else if (node.isDecommissioned()) {
        s = StoredReplicaState.DECOMMISSIONED;
      } else if (node.isMaintenance()) {
        if (node.isInMaintenance() || !node.isAlive()) {
          s = StoredReplicaState.MAINTENANCE_NOT_FOR_READ;
        } else {
          s = StoredReplicaState.MAINTENANCE_FOR_READ;
        }
      } else if (isExcess(node, b)) {
        s = StoredReplicaState.EXCESS;
      } else {
        s = StoredReplicaState.LIVE;
      }
      counters.add(s, 1);
      // //如果这个Storage是stale storage,那么,认为这个replica是stale状态,直到收到对应DN的heartbeat
      if (storage.areBlockContentsStale()) {
        counters.add(StoredReplicaState.STALESTORAGE, 1);
      }
    } else if (!inStartupSafeMode &&
        storage.getState() == State.READ_ONLY_SHARED) {
      s = StoredReplicaState.READONLY;
      counters.add(s, 1);

副本的大部分状态是由这个副本所在的存储状态决定的:

  • LIVE: 就是我们最常见的正常的存活状态。对于条带布局模式,虽然预期是每一个internal block只有一个副本,但是有可能存在某个internal block同时存在LIVE状态和其它状态,在这种情况下,LIVE状态是排他的,即,这个Internal Block只要有一个副本是LIVE,那么我们就认为这个Internal Block的状态是LIVE。关于节点状态的一些去重操作,参考BlockManager.countLiveAndDecommissioningReplicas()
  • READONLY: 如果replica所在的Storage的类型是StorageType.State.READ_ONLY_SHARED, 那么这个副本的状态就是READONLY。这个READ_ONLY_SHARED是一个特殊的HDFS特性,我看了一下对应的HDFS issue HDFS-5318(issue里面有对应的design doc, 感兴趣的读者可以看看),其大致意思就是将我们传统的通过物理磁盘存储block的方式转移到通过共享存储(NAS, S3等)存储块信息,由于不再存放在某台DataNode上,因此客户端可以通过任意DataNode读取到这个Block。这个READONLY状态和本文的关系不大,不做详细解释。
  • DECOMMISSIONING: 一个机器decommision的过程就意味着上面的replica需要全部转移(copy)到其它机器上,在全部转移完成以前,这个机器上的block都是DECOMMISSIONING的状态。显然,对于一个条带布局的 internal block,如果这个块已经成功转移到其它的Live的机器上,那么这同一个internal block就会在NameNode端同时存在LIVE和DECOMMISSIONING的状态,这时候,NameNode的判定状态是Live状态。关于节点状态的一些去重操作,参考BlockManager.countLiveAndDecommissioningReplicas()
  • DECOMMISSIONED: 副本所在的机器已经decommission结束。
  • MAINTENANCE_FOR_READ: 关于机器的maintenance状态,感兴趣的读者可以自行学习,它发生在我们需要暂时将某个节点进行下线或者升级同时又不希望这个节点的短暂下线引起集群大量的副本拷贝的场景。MAINTENANCE_NOT_FOR_READ的状态是指这个副本所在的机器正在进入(尚未成功)maintenance状态,这时候,这个机器显然不能再服务写请求,但是却可以在某些极端情况下服务一些读请求,直到完全进入maintenance状态。
  • MAINTENANCE_NOT_FOR_READ:这是指这个机器已经完全进入了MAINTENANCE状态,这时候这个机器将拒绝任何请求,包括读请求。
  • CORRUPT: 这个replica所在的机器存储已经CORRUPT。请注意区分Block corrupt和Replica corrupt的区别,当且仅当一个Block的所有的replica都已经corrupt了,那么这个Block会被认为是corrupt。NameNode端corrupt的replica都被一个叫做CorruptReplicasMap的对象管理,存放了所有的corrupted的replica以及对应的Corrupt的原因:
    public class CorruptReplicasMap{
     /** The corruption reason code */
     public enum Reason {
       NONE,                // not specified.
       ANY,                 // wildcard reason
       GENSTAMP_MISMATCH,   // mismatch in generation stamps
       SIZE_MISMATCH,       // mismatch in sizes
       INVALID_STATE,       // invalid state
       CORRUPTION_REPORTED  // client or datanode reported the corruption
     }
     // 存放了所有corrupt的replica
     private final Map<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
       new HashMap<Block, Map<DatanodeDescriptor, Reason>>();
    
    在BlockManager中维护了CorruptReplicasMap的引用,所有corrupt replica都是通过BlockManager.markReplicaAsCorrupt()来添加到corruptReplicas中的,主要有以下情况:
    1. 来自DataNode的增量块汇报(DatanodeProtocol.blockReceivedAndDelete接口)或者全量块汇报(DatanodeProtocol.blockReport)接口。在收到这些块汇报以后,NameNode会对所有汇报上来的块进行时间戳和size的检测,如果发现汇报上来的块和自己在blockMap中存储的块的时间戳或者size不一致,则认定该块是corrupt块
    2. 来自DataNode直接汇报的badBlock(通过DatanodeProtocol.reportBadBlocks()),NameNode收到这些会报上来的badBlock会直接通过调用markBlockAsCorrupt()将其标记为corrupt block。DataNode在什么情况下会直接上报badBlocks呢?这主要发生在比如DataNode被分配了reconstruct的任务时,会从远程读取一些replica进行重构,这时候如果读取发生问题,就会认为是corrupted replica,然后通过DatanodeProtocol.reportBadBlocks()接口上报NameNode。
    3. 客户端在读取块的过程中发现块的校验失败,会通过(ClientProtocol.reportBadBlocks()接口)告知NameNode
    4. DataNode在完成了某个Recovery以后,通过DataNodeProtocol.commitBlockSynchronization()接口告知NameNode,NameNode会在适当情况下将replica标记为corrupt
  • STALESTORAGE:这种状态其实是一种Corner Case。当NameNode发生重启或者Failover(从standby到达active状态)发生,NameNode会将所有的DataNode 的所有的Storage标记为Stale(陈旧)状态,这时候这些storage上的replica也全部为stale状态,直到收到了对应的DataNode发送过来的关于这个Storage的心跳信息,才会解除Stale状态。Stale状态是为了处理在NameNode发生状态转移的时候DataNode和新的NameNode发生的一些不一致状态。在Stale状态的副本的大多数处理都会延迟,因为这是一个中间状态,比如我们发现一个replica的副本数过多,但是其中有一个副本是STALESTORAGE状态,那么这时候我们不可以贸然去删除多余副本,因为这时候有可能对应的 DataNode已经将副本删除,等汇报上来的时候,NameNode也将副本在另外机器上删除,造成副本丢失。
  • REDUNDANT: 只适用于条带布局,某一个internal block的副本数量多余一个。
  • EXCESS: 含义其实于REDUNDANT,只不过EXCESS指的是NameNode已经发现了该replica有多余副本(比如通过fsck,或者通过RedundancyMonitor的定时扫描线程),同时将这个replica的信息放到excessRedundancyMap中去,所有放到excessRedundancyMap中的internal block都会采用响应策略删除一个replica的多余副本。
    从上面的状态分析可以看到,副本的状态和一些存储的状态有一部分是用户操作的,比如Decommission和Maintenance,有些是机器自己汇报的,比如CORRUPT, 有些是集群本身的状态发展而成的,比如LIVE, STALESTORAGE, REDUNDANT, EXCESS,

2.4.1 故障文件块的查找收集

在这里插入图片描述

2.4.1.1 misReplica的检测

一个Block的replica的数量处于不正常的状态,比如,有多余的副本,或者低于预期的副本等,都属于副本不正常的情况。注意,副本不正常的情况都是指已经处于COMPLETE状态的副本,即不可变的块。可变的、正在写的块不在重构的处理范围内,而是块恢复(Recover)的职责。
NameNode会通过各种方式不断检测每个副本,确定这个副本是否属于错误副本,如果属于错误副本,则进行进一步处理

  enum MisReplicationResult {
    /** The block should be invalidated since it belongs to a deleted file. */
    INVALID, // 这个块属于一个被删除的文件,副本可以删除了
    /** The block is currently under-replicated. */
    UNDER_REPLICATED, // 这个块的副本数不足
    /** The block is currently over-replicated. */
    OVER_REPLICATED,
    /** A decision can't currently be made about this block. */
    POSTPONE,
    /** The block is under construction, so should be ignored. */
    UNDER_CONSTRUCTION,
    /** The block is properly replicated. */
    OK
  }

BlockManager.run()通过调用processMisReplicatedBlock() 来扫描当前replica的位置有问题(放置不正确,或者缺少replica)的所有block, 如果这个Replica的确需要重构,那么就放入到neededConstruction中。 processMisReplicatedBlock() 的扫描操作会在以下情况下被触发:

  • 用户在运行fsck命令的过程中,如果添加了-replicate参数,那么fsck不仅仅会检查会返回并检查块的状态,并且会将检查到的副本数量不足的块(misReplicatedBlocks)进行进一步处理。
  • NameNode启动时,Active NameNode会通过BlockManager启动一个异步独立的Daemon线程,这个线程会周期性扫描当前BlockManager管理的所有的Block,每个block都会检查其replication状态。
  • RedundancyMonitor每一轮运行的时候,都会扫描postponedMisreplicatedBlocks并尝试对着里面的超时的block进行处理,如果依然处理失败
2.4.1.2 延迟队列(postponedMisreplicatedBlocks)的构造和实现

BlockManager使用postponedMisreplicatedBlocks保存了一些需要延迟处理的、放置(MisReplicationResult中表述的比如UNDER_REPLICATED,OVER_REPLICATED)状态不正常的Block。

  /**
   * After a failover, over-replicated blocks may not be handled
   * until all of the replicas have done a block report to the
   * new active. This is to make sure that this NameNode has been
   * notified of all block deletions that might have been pending
   * when the failover happened.
   */
  private final Set<Block> postponedMisreplicatedBlocks =
      new LinkedHashSet<Block>();

延迟处理是因为,系统刚刚才启动,或者,Failover刚发生(当前的这个Active NameNode是刚刚才从Standby到Active状态的),因此,还没有收到一部分DataNode的第一次block report,即目前NameNode关于Block中副本的状态是过时(Stale)的,因此需要延迟对这个misReplicatedBlock进行进一步处理。

本质上,这是为了解决新的NameNode所看到的集群状态其实并非集群最新状态的特殊情况。这种特殊情况在HDFS特指需要删除副本数过多的Block的情况:
新的NameNode发现这个Block的副本超过了要求的副本数量(over-replicated),因此需要将某一个或者多个replica 加入到invalidateBlocks中,随后被删掉。但是,NameNode也发现这个Block有一个副本是在Stale DataNode上,这时候问题出现了:由于NameNode当前掌握的某个replica-1的DataNode(DN-1)的状态是过期(Stale)的,很有可能之前的Active NameNode已经发现了这个over-replicated的Block并且已经让这个Stale DataNode删掉了副本,但是当前的新的Active NameNode还不知道。如果这时候Active NameNode直接指示其它DataNode(DN-2)删掉副本,当之前的DN-1汇报说我已经把副本replica-1已经删除,这时候就造成Block的副本数不足。因此最好等待这个Block中位于Stale DataNode的heartbeat上来再对这个Invalidate操作进行处理。

在NameNode刚刚切换到 Active状态的时候,会将所有的DataNodeStorageInfo的状态置为Stale,直到第一次收到这个DataNode对应的heartbeat:

// NameNode刚启动,会将所有DataNode标记为Stale
  void markStaleAfterFailover() {
    heartbeatedSinceFailover = false;
    blockContentsStale = true;
  }
   // 收到heartbeat
  void receivedHeartbeat(StorageReport report) {
    updateState(report);
    heartbeatedSinceFailover = true;
  }

  //收到heart
  void receivedBlockReport() {
    if (heartbeatedSinceFailover) { // Failover以后已经收到了heartbeat
      blockContentsStale = false; // 将DataNodeStorageInfo置为非Stale状态
    }
    blockReportCount++;
  }
postponedMisreplicatedBlocks中Block的添加

上面讲过,postponedMisreplicatedBlocks中的所有Block都是那种1) replica数量超过预期值并且 2) 有Replica在stale DataNode上。
大概有下面的这些情况,NameNode会往postponedMisreplicatedBlocks中添加Block:

  1. BlockManager.run()是一个不断执行并扫描所有Block的Daemon线程,对于每一个Block,一旦发现这个Block存在上面的情况,就放到postponedMisreplicatedBlocks中
  2. 每当收到DataNode的Block report以后,都会对Block的状态进行一系列的检查,同样的,如果发现这个Block满足上面的条件,就放到postponedMisreplicatedBlocks中
  3. 当用户进行了手动降副本操作,比如,手动通过setReplica命令将副本从3降低到2,这时候这个Block的副本数量可能会超过预期并且有副本在Stale DataNode上,就会将这个replica放到postponedMisreplicatedBlocks中
  4. 在尝试处理一个Corrupted Block的时候(markBlockAsCorrupt()方法),也可能将这个Block的某些replica进行invalidate操作(invalidateBlock()操作),如果发现这个replica存在上述情况,也会放到postponedMisreplicatedBlocks中
postponedMisreplicatedBlocks中Block的移除

RedundancyMonitor这个Daemon线程负责重新扫描postponedMisreplicatedBlocks中的每一个Block,用来对这里的Block的状态进行重新的确认。重新扫描的逻辑发生在方法rescanPostponedMisreplicatedBlocks()中:
显然,postponedMisreplicatedBlocks中的Block会发生以下的各种情况:

  1. 最简单的,这个Block依然是over-replicated的:
    • 但是有副本存放在Stale DataNode上,这时候什么也不用做,对这个Block的处理继续延迟
    • 没有副本在Stale DataNode上,那么直接通过processExtraRedundancyBlock()进行over-replicated块的处理(比如删除多余replica等)
  2. 如果块的状态是已经删除,比如,对应文件已经删掉了,则加入到InvalidateBlocks中,这个块将被删除
  3. 这个块并不是COMPLETE状态,不做任何处理,不是块重构的处理范围。
  4. 检查发现这个Block需要进行Reconstruction(下文讲pendingReconstruction的时候会将isNeededReconstruction(),判断一个Block是否需要进行重构),那么就将其从postponedMisreplicatedBlocks中移除,转而加入到neededReconstruction
  5. 如果都不是,那么这个Block的状态完全正常,不需要处理。
2.4.1.3 待定队列(PendingReconstructionBlock)的构造和实现

BlockManager中维护了一个PendingReconstructionBlocks pendingReconstruction对象,用来监控那些尚且需要获取更多已经存储的副本的Block:

class PendingReconstructionBlocks {
  private final Map<BlockInfo, PendingBlockInfo> pendingReconstructions;
  static class PendingBlockInfo {
    private long timeStamp;
    private final List<DatanodeStorageInfo> targets; // 这个block需要复制到的位置

对pendingReconstruction的添加和删除操作

从PendingReconstructionBlocks的定义可以看到,pendingReconstructions是一个以block作为key、以PendingBlockInfo作为value的map,代表这个block对应的PendingBlockInfo里面的target尚未向NameNode进行汇报。我们都知道客户端在写数据并且addBlock()的时候,NameNode会通过PlacementPolicy为这个Block寻找合适的target location并且返回给客户端,在这个block commit了以后, NameNode就会把这个block和对应的 target添加到PendingReconstructionBlocks中,意思是:这个block会写入到这些targets,但是我目前还没有收到这些targets对这个block的汇报,所以就把这个Block和它对应的targets 记录下来,当收到了DataNode的汇报以后,就把这个DataNode从这个block的记录里删除,如果这个blocks的所有targets都汇报了这个block,就可以把这个block从pendingReconstructions中彻底删除。
PendingReconstructionBlocks.increment()方法是向PendingReconstructionBlocks对象中添加targets记录的方法,我们跟踪这个方法的调用者,可以看到什么时候NameNode会往pendingReconstruction中添加block -> targets 记录:

  1. 客户端在addBlock()的时候:往往在创建新的block以前,需要确认当前文件的最后一个block是commit的状态,因此,如果这个block commit成功了,那么会将这当前的最后一个block -> targets 添加到pendingReconstructions中
  2. 客户端在close()文件的时候: 客户端关闭文件的时候,当所有的Block已经写到DataNode,客户端会向NameNode发起close文件的请求,NameNode收到请求以后,会通过addCommittedBlocksToPending()方法,将Block -> targets对应关系添加到pendingReconstructions中
  3. DataNode进行块汇报的时候:NameNode收到来自DataNode的block report,会通过addCommittedBlocksToPending()方法将这个block当前还期待的但是没有收到report的block -> targets的关系放到pendingReconstruction中

PendingReconstructionBlocks.increment()相反,PendingReconstructionBlocks.decrement()会将一个block -> datanode的对应关系从pendingReconstruction中删除,这发生在DataNode在通过DatanodeProtocol.blockReceivedAndDeleted 接口进行增量块汇报的时候,会将这个block -> datanode的对应关系从pendingReconstruction中删除。同时,如果这个block对应的所有targets都已经完成了汇报,就把block从这个 pendingReconstruction中删除。

对pendingReconstruction的超时处理

上面讲过,PendingReconstructionBlocks就像Block的守护者一样,NameNode通过它可以知道目前还有哪些block在等待Datanode向上汇报的信息。正常情况下,一个block存放到在PendingReconstructionBlocks,当所有的targets完成块汇报,很快就会从pendingReconstruction中删除。但是,分布式系统中,任何异常状态都有可能发生,如果pendingReconstruction 中的某个Block所预期的DataNode始终没有全部汇报上来,应该怎么样呢?
原来,为了识别这种很长时间依然没有删除的block, PendingReconstructionBlocks通过一个单独的线程PendingReconstructionMonitor去监控所有添加进来的block并为每个添加进来的block记时,超过指定时间,就会被标记为超时是block。在RedundancyMonitor线程的每一轮运行中,都会通过方法processPendingReconstructions()尝试去处理pendingReconstruction中超时的block:

  private class RedundancyMonitor implements Runnable {

    @Override
    public void run() {
      while (namesystem.isRunning()) { // 这是一个无限循环,只要是active namenode,就不断运行
        try {
          // Process recovery work only when active NN is out of safe mode.
          if (isPopulatingReplQueues()) {
            computeDatanodeWork(); // 选择并且将需要re-construct的节点发送给对应的Node 去执行
            processPendingReconstructions(); // 看看pendingReconstruction 中有哪些block需要加入到neededConstruction 中去
            rescanPostponedMisreplicatedBlocks();


 /**
   * If there were any reconstruction requests that timed out, reap them
   * and put them back into the neededReconstruction queue
   */
  void processPendingReconstructions() {
    // pendingReconstruction主要来自与文件最后close的时候的最后一个block,在这里,如果pendingReconstruction
    // 中的block在指定时间内依然没有完成construction,那么就需要放到neededConstruction中去
    BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks();
    // .......
        for (int i = 0; i < timedOutItems.length; i++) {
          BlockInfo bi = blocksMap.getStoredBlock(timedOutItems[i]);
          NumberReplicas num = countNodes(timedOutItems[i]);
          if (isNeededReconstruction(bi, num)) { //这个pendingReconstruction 中的block的确需要re-construction
            neededReconstruction.add(bi, num.liveReplicas(),
                num.readOnlyReplicas(), num.outOfServiceReplicas(),
                getExpectedRedundancyNum(bi));
          }
        }
      } finally {
        namesystem.writeUnlock();

如上面代码所示,RedundancyMonitor线程会从pendingReconstruction取出超时的blocks,如果发现这些block需要进行重构,则将这需要重构的block加入到neededReconstruction中。下一轮循环过来,就会通过computeDatanodeWork()对needConstruction中需要重构的block进行处理。而一个block是否需要重构的判断入下代码所示:

  boolean isNeededReconstruction(BlockInfo storedBlock,
      NumberReplicas numberReplicas, int pending) {
    return storedBlock.isComplete() &&
        !hasEnoughEffectiveReplicas(storedBlock, numberReplicas, pending);
  }
  // 如果 live + pending replicas 的数量不小于所需要的replica数量,并且(还有pending的replica,或者没有pending的并且已经满足放置策略)
  // 这里的含义是,如果有pending的,那么我们先不用考虑是否满足placement policy
  boolean hasEnoughEffectiveReplicas(BlockInfo block,
      NumberReplicas numReplicas, int pendingReplicaNum) {
    int required = getExpectedLiveRedundancyNum(block, numReplicas); // 先看看需要多少个live的副本
    int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
    return (numEffectiveReplicas >= required) &&
        (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
  }

  // 可容忍的最少的live replica的数量
  public short getExpectedLiveRedundancyNum(BlockInfo block,
      NumberReplicas numberReplicas) {
    // 对于striped block,expectedRedundancy 数量指的是data block + parity block的数量,
    // 对于replication,expectedRedundancy指的是replication factor
    final short expectedRedundancy = getExpectedRedundancyNum(block);
    // 假如当前我的block配置的副本是5, 有2个副本是处于maintenance,那么我期待的live 副本数量是5-2=3
    return (short)Math.max(expectedRedundancy -
        numberReplicas.maintenanceReplicas(), // 处于maintenance中的replica也算是live,这本身就是maintenance的目的,让replica短时间可以容忍丢失,但是处于decommission的不能算live
        // 最小的maintenance 副本数量。对于striped block,最小的maintenance副本数量就是data block 的数量,说明对于
        // stripe block, 不需与data block丢失
        getMinMaintenanceStorageNum(block));
  }
  
  public short getExpectedRedundancyNum(BlockInfo block) {
    return block.isStriped() ?
        ((BlockInfoStriped) block).getRealTotalBlockNum() : // 比如RS(6,2), realTotalBlockNum 就是 6 + 2 = 8
        block.getReplication(); // 连续布局情况下,就是配置的块副本数
  }
  // 能够允许进入maintenance状态
  private short getMinMaintenanceStorageNum(BlockInfo block) {
    if (block.isStriped()) {
      return ((BlockInfoStriped) block).getRealDataBlockNum(); // stripe block实际占用数据的块的数量
    } else {
      return (short) Math.min(minReplicationToBeInMaintenance,
          block.getReplication()); // 或者是replication factor, 或者是最小允许的进入MAINTENANCE状态live replica的数量
    }
  }

可以看到,对于一个Block是否需要进行reconstruction,是基于已经统计好的Replica的各种状态信息的统计(统一存放在NumberReplicas对象中),就是:

  • 这个节点处于COMPLETE状态并且有足够多的有效Replica(参考isNeededReconstruction())
  • 什么是待定副本数(PendingReplica): 就是我们讲的PendingReconstructionBlocks对象维护的replica -> targets信息,即一个block写完(COMMITTED或者COMPLETED)但是NameNode还没收到足够的DataNode汇报,那么预期还需要收到多少个DataNode的汇报的数量。比如3副本的Block,NameNode目前只收到1个DataNode的汇报,那么这个Block的pendingReplicaNum是2。
  • 什么叫有效副本数量(EffectiveReplicas):从方法hasEnoughEffectiveReplicas()的代码 int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;,即存活状态的副本数量再加上待定状态的副本数量。存活状态的副本是有效副本很容易理解,但是为什么待定的副本也算作有效副本呢?因为待定的副本是正常状态下还缺失的待汇报的数量,这种缺失是很正常的,正常情况下只需要再等等就可以,因此,待定的副本也算作有效。
  • 什么叫允许进入Maintenance状态的最小存活副本数:即要想将某一个replica所载的DataNode进入maintenance状态,那么我必须最少有多少个存活的副本?这是为了避免某个节点进入maintenance状态造成了某些数据不可读的状态。从方法getMinMaintenanceStorageNum()可以看到,对于纠删码,如果进入ENTERING_MAINTENANCE状态以后,依然最少有dataBlockNum个副本存活,那么这个replica就可以进入Maintenance状态。对于3副本,这个默认值是1,即ENTERING_MAINTENANCE中如果一个Block有一个存活的副本,那么这个副本就可以正式进入MAINTENANCE状态。这就是为什么StoredReplicaState在进入正式的MAINTENANCE状态以前,有一个ENTERING_MAINTENANCE状态,在ENTERING_MAINTENANCE状态时,DataNode会有对应的Monitor确认所有的Block的live replica数量都大于getMinMaintenanceStorageNum(),才会从ENTERING_MAINTENANCE进入到MAINTENANCE状态,避免进入到MAINTENANCE状态以后导致数据副本缺失。
  • 当前期望的存活副本数量(Expected Live Redundancy): 从getExpectedLiveRedundancyNum()方法的return (short)Math.max(expectedRedundancy - numberReplicas.maintenanceReplicas(),getMinMaintenanceStorageNum(block));代码可以看到,不考虑特殊情况,对于连续布局,期望的副本数量就是配置的副本数,对于纠删码,期待的副本数量就是dataBlocksNum + parityBlocksNum, 比如RS(6,2)中等于8。但是,由于有些Block的部分副本有可能处于maintenance状态,这些副本虽然暂时不可用,但是并没有丢失,因此,期待的副本数量应该减去这些处于Maintenance状态的副本。比如RS(6,2)中,有3个副本都处于maintenance状态,那么我期待的存活副本数量是8-3=5。同时期望的存活副本数量应该不小于允许进入maintenance的最小存活副本数量。
  • 什么叫足够多的有效Replcia: 即当前有效的副本数量不小于期望的副本数量,并且当前还有pending的副本,或者虽然没有pending的副本,但是整个Block的replica分布处于满足PlacementPolicy要求的状态。
    • 这意味着如果有效的副本数量不小于期望的副本数量,并且还有pending的副本,这时候即使整个Block的副本不满足 PlacementPolicy要求的状态,也认为有足够多的有效Replcia,这样判定是因为存在pending的副本,所以认为极有可能当pending的副本的DataNode汇报上来以后,PlacementPolicy就会被满足。
2.4.1.3 重构队列(neededReconstruction)的构造和实现

BlockManager中有一个LowRedundancyBlocks neededReconstruction变量,记录了当前需要进行reconstruction的所有block。
由于不同情况的恢复优先级不同(比如,3副本情况下丢失1个副本和丢失2个副本的紧急程度显然不同,RS(6,2)的情况下丢失1个replica和丢失2个replica的紧急程度显然不同),LowRedundancyBlocks对象根据紧急程度定义了优先级,每个优先级都有一个对应的block队列,代表了这个block需要进行该优先级的re-construct。

class LowRedundancyBlocks implements Iterable<BlockInfo> {
  static final int QUEUE_HIGHEST_PRIORITY = 0;
  static final int QUEUE_VERY_LOW_REDUNDANCY = 1;
  static final int QUEUE_LOW_REDUNDANCY = 2;
  static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3;
  static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;

显然,对于连续布局和条带布局,确定优先级的基本思想一致,但是计算方式并不相同。我们通过getPriorityContiguous()方法和getPriorityStriped()方法可以清晰地看到对于两种布局方式确定其优先级的计算逻辑。
先抛开布局方式的差异,一般来讲,各种优先级的含义是:

  • QUEUE_HIGHEST_PRIORITY的最高优先级指的是这个block再丢失一个replica就会corrupt(无法再通过re-construct恢复),因此情况非常紧急。
  • QUEUE_VERY_LOW_REDUNDANCY指的是这个Block再丢一个replica依然还有恢复的可能,但是丢两个就无法恢复了。
  • QUEUE_LOW_REDUNDANCY则是指这个Block还能容忍两个或者两个以上的Replica的丢失而依然可以通过reconstruct来恢复数据。
  • QUEUE_WITH_CORRUPT_BLOCKS则代表这个Block已经corrupt,不可能通过reconstruct恢复了,因此不用对其构建reconstruction任务。
    /**
   * @param curReplicas 当前live的replica
   * @param readOnlyReplicas 处在READONLY状态的replica,
   * @param outOfServiceReplicas 指的是状态处在MAINTENANCE_NOT_FOR_READ ||  MAINTENANCE_FOR_READ || DECOMMISSIONED || DECOMMISSIONING的replica
   * @param expectedReplicas 预期的replica数量,比如我们配置的文件副本数量为3
   */
  private int getPriorityContiguous(int curReplicas, int readOnlyReplicas,
      int outOfServiceReplicas, int expectedReplicas) {
    if (curReplicas == 0) {
      // If there are zero non-decommissioned replicas but there are
      // some out of service replicas, then assign them highest priority
      if (outOfServiceReplicas > 0) {
        return QUEUE_HIGHEST_PRIORITY;
      }
      if (readOnlyReplicas > 0) {
        // only has read-only replicas, highest risk
        // since the read-only replicas may go down all together.
        return QUEUE_HIGHEST_PRIORITY;
      }
      //all we have are corrupt blocks
      return QUEUE_WITH_CORRUPT_BLOCKS;
    } else if (curReplicas == 1) {
      // only one replica, highest risk of loss
      // highest priority
      return QUEUE_HIGHEST_PRIORITY;
    } else if ((curReplicas * 3) < expectedReplicas) {
      //can only afford one replica loss 还能再承受一个replica loss,即如果还有两个replica loss,数据就丢失了,block就corrupt了
      //this is considered very insufficiently redundant blocks.
      return QUEUE_VERY_LOW_REDUNDANCY;
    } else {
      //add to the normal queue for insufficiently redundant blocks
      return QUEUE_LOW_REDUNDANCY;
    }
  }
  
    /**
   * @param curReplicas 当前live的replica的数量
   * @param outOfServiceReplicas 指的是状态处在MAINTENANCE_NOT_FOR_READ ||  MAINTENANCE_FOR_READ || DECOMMISSIONED || DECOMMISSIONING的replica
   * @param dataBlkNum 配置的预期的数据块的数量,比如RS(6,2)中,dataBlkNum=6
   * @param parityBlkNum 配置的预期的校验块的数量,比如RS(6,2)中,parityBlkNum=2
   */
  private int getPriorityStriped(int curReplicas, int outOfServiceReplicas,
      short dataBlkNum, short parityBlkNum) {
    if (curReplicas < dataBlkNum) {
      // There are some replicas on decommissioned nodes so it's not corrupted
      if (curReplicas + outOfServiceReplicas >= dataBlkNum) { //有一部分replica在decommissiong 节点上,因此它还没有corrupt,但是必须立刻恢复了
        return QUEUE_HIGHEST_PRIORITY;
      }
      return QUEUE_WITH_CORRUPT_BLOCKS; // 已经corrupt了,无法恢复了
    } else if (curReplicas == dataBlkNum) { // 再丢失一个replica数据就无法恢复了
      // highest risk of loss, highest priority
      return QUEUE_HIGHEST_PRIORITY;
    } else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) {
      // can only afford one replica loss
      // this is considered very insufficiently redundant blocks.
      return QUEUE_VERY_LOW_REDUNDANCY; // 再丢失一个replica就corrupt了
    } else {
      // add to the normal queue for insufficiently redundant blocks.
      return QUEUE_LOW_REDUNDANCY;// 一般优先级
    }
  }

在这里插入图片描述
在这里插入图片描述
LowRedundancyBlocks会通过getPriorityContiguous()获取优先级,然后将对应的需要进行reconstruct的block放入到对应的优先级队列中:

  private boolean add(BlockInfo blockInfo, int priLevel, int expectedReplicas) {
    if (priorityQueues.get(priLevel).add(blockInfo)) {
      return true;
    }
    return false;
  }

然后,RedundancyMonitor对应的Daemon 线程会根据LowRedundancyBlocks neededReconstruction中维护的优先级队列,按照优先级选择Block, 构造对应的ReconstructionTask。通过blocksToProcess控制每轮循环最多需要进行re-construct的block的数量,每次循环都通过bookmark的方式从上一轮循环处理的位置接着进行:

 synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
      int blocksToProcess, boolean resetIterators) {
    final List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LEVEL);
    int count = 0;
    int priority = 0;
    HashSet<BlockInfo> toRemove = new HashSet<>();
    // 依次遍历每一个LEVEL的queue,形成一个需要进行re-construct的List<List>, 外层list就是优先级,内层list就是这个优先级下面的需要进行re-construct的block
    for (; count < blocksToProcess && priority < LEVEL; priority++) {
      final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority);
      // 这个PriorityQueue的bookmark,每次循环都从上一次循环的位置开始,而不是重新开始
      final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
      final List<BlockInfo> blocks = new LinkedList<>(); 
      if (!inCorruptLevel) { // 因为corrupt block已经没有恢复的必要了
        blocksToReconstruct.add(blocks);
      }
      for(; count < blocksToProcess && i.hasNext(); count++) {
        BlockInfo block = i.next();
        if (block.isDeleted()) { // 这个block在neededReconstruction中,但是其实已经被NN删除了,因此必须要构造re-construct了
          toRemove.add(block);
          continue;
        }
        if (!inCorruptLevel) {
          blocks.add(block);
        }
      }
      for (BlockInfo bInfo : toRemove) {
        remove(bInfo, priority); // 这个Block已经被系统删除了(比如,文件删除了),因此已经不需要reconstruct了
      }
      toRemove.clear();
    }
  ....
    return blocksToReconstruct;
  }
2.4.1.2 RedundancyMonitor的运行

块的重构是由一个持续运行的Daemon线程RedundancyMonitor负责调度的,由于待重构的Block信息逗存放在BlockManager.neededReconstruction(重构队列)中,因此RedundancyMonitor主要功能就是从重构队列中取出Block,构造重构任务,调度出去。同时,它还负责从待定队列中取出那些超时的Block,以及从延迟队列中取出已经不需要继续延迟(Stale状态结束)的节点,如果这些节点需要重构,则创建重构任务和调度重构任务。

2.4.2 故障块的重构过程

本节将会讲解故障块的重构(Reconstruction)的基本流程。下一节会讲解故障块的恢复(Recovery)过程。
故障块的恢复(Recovery)指的是写数据过程中,由于某些非正常原因,造成最终的块文件没有达到最后的COMPLETE状态,比如,写过程客户端的突然离开,写过程中DataNode的突然当机。
而故障块的重构(Reconstruction)发生在写完成以后,由于系统情况的改变,比如,DataNode宕机,DataNode汇报上来的存储故障,用户手动修改了副本数导致副本数量过多,用户手动进行decommission或者maintenance导致副本的数量变化,都需要进行副本相关的重构。

在这里插入图片描述

  private class RedundancyMonitor implements Runnable {

    @Override
    public void run() {
      while (namesystem.isRunning()) { // 这是一个无限循环,只要是active namenode,就不断运行
        try {
          // Process recovery work only when active NN is out of safe mode.
          if (isPopulatingReplQueues()) {
            computeDatanodeWork(); // 从重构队列中取出Block, 选择并且将需要re-construct的节点发送给对应的Node 去执行
            processPendingReconstructions(); // 从待定队列中取出Block,,如果需要reconstruct,加入到neededReconstruction中
            rescanPostponedMisreplicatedBlocks(); // 从延迟队列中取出Block,如果需要reconstruct,加入到neededReconstruction中
          }

在这里插入图片描述

2.4.2.1 从优先级队列中选择待重构块

RedundancyMonitor的computeDatanodeWork()负责创建块重建的任务并调度出去

我们在上文讲解过,重构队列(neededReconstruction)是一个优先级队列,重构任务的创建和调度肯定是按照优先级从高到底进行。在LowRedundancyBlocks.chooseLowRedundancyBlocks()中可以看到基于优先级从重构队列中获取Block的逻辑:

  int computeBlockReconstructionWork(int blocksToProcess) {
    List<List<BlockInfo>> blocksToReconstruct = null;
    namesystem.writeLock();
    ........
    // Choose the blocks to be reconstructed
      blocksToReconstruct = neededReconstruction // blocksToProcess存储了每次最多可以处理的blocks的数量
          .chooseLowRedundancyBlocks(blocksToProcess, reset); // 每次最多处理blocksToProcess个,然后每次都是从上次的bookmark的位置接着进行
    ......
    // 根据挑选的 需要进行reconstruct的block,对他们进行重新构建
    return computeReconstructionWorkForBlocks(blocksToReconstruct);
  }

 // 选择那些处于风险中的块,按照优先级构成一个2层list。注意,这时候只是选出了需要进行重构的Block,至于怎么重构,需要computeReconstructionWorkForBlocks()来确定
 synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
      int blocksToProcess, boolean resetIterators) {
    final List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LEVEL);

    int count = 0;
    int priority = 0;
    HashSet<BlockInfo> toRemove = new HashSet<>();
    // 依次遍历每一个LEVEL的queue,形成一个需要进行re-construct的List<List>, 外层list就是优先级,内层list就是这个优先级下面的需要进行re-construct的block
    for (; count < blocksToProcess && priority < LEVEL; priority++) {
      // Go through all blocks that need reconstructions with current priority.
      // Set the iterator to the first unprocessed block at this priority level
      // We do not want to skip QUEUE_WITH_CORRUPT_BLOCKS because we still need
      // to look for deleted blocks if any.
      final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority);
      // 这个PriorityQueue的bookmark,每次循环都从上一次循环的位置开始,而不是重新开始
      final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
      final List<BlockInfo> blocks = new LinkedList<>();
      if (!inCorruptLevel) { // 因为corrupt block已经没有恢复的必要了
        blocksToReconstruct.add(blocks);
      }
      .......
    return blocksToReconstruct;
  }
  }

可以看到,chooseLowRedundancyBlocks每次都按照优先级从高到低的顺序,最多选取blocksToProcess 个Block进行重构。请注意,整个分布式系统是一个不断变化的系统,从重构队列中取出的Block很可能已经不需要再重构了,比如(文件已经删除,由于DataNode恢复因此块状态已经完全恢复)等,所以从重构队列中取出的块都会时刻进行状态的重新判断,只有确定这个块的确需要重构,才会放入到blocksToReconstruct。

2.4.2.2 构建待重构块的统计信息和元数据信息,为选择source节点做准备

在上一节选定了需要进行重构的Block(基于复制或者基于纠删码),下面,就开始对这个块的信息进行分析,比如,这个Block是基于复制还是基于基于纠删码,这个Block的每一个replica在哪里(比如EC编码中每一个Internal Block的Storage信息,或者基于复制的Block的每一个Replica的信息),每一个replica的状态是什么样的(LIVE, DECOMMISSIONGING, MAINTENANCE_FOR_READ…),目前丢失因此需要重构的physical block有几个,是哪些(比如EC编码中用internal block index表示,而在复制方式中则只需要关心还缺几个副本)。

由于重构的读取过程显然会带来对应source节点的负载,假如有多种选择方案(比如,ReplicationWork中,有两个副本可用,只需要选择一个source replica拷贝到第三个节点,选哪个更好?),那么基本原则是选择负载最轻的、对系统运行影响最小的节点。

// 为chooseLowRedundancyBlocks()返回的blocks构造BlockReconstructionWork,并调度出去
 int computeReconstructionWorkForBlocks(
      List<List<BlockInfo>> blocksToReconstruct) { // 按照优先级进行排列的,每一个List<BlockInfo>都是这个对应优先级的block的列表
    List<BlockReconstructionWork> reconWork = new ArrayList<>();
    ....
      // 对这些block的重构任务进行分类,构造BlockReconstructionWork的具体实现,ErasureCodingWork或者ReplicationWork
        for (int priority = 0; priority < blocksToReconstruct 
            .size(); priority++) { // 优先级从高到低,0的优先级最高
          for (BlockInfo block : blocksToReconstruct.get(priority)) {
            // 创建对应的ReconstructionWork,但是还没有到挑选节点的阶段
            BlockReconstructionWork rw = scheduleReconstruction(block,
                priority);
            if (rw != null) {
              reconWork.add(rw);
            }
        ......

选择Source节点的逻辑在方法chooseSourceDataNodes()中,其基本思路为:

  • 一个非正常不可读状态的Replica,肯定不会作为source节点,比如,StoredReplicaState.CORRUPT,StoredReplicaState.EXCESS,StoredReplicaState.MAINTENANCE_NOT_FOR_READ,StoredReplicaState.DECOMMISSIONED
  • 考虑一个节点上目前已经分配的replica任务,超过了hard limit(dfs.namenode.replication.max-streams-hard-limit,默认值为4), 即使是优先级最高的重构任务,也不会将该节点作为source节点
  • 对于一个处于正在decommission(DECOMMISSIONING)或者正在准备进入maintenance(MAINTENANCE_FOR_READ),是比普通的正常replica更应该被选为source,因为这种状态的节点的workload往往很低,因此重构过程中对系统影响较小,其具体规则为:
  • 如果是最高优先级(LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY),不管该节点目前是LIVE,或者DECOMMISSIONING或者MAINTENANCE_FOR_READ, 只要该节点当前正在运行和准备运行的replica任务数量没有超过hard limit(dfs.namenode.replication.max-streams-hard-limit),那么依然会作为source nodes。
  • 或者,虽然不是最高优先级,但是这个replica的状态是DECOMMISSIONING或者MAINTENANCE_FOR_READ,即使超过了soft limit(dfs.namenode.replication.max-streams),那么依然会作为source,因为这两个状态的Storage往往负载较低,是优选的对象。
  • 或者,虽然优先级不高,并且replica的状态是LIVE状态(不是优选的DECOMMISSIONING或者MAINTENANCE_FOR_READ状态),只要该节点上的任务没有超过soft limit(dfs.namenode.replication.max-streams),这个节点就可以作为source

下面是chooseSourceDatanodes()的代码,请注意,这些参数比如containingNodes, nodesContainingLiveReplicas,numReplicas,liveBlockIndices,liveBusyBlockIndices都是将这个Block对以的状态信息从方法里面带出方法外,供chooseSourceDatanodes()的调用者scheduleReconstruction()进行重构的下一步调度。

  • containingNodes表示包含这个Block的某个replica的节点
  • nodesContainingLiveReplicas 表示包含这个Block的某个replica并且状态为LIVE的节点的信息
  • numReplicas 就是这个Block的各个replica的在各个StoredReplicaState的统计信息
  • liveBlockIndices 只针对条带布局,可以看到liveBlockIndices不仅仅是包含LIVE状态的replica,而是经过chooseSourceDataNodes()的判断,认为可以从上面读取replica的节点的internal block index(比如节点是LIVE的状态,节点是MAINTENANCE_FOR_READ或者DECOMMISSIONING状态等),都放入到liveBlockIndices中。
  • liveBusyBlockIndicies 只针对条带布局,所有处于LIVE或者DECOMMISSIONING并且这个节点上目前已经存在的复制任务超过一定数量
DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
      List<DatanodeDescriptor> containingNodes,
      List<DatanodeStorageInfo> nodesContainingLiveReplicas,
      NumberReplicas numReplicas,  // 传到这里的numReplicas是一个刚刚初始化、空的统计信息
      List<Byte> liveBlockIndices,
      List<Byte> liveBusyBlockIndices, int priority) {
    
    if (isStriped) {
      int blockNum = ((BlockInfoStriped) block).getTotalBlockNum(); // data unit + parity unit
      liveBitSet = new BitSet(blockNum);
      decommissioningBitSet = new BitSet(blockNum);
    }

    for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) { // 对于存储这个Block的每一个 DatanodeStorageInfo
      final DatanodeDescriptor node = getDatanodeDescriptorFromStorage(storage);//
      final StoredReplicaState state = checkReplicaOnStorage(numReplicas, block, // 获取这个节点上的 replica的状态
          storage, corruptReplicas.getNodes(block), false);
      if (state == StoredReplicaState.LIVE) { //存活状态
        if (storage.getStorageType() == StorageType.PROVIDED) {
          storage = new DatanodeStorageInfo(node, storage.getStorageID(),
              storage.getStorageType(), storage.getState());
        }
        nodesContainingLiveReplicas.add(storage); // 加入存活列表
      }
      containingNodes.add(node);

      // do not select the replica if it is corrupt or excess
      if (state == StoredReplicaState.CORRUPT ||
          state == StoredReplicaState.EXCESS) { // CORRUPT和EXCESS状态不考虑
        continue;
      }

      // Never use maintenance node not suitable for read
      // or unknown state replicas.
      if (state == null
          || state == StoredReplicaState.MAINTENANCE_NOT_FOR_READ) {
        continue;// 不可读状态不考虑
      }
      if (state == StoredReplicaState.DECOMMISSIONED) { // 已经decommissioned,这个replica不考虑
        if (decommissionedSrc == null ||
            ThreadLocalRandom.current().nextBoolean()) {
          decommissionedSrc = node;
        }
        continue;
      }
      byte blockIndex = -1;
      if (isStriped) {
        blockIndex = ((BlockInfoStriped) block)
            .getStorageBlockIndex(storage); // 这个节点所存储的internal block的block index
        countLiveAndDecommissioningReplicas(numReplicas, state,
            liveBitSet, decommissioningBitSet, blockIndex);
      }

      if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
          && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
          && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) { // maxReplicationStreams默认值是2
        if (isStriped && (state == StoredReplicaState.LIVE
            || state == StoredReplicaState.DECOMMISSIONING)) {
          liveBusyBlockIndices.add(blockIndex);
        }
        continue; // already reached replication limit 已经超过限制,查看下一个replica
      }


      if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) { // replicationStreamsHardLimit默认值是4
        if (isStriped && (state == StoredReplicaState.LIVE
            || state == StoredReplicaState.DECOMMISSIONING)) {
          liveBusyBlockIndices.add(blockIndex);
        }
        continue;
      }

      if(isStriped || srcNodes.isEmpty()) {
        srcNodes.add(node);
        if (isStriped) {
          liveBlockIndices.add(blockIndex); // 加入到healthy block中
        }
        continue;
      }
      .....
    } //循环结束
    //  对于基于复制的布局方式,如果没有一个存活的节点包含replica,还没有找到一个srcNodes,但是发现了一个已经decommissioned节点包含,那么,依然准备使用这个 decommissioned节点
    // Pick a live decommissioned replica, if nothing else is available.
    if (!isStriped && nodesContainingLiveReplicas.isEmpty() &&
        srcNodes.isEmpty() && decommissionedSrc != null) {
      srcNodes.add(decommissionedSrc);
    }
    // 如果是基于复制,那么srcNodes肯定只有一个元素
    return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]);
  }

scheduleReconstruction()通过chooseSourceDatanodes()选定Source节点以后,就个妞刚刚选择节点过程中获取的Block的source 节点的一些统计信息比如containingNodes, nodesContainingLiveReplicas,liveBlockIndices,liveBusyBlockIndices,计算还需要多少个replica需要重构,将这些综合信息封装成BlockReconstructionWork,然后开始进行重构任务的调度。
块的重构细节都被封装在对应的BlockReconstructionWork对象中,根据块布局方式的不同,分为ErasureCodingWorkReplicationWork两种实现。显然,ReplicationWork不需要block index的这类信息的,但是ErasureCodingWork需要。在方法的,两个子类都重写chooseTargets()方法和addTaskToDataNode()方法,因为在选择目标节点上,不同的块布局有不同的PlacementPolicy,同时,在将任务添加到DataNode的时候,也有不同的逻辑。下文将会详细讲解。

在这里插入图片描述

class ErasureCodingWork extends BlockReconstructionWork {
  private final byte[] liveBlockIndicies;
  private final byte[] liveBusyBlockIndicies;
  private final String blockPoolId;

  public ErasureCodingWork(String blockPoolId,
                           BlockInfo block,
      BlockCollection bc,
      DatanodeDescriptor[] srcNodes,// srcNodes.size()和liveBlockIndicies.size()一样,并且liveBlockIndicies相同位置的值就是对应的internal block在group中的位置
      List<DatanodeDescriptor> containingNodes, // 包含这个Block的所有的节点,包括DECOMMISSION和MAINTENANCE节点
      List<DatanodeStorageInfo> liveReplicaStorages, // 包含这个Block的Live节点,因此不包含DECOMMISSION和MAINTENANCE
      int additionalReplRequired,  // 还缺少个replica
      int priority,
      byte[] liveBlockIndicies,// 只针对条带布局,所有live状态的replica的internal block index
      byte[] liveBusyBlockIndicies) { // 只针对条带布局,所有处于LIVE或者DECOMMISSIONING并且这个节点上目前已经存在的复制任务超过一定数量
    super(block, bc, srcNodes, containingNodes,
        liveReplicaStorages, additionalReplRequired, priority);
    this.blockPoolId = blockPoolId;
    this.liveBlockIndicies = liveBlockIndicies;
    this.liveBusyBlockIndicies = liveBusyBlockIndicies;
  }
class ReplicationWork extends BlockReconstructionWork {
  public ReplicationWork(BlockInfo block, 
                         BlockCollection bc,
                         DatanodeDescriptor[] srcNodes, // 源节点,对于ReplicationWork, srcNodes的大小一定是1
                         List<DatanodeDescriptor> containingNodes,// 包含这个Block的所有的节点,包括DECOMMISSION和MAINTENANCE节点
                         List<DatanodeStorageInfo> liveReplicaStorages,  // 包含这个Block的Live节点,因此不包含DECOMMISSION和MAINTENANCE
                         int additionalReplRequired, // 还需要多少个副本
      int priority) {
    super(block, bc, srcNodes, containingNodes,
        liveReplicaStorages, additionalReplRequired, priority);
    // 构造ReplicationWork的时候,还没有确定target节点,但是先进行计数
    getSrcNodes()[0].incrementPendingReplicationWithoutTargets();

scheduleReconstruction()方法的具体代码如下,其基本逻辑为:

  • 先通过调用getExpectedLiveRedundancyNum()看看我当前期望这个Block有多少个副本存活。上文在讲getExpectedLiveRedundancyNum()方法的时候已经详细讲解了期望的存活副本数量(Expected Live Redundancy)的具体含义。

  • 基于当前的期望副本数,和这个副本当前的实际情况(有多少replica是正常的LIVE状态,有多少个replica是pending状态(即pendingReconstructions中的副本,pendingReconstructions的含义上文已经讲过)),我还额外需要多少个副本呢?这是通过下面的代码计算的:

     if (numReplicas.liveReplicas() < requiredRedundancy) { // 当前的live replica还不够requiredRedundancy的数量
          additionalReplRequired = requiredRedundancy - numReplicas.liveReplicas()
              - pendingNum;
        }
    
  • 即使在数量上我看起来不再需要副本(当前Block的实际存活的副本数已经不小于期望的存活副本数量(Expected Live Redundancy)),那有可能副本的位置并不满足PlacementPolicy的需求,这也是需要进行重构的,代码如下:

    else { // 尽管副本总数量够了,但是有可能按照Placement policy,副本的位置不符合要求
          // Violates placement policy. Needed on a new rack or domain etc.
          BlockPlacementStatus placementStatus = getBlockPlacementStatus(block);
          additionalReplRequired = placementStatus.getAdditionalReplicasRequired();
        }
    
  • 对于条带布局,在确定targets的数量的时候,还去掉了正处在DECOMMISSIONING和处在MAINTENANCE_FOR_READ的Replica,因为这些replica目前算作LIVE的replica(查看chooseSourceDatanodes()在确定liveBusyBlockIndicies的基本逻辑),而这些Replica即将不可读(因为即将进入DECOMMISSIONED和MAINTENANCE_NOT_FOR_READ状态),因此必须尽快利用这些节点上的Replica进行其它Replica的重构。

  • 在按照上述逻辑确定了targets的数量以后,构造ErasureCodingWork或者ReplicationWork对象,开始选择目标节点。

    // 虽然创建了ErasureCodingWork但是有可能是进行replica work,具体调度出去的时候会进行具体分析
          return new ErasureCodingWork(getBlockPoolId(), block, bc, newSrcNodes,
              containingNodes, liveReplicaNodes, additionalReplRequired,
              priority, newIndices, busyIndices);
        } else {
          return new ReplicationWork(block, bc, srcNodes,
              containingNodes, liveReplicaNodes, additionalReplRequired,
              priority);
        }
    
2.4.2.3 选择target节点

目标节点是重构的时候需要写入重构数据的节点。显然,这刚好是PlacementPolicy做的事情。代码如下:

 //  为每一个reconstruction work挑选target节点
    for (BlockReconstructionWork rw : reconWork) {
      // Exclude all of the containing nodes from being targets.
      // This list includes decommissioning or corrupt nodes.
      final Set<Node> excludedNodes = new HashSet<>(rw.getContainingNodes());

      // Exclude all nodes which already exists as targets for the block
      List<DatanodeStorageInfo> targets = // 排除掉这个Block当前所在的targets
          pendingReconstruction.getTargets(rw.getBlock());
      if (targets != null) {
        for (DatanodeStorageInfo dn : targets) {
          excludedNodes.add(dn.getDatanodeDescriptor());
        }
      }

      //根据布局方式(连续布局还是条带布局),选择合适的块放置策略
      final BlockPlacementPolicy placementPolicy =
          placementPolicies.getPolicy(rw.getBlock().getBlockType());
      rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes);// 最终会调BlockPlacementPolicy.chooseTarget()
    }

从上面的代码可以看到,选择目标节点是通过调用BlockReconstructionWork.chooseTargets()方法实现的,这个方法在父类BlockReconstructionWork上是抽象方法,需要子类去实现。两个子类对这个方法的实现方式大致一致,都是选择PlacementPolicy.chooseTargets()去调度,这里不贴二者实现的代码。

在上文讲条带布局的写过程时我们说过,连续布局的默认块放置策略BlockPlacementPolicyDefault会将第一个replica放在一个rack的某台机器上,另外两个replica会放在另外一个rack的两台不同机器上。而条带布局的默认块放置策略是BlockPlacementPolicyRackFaultTolerant,倾向于讲所有的internal block放到不同的基架上去。
我们看一下BlockPlacementPolicy接口,就知道它需要哪些信息:

  public abstract DatanodeStorageInfo[] chooseTarget(String srcPath, // 这个块对应的文件的path
                                             int numOfReplicas, // 额外需要的节点数量
                                             Node writer, // 哪个节点负责写数据,这种情况下,如果这个节点的确是一个DataNode,并且这个DataNode不在excludedNodes里面,那么第一个replica会写到这个节点上去
                                             List<DatanodeStorageInfo> chosen, // 已经被选择作为targets的节点
                                             boolean returnChosenNodes, // 是否返回被选择的节点
                                             Set<Node> excludedNodes, // 需要排除的节点
                                             long blocksize, // 一个group中的data block的大小的总和
                                             BlockStoragePolicy storagePolicy, // 这个节点应该放在哪种类型(StorageType比如SSD,DISK等)的介质上
                                             EnumSet<AddBlockFlag> flags //分配block的时候的一些hint(暗示)的flag信息以修改这个PlacementPolicy的默认行为,比如默认会将第一个replica放到writer所在的DataNode上,但是如果指定了NO_LOCAL_WRITER,就不会把第一个replica放到跟writer在一个host上
                                             );

在调用BlockPlacementPolicy.chooseTarget()进行目标节点选择的时候,基本思路是,我告诉你 1) 已经选择作为target的节点(chosen) 2) 需要排除的节点,即不能作为目标的节点,那么,请根据你的BlockPlacementPolicy的实现,为我选择numOfReplicas个节点作为target节点。

  • chosen: 在块重构的场景下,已经选择作为target的节点(chosen)是这个Block的处于LIVE状态的replica。为什么需要这个参数,可以参考具体BlockPlacementPolicy的代码,本文不做论述。
  • excludedNodes: 而根据上面的代码,我们知道excludedNodes的选择规则,即只要是这个Block的replica所在的节点(存放在containingNodes中,无论节点的状态),以及这个Block还处于pending状态的节点(上文讲过pendingReconstruction,这些节点是写Block完成但是还没有进行Block report的节点,我们assume它过一段时间就会汇报,因此重构块的其它节点的时候,不应该选择它)。
2.4.2.4 将任务调度出去

到这一阶段,我们已经构建了BlockReconstructionWork的具体实现,ErasureCodingWork或者ReplicationWork,包含了srcNodes(这个Block的每一个replica所在的并且满足一定条件比如节点状态,当前上面已经有的replication的task的数量),containingNodes(这个Block的每一个replica所在的节点,无论什么状态,无论上面已经有多少replication的task),targets(重构的目标节点),liveBlockIndicies(处于LIVE状态或者其他状态但是满足要求的replica的internal block index,只针对条带布局),liveReplicaStorages(处于LIVE状态的DataNodeStorage),现在可以将任务调度出去了。调度的主要逻辑在方法validateReconstructionWork()中(方法名字似乎跟它做的事情并不一致。。。。)

    // 将task调度给对应的DN,进行重构
    namesystem.writeLock();
    try { // 遍历每一个reconstruction work, 然后调度出去
      for (BlockReconstructionWork rw : reconWork) {
        .......
        synchronized (neededReconstruction) {
          if (validateReconstructionWork(rw)) { //validateReconstructionWork会将任务调度出去,即attach到对应的DataNode,等待收到心跳,就把attach到DataNode的任务作为response发送给DataNode
          ....
      }
    .....
    return scheduledWork;

validateReconstructionWork()做了以下事情

  1. 再次判断是否有足够副本,是否真的需要重构
    首先再次确认BlockReconstructionWork对应的Block的确没有足够的redundancy,即是否有足够的有效副本数。如果有效副本数已经足够,则取消任务。关于有效副本数(hasEnoughEffectiveReplicas()方法)的定义,上文已经讲解。
        NumberReplicas numReplicas = countNodes(block);// 再次统计这个block当前的状态统计信息
        final short requiredRedundancy =
            getExpectedLiveRedundancyNum(block, numReplicas);
        final int pendingNum = pendingReconstruction.getNumReplicas(block);
        if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) { // 算上pending的,已经有足够的replica,不需要进行re-construct了
          neededReconstruction.remove(block, priority);
          .......
          return false;
        }
    
  2. 是副本不够,还是副本够但是机架不够
    这里的考虑是,一个重构任务,包括ErasureCodingWork,有可能并不是因为某一个replica的缺失,而是,尽管replica并不缺失,但是位置不对,比如,纠删码要求每个replica都尽量在不同的机架上,但是现在却发现有两个replica在同一机架上,并且有多余的机架可以满足要求,此时依然需要进行重构,只不过,即使是ErasureCodingWork,也是通过复制的方式去实现(将一个replica从一个rack拷贝到另一个rack以)。
    if ((numReplicas.liveReplicas() >= requiredRedundancy) &&
            (!placementStatus.isPlacementPolicySatisfied())) { // 如果replica数量足够,仅仅是分布方式不满足放置策略
          BlockPlacementStatus newPlacementStatus =
              getBlockPlacementStatus(block, targets); // 当添加了targets以后,获取对应的status
          // 节点添加进来以后,依然没有满足当前的放置策略,并且,即使把target加进来,还需要额外的节点数量大于不加入target的时候还需要的节点数量,那么,加入这些target没有任何价值
          // 也就是说,如果这些targets加进来以后,放置策略被满足,或者,虽然依然不满足,但是至少让所需要的节点数量减少了,那么,把这些target加进来就有意义
          if (!newPlacementStatus.isPlacementPolicySatisfied() &&
              (newPlacementStatus.getAdditionalReplicasRequired() >=
                  placementStatus.getAdditionalReplicasRequired())) { // 分布不满足分布策略,
            // If the new targets do not meet the placement policy, or at least
            // reduce the number of replicas needed, then no use continuing.
            return false;
          }
          // mark that the reconstruction work is to replicate internal block to a
          // new rack.
          rw.setNotEnoughRack(); // 仅仅需要把replica放到一个新的rack上
        }
    
    每一个BlockPlacementPolicy都必须实现 verifyBlockPlacement()方法,它返回一个BlockPlacementStatus对象,封装了当前的副本放置是否满足要求、还需要几个副本等等信息,供调用者进行决策。
     public abstract BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, int numOfReplicas);
    
    	public interface BlockPlacementStatus {
    	  public boolean isPlacementPolicySatisfied(); // 当前的放置状态是否满足了放置策略
    	  public String getErrorDescription();// 当前的放置状态没有满足放置策略的一些错误或者描述信息
    	  int getAdditionalReplicasRequired(); // 还需要多少个额外副本才能保证放置策略得到满足
    	}
    

可以看到,如果副本数量足够但是verifyBlockPlacement()返回的BlockPlacementStatus显示Block当前的Replica放置不满足要求,那么就会调用BlockReconstructionWork.setNotEnoughRack(),将boolean notEnoughRack置为True,这个变量将会影响到向DataNode添加任务的逻辑,因为如果notEnoughRack=True,那么即使是ErasureCodingWork,这个任务也不再是进行encode/decode类型的计算,而是通过将某个internal block进行拷贝来买满足PlacementPolicy的要求。下文会详细介绍向DataNode添加任务的流程。
3. 将任务Attach到DataNode
在BlockReconstructionWorkd的类图中可以看到,addTaskToDataNode()方法是一个抽象方法,因此ErasureCodingWork和ReplicationWork对这个方法都有自己的实现。

  • ErasureCodingWork的任务分配
    请注意,ErasureCodingWork实际分配出去的任务不一定就是纠删码的encode/decode任务,也可能也是复制任务。其基本逻辑是:
    1. 是否是不缺副本但是缺少机架。在纠删码场景下,缺少rack的意思是,有rack上的存放了这个Block的多个副本。在这种情况下,通过chooseSource4SimpleReplication(),选择一个Replica的DataNode,将任务调度给这个DataNode,任务是:将你上面的这个replica 挪到target上面去。选择这个DataNode的原则也很直观:看看这个Block在哪个Rack上存放了多个replica,那么就选择这个Rack上的某个DataNode,让这个DataNode自己负责把自己的Replica挪走。
    2. 是否整个副本数量足够(无需通过encode/decode进行internal block的重算),但是当前有的Replica的状态是DECOMMISSIONING或者MAINTENANCE_FOR_READ的状态。这种情况下,会给每一个处在DECOMMISSIONING或者MAINTENANCE_FOR_READ状态的DataNode分发任务,任务的内容是:将你上面这个Block的replica立刻拷贝到远程的targets机器上去。
    3. 既不是缺少机架,同时也不是有节点即将进行decommioning或者maintenance,而是的确缺少internal block。这种情况下,的确需要调度纠删码的encode/decode任务。调度的基本逻辑是,将任务调度给选定的targets节点中的第一个节点,显然,这个节点将会负责从source节点上读取internal block,通过encode或者decode,让生成的一个或者多个internal block,发送到targets上去。由于它自己本身就是一个target,显然有一个internal block是给分配给自己的。
  • ReplicationWork的任务分配
    ReplicationWork的任务就很简单了,都是基于拷贝的多副本情况下副本数量不够,因此将任务调度给source节点的第一个节点,这个节点负责将自己存放的这个replica拷贝到target节点上去。

将任务attach到DataNode是通过DatanodeDescriptor.addBlockToBeReplicated()和DatanodeDescriptor.addBlockToBeErasureCoded()方法进行的。我们从下面的DatanodeDescriptor的类图可以看到,DatanodeDescriptor上保存了NameNode即将发送给该DataNode的各种类型的副本操作信息,这些信息将会在下一次DataNode到来的时候作为repsonse发送给DataNode.

在这里插入图片描述
从下面的代码可以看到,addBlockToBeReplicated()和addBlockToBeErasureCoded()就是将对应的任务添加到该DatanodeDescriptor的对应的任务队列replicaBlocks和erasurecodeBlocks中去。

---------------------------------------------DatanodeDescriptor----------------------------------------------------
  
  public void addBlockToBeReplicated(Block block, // 这个block是internal block
      DatanodeStorageInfo[] targets) {
    assert(block != null && targets != null && targets.length > 0);
    replicateBlocks.offer(new BlockTargetPair(block, targets));
  }

  /**
   * Store block erasure coding work.
   */
  void addBlockToBeErasureCoded(ExtendedBlock block,
      DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets,
      byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) {
    assert (block != null && sources != null && sources.length > 0);
    // 构造这个BlockECReconstructionInfo的block是一个block group,即我还不知道需要对哪个internal block进行重算
    BlockECReconstructionInfo task = new BlockECReconstructionInfo(block,
        sources, targets, liveBlockIndices, ecPolicy);
    erasurecodeBlocks.offer(task);
  }
  1. 任务的派发
    NameNode从来不会主动向DataNode发送信息(你清高行了吧?),只会在收到DataNode发送过来的心跳信息的时候,将任务通过心跳的响应返回给DataNode。

当NameNode 收到了DataNode发过来的信息,会在handleHeartbeat()方法中查看当前需要回复给这个DataNode的命令,其中就包括需要该DataNode复制replica到别的节点的命令,或者进行纠删码的encode/decode以恢复纠删码内部块的命令。
所有的命令封装在DatanodeCommand中的对应具体实现类中,比如,对Block进行基于复制的重构new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, pendingList), 对Block进行基于纠删码编解码的重构new BlockECReconstructionCommand( DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList), 删除一个Block的Replica的命令new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks)等等。几个重要的DatanodeProtocol:

  final static int DNA_TRANSFER = 1;   // 将副本从一个节点传输到另一个节点
  final static int DNA_INVALIDATE = 2; // 删除一个replica
  final static int DNA_SHUTDOWN = 3;   // 关闭节点
  final static int DNA_REGISTER = 4;   // 重新注册
  final static int DNA_RECOVERBLOCK = 6;  // 恢复一个eblock
  final static int DNA_ACCESSKEYUPDATE = 7;  // update access key
  final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // 更新balancer的带宽
  final static int DNA_CACHE = 9;      // 缓存一个block
  final static int DNA_UNCACHE = 10;   // 将一个block从缓存中拿掉
  final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // 基于纠删码的重构命令
2.4.2.5 DataNode对重构任务的处理

在DataNode端,DataNode会为每一个NameNode(每一个NameService的每一个NameNode,Active或者Standby)构建一个BPOfferService,用来处理这个NameNode发过来的命令。最终,会在BPOfferService.processCommandFromActive()处理来自Active NameNode的命令

----------------------------------------------BPOfferService------------------------------------------------
boolean processCommandFromActor(DatanodeCommand cmd,
      BPServiceActor actor) throws IOException {
    .......
    try {
      if (actor == bpServiceToActive) {
        return processCommandFromActive(cmd, actor);
      } else {
        return processCommandFromStandby(cmd, actor);
      }


 private boolean processCommandFromActive(DatanodeCommand cmd,
      BPServiceActor actor) throws IOException {
    .......
    switch(cmd.getAction()) {
    case DatanodeProtocol.DNA_TRANSFER: // 处理replica复制任务非常简单,就是根据副本信息,将副本复制到指定的远程节点
      // Send a copy of a block to another datanode
      dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(),
          bcmd.getTargets(), bcmd.getTargetStorageTypes(),
          bcmd.getTargetStorageIDs());
      break;
    case DatanodeProtocol.DNA_INVALIDATE:
    ......
    case DatanodeProtocol.DNA_CACHE:
    .....
    case DatanodeProtocol.DNA_UNCACHE:
    .......
    case DatanodeProtocol.DNA_SHUTDOWN:
    ......
    case DatanodeProtocol.DNA_FINALIZE:
    ......
    case DatanodeProtocol.DNA_RECOVERBLOCK:
    .......
    case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
    .......
      // 对于continuous block的恢复,参考DatanodeProtocol.DNA_TRANSFER
    case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
      LOG.info("DatanodeCommand action: DNA_ERASURE_CODING_RECOVERY");
      Collection<BlockECReconstructionInfo> ecTasks =
          ((BlockECReconstructionCommand) cmd).getECTasks();
      dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
      .......
    }
    return true;
  }

NameNode对应,DatanodeProtocol.DNA_TRANSFER用来进行副本的复制,而DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION用来进行纠删码的块重构。基于副本复制的重构方式很简单,我们不做赘述。主要讲解纠删码的块重构。
DataNode会构造一个全局的叫做ErasureCodingWorker的对象负责进行纠删码的块重构任务(注意和NameNode端的ErasureCodingWork类区分开)。DataNode启动的时候会初始化ErasureCodingWorker,在ErasureCodingWorker构造时,会用一个线程池专门执行块的重构任务

 public ErasureCodingWorker(Configuration conf, DataNode datanode) {
    ......
    initializeStripedBlkReconstructionThreadPool(conf.getInt(
        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_THREADS_KEY,
        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_THREADS_DEFAULT));
  }


  private void initializeStripedBlkReconstructionThreadPool(int numThreads) {
    LOG.debug("Using striped block reconstruction; pool threads={}",
        numThreads);
    stripedReconstructionPool = DFSUtilClient.getThreadPoolExecutor(numThreads,
        numThreads, 60, new LinkedBlockingQueue<>(),
        "StripedBlockReconstruction-", false);
    stripedReconstructionPool.allowCoreThreadTimeOut(true);
  }

其实就是创建了一个corePoolSize=8, maxPoolSize=8, keepAliveTime=60s, 任务队列是一个阻塞无界队列LinkedBlockingQueue的线程池。我们都知道,这个在线程池的划分上叫做叫做固定线程数量的线程池(可以参考文章《Java的5中线程池》)。线程池中的每一个线程负责一个Block(注意不是replica)的重构。

------------------------------------------------ErasureCodingWorker---------------------------------------------------
 public void processErasureCodingTasks(
      Collection<BlockECReconstructionInfo> ecTasks) {
    // 对于每一个重构的task
    for (BlockECReconstructionInfo reconInfo : ecTasks) {
      try {
        StripedReconstructionInfo stripedReconInfo =
            new StripedReconstructionInfo(
            reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(),
            reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(),
            reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes(),
            reconInfo.getTargetStorageIDs());
        
        final StripedBlockReconstructor task =
            new StripedBlockReconstructor(this, stripedReconInfo);
        if (task.hasValidTargets()) {
          stripedReconstructionPool.submit(task);

可以看到,ErasureCodingWorker将块的重构信息封装到StripedReconstructionInfo中,然后封装为一个StripedBlockReconstructor,提交给ErasureCodingWorker在初始化的时候构造的固定线程数的线程池stripedReconstructionPool。由此可见,StripedBlockReconstructor肯定是一个Runnable,它的run()方法结构非常清晰,做的事情一目了然:

--------------------------------------------StripedBlockReconstructor----------------------------------------------
class StripedBlockReconstructor extends StripedReconstructor
    implements Runnable {
  private StripedWriter stripedWriter;
  ......
  @Override
  public void run() {
    try {
      initDecoderIfNecessary(); // 根据ECPolicy创建对应的decoder
      initDecodingValidatorIfNecessary();  // 根据ECPolicy创建对应的decoder的validator
      getStripedReader().init();//初始化条带块的读取
      stripedWriter.init(); //初始化条带块的写入
      reconstruct(); // 重构
      stripedWriter.endTargetBlocks(); // 结束目标块的写入,重构完成
      .....

从上面的代码可以看到,StripedBlockReconstructor会首先初始化Decoder和DecodingValidator,然后初始化了Reader(getStripedReader().init())和Writer(stripedWriter.init()),然后进行重构,最后进行一些关闭操作。
具体架构如下图所示:
在这里插入图片描述

关于Decoder和DecodingValidator:

  1. 关于Decoder和DecodingValidator
    Decoder就是通过已有的replica计算出缺失的block,比如RS(6,2),index=(3,6)的块丢失,因此需要通过Decode的方式重算丢失块。DecodingValidator的过程就是将已经Decode的结果重新作为Input进行Decode,看看生成的块与当前的块是否匹配。
    比如,通过decode(d0,d1,d2,d4,p0,p1) 恢复出 (d0, d5), 然后校验过程就可以是decode(d1, d2, d3,d4,d5, p0) 计算出d0’,然后对比d0’和d0,如果一致则校验成功。
    在这里插入图片描述

  2. 关于Reader

StripedReader的构造

显然,纠删码的重构需要从多个Source读取internal block,才能进行重构。比如RS(6,2),应该至少需要从6台DataNode上读取6个internal block(待会儿会讲到文件最后的一个Group可能不需要读这么多的block),才能重构丢失的internal block,和每一个DataNode的通信都需要建立独立的连接,需要多个reader。StripedBlockReconstructor将这些reader的管理交付给StripedReader负责,它管理了StripedBlockReader的List, 每一个StripedBlockReader负责一个DataNode上Replica的读取。
StripeReader的构造方法如下:

	 StripedReader(StripedReconstructor reconstructor, DataNode datanode,
	      Configuration conf, StripedReconstructionInfo stripedReconInfo) {
	    ......
	    dataBlkNum = stripedReconInfo.getEcPolicy().getNumDataUnits();
	    parityBlkNum = stripedReconInfo.getEcPolicy().getNumParityUnits();
	
	    int cellsNum = (int) ((stripedReconInfo.getBlockGroup().getNumBytes() - 1)
	        / stripedReconInfo.getEcPolicy().getCellSize() + 1);
	    minRequiredSources = Math.min(cellsNum, dataBlkNum); // minRequiredSources可能等于data block的数量,有可能等于当前这个stripe的cell数量
	
	    if (minRequiredSources < dataBlkNum) { // cellNum小于dataBlkNum,说明这个block group的总的数据量甚至都没有一个stripe那么多,这往往是一个文件的最后一个Block Group
	      int zeroStripNum = dataBlkNum - minRequiredSources;
	      zeroStripeBuffers = new ByteBuffer[zeroStripNum];
	      zeroStripeIndices = new short[zeroStripNum];
	    }
	    ......
	    readers = new ArrayList<>(sources.length);
	    readService = reconstructor.createReadService(); //用来进行并发读取的StripedBlockReader的线程池
	  }
			```
	```java
	  CompletionService<BlockReadStats> createReadService() {
	    return new ExecutorCompletionService<>(stripedReadPool);
	  }
      // StripedReader的初始化
	  void init() throws IOException {
	    initReaders();
	    initBufferSize();
	    initZeroStrip();
	  }

从刚上面的代码可以看到,在构造的时候主要做了以下3件事情:

  • 确定最少需要从多少个Source读取数据
    • 以RS(6,2),cell大小为1MB为例,为了重构内部物理块,难道不都是需要至少6个内部块吗?答案是不一定。特殊情况发生在文件的最后一个Block Group,假如这个Block Group的大小小于等于(6 - 1) * 1M = 6M,那么就至少有一个internla block是全0的,HDFS对于这种小文件,尽管NameNode会照常分配6+2=8个物理块,但是客户端也只会写有有效数据的块,根本不会写没有任何有效数据的块。这个结果我们在本文的实验验证部分已经验证了。下面的StripedReader的构造方法就是确定最少需要多少个source,即需要构造多少个实际读取数据的reader。剩下的空的 internalblock 也会构建对应的Buffer,但是数据全是0。

      根据上面的计算方式,我们以RS-6-2-1024k为例:
      如果这个BlockGroup的大小是3.5 * 1024k = 1536kB,由于一个cell大小为1024KB, 那么这个Block Group只会有 (3584 - 1)/1024 + 1 = 4个internal block,剩下两个internal Block不会生成。

      如果这个BlockGroup的大小是8 * 1024 = 8192KB,由于一个cell大小为1024KB,那么(8192 - 1)/1024 + 1 = 8,与最多的6取最小值,因此是需要6个internal block。
      两种情况的纠删码的条带布局如下图所示:
      在这里插入图片描述

  • 按照source的数量,构造Reader的数组
  • 构造Reader的线程池readService

可以看到,这里是构造了一个线程池stripedReadPool,corePoolSize=0, maxPoolSize=Integet.MAX_VALUE, keepAliveTime=60s, 任务队列是一个阻塞无界队列SynchronousQueue的线程池。我们都知道,这个在线程池的划分上叫做叫做缓存线程池(Cached Thread Pool,可以参考文章[《Java的5中线程池》]。SynchronousQueue一般用在无界的缓存线程池中,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
请把这个线程池与上文提到的ErasureCodingWorker中构造的线程池stripedReconstructionPool区分开,stripedReconstructionPool中的每一个task是一个StripedBlockReconstructor,每一个task负责一个Block的reconstruction,这里的线程池stripedReadPool的每一个task则负责一个Block下的Replica的读取。

 // 构造线程池
 private void initializeStripedReadThreadPool() {
    // Essentially, this is a cachedThreadPool.
    stripedReadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
        60, TimeUnit.SECONDS,
        new SynchronousQueue<>(),
        new Daemon.DaemonFactory() {
          .....
        },
        new ThreadPoolExecutor.CallerRunsPolicy() {
          .......
        });

    stripedReadPool.allowCoreThreadTimeOut(true);
  }
  // 将线程池封装为ExecutorCompletionService
  CompletionService<BlockReadStats> createReadService() {
    return new ExecutorCompletionService<>(stripedReadPool);
  }
StripedReader的初始化

刚刚说过,在StripedBlockReconstructor.run()中,会通过调用getStripedReader().init()来初始化Reader。
我们可以看到,StripedReader初始化主要是初始化每一个Replica的StripedBlockReader,以及初始化buffer和zeroStripe(上面讲到的空的internal block,这些block没有有效数据,decode运算的时候需要假设上面的数据是0)。
从下面的代码可以看到,一共初始化了minRequiredSources个reader,每一个reader会负责与一个DataNode通信拉去对应的replica,所有的reader放在一个readers数组中。

  void init() throws IOException {
    initReaders();
    initBufferSize();
    initZeroStrip();
  }

  private void initReaders() throws IOException {
    // 初始化一个success list, 代表可以进行internal block读取的DN,由于每次读取的过程中可能发现有些DN不好用,
    // 因此会更新这个list
    successList = new int[minRequiredSources];

    StripedBlockReader reader;
    int nSuccess = 0; // 我们有sources.length个reader,但是实际上我们只需要minRequiredSources个reader
    for (int i = 0; i < sources.length && nSuccess < minRequiredSources; i++) {
      reader = createReader(i, 0); // offset都是0,因为每一个reader都是从自己负责的replica的0的位置开始读取
      readers.add(reader);
      if (reader.getBlockReader() != null) { // 连接已经建立好了
        initOrVerifyChecksum(reader);
        successList[nSuccess++] = i; //successList[nSuccess++]存放了readers数组的索引
      }
    }
    }
  /**
   * 注意,idxInSources只是在liveIndices数组中的索引,而liveIndices[idxInSources]才是internal block在block group中的索引
   */
  StripedBlockReader createReader(int idxInSources, long offsetInBlock) {
    return new StripedBlockReader(this, datanode,
        conf, liveIndices[idxInSources], // liveIndices[idxInSources] 存的是internal block在group中的index
        reconstructor.getBlock(liveIndices[idxInSources]),
        sources[idxInSources], offsetInBlock);
  }

initBufferSize()的代码如下,buffer的大小通过dfs.datanode.ec.reconstruction.stripedread.buffer.size配置,默认64KB。每次构造的时候,minRequiredSources中的每一个StripedBlockReader都会读取bufferSize的数据,进行decode操作,然后写入到target中。

  private void initBufferSize() {
    int bytesPerChecksum = checksum.getBytesPerChecksum(); // 默认一个chunk是512B,算上4B的checksum,是516B
    // The bufferSize is flat to divide bytesPerChecksum
    int readBufferSize = stripedReadBufferSize; // 默认64kb
    // 假如用户配置的stripedReadBufferSize是9KB, bytesPerChecksum是4KB,那么真正的bufferSize只需要8KB就行,不需要9KB
    bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum :
        readBufferSize - readBufferSize % bytesPerChecksum;
  }

initZeroStrip()方法主要是为上文讲的空block构造buffer。空的internal block中虽然没有有效数据,却需要置0来进行统一decode运算。这里不做详细讲解。

  1. 关于Writer
StripedWriter的构造

同StripedReader在逻辑上相似,StripedWriter也是封装了一个或者多个StripedBlockWriter,每一个StripedBlockWriter负责一个target replica的写操作。

 StripedWriter(StripedReconstructor reconstructor, DataNode datanode,
      Configuration conf, StripedReconstructionInfo stripedReconInfo) {
   ........
    /**
     * 有多个writer,可以看到一个StripedWriter负责一个BlockGroup 的一个或者多个internal block的恢复
     * 一个StripedBlockWriter负责一个 replica的生成操作
     */
    writers = new StripedBlockWriter[targets.length];

    targetIndices = new short[targets.length];//targetIndices数组中的每一个元素的值代表了这个target在Block Index中的索引
    Preconditions.checkArgument(targetIndices.length <= parityBlkNum,
        "Too much missed striped blocks.");
    initTargetIndices();
    long maxTargetLength = 0L;
    for (short targetIndex : targetIndices) { // 遍历每一个internal block,获取最大的internal block的长度
      maxTargetLength = Math.max(maxTargetLength,
          reconstructor.getBlockLen(targetIndex));
    }
    reconstructor.setMaxTargetLength(maxTargetLength);
}
  // 初始化targetIndices数组,targetIndices数组的每一个元素代表了需要恢复的block在Block Group中的index
  private void initTargetIndices() {
    BitSet bitset = reconstructor.getLiveBitSet(); // 从StripedReconstructor构造函数可以看到,liveBitSet中存放了liveIndices中的每一个value,即每一个存活的(也包含decommissioning和maintenance_for_read,参考chooseSourceDataNodes()方法)的internal block的index


    int m = 0;
    hasValidTargets = false;
    for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
      if (!bitset.get(i)) { // 缺数据
        if (reconstructor.getBlockLen(i) > 0) { // 的确需要数据
          if (m < targets.length) { // 最多构造m个target
            targetIndices[m++] = (short)i;
            hasValidTargets = true;
          }
          .......
 }

StripedWriter的构造方法主要做了以下几件事情:

  1. 根据targets的数量,初始化StripedBlockWriter[]数组
  2. 设置targetIndices数组。数组中的每一个元素保存了需要进行恢复的internal block在Block Group中的索引。
  3. 设置最大的目标长度
    这里指的是最长的internal block的长度。对于一个小文件,或者一个大文件的最后一个BlockGroup,由于没有将Block Group写满,不一定每一个Internal Block的大小都是满载的128MB,很有可能这个Block Group的所有的Internal Block都很小,因此我们先计算这个Block Group的所有 internal block的最大size,只要写满这个size,就结束了,不用再写了。比如下图,一个文件8MB,最大的internal block也就2MB:
StripedWriter的初始化

刚刚说过,在StripedBlockReconstructor.run()中,会通过调用stripedWriter.init()来初始化writer。
StripedWriter的初始化主要是初始化了写数据过程中的data buffer和checksum buffer。一个data buffer用来存放一个packet,一个packet是有头部信息和多个chunk组成。这里不做赘述

  void init() throws IOException {
    DataChecksum checksum = reconstructor.getChecksum();
    checksumSize = checksum.getChecksumSize();// 4B
    bytesPerChecksum = checksum.getBytesPerChecksum(); // 512B
    int chunkSize = bytesPerChecksum + checksumSize; // 516B
    maxChunksPerPacket = Math.max( // WRITE_PACKET_SIZE是64KB,需要包含PKT_MAX_HEADER_LEN的header
        (WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN) / chunkSize, 1);
    int maxPacketSize = chunkSize * maxChunksPerPacket
        + PacketHeader.PKT_MAX_HEADER_LEN; // 最大的packet的大小

    packetBuf = new byte[maxPacketSize]; // 数据的buffer
    int tmpLen = checksumSize *
        (reconstructor.getBufferSize() / bytesPerChecksum);
    checksumBuf = new byte[tmpLen]; // checksum的buffer的长度
  }
开始重构

上面讲过,方法reconstruct()是重构的具体过程。抛开细节,整个重构是一轮一轮进行,一轮重构是指读取一个Stripe中的source的数据(通过上面的线程池stripedReadPool并发进行,但是显然,在进行decode之前必须等所有task成功结束),进行decode,然后发送给一个或者多个targets (注意,多个targets的发送并没有使用线程池的方式并发进行,为什么?)

void reconstruct() throws IOException {
    // 在internal block中的位置依然小于最大的target internal block的长度, 意味着还需要接着读数据
    while (getPositionInBlock() < getMaxTargetLength()) { // 当前的position还小于最大的internal block的长度
      DataNodeFaultInjector.get().stripedBlockReconstruction();
      long remaining = getMaxTargetLength() - getPositionInBlock();
      final int toReconstructLen = // 最多只能读取buffer size大小的数据(从StripedReader的构造函数可以看到,buffer size肯定是chunk的整数倍)
          (int) Math.min(getStripedReader().getBufferSize(), remaining);
      // step1: read from minimum source DNs required for reconstruction.
      // The returned success list is the source DNs we do real read from
      getStripedReader().readMinimumSources(toReconstructLen);// 从远程最多读取toReconstructLen byte的数据

      // step2: decode to reconstruct targets
      reconstructTargets(toReconstructLen); // 重算,构造数据

      // step3: transfer data
      // 把数据发送给target
      if (stripedWriter.transferData2Targets() == 0) { // 将数据发送给 target
        String error = "Transfer failed for all targets.";
        throw new IOException(error);
      }
      updatePositionInBlock(toReconstructLen);

      clearBuffers();
    }
读取

读取的逻辑很简单,根据需要读取的reconstructLength,并发提交minRequiredSources个任务进行读取操作,每一个任务是一个StripedBlockReader,负责和远程的DataNode通信以读取一个Internal Block,每一个submit都会返回一个Future句柄,供调用者查询结果。提交以后,开始轮询所有的Future。
正常情况下,minRequiredSources个reader都成功,则这一轮读取结束,所有读取的结果都存放在各自的StripedBlockReader的缓存中,供待会儿Decode使用。
如果失败或者超市,则会尝试更换另外一个DataNode读取。显然,只有在minRequiredSources小于liveSources的时候,还有可以更换的DataNode,否则,重构失败。

  int[] doReadMinimumSources(int reconstructLength,
                             CorruptedBlocks corruptedBlocks)
      throws IOException {
    int nSuccess = 0;
    int[] newSuccess = new int[minRequiredSources];
    BitSet usedFlag = new BitSet(sources.length);
    /*
     * Read from minimum source DNs required, the success list contains
     * source DNs which we think best.
     */
    // 只需要真正的从minRequiredSources个reader中读取数据,剩下的则直接向buffer中填充0
    for (int i = 0; i < minRequiredSources; i++) { // 最小的reader数量中的每一个reader都要去读数据
      StripedBlockReader reader = readers.get(successList[i]);// successList[i]代表对应的需要从中拉取数据的block,readers.get将获取这个block对应的StripedBlockReader
      int toRead = getReadLength(liveIndices[successList[i]],
          reconstructLength);
      .....
        Callable<BlockReadStats> readCallable =
            reader.readFromBlock(toRead, corruptedBlocks);
        Future<BlockReadStats> f = readService.submit(readCallable);
        futures.put(f, successList[i]);
        .....
    }

    while (!futures.isEmpty()) { // 一直循环等待futures清空
      try {
        StripingChunkReadResult result =
            StripedBlockUtil.getNextCompletedStripedRead(
                readService, futures, stripedReadTimeoutInMills); // 每次取出一个成功的Future,即一个Replica的读取结果,这个结果有可能是成功,有可能是超时,有可能是失败
        int resultIndex = -1;
        if (result.state == StripingChunkReadResult.SUCCESSFUL) {
          resultIndex = result.index;
        } else if (result.state == StripingChunkReadResult.FAILED) { // 和下面的Timeout的处理策略一样,换一个DataNode读取(因为有可能minRequiredSources小于sources的数量,即还有剩余的DataNode可以尝试 )
          StripedBlockReader failedReader = readers.get(result.index);
          failedReader.closeBlockReader();
          resultIndex = scheduleNewRead(usedFlag,
              reconstructLength, corruptedBlocks);
        } else if (result.state == StripingChunkReadResult.TIMEOUT) {
          resultIndex = scheduleNewRead(usedFlag,
              reconstructLength, corruptedBlocks); // 和上面的Failed的处理策略一样,换一个DataNode读取(因为有可能minRequiredSources小于sources的数量,即还有剩余的DataNode可以尝试 )
        }
        if (resultIndex >= 0) {
          newSuccess[nSuccess++] = resultIndex; // 记录这个成功的读取,如果成功数量足够了,则这一轮结束
          if (nSuccess >= minRequiredSources) { // 足够了,剩下的不用再读了
            cancelReads(futures.keySet());
            clearFuturesAndService();
            break; // 成功数量足够了,这一轮读取结束
          ......
    }
Decode

在minRequiredSources个StripedBlockReader都完成了一轮读取,可以进行这一轮的Decode了。这是通过方法reconstructTargets()进行的:

  private void reconstructTargets(int toReconstructLen) throws IOException {
    // 收集所有的StripedBlockReader刚刚读取的数据
    ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen);

    int[] erasedIndices = stripedWriter.getRealTargetIndices();
    ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen);
    markBuffers(inputs); // 记录当前的position
    decode(inputs, erasedIndices, outputs); // 在这里进行解码,将解码以后的数据写入到outputs中,outputs其实就是StripedBlockWriter中的buf
    resetBuffers(inputs); // 还原position
    ....
    getValidator().validate(inputs, erasedIndices, outputs);
    }
    ......
    stripedWriter.updateRealTargetBuffers(toReconstructLen);
  }

  private void decode(ByteBuffer[] inputs, int[] erasedIndices,
      ByteBuffer[] outputs) throws IOException {
    long start = System.nanoTime();
    getDecoder().decode(inputs, erasedIndices, outputs);
    long end = System.nanoTime();
    this.getDatanode().getMetrics().incrECDecodingTime(end - start);
  }

可以看到,reconstructTargets的任务就是从所有的StripedBlockReader的Buffer中拿到刚刚读取的数据,调用decode(),将数据写入到StripedBlockWriter的Buffer中去。写入完成,并且校验成功,剩下的任务就交给StripedBlockWriter将自己的buffer数据发送给远程的targets。

decode的具体算法我们不做讨论,从其参数可以看到,inputs[]就是所有的StripedBlockReader读取的数据,erasedIndices就是Decode以后生成的internal block的index,outputs就是decode生成的数据即StripedBlockWriter的buffer。可以看到,decode()方法可以在一轮生成多个丢失的targets。

传输到目标target

如上文所讲,此时计算好的internal block的数据已经存放到了对应的index的StripedBlockWriter的buffer 中去,StripedBlockWriter会通过调用调用transferData2Target()将数据发送到对应的target node上去。
上文已经说个,一个StripedBlockWriter和一个待恢复的internal block以及存放这个internal block的target DataNode是一一对应的关系,并且在StripedBlockWriter初始化的时候,已经构建好了和这个远程的DataNode的socket连接。
这个发送的过程根客户端写数据的过程是一模一样的。从原始数据、到以chunk为单位计算checksum、到组装成packet、到将数据通过socket发送给DataNode的过程都放在方法transferData2Target()中:

-------------------------------------------------StripedBlockWriter----------------------------------------------------
 void transferData2Target(byte[] packetBuf) throws IOException {
    // 现在数据已经存放在targetBuffer中(有可能是direct,有可能不是direct),如果targetBuffer是direct,那么计算
    // checksum的时候存放checksum的ByteBuffer也得是direct,如果不是direct(在heap中),那么checksum也存放在heap中
    if (targetBuffer.isDirect()) {
      ByteBuffer directCheckSumBuf =
          BUFFER_POOL.getBuffer(true, stripedWriter.getChecksumBuf().length);
      stripedWriter.getChecksum().calculateChunkedSums(
          targetBuffer, directCheckSumBuf); // 对targetBuffer的数据做校验,写入directCheckSumBuf
      directCheckSumBuf.get(stripedWriter.getChecksumBuf()); // directCheckSumBuf写入到stripedWriter的buf中去
      BUFFER_POOL.putBuffer(directCheckSumBuf); //  归还directCheckSumBuf到BUFFER_POOL中
    } else { // 如果targetBuffer不是direct,那么直接基于数组进行计算
      stripedWriter.getChecksum().calculateChunkedSums( // 只有在非direct(即存放在heap中)的情况下array()方法才会有返回值
          targetBuffer.array(), 0, targetBuffer.remaining(),
          stripedWriter.getChecksumBuf(), 0);
    }

    int ckOff = 0;
    while (targetBuffer.remaining() > 0) { // 一个packet可能装不下这么多数据,因此会分多个packet进行发送
      DFSPacket packet = new DFSPacket(packetBuf,
          stripedWriter.getMaxChunksPerPacket(),
          blockOffset4Target, seqNo4Target++,
          stripedWriter.getChecksumSize(), false);
      int maxBytesToPacket = stripedWriter.getMaxChunksPerPacket() // 单个packet中的chunk数量 * chunk的长度
          * stripedWriter.getBytesPerChecksum();
      int toWrite = targetBuffer.remaining() > maxBytesToPacket ?
          maxBytesToPacket : targetBuffer.remaining(); // 实际发送的数据长度(不包含checksum,checksum此时是单独存放在checksumBuf中的)
      int ckLen = ((toWrite - 1) / stripedWriter.getBytesPerChecksum() + 1)
          * stripedWriter.getChecksumSize();// checksum的总长度,比如,我们需要10个checkum,每个checksum是4B,那么ckLen就是40Byte
      packet.writeChecksum(stripedWriter.getChecksumBuf(), ckOff, ckLen); // 把checksum中的数据存入packet中
      ckOff += ckLen;
      //  把inBuffer中的数据写入到Packet中去(注意并不是发送,只是写入到Packet对应的buf中)
      packet.writeData(targetBuffer, toWrite); // 把存放在targetBuffer中的长度为toWrite的数据存放到packet中
      packet.writeTo(targetOutputStream); // 把packet中的数据发送到targetOutputStream绑定的远程 DN中去
      ......
    }
Target节点接收数据

Target接收写入的internal block的过程和普通客户端写入block的过程完全相同,接收完成以后会将块信息汇报给NameNode,然后NameNode的BlockManager会更新对应的块信息。这个过程本文不做详细讲解。有兴趣的读者可以自行查找资料学习。

一些实验

验证在不同文件大小的情况下的基于复制和基于纠删码的块处理

需要验证的问题:

基于RS-3-2-1024k,一个cell是1024KB, 一个Stripe中有3个Data Cell和2个Parity Cell。假如一个Logical Block(Block Group)的大小是128MB, 那么一个Block Group写满数据,大概会有128MB / (3 * 1024KB) = 42个Stripe, 每个Internal Block大概42M左右。那么问题来了,假如数据写不满一个Block Group,比如一个文件本身就很小,或者一个大文件的最后一个 Block Group,NameNode怎么进行块的管理的呢 :

  1. 假如文件大小只有500KB, 很明显,有效数据只会写到第一个 (index = 0 ) Stripe的第一个cell,这时候,index=1和index=2的internal block是生成但是数据都是0,还是压根都不会生成?
  2. 假如文件大小由4MB,很明显,4MB已经超过了一个Stipe,这时候显然这个Block Group的所有的internal block都会有有效数据,但是每个internal group的有效数据部分的大小都很小,那么这时候这些internal Group是固定大小,多余数据补0,还是只存放有效数据部分呢?

环境准备:

由于我的测试环境仅仅由5台测试机且在同一rack上,因此无法基于RS-6-2-1024k进行实验,该Policy明显要求至少6台DataNode并且分布在不同的Rack上。
因此,实验基于RS-3-2-1024k进行。Hadoop的rack-wareness是在NameNode端进行判断的,而不是DataNode进行报告的。参考《Hadoop Rack Awareness》的文档,用户可以自行通过Java、Shell、Python去随心所欲实现hostname到rack的映射关系。Cloudera通过Host -> Rack的映射文件和一个简单的Python脚本实现了Host到Rack的映射,我自己的测试Hadoop环境是开源环境,节约时间,我借用了Cloudera的处理方式,通过一个映射文件和一个Python脚本,将本来实际属于一个Rack的5台机器定义到了5个不同的Rack,让RS-3-2-1024k 能够在我的测试环境运行。

映射文件:

<?xml version="1.0" encoding="UTF-8"?>

<!--Autogenerated by Cloudera Manager-->
<topology>
  <node name="rccd101-6a.sjc2.dev.com" rack="/rccd101-6a"/>
  <node name="10.30.120.121" rack="/rccd101-6a"/>
  <node name="rccd101-6b.sjc2.dev.com" rack="/rccd101-6b"/>
  <node name="10.30.120.122" rack="/rccd101-6b"/>
  <node name="rccd101-6c.sjc2.dev.com" rack="/rccd101-6c"/>
  <node name="10.30.120.123" rack="/rccd101-6c"/>
  <node name="rccd101-7a.sjc2.dev.com" rack="/rccd101-7a"/>
  <node name="10.30.120.125" rack="/rccd101-7a"/>
  <node name="rccd101-7b.sjc2.dev.com" rack="/rccd101-7b"/>
  <node name="10.30.120.126" rack="/rccd101-7b"/>
</topology>

Python脚本:

------------------------------------------------------topology.py------------------------------------------------------
#!/usr/bin/env python
import os
import sys
import xml.dom.minidom

def main():
    MAP_FILE = 'etc/hadoop/topology.map'
    DEFAULT_RACK = '/default'
    max_elements = 1
    map = dict()
    mapFile = open(MAP_FILE, 'r')

    dom = xml.dom.minidom.parse(mapFile)
    for node in dom.getElementsByTagName("node"):
        rack = node.getAttribute("rack")
        max_elements = max(max_elements, rack.count("/"))
        map[node.getAttribute("name")] = node.getAttribute("rack")

    default_rack = "".join([ DEFAULT_RACK for _ in xrange(max_elements)])
    if len(sys.argv)==1:
        print(default_rack)
    else:
        print(" ".join([map.get(i, default_rack) for i in sys.argv[1:]]))
    return 0

if __name__ == "__main__":
    sys.exit(main())

在core-site.xml中添加配置:

  <property>
    <name>net.topology.script.file.name</name>
    <value>etc/hadoop/topology.py</value>
  </property>

过程

创建各种测试需要的文件

当我们尝试enable一个data unit的数量大于rack数量的hadoop集群中enable一个policy的时候,会抛出异常:

hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs ec -enablePolicy -policy RS-10-4-1024k
Erasure coding policy RS-10-4-1024k is enabled
Warning: The cluster setup does not support EC policy RS-10-4-1024k. Reason: The number of DataNodes (5) is less than the minimum required number of DataNodes (14) for the erasure coding policies: RS-10-4-1024k

因此我们在集群中Enable了RS-3-2-1024k这个Policy,这个Policy只要求集群由3个rack就够了:

# 创建基于纠删码RS(3,2)的目录
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs dfs  -mkdir hdfs://olap-hdfs-test/test-RS-3-2-1024k
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs ec -setPolicy -path hdfs://olap-hdfs-test/test-RS-3-2-1024k -policy RS-3-2-1024k
Set RS-3-2-1024k erasure coding policy on hdfs://olap-hdfs-test/test-RS-3-2-1024k

# 创建基于3副本复制的目录
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs dfs  -mkdir hdfs://olap-hdfs-test/test-replication

然后我们向这个policy的目录写入一个500KB和一个4000KB的文件:

hadoop@rccd101-6b:/root$ truncate -s 500KB /tmp/500KB.log
hadoop@rccd101-6b:/root$ truncate -s 4000KB /tmp/4000KB.log
hadoop@rccd101-6b:/root$ truncate -s 122MB /tmp/122MB.log
hadoop@rccd101-6b:/root$ truncate -s 420MB /tmp/420MB.log
# 准备好纠删码的测试文件
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs dfs -copyFromLocal /tmp/500KB.log hdfs://olap-hdfs-test/test-RS-3-2-1024k/500KB.log
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs dfs -copyFromLocal /tmp/4000KB.log hdfs://olap-hdfs-test/test-RS-3-2-1024k/4000KB.log
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs dfs -copyFromLocal /tmp/122MB.log hdfs://olap-hdfs-test/test-RS-3-2-1024k/122MB.log
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs dfs -copyFromLocal /tmp/420MB.log hdfs://olap-hdfs-test/test-RS-3-2-1024k/420MB.log


# 准备好基于3副本复制的文件
hadoop@rccd101-6b:/root$ $HADOOP_HOME/bin/hdfs dfs -copyFromLocal /tmp/4000KB.log hdfs://olap-hdfs-test/test-replication/4000KB.log

测试小于一个Stripe的小文件的纠删码块构成

对于基于纠删码编码的500KB的文件,查看NameNode的日志:

2023-11-26 03:37:49,956 DEBUG StateChange: BLOCK* getAdditionalBlock: /test-RS-3-2-1024k/500KB.log._COPYING_  inodeId 16419 for DFSClient_NONMAPREDUCE_-2082370100_1
2023-11-26 03:37:49,959 DEBUG StateChange: DIR* FSDirectory.addBlock: /test-RS-3-2-1024k/500KB.log._COPYING_ with blk_-9223372036854775568_1018 block is added to the in-memory file system
2023-11-26 03:37:49,959 INFO StateChange: BLOCK* allocate blk_-9223372036854775568_1018, replicas=11.37.76.122:9866, 11.37.76.126:9866, 11.37.76.123:9866, 11.37.76.121:9866, 11.37.76.125:9866 for /test-RS-3-2-1024k/500KB.log._COPYING_
2023-11-26 03:37:49,959 DEBUG StateChange: persistNewBlock: /test-RS-3-2-1024k/500KB.log._COPYING_ with new block blk_-9223372036854775568_1018, current total block count is 1
2023-11-26 03:37:50,057 DEBUG BlockManager: Reported block blk_-9223372036854775568_1018 on 11.37.76.122:9866 size 500000 replicaState = FINALIZED
2023-11-26 03:37:50,057 DEBUG BlockManager: In memory blockUCState = UNDER_CONSTRUCTION
2023-11-26 03:37:50,059 DEBUG BlockManager: Reported block blk_-9223372036854775565_1018 on 11.37.76.121:9866 size 500000 replicaState = FINALIZED
2023-11-26 03:37:50,059 DEBUG BlockManager: In memory blockUCState = UNDER_CONSTRUCTION
2023-11-26 03:37:50,061 DEBUG BlockManager: Reported block blk_-9223372036854775564_1018 on 11.37.76.125:9866 size 500000 replicaState = FINALIZED

从上面的日志我们可以看到:

  1. 在NameNode为第一个BlockGroup分配Block和选定目标节点的时候,分配到了5台机器上,这是因为我们的RS(3,2)的policy要求5台机器
  2. 块汇报的时候,只有3台机器汇报了块,这三台汇报块的DataNode包含了一个存放数据的DataNode和两个存放校验码的DataNode。这说明:当一个Block Group的数据不足以写满一个Stripe的时候,没有数据的Internal Block虽然会在NameNode端分配(因为NameNode进行块分配的时候完全不知道这个块会有多大,只能按照EC Policy的Data Unit + Parity Unit的大小进行分配),但是,没有数据的internal block都不会产生,这说明客户端都不会往没有数据的internal block 上写任何东西,自然也不会有Block Report。
  3. 每个internal block(包括 Data Block 和 Parity Block)的大小都是根据文件的实际大小来的,不是固定的。这个文件500KB,那么这个internal block大小就是500KB,校验块也只有500KB。

测试大于一个Stripe但是小于一个Block Group的小文件的纠删码块构成

对于基于纠删码编码的4MB的文件,查看NameNode的日志:

2023-11-26 03:59:02,205 DEBUG StateChange: BLOCK* getAdditionalBlock: /test-RS-3-2-1024k/4000KB.log._COPYING_  inodeId 16420 for DFSClient_NONMAPREDUCE_-856274170_1
2023-11-26 03:59:02,206 DEBUG StateChange: DIR* FSDirectory.addBlock: /test-RS-3-2-1024k/4000KB.log._COPYING_ with blk_-9223372036854775552_1019 block is added to the in-memory file system
2023-11-26 03:59:02,206 INFO StateChange: BLOCK* allocate blk_-9223372036854775552_1019, replicas=11.37.76.122:9866, 11.37.76.125:9866, 11.37.76.121:9866, 11.37.76.123:9866, 11.37.76.126:9866 for /test-RS-3-2-1024k/4000KB.log._COPYING_
2023-11-26 03:59:02,206 DEBUG StateChange: persistNewBlock: /test-RS-3-2-1024k/4000KB.log._COPYING_ with new block blk_-9223372036854775552_1019, current total block count is 1
2023-11-26 03:59:02,455 DEBUG BlockManager: Reported block blk_-9223372036854775552_1019 on 11.37.76.122:9866 size 1902848 replicaState = FINALIZED
2023-11-26 03:59:02,457 DEBUG BlockManager: Reported block blk_-9223372036854775551_1019 on 11.37.76.125:9866 size 1048576 replicaState = FINALIZED
2023-11-26 03:59:02,458 DEBUG BlockManager: Reported block blk_-9223372036854775550_1019 on 11.37.76.121:9866 size 1048576 replicaState = FINALIZED
2023-11-26 03:59:02,460 DEBUG BlockManager: Reported block blk_-9223372036854775549_1019 on 11.37.76.123:9866 size 1902848 replicaState = FINALIZED
2023-11-26 03:59:02,464 DEBUG BlockManager: Reported block blk_-9223372036854775548_1019 on 11.37.76.126:9866 size 1902848 replicaState = FINALIZED

可以看到:

  1. NameNode依然分配了5台DataNode,RS(3,2)的policy要求5台机器
  2. 由于文件的大小超过了一个Stripe, 因此,所有的internal block,包括data block和parity block,都存放了一部分有效数据,都进行了汇报。
  3. 由于数据是4M,因此index=0的internal block会存放两个Stripe的数据,而index = [1,2]的internal block只会存放一个stripe的数据,总的数据量刚好匹配: (1902848B + 1048576B + 1048576B) = 4000KB
  4. 由于index = 0的internal block的size是1902848B,导致两个parity block的大小都会是1902848B,这也完全合理

测试大于一个Block Group的大文件的纠删码块构成

对于文件大小超过128MB * 3 = 402MB的文件,我们写入纠删码目录:

2023-11-26 04:36:45,130 DEBUG StateChange: BLOCK* getAdditionalBlock: /test-RS-3-2-1024k/420MB.log._COPYING_  inodeId 16427 for DFSClient_NONMAPREDUCE_185912574_1
2023-11-26 04:36:45,131 DEBUG StateChange: DIR* FSDirectory.addBlock: /test-RS-3-2-1024k/420MB.log._COPYING_ with blk_-9223372036854775472_1025 block is added to the in-memory file system
2023-11-26 04:36:45,131 INFO StateChange: BLOCK* allocate blk_-9223372036854775472_1025, replicas=11.37.33.122:9866, 11.37.33.125:9866, 11.37.33.123:9866, 11.37.33.126:9866, 11.37.33.121:9866 for /test-RS-3-2-1024k/420MB.log._COPYING_
2023-11-26 04:36:45,131 DEBUG StateChange: persistNewBlock: /test-RS-3-2-1024k/420MB.log._COPYING_ with new block blk_-9223372036854775472_1025, current total block count is 1
2023-11-26 04:36:47,317 DEBUG StateChange: BLOCK* getAdditionalBlock: /test-RS-3-2-1024k/420MB.log._COPYING_  inodeId 16427 for DFSClient_NONMAPREDUCE_185912574_1
2023-11-26 04:36:47,318 DEBUG StateChange: DIR* FSDirectory.addBlock: /test-RS-3-2-1024k/420MB.log._COPYING_ with blk_-9223372036854775456_1026 block is added to the in-memory file system
2023-11-26 04:36:47,319 INFO StateChange: BLOCK* allocate blk_-9223372036854775456_1026, replicas=11.37.33.122:9866, 11.37.33.125:9866, 11.37.33.121:9866, 11.37.33.126:9866, 11.37.33.123:9866 for /test-RS-3-2-1024k/420MB.log._COPYING_
2023-11-26 04:36:47,319 DEBUG StateChange: persistNewBlock: /test-RS-3-2-1024k/420MB.log._COPYING_ with new block blk_-9223372036854775456_1026, current total block count is 2

可以看到,客户端一共向NameNode申请了两次Block,这是因为文件的大小为420MB,由于HDFS默认的块大小是128MB(通过dfs.block.size配置),这个配置对于基于3副本复制的情况很好理解,就是一个replica的大小最大为128MB,但是对于纠删码,它指的并不是一个Block Group的最大大小,而是一个Internal Block的最大大小。因此,在RS(3,2)的情况下,一个Block Group最大为128MB*3 = 402MB,所以一个420MB的文件就需要客户端两次向NameNode申请Block,这里申请的Block是Block Group。

测试小于一个物理块的文件的3副本复制快的管理

对于基于3副本复制的500KB的文件,我们看看文件的size小于一个Block(默认128MB)的情况,查看NameNode的日志:

2023-11-26 04:09:41,604 DEBUG StateChange: DIR* FSDirectory.addBlock: /test-replication/500KB.log._COPYING_ with blk_1073741826_1020 block is added to the in-memory file system
2023-11-26 04:09:41,604 INFO StateChange: BLOCK* allocate blk_1073741826_1020, replicas=11.37.33.122:9866, 11.37.33.123:9866, 11.37.33.126:9866 for /test-replication/500KB.log._COPYING_
2023-11-26 04:09:41,605 DEBUG StateChange: persistNewBlock: /test-replication/500KB.log._COPYING_ with new block blk_1073741826_1020, current total block count is 1
2023-11-26 04:09:41,705 DEBUG BlockManager: Reported block blk_1073741826_1020 on 11.37.33.126:9866 size 500000 replicaState = FINALIZED
2023-11-26 04:09:41,707 DEBUG BlockManager: Reported block blk_1073741826_1020 on 11.37.33.123:9866 size 500000 replicaState = FINALIZED
2023-11-26 04:09:41,708 DEBUG BlockManager: Reported block blk_1073741826_1020 on 11.37.33.122:9866 size 500000 replicaState = FINALIZED

可以看到,在基于副本复制的存储策略下,如果文件大小小于128MB的块大小,那么实际的块的大小是以文件大小为准的,而不是固定的128MB

3 总

https://issues.apache.org/jira/browse/HDFS-8734

4. 引用

Understanding the Performance of Erasure Codes in Hadoop Distributed File System
Introduction to HDFS Erasure Coding in Apache Hadoop
HDFS Erasure Coding
Hadoop3.2.1 【 HDFS 】源码分析 : BPOfferService 解析
Rack Awareness
HDFS Erasure Coding
Hadoop3.2.1 【 HDFS 】源码分析 :BlockManager解析

现在遗留的问题

blockGroup的getNumBytes到底是128MB还是128MB * 6 ? 即通过addBlock()申请一个group的时候返回的size,倒是是128MB还是128MB * 6
假如是128MB, 在客户端addBlock申请了一个block group以后,对internal block的大小按照data block的数量进行了切分StripedBlockUtil.parseStripedBlockGroup,那么一个internal block是128MB/6? 这跟Cloudera的官方文档的图一致,但是从代码里面看,下面代码中的blockSize就是创建文件的时候根据配置文件中配置的block size申请以后返回的,应该是128MB,所以怎么解释currentBlockGroup.getNumBytes() == blockSize * numDataBlocks;?如果下面的blockSize=128MB, 那就意味着一个block group的大小应该是128MB * 6 ? 可是blockGroup在进行addBlock()申请的时候返回的值是128MB才对。

如果

private boolean shouldEndBlockGroup() {
return currentBlockGroup != null &&
currentBlockGroup.getNumBytes() == blockSize * numDataBlocks;
}
把一个基于纠删码的文件copy到一个基于副本副本复制的目录下,会修改块布局方式吗?
创建文件的时候, ecPolicyName传入的是啥?如果是null,NameNode怎么设置所创建的文件的ECPolicy?

文章来源:https://blog.csdn.net/zhanyuanlin/article/details/133789966
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。