Blog
hadoop-14 Shuffle机制
Shuffle机制
map方法之后reduce方法之前的数据处理过程
分区Partitioner
- 默认分区
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
public void configure(JobConf job) {}
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
- 默认分区是根据key的haseCode对ReduceTasks个数取模得到的,用户没法控制哪个key存储到哪个分区
自定义Partitioner
- 自定义类继承Partitioner,重写getPartition()方法
- 在job驱动中,设置自定义Partitioner
- job.setPartitionerClass(CustomPartitioner.class)
- 自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
- job.setNumReduceTasks(5)
- ProvincePartitioner.java
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
// Partitioner泛型 输入是Mapper的输出
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
String phone = text.toString();
String prePhone = phone.substring(0, 3);
int partition;
switch (prePhone) {
case "136":
partition = 0;
break;
case "137":
partition = 1;
break;
case "138":
partition = 2;
break;
case "139":
partition = 3;
break;
default:
partition = 4;
break;
}
return partition;
}
}
FlowDriver.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 设置自定义分区
job.setPartitionerClass(ProvincePartitioner.class);
// 设置 reduceTask数量
job.setNumReduceTasks(5);
// 6.设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\hadoop\\input"));
FileOutputFormat.setOutputPath(job, new Path("E:\\hadoop\\output2"));
// 7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
- job.setPartitionerClass(ProvincePartitioner.class);
- job.setNumReduceTasks(5);
分区总结
- 如果ReduceTask的数量 > getPatition的结果数,则会多产生几个空的输出文件part-r-000xxx
- 如果1 < ReduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,会Exception
- 如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给一个MapReduce,最终也就只会产生一个结果文件part-r-00000
- 分区号必须从零开始,逐一累加
MR排序
- MapTask和ReduceTask均会对数据按照key进行排序
- MapTask 分区快排
- 溢写到文件merge归并排序
- ReduceTask 归并排序
- MapTask 分区快排
- 排序在reduce阶段可以提高效率
- 默认是按照字典顺序排序,且实现该排序的方法时快速排序
MapTask 分区快排:对于MapTask 它会将处理的结果暂时存放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序
ReduceTask 归并排序:对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写到磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后并将数据溢写到磁盘上,当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
排序分类
- 部分排序
- MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
- 全排序
- 最终输出结果只有一个文件,且文件内部有序。
- 实现方式是设置一个ReduceTask
- 辅助排序
- 二次排序
- 在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序
自定义排序
- bean对象继承WriteableComparable接口,重写compareTo方法
代码:
- FlowBean.java
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 1.定义类实现writeable接口
* 2.重写序列化和反序列化方法
* 3.重写空参构造
* 4.toString方法
*/
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow; // 上行流量
private long downFlow; // 下行流量
private long sumFlow; // 总流量
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
// 重写空参构造
public FlowBean() {
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
@Override
public int compareTo(FlowBean o) {
// 总流量倒序排序
if (this.sumFlow > o.sumFlow) {
return -1;
} else if (this.sumFlow < o.sumFlow) {
return 1;
} else {
return 0;
}
}
}
FlowMapper.java
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private final FlowBean outK = new FlowBean();
private final Text outV = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取数据
String line = value.toString();
String[] split = line.split("\t");
outK.setUpFlow(Long.parseLong(split[1]));
outK.setDownFlow(Long.parseLong(split[2]));
outK.setSumFlow();
outV.set(split[0]);
context.write(outK, outV);
}
}
FlowReducer.java
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// values 相同总流量的手机号集合
for (Text value : values) {
context.write(value, key);
}
}
}
FlowDriver.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置jar
job.setJarByClass(FlowDriver.class);
// 3.关联mapper 和 reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4.设置mapper输出kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 5.设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6.设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\hadoop\\output1"));
FileOutputFormat.setOutputPath(job, new Path("E:\\hadoop\\output5"));
// 7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
二次排序:
@Override
public int compareTo(FlowBean o) {
// 总流量倒序排序
if (this.sumFlow > o.sumFlow) {
return -1;
} else if (this.sumFlow < o.sumFlow) {
return 1;
} else {
// 如果总流量相等 按照上行流量正序排序
if (this.upFlow > o.upFlow) {
return 1;
} else if (this.upFlow < o.upFlow) {
return -1;
} else {
return 0;
}
}
}
分区后区内排序:
- ProvincePartitioner2.java
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner2 extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
String phone = text.toString();
String prePhone = phone.substring(0, 3);
int partition;
switch (prePhone){
case "136":
partition = 0;
break;
case "137":
partition = 1;
break;
case "138":
partition = 2;
break;
case "139":
partition = 3;
break;
default:
partition = 4;
}
return partition;
}
}
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置jar
job.setJarByClass(FlowDriver.class);
// 3.关联mapper 和 reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4.设置mapper输出kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 指定分区类和分区数量
job.setPartitionerClass(ProvincePartitioner2.class);
job.setNumReduceTasks(5);
// 5.设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6.设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\hadoop\\output5"));
FileOutputFormat.setOutputPath(job, new Path("E:\\hadoop\\output6"));
// 7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
Combiner合并
- Combiner 是MR程序中Mapper和Reducer之外的一种组件
- Combiner组件的父类就是Reducer
- Combiner和Reducer的区别在于运行的位置
- Combiner是在每一个MapTask所在的节点运行
- Reducer是接收全局所有Mapper的输出结果
- Combiner的意义就是对每一个MapTask的数据进行局部汇总,以减少网络传输
- Combiner能够应用的前提是不能影响最终的业务逻辑,而且Combiner的输出KV应该跟Reducer的输入KV类型要对应起来
自定义Combiner 继承Reducer即可
- WordCountCombiner.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outV.set(sum);
context.write(key, outV);
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置Driver jar包路径 通过全类名反射得到jar包位置
job.setJarByClass(WordCountDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4.设置map输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置最终输出的kv类型,有的mr没有reducer
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置自定义Combiner类
job.setCombinerClass(WordCountCombiner.class);
// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("E:\\hadoop\\wcinput"));
FileOutputFormat.setOutputPath(job, new Path("E:\\hadoop\\wcoutput2"));
// 7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
- 如果取消Reducer只做Mapper则输出文件不会combiner,combiner只适用于Mapper和Reducer之间中间阶段
- job.setNumReduceTasks(0);
- 因为Reducer和Combiner逻辑一样,所以可以设置Reducer为Combiner类