Flink, 大数据

flink-34 状态一致性 端到端一致性

状态一致性

一致性其实就是结果的正确性,一般从数据丢失、数据重复来评估

流式计算在发生故障、需要恢复状态进行回滚时就需要更多的保障机制了。我们通过检查点的保存来保证状态恢复后结果的正确性。

一致性三种级别:

  • 最多一次 At-Most-Once
  • 至少一次 At-Least-Once
  • 精确一次 Exactly-Once

端到端一致性

从源头到结果输出整个过程都是正确的

数据源可重放数据,或者说可重置读取数据偏移量,加上flink的source算子将偏移量作为状态保存进检查点,这样就可以在故障恢复时从检查点中读取出来,对数据源重置偏移量,重新获取数据。

两阶段提交事务

Flink写入Kafka两阶段提交

具体实现:

(1)Flink内部

flink内部可以通过检查点机制保证状态和处理结果的exactly-once语义

(2)输入端

输入数据源端的kafka可以对数据进行持久化保存,并可以重置偏移量offset,所以我们可以在source任务(FlinkKafkaConsumer)中将当前读取的偏移量保存为算子状态,写入到检查点中,当发生故障时,从检查点中读取恢复状态,并由连接器FlinkKafkaConsumer向kafka重新提交偏移量,就可以重新消费数据、保证结果一致性了。

(3)输出端

输出端保证exactly-once的最佳实现,当然就是两阶段提交2PC

我们写入kafka的过程实际上时一个两个段的提交:

  • 处理完毕得到结果,写入kafka时是基于事务的预提交。
  • 等到检查点保存完毕,才会提交事务进行正式提交
  • 如果中间出现故障事务进行回滚,预提交就会被放弃,恢复状态之后,也只能恢复所有已经确认提交的操作

需要的配置

  • 必须启用检查点
  • 指定KafkaSink的发送级别为DeliveryGuarantee.EXACTLY_ONCE
  • 配置kafka读取数据的消费者的隔离级别

这里所说的kafka是写入的外部系统。预提交阶段数据已经写出,只是被标记为未提交(uncommitted),而kafka中默认的隔离级别isolation.level是read_uncommitted也就是可以读取未提交的数据。这样一来,外部应用就可以直接消费未提交的数据,对于事务性的保证就失效了,所以应该将隔离级别配置为read_committed,表示消费者遇到未提交的消息时,会停止从分区中消费数据,直到消息被标记为已提交才会再次恢复消息。

  • 事务超时配置

flink的kafka连接器中配置的事务超时时间transaction.timeout.ms默认是1小时,而kafka集群配置的事务最大超时时间transaction.max.timeout.ms默认是15分钟。所以检查点保存时间很长时,有可能出现kafka已经认为事务超时了,丢弃了预提交的数据,而sink任务还可以继续等待。如果接下来检查点保存成功了,发生故障后回滚到间隔检查点的状态,这部分的数据就被真正丢掉了。所以这两个超时时间,前者应该小于后者。

package com.learn.flink.checkpoint;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;

public class KafkaEOSDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        System.setProperty("HADOOP_USER_NAME", "root");
        // 启动检查点 设置精准一次
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointStorage("hdfs://hadoop001:8020/chk");
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 读取kafka
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop001:9092")
                .setGroupId("test")
                .setTopics("topic_1")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.latest())
                .build();
        DataStreamSource<String> kafkaDS = (DataStreamSource<String>) env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source");

        // 写出kafka
        // 精准一次写入kafka 需满足条件:开启checkpoint、设置事务前缀、设置事务超时时间 大于checkpoint间隔 小于max 15分钟
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop001:9092")
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("ws")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 精准一次,开启两阶段提交
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 精准一次必须设置事务的前缀
                .setTransactionalIdPrefix("fs-")
                // 精准一次 必须设置事务超时时间:大于checkpoint间隔(5s)小于max 15分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();

        kafkaDS.sinkTo(kafkaSink);

        env.execute();
    }
}

消费被两阶段提交的topic

KakfaSource.xxx
    // 作为下游的消费者,要设置事务的隔离级别 = 读已提交
    .serProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
    .build();

checkpoint时间设置了5s 所以读取到数据也要差不多5s