Hadoop, 大数据

hadoop-10 MapReduce

MapReduce定义

  • 是一个分布式运算程序的编程框架,是用户开发基于hadoop的数据分析应用的核心框架
  • MapReduce核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布式运算程序,并运行在一个hadoop集群上

优点:

  • 易于编程
    • 用户只需要关心业务逻辑,实现框架的接口
  • 良好扩展性
    • 可以动态增加服务器,解决计算资源不够问题
  • 高容错性
    • 任何一台机器挂掉,可以将任务转移到其他节点
  • 适合海量数据计算

缺点:

  • 不擅长实时计算
  • 不擅长流式计算
  • 不擅长DAG有向无环图计算
    • 顺序迭代式计算

核心思想

  • MapReduce运算程序一般需要分成2个阶段,Map阶段和Reduce阶段
  • Map阶段的并发MapTask,完全并行运行,互不相干
  • Reduce阶段的并发ReduceTask,完全互不相干,但是他们的数据依赖上一个阶段的所有MapTask并发实例的输出
  • MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce串行运行

MapReduce进程

一个完整的MapReduce程序在分布式运行时有三类实例进程:

  • MrAppMaster:负责整个程序的过程调度及状态协调
    • 是ApplicationMaster的子进程
  • MapTask:负责Map阶段的整个数据处理流程
  • ReduceTask:负责Reduce阶段整个数据处理流程

MapReduce编程规范

  • Mapper阶段
    • 用户自定义的Mapper要继承Mapper类
    • Mapper的输入数据是KV对的形式(kv的类型可自定义)
      • k是这一行的偏移量 v是这一行内容
    • Mapper中的业务逻辑写在map()方法中
    • Mapper的输出数据是KV对的形式(KV的类型可自定义)
    • map()方法(MapTask进程)对每一个调用一次
      • 每一行调用一次map()方法
      • 一行一行处理
  • Reducer阶段
    • 用户自定义的Reducer要继承Reducer类
    • Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
    • Reducer的业务逻辑写在reduce()方法中
    • ReduceTask进程对每一组相同k的组调用一次reduce方法
      • 参数(k, 迭代器v)
  • Driver阶段
    • 相当于Yarn集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象

WordCount案例

  • 准备数据words.txt
hello spark
hello hadoop
hive hbase
hive

分别编写Mapper Reducer Driver

WordCountMapper.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * KEYIN map阶段输入key的类型 LongWriteable
 * VALUEIN map阶段输入value的类型 Text
 * KEYOUT map阶段输出的key类型Text
 * VALUEOUT map阶段输出的value类型IntWriteable
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    // 提到外边防止资源浪费
    private final Text outK = new Text();
    private final IntWritable outV = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 获取一行
        String line = value.toString();
        // 切割单词
        String[] words = line.split(" ");
        // 循环写出
        for (String word : words) {
            // 封装outK
            outK.set(word);
            // 写出
            context.write(outK, outV);
        }
    }
}

WordCountReducer.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private final IntWritable outV = new IntWritable();
    /**
     * hello, (1, 1, 1, ...)
     * @param key Map阶段输出的key:hello
     * @param values Iterable 类似集合 不是迭代器
     *               Map阶段输出的相同key的value集合:(1, 1, 1, ...)
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        outV.set(sum);
        // 写出
        context.write(key, outV);
    }
}

WordCountDriver.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2.设置Driver jar包路径 通过全类名反射得到jar包位置
        job.setJarByClass(WordCountDriver.class);
        // 3.关联mapper和reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 4.设置map输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5.设置最终输出的kv类型,有的mr没有reducer
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6.设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("E:\\words.txt"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\hadoop\\output1"));
        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

集群运行

  • 输入路径args[0]
  • 输出路径args[1]
hadoop jar wc.jar com.learn.mapreduce.wordcount2.WordCountDriver /input /output