MapReduce应用实例--二次排序之reduce内有序

MapReduce天生就具有排序的特性,但是面对稍微复杂的排序时我们还是希望能够充分利用其自身的设计原理来达到我们的目的。其中二次排序就是一个很好的例子,下面主要介绍下二次排序。

二次排序的场景是不仅需要对key排序依然需要对value中的某个值进行排序,也就是先对key排序然后对相同key的record再对value进行排序。

对key排序我们可以使用MR自身的排序,但是怎么对value中的某个值进行排序呢?

MapReduce留出了很多可以自定义的接口,比如partition、comparator和group等等接口,这里只需用到这三个,其它以后用的接口再介绍。

之前有几篇介绍MapReduce流程的blog,可以自行搜索,关键字为MapReduce源码解析

熟悉MapReduce流程的同学会知道key的第一次排序发生在map端的sortAndSpill阶段,此阶段是将内存中的数据先根据partition进行排序然后再对key排序(排序算法是改进的快排)第二次排序发生在reduce端的merge阶段,此阶段是将从map端copy来的segment(局部有序的数据,局部有序的原因是map端已将数据按key排序,reduce copy时从n个map中将此reduce需要的数据复制过来,则每个map内的数据是有序的,而各个map之间的数据是无序的)进行堆排序,使数据按照key有序。(这里的第二次排序其实也可以说是第三次排序,因为在map端也会有个merge阶段,将spill到磁盘的临时文件merge成一个大文件,这个过程将spills文件中按照partition和key进行排序)

由此可知MapReduce整个流程是以key排序为核心的,那么针对上面的需求是否可以在key上做点文章呢?

针对上面需求的解决方案是将需要排序的key和value1(value中的某个属性)组成一个复合键compositeKey,在map端按照key分区和排序,则相同的key被分到同一个reduce中,在reduce中对相同key的value1进行排序,然后根据key进行分组,以便形成相同key的Iterator,这时输出的数据就是按照key和value1排序的。

这里需要注意下由于partition选择的方法不一样,可能会导致最终的结果可能是reduce内有序和全局有序

问题

问题如下:
有两个文件a.txt和b.txt
a.txt中的内容是
1990 31
1991 20
1991 18
1991 33
1990 22
1990 17
b.txt中的内容是
1992 31
1991 27
1993 18
1993 33
1992 22
1990 10
想要的排序结果是
1990 10
1990 17
1990 22
1990 31
1991 18
1991 20
1991 27
1991 33
1992 22
1992 31
1993 18
1993 33

先来看下根据复合键compositeKey中的key进行hash分区的代码

hash分区代码示例

复合键compositeKey的形成有两种形式:一种是将key和value拼接成一个字符串,key和value之前用特殊字符分隔,另一种是实现WritableComparable接口自己写一个新的数据类型。

先来个简单的,将key和value拼接为一个字符串

key和value拼接字符串为复合键

key和value拼接为字符串是在map中执行的,先看下map的代码:

1
2
3
4
5
6
7
8
9
10
11
12
public static class MyMapper
extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");
// 将内容key value作为复合key输出
context.write(new Text(arr[0] + " " + arr[1]), new IntWritable(Integer.parseInt(arr[1])));
}
}

源文件里的key和value是用空格分隔的,在map中将key和value分隔开然后将key和value组合为一个新的newKey,将value依然作为value输出。

这里既然看了map的代码,那么不防再看下reduce代码:

1
2
3
4
5
6
7
8
9
10
11
12
public static class MyReducer
extends Reducer<Text,IntWritable,Text,NullWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
for (IntWritable val : values) {
// 将内容key value作为复合key输出
context.write(key, NullWritable.get());
}
}
}

reduce只是将key(这里的key是指在map中组合之后的复合键)输出,value是hadoop中提供的null对象NullWritable

这里之所以只是直接将key输出是因为我们在整个MR流程中通过key的第一个字段进行分区和分组,比较两个key的大小时是先比较key的第一个字段,相同时再比较第二个字段(这里需要注意的是源文件中第二个字段是int,如果第二个字段也是string类型的,则就可以利用Text自身的比较器进行两个key的比较)。这样的好处是你可以自定义key和value之前的分隔符(而不用重写outputFormater去定义key和value之间的分隔符),但也有一些局限性,具体看自己需求吧。

下面来看下我们是怎么对这个复合键进行处理的。首先看下分区策略:

自定义分区策略时要继承Partitioner类,重写getPartition方法,代码如下:

