Flink, 大数据

flink-07 并行度

并行子任务和并行度

当要处理的数据量非常大时,我们可以把一个算子操作,复制多份到多个节点,数据来了之后就可以到其中任意一个执行。这样一来,一个算子任务就被拆分成了多个并行子任务(subtasks),再将他们分发到不同节点,就真正实现了并行计算。

在Flink执行过程中,每一个算子(operator)可以包含一个或多个子任务(operator subtask)这些子任务在不同的线程、不同的物理机或不同的容器中独立的执行。

一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。这样包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中不同 的算子可能具有不同的并行度。

并行度的设置

  • 在代码中设置
    • 可以在算子后面跟着调用setParallelism()方法设置当前算子的并行度
    • 只对当前算子有效
    • 使用env.setParallelism()设置全局并行度
package com.learn.flink.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountUnboundedDemo {
    public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> socketDS = env.socketTextStream("hadoop003", 7777);
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
                .flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
                    String[] words = value.split(" ");
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1));
                    }
                }).setParallelism(2)
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1);
        sum.print();

        env.execute();

    }
}

这里还需要添加包:在本地开启flink-webui方便查看

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

同时这里添加了provided 所以在运行的时候需要编辑运行配置 添加provided依赖

FlatMap的并行度为代码中设置的2,sink中的并行度16默认使用当前电脑的线程数(8核16线程)

并行度优先级:

算子 > env > 提交时指定 > 配置文件

算子链

一个数据流在算子之间传输数据的形式可以是一对一(one-to-one)的直通(forwarding)模式,也可以是打乱的重分区(redistributing)模式,具体是哪一种形式取决于算子的种类。

一对一(One to One, forwarding)

这种模式下,数据流维护着分区以及元素的顺序。比如图中的source和map算子,比如上面第一张图中的source和map算子,source算子读取数据之后,可以直接发送给map算子做处理,它们之间不需要重分区,也不需要调整数据的顺序。这就意味着map算子的子任务,看到元素的个数和顺序跟source算子的子任务产生的完全一样,保证着“一对一”的关系。map、filter、flatMap等算子都是这种一对一的对应关系

重分区(redistributing)

在这种模式下,数据流的分区会发生变化。比如map和后面的keyby/window算子之间以及keyby/window和sink算子之间,都是这样的关系

每一个算子的子任务会根据数据传输的策略,把数据发送到不同的下游目标任务。这些传输方式都会引起重分区的过程。

合并算子链

在flink中,并行度相同的一对一的算子操作,可以直接链接在一起形成一个大的任务。这样原来的算子就成为了真正任务里的一部分。所以如下图,每个task会被一个现成执行。这样的技术被称为算子链(operator chain)

source子任务和map子任务合并成为一个子任务

将算子链接成task是非常有效的优化,可以减少现成之间的切换和基于缓存区的数据交换。在减少时延的同时提升吞吐量。

禁用算子链

// 禁用算子链
.map(xxx).disableChaining();
// 从当前算子开始新链
.map(xxx).startNewChain()

全局禁用合并算子链

env.disableOperatorChaining();

禁用算子链场景:flatMap计算逻辑复杂,Map计算逻辑也复杂 这时候需要分开