Hadoop, 大数据

hadoop-14 Shuffle机制

Shuffle机制

map方法之后reduce方法之前的数据处理过程

分区Partitioner

  • 默认分区
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {

  public void configure(JobConf job) {}

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}
  • 默认分区是根据key的haseCode对ReduceTasks个数取模得到的,用户没法控制哪个key存储到哪个分区

自定义Partitioner

  • 自定义类继承Partitioner,重写getPartition()方法
  • 在job驱动中,设置自定义Partitioner
    • job.setPartitionerClass(CustomPartitioner.class)
  • 自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
    • job.setNumReduceTasks(5)
  • ProvincePartitioner.java
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

// Partitioner泛型 输入是Mapper的输出
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
        String phone = text.toString();
        String prePhone = phone.substring(0, 3);
        int partition;
        switch (prePhone) {
            case "136":
                partition = 0;
                break;
            case "137":
                partition = 1;
                break;
            case "138":
                partition = 2;
                break;
            case "139":
                partition = 3;
                break;
            default:
                partition = 4;
                break;
        }
        return partition;
    }
}

FlowDriver.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(FlowDriver.class);
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 设置自定义分区
        job.setPartitionerClass(ProvincePartitioner.class);
        // 设置 reduceTask数量
        job.setNumReduceTasks(5);

        // 6.设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("E:\\hadoop\\input"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\hadoop\\output2"));
        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
}
  • job.setPartitionerClass(ProvincePartitioner.class);
  • job.setNumReduceTasks(5);

分区总结

  • 如果ReduceTask的数量 > getPatition的结果数,则会多产生几个空的输出文件part-r-000xxx
  • 如果1 < ReduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,会Exception
  • 如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给一个MapReduce,最终也就只会产生一个结果文件part-r-00000
  • 分区号必须从零开始,逐一累加

MR排序

  • MapTask和ReduceTask均会对数据按照key进行排序
    • MapTask 分区快排
      • 溢写到文件merge归并排序
    • ReduceTask 归并排序
  • 排序在reduce阶段可以提高效率
  • 默认是按照字典顺序排序,且实现该排序的方法时快速排序

MapTask 分区快排:对于MapTask 它会将处理的结果暂时存放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序

ReduceTask 归并排序:对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写到磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后并将数据溢写到磁盘上,当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

排序分类

  • 部分排序
    • MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
  • 全排序
    • 最终输出结果只有一个文件,且文件内部有序。
    • 实现方式是设置一个ReduceTask
  • 辅助排序
  • 二次排序
    • 在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序

自定义排序

  • bean对象继承WriteableComparable接口,重写compareTo方法

代码:

  • FlowBean.java
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 1.定义类实现writeable接口
 * 2.重写序列化和反序列化方法
 * 3.重写空参构造
 * 4.toString方法
 */
public class FlowBean implements WritableComparable<FlowBean> {
    private long upFlow; // 上行流量
    private long downFlow; // 下行流量
    private long sumFlow; // 总流量

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    // 重写空参构造
    public FlowBean() {
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    @Override
    public int compareTo(FlowBean o) {
        // 总流量倒序排序
        if (this.sumFlow > o.sumFlow) {
            return -1;
        } else if (this.sumFlow < o.sumFlow) {
            return 1;
        } else {
            return 0;
        }
    }
}

FlowMapper.java

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
    private final FlowBean outK = new FlowBean();
    private  final Text outV = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 获取数据
        String line = value.toString();
        String[] split = line.split("\t");
        outK.setUpFlow(Long.parseLong(split[1]));
        outK.setDownFlow(Long.parseLong(split[2]));
        outK.setSumFlow();
        outV.set(split[0]);
        context.write(outK, outV);
    }
}

FlowReducer.java

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // values 相同总流量的手机号集合
        for (Text value : values) {
            context.write(value, key);
        }
    }
}

FlowDriver.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2.设置jar
        job.setJarByClass(FlowDriver.class);
        // 3.关联mapper 和 reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);


        // 4.设置mapper输出kv类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);


        // 5.设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 6.设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("E:\\hadoop\\output1"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\hadoop\\output5"));
        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

二次排序:

@Override
public int compareTo(FlowBean o) {
    // 总流量倒序排序
    if (this.sumFlow > o.sumFlow) {
        return -1;
    } else if (this.sumFlow < o.sumFlow) {
        return 1;
    } else {
        // 如果总流量相等 按照上行流量正序排序
        if (this.upFlow > o.upFlow) {
            return 1;
        } else if (this.upFlow < o.upFlow) {
            return -1;
        } else {
            return 0;
        }
    }
}

分区后区内排序:

  • ProvincePartitioner2.java
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner2 extends Partitioner<FlowBean, Text> {
    @Override
    public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
        String phone = text.toString();
        String prePhone = phone.substring(0, 3);
        int partition;
        switch (prePhone){
            case "136":
                partition = 0;
                break;
            case "137":
                partition = 1;
                break;
            case "138":
                partition = 2;
                break;
            case "139":
                partition = 3;
                break;
            default:
                partition = 4;
        }
        return partition;
    }
}
public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2.设置jar
        job.setJarByClass(FlowDriver.class);
        // 3.关联mapper 和 reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        // 4.设置mapper输出kv类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        
        // 指定分区类和分区数量
        job.setPartitionerClass(ProvincePartitioner2.class);
        job.setNumReduceTasks(5);

        // 5.设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 6.设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("E:\\hadoop\\output5"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\hadoop\\output6"));
        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

Combiner合并

  • Combiner 是MR程序中Mapper和Reducer之外的一种组件
  • Combiner组件的父类就是Reducer
  • Combiner和Reducer的区别在于运行的位置
    • Combiner是在每一个MapTask所在的节点运行
    • Reducer是接收全局所有Mapper的输出结果
  • Combiner的意义就是对每一个MapTask的数据进行局部汇总,以减少网络传输
  • Combiner能够应用的前提是不能影响最终的业务逻辑,而且Combiner的输出KV应该跟Reducer的输入KV类型要对应起来

自定义Combiner 继承Reducer即可

  • WordCountCombiner.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable outV = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        outV.set(sum);
        context.write(key, outV);
    }
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2.设置Driver jar包路径 通过全类名反射得到jar包位置
        job.setJarByClass(WordCountDriver.class);
        // 3.关联mapper和reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 4.设置map输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5.设置最终输出的kv类型,有的mr没有reducer
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置自定义Combiner类
        job.setCombinerClass(WordCountCombiner.class);

        // 6.设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("E:\\hadoop\\wcinput"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\hadoop\\wcoutput2"));
        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}
  • 如果取消Reducer只做Mapper则输出文件不会combiner,combiner只适用于Mapper和Reducer之间中间阶段
    • job.setNumReduceTasks(0);
  • 因为Reducer和Combiner逻辑一样,所以可以设置Reducer为Combiner类

About 蓝染君

喜爱编程开发的程序猿