Flink, 大数据

flink-32 检查点配置

启用检查点

默认情况下flink是禁用检查点的,如果要开启需要在代码中显示的调用

// 每隔1秒启动一次检查点保存
env.enableCheckpointing(1000);

不传参默认的间隔周期为500毫秒(已弃用)

package com.learn.flink.checkpoint;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class CheckConfigDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);

        // 代码中用到hdfs,需要导入hadoop依赖 指定访问hdfs的用户名
        System.setProperty("HADOOP_USER_NAME", "root");

        // TODO 检查点配置
        // 1、启用检查点,默认是barrier对齐的,周期为5s,精准一次
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        // 2、指定检查点的存储位置
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        // 使用hdfs需要导入hadoop-client依赖,不建议打包的时间将hadoop依赖打包scope设置provided即可
        checkpointConfig.setCheckpointStorage("hdfs://hadoop001:8020/chk");
        // 3、checkpoint的超时时间:默认10分钟
        checkpointConfig.setCheckpointTimeout(60 * 1000L);
        // 4、同时运行中的checkpoint的最大数量
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        // 5、最小等待间隔:上一轮checkpoint结束 到下一轮 checkpoint开始之间的间隔,设置了大于0,并发就会变成1
        checkpointConfig.setMinPauseBetweenCheckpoints(1000L);
        // 6、取消作业时,checkpoint是否保留在外部系统
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        // 7、允许checkpoint连续失败的次数,默认0,表示checkpoint一失败job就挂掉
        checkpointConfig.setTolerableCheckpointFailureNumber(3);


        DataStreamSource<String> socketDS = env.socketTextStream("hadoop003", 7777);
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
                .flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
                    String[] words = value.split(" ");
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1));
                    }
                })
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1);
        sum.print();

        env.execute();
    }
}

默认保留最近的一次成功的checkpoint,可以设置保留个数,取消任务的时候chk-132就会被删除,因为设置了DELETE_ON_CANCELLATION,生产建议设置RETAIN_ON_CANCELLATION

DELETE_ON_CANCELLATION 如果程序突然挂掉而非取消,则无法删除chk-132

// 开启非对其检查点(barrier非对其)
// 开启要求: checkpoint模式必须时精准一次,最大并发必须设为1
checkpointConfig.enableUnalignedCheckpoints();
// 开启非对齐检查点才生效:默认0,表示一开始就直接用 非对齐检查点
// 如果大于0,一开始用 对齐检查点(barrier对齐),对齐的时间超过这个参数,自动切换成 非对齐检查点(barrier非对齐)
checkpointConfig.enableUnalignedCheckpoints();
checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1));

通用增量 checkpoint(changelog)

flink1.15开始不管hashmap还是rocksdb状态后端都可以通过开启changelog实现通用的增量checkpoint

  • 执行过程
    • 带状态的算子任务将状态写入变更日志(记录状态)
    • 状态物化:状态表定期保存,独立于检查点
    • 状态物化完成后,状态变更日志就可以被截断到相应的点
  • 注意事项
    • 目前标记为实验性功能,开启后可能会造成资源消耗增大
      • hdfs上保存的文件数变多
      • 消耗更多的IO带宽用于上传变更日志
      • 更多的CPU用于序列化状态更改
      • TaskManager使用更多内存来缓存状态更改
    • 使用限制
      • checkpoint的最大并发必须为1
      • 从flink1.15开始,只有文件系统的存储类型实现可用(memory测试阶段)
      • 不支持NO_CLAIM模式
  • 使用方式

(1)配置文件指定

state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem
# 存储changelog数据
dstl.dfs.base-path: hdfs://hadoop001:8020/changelog
execution.checkpointing.max-concurrent-checkpoints: 1
execution.savepoint-restore-mode: CLAIM

(2)代码中设置

需要引入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-changelog</artifactId>
    <version>${flink.version}</version>
    <scope>runtime</scope>
</dependency>

开启changelog

// 开启changelog
// 要求checkpoint的最大并发必须为1 setMaxConcurrentCheckpoints 其他参数建议在flink-conf配置文件中去指定(参考方式一)
env.enableChangelogStateBackend(true);