MapReduce是一个抽象的分布式计算模型,主要对键值对进行运算处理。用户需要提供两个自定义函数:
在这个案例中,目标是将文本作为输入,将其中的单词出现频率进行统计。 此时,map和reduce的伪代码如下:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
map函数通过接受文本(这里输入的是文本的名字)作为key,文本的内容作为value,并通过遍历文本中的单词来生成单词/出现频率的键值对。这里单个单词被遍历的时候,出现频率的贡献都为1。 在这过程中,相同的key会被整合到一起作为中间键值对交付给reduce。 reduce函数则接受中间键值对,单词作为key,一个列表作为value(实际上由于每个单词的贡献为1,因此列表的长度就为单词的出现频率)统计出来单词的出现频率。具体方法如上述伪代码,将每个word的出现频率相加。
golang实现版本如下
在这个例子当中,要写一个关于气象数据的分析程序。分布在全球各地的很多气象传感器每隔一小时收集气象数据和大量日志数据,且这些数据按行并以ASCII格式存储,半结构化且按照记录方式存储,很适合用MapReduce来分析。
如下图,这是一行采样数据,其中重要字段添加了注释。这一行数据被分成很多行以突出每个字段,但实际是存储在一行中,没有分隔符的。
数据文件按照日期和气象站进行组织。从1901年到2001年,每一年都有一个目录,包含各个气象站该年气象数据的打包文件和说明文件。
现在,我们的目的是要提取出每一年的最高气温。 假设我们现在需要统计传统处理按行存储数据的工具是awk,如该脚本
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk'{ temp = substr($0,88,5) + 0;
q = substr($0,93,1);
if(temp != 9999 && q ~ /[01459]/ && temp > max) max = temp}
END {print max}'
done
这个脚本遍历压缩文件,首先显示年份,然后使用awk处理每一个文件。 awk主要提取两个字段:气温和质量代码。气温+0后转换成整数,然后测试气温值是否有效。用9999代替NCDC数据中丢失的值。如果数据读取正确,那么就和目前的最大值进行比较。如果比目前的最大值大,那么替换最大值。 处理完所有行之后,再输出最大气温值。 如下是部分运行结果(这里的气温值被放大了十倍,1901年的最高温度是31.7度)
为了加快速度,我们需要并行数据分析。 同样还是两个阶段,map和reduce。map函数很简单,我们只对年份和气温感兴趣,因此将数据中的行作为输入,将气温和年份等提取出来。提取之后,将年份相同,也就是key相同的值聚合到一起,交付给reduce。reduce函数对每个key选出一个最大的气温值即可。 此处,我们假设一行中属性的偏移量是固定的,如下每一行的字体标粗部分分别为年份和气温
key是文件中的行偏移量,我们不需要因此忽略。我们只需要提取出来年份和气温即可,就有
(1950,0)
(1950,22)
(1950,-11)
(1949,111)
(1949,78)
reduce将相同key的键值对进行排序与整合,因此有
(1949,[111,78])
(1950,[22,0,-11])
最后,reduce遍历每一个键值对,将最大的值选出来,就是我们需要的数据。
(1949,111)
(1950,22)
从这个例子,我们可以看出来mapreduce的流程
MapReduce框架主要由三个部分组成
传入Map的输入数据会被自动切分成M个数据片段,用于分布到多台机器上并行处理。 利用分区函数,将Map生成的中间键值对分成R个不同的分区,以此让R个reduce worker也并行地进行reduce工作。
分区数量R和分区函数由用户指定,以下是一个简单的实现
其中KeyValuePairs是经过调用了mapF生成的中间键值对。在下面的循环中,通过对键值对的key进行hash函数得到哈希值,然后再模nReduce得到分区位置。也就是hash(key)%nReduce 由user program调用MapReduce开始
首先,创建nReduce个文件,文件名则是通过一个函数来创建的,目的是方便reduce worker通过有规律的名字去寻找这些文件。 然后通过调用用户自定义的mapF函数,将输入文件拆分成键值对。 遍历键值对,利用ihash(kv.key)%nReduce计算得到文件下标,并将键值对附加到该文件上。
reduce通过reduceName约定的名字从文件中寻找自己消费的文件。 然后遍历中间键值对,利用sort对这些键值对进行排序(key相同的键值对都会被排到一起) 然后调用reduceF函数进行reduce,最后输出到文件当中。
Master中保存了每个Map任务和每个Reduce任务的状态(闲置,正在运行,以及完成),以及非空闲任务的worker机器的ID。 Master的数据结构之所以要保存这些,是因为需要保证容灾能力。
Master对Worker进行心跳检测。如果一定时间内无法ping通该worker,它就会被标记成failed。所有由该worker完成的map或reduce任务都会重新设置为初始的闲置状态,然后将这些任务重新分配给其他worker。 如果是完成map的worker,由于输出结果保存在worker的磁盘之中无法访问,因此就要重新运行。 如果是reducer的worker,结果已经存储在全局的文件系统中了,因此无需再次执行
上文中我们提到,Master中保存了每个Map任务和每个Reduce任务的状态(闲置,正在运行,以及完成),以及非空闲任务的worker机器的ID。 master周期性的将上文的数据结构写入磁盘备份,如果出错了,就可以从这些数据结构中恢复状态。
我们将map和reduce任务分别拆分成了M个和R个子任务。理想情况下,M和R应该远大于worker的数量。每个worker也会执行不同的许多任务,以此保证负载均衡的能力。同时,worker故障了的话,通过能够将该机器的任务发送到其他完好的worker上加快恢复的速度 但M和R的数量不应过多,因为master需要在内存中保存O(M*R)个状态,需要执行O(M+R)次调度。过大的M和R会加重master的负担。
网络带宽在这个模型中是一个相对稀缺的资源,因此MapReduce尽量将输入数据存储在本地的硬盘中以节约网络资源。 其中输入和输出是不可避免的网络通信,一般会通过一个分布式存储系统进行输入和存储,如GFS。 Master调度任务的时候会考虑文件的位置,尽量在调度一个包含该输入数据副本的机器上执行。如果任务失败,则会调度到同样有该副本的较近的机器上执行。 一种优化形式是令运行着GFS的集群进行。MapReduce工作,那么Master可以将任务分配给已经拥有该数据的Worker中,从而直接从本地磁盘进行读取,减少网络通信。
从上述的信息来看,MapReduce主要的优点有