1
2
3
4
5
6
7
8
9
public class FirstPartition extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text text, IntWritable intWritable, int i) {
// 拿到复合键中的第一个值,也就是源文件中的key,
// 然后根据其值进行hash分区
return Math.abs(text.toString().split(" ")[0].hashCode() * 127) % i;
}
}

对数据分区之后,就该对复合键进行排序了,这里的比较原则是先将复合键进行切分,然后先对第一个字段进行比较,相同之后在对第二个字段进行比较。由于源文件中key是string,value是int,将key和value组成Text类型的复合键时,不能使用Text自身的比较器(因为value是int),则这里需要自定义比较器

Hadoop中自定义比较器要继承WritableComparator并且重载compare(WritableComparable a, WritableComparable b)方法(需要特别注意的是,必须有一个构造函数),自定义比较器也可以实现RawComparator接口。这里是继承WritableComparator类,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class KeyComparator extends WritableComparator {
// 构造函数必须有
public KeyComparator(){
super(Text.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 对复合键进行切分
String[] arr_a = a.toString().split(" ");
String[] arr_b = b.toString().split(" ");
System.out.println("=========KeyComparator=========");
// 先比较第一个字段然后再比较第二个
if (arr_a[0].compareTo(arr_b[0]) != 0){
return arr_a[0].compareTo(arr_b[0]);
}else {
return Integer.parseInt(arr_a[1]) - Integer.parseInt(arr_b[1]);
}
}
// 构造函数是必须的,但当构造函数和下面的方法同时存在的时候,排序会出问题
// 如果没有构造函数时,执行的比较方法是下面的方法,但排序貌似有问题
// @Override
// public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
// System.out.println("Key++++++++++++Comparator");
// String str1 = new String(b1,s1,l1);
// String str2 = new String(b1,s1,l1);
// System.out.println(str1.split(" ")[0]);
//// byte[] bytes1 = str1.split(" ")[0].getBytes();
//// byte[] bytes2 = str2.split(" ")[0].getBytes();
// if (!str1.split(" ")[0].equals(str2.split(" ")[0])){
// return str1.split(" ")[0].compareTo(str2.split(" ")[0]);
// }else {
// return str1.split(" ")[1].compareTo(str2.split(" ")[1]);
// }
//// return str1.split(" ")[0].compareTo(str2.split(" ")[0]);
//// return this.compareBytes(b1, s1, l1, b2, s2, l2);
// }
}

到此代码其实就已经实现了reduce输出的局部有序,但有些场景也需要重写reduce端的分组策略,所以这里也加上自定义的分组策略。自定义分组策略时要其实也是重写一个比较器,这里依然采用继承WritableComparator类,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class GroupComparator extends WritableComparator {
// 继承WritableComparator时必须有构造方法
public GroupComparator(){
super(Text.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
String[] arr_a = a.toString().split(" ");
String[] arr_b = b.toString().split(" ");
System.out.println("=========GroupComparator=========");
// 根据复合键中的第一个字段进行分组比较
return arr_a[0].compareTo(arr_b[0]);
}
}

最后就是main主类了,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 本地ide远程连接集群时,需要设置文件存储格式
conf.set("fs.defaultFS", "hdfs://192.168.244.131:9000");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(SecondSort.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
// 设置了下reduce的个数
job.setNumReduceTasks(2);
// 由于map和reduce的输出格式不一样,需要分别设置
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 重定义partition
job.setPartitionerClass(FirstPartition.class);
// 重写排序方法
job.setSortComparatorClass(KeyComparator.class);
// 重定义分组方法
job.setGroupingComparatorClass(GroupComparator.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
// 删除out输出的内容,以免每次执行之前手动删除
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(otherArgs[otherArgs.length - 1]))){
fs.delete(new Path(otherArgs[otherArgs.length - 1]), true);
}
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

输出结果如下:
因为有两个reduce则有两个输出,part-r-00000和part-r-00001。
part-r-00000的内容如下:
1991 18
1991 20
1991 27
1991 33
1993 18
1993 33
part-r-00001的内容如下:
1990 10
1990 17
1990 22
1990 31
1992 22
1992 31

自定义数据类型为reduce的key

将需要排序的字段组合为一个新的数据类型,由此新数据类型作为key。Hadoop自定义数据类型时需要实现WritableComparable接口。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class EntityPair implements WritableComparable<EntityPair>{
private Text firstKey;
private IntWritable secondKey;
public Text getFirstKey() {
return firstKey;
}
public void setFirstKey(Text firstKey) {
this.firstKey = firstKey;
}
public IntWritable getSecondKey() {
return secondKey;
}
public void setSecondKey(IntWritable secondKey) {
this.secondKey = secondKey;
}
public EntityPair(Text firstKey, IntWritable secondKey) {
this.firstKey = firstKey;
this.secondKey = secondKey;
}
public EntityPair() {
}
@Override
public int compareTo(EntityPair o) {
return this.firstKey.compareTo(o.getFirstKey());
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(firstKey.toString());
dataOutput.writeInt(secondKey.get());
}
@Override
public void readFields(DataInput dataInput) throws IOException {
firstKey = new Text(dataInput.readUTF());
secondKey = new IntWritable(dataInput.readInt());
}
@Override
public String toString(){
return this.getFirstKey() + " " + this.getSecondKey();
}
}

则分区策略、比较规则和分组策略和上一节的思路大体一样,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 自定义分区策略
public class EntityPartition extends Partitioner<EntityPair, IntWritable> {
@Override
public int getPartition(EntityPair text, IntWritable intWritable, int i) {
// 得到EntityPair中的第一个firstKey
return Math.abs(text.getFirstKey().hashCode() * 127) % i;
}
}
// 自定义比较器
public class EntityComparator extends WritableComparator {
public EntityComparator(){
super(EntityPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
EntityPair entityPair1 = (EntityPair) a;
EntityPair entityPair2 = (EntityPair) b;
System.out.println("=========Comparator=========");
if (!entityPair1.getFirstKey().toString().equals(entityPair2.getFirstKey().toString())){
return entityPair1.getFirstKey().toString().compareTo(entityPair2.getFirstKey().toString());
}else {
return entityPair1.getSecondKey().get() - entityPair2.getSecondKey().get();
}
}
}
// 自定义分组策略
public class EntityGroup extends WritableComparator {
public EntityGroup(){
super(EntityPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
EntityPair entityPair1 = (EntityPair) a;
EntityPair entityPair2 = (EntityPair) b;
System.out.println("=========GroupComparator=========");
return entityPair1.getFirstKey().toString().compareTo(entityPair2.getFirstKey().toString());
}
}

下面是MR类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class EntitySecondSort {
private static final Logger log = Logger.getLogger(EntitySecondSort.class);
public static class MyMapper
extends Mapper<Object, Text, EntityPair, IntWritable> {
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");
// 将内容key value作为复合key输出
context.write(new EntityPair(new Text(arr[0]), new IntWritable(Integer.parseInt(arr[1]))),
new IntWritable(Integer.parseInt(arr[1])));
}
}
public static class MyReducer
extends Reducer<EntityPair,IntWritable,Text,IntWritable> {
public void reduce(EntityPair key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
for (IntWritable val : values) {
// 分组之后
context.write(new Text(key.getFirstKey()), val);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.244.131:9000");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(EntitySecondSort.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setNumReduceTasks(2);
job.setMapOutputKeyClass(EntityPair.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 重定义partition
job.setPartitionerClass(EntityPartition.class);
// 重写排序方法
job.setSortComparatorClass(EntityComparator.class);
// 自定义分组策略
job.setGroupingComparatorClass(EntityGroup.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(otherArgs[otherArgs.length - 1]))){
fs.delete(new Path(otherArgs[otherArgs.length - 1]), true);
}
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

这里的分组策略也不是必须的,(只有在reduce中需要对相同key的values进行合并操作时,才需要对其records根据复合键的第一个值进行分组。)

上面的reduce代码输出的是键值对,之所以这样是考虑到value中可能包含很多属性,而只需要对其中的某一个属性value1进行排序,则将剩余的属性列出。

关于二次排序的例子Hadoop自带的MapReduce例子中也有样例,类名SecondarySort

上面的代码只实现了reduce内有序,各个reduce之间是无序的,那么如何得到一个全局有序的结果呢?看下篇

附加: equals和hashCode的关系

这里补充下equals和hashCode的关系

equals和hashCode都是Object类的方法,可以在任务一个类中重写,Object默认的equals实现是判断两个对象的地址是否相等(即,是否是同一个对象)来区分它们是否相等,此时等价于”==”。但是equals方法往往被类重写,用来判断两个对象的内容是否相等,如String类的equals方法。

hashCode主要是用来判断对象在散列表中的位置,则如果两个对象相同则在散列表中的位置也应该是相同的,但是判断两个对象是否相同是由equals判断的,则如果某个类的对象要在散列表中使用,重写equals方法时往往也要重写hashCode方法,以保证equals为true时,hashCode是相同的。如果某个类不会出现在散列表中,则equals和hashCode并没有什么直接的关系。

您的肯定,是我装逼的最大的动力!