Blog
flink-34 状态一致性 端到端一致性
状态一致性
一致性其实就是结果的正确性,一般从数据丢失、数据重复来评估
流式计算在发生故障、需要恢复状态进行回滚时就需要更多的保障机制了。我们通过检查点的保存来保证状态恢复后结果的正确性。
一致性三种级别:
- 最多一次 At-Most-Once
- 至少一次 At-Least-Once
- 精确一次 Exactly-Once
端到端一致性
从源头到结果输出整个过程都是正确的
数据源可重放数据,或者说可重置读取数据偏移量,加上flink的source算子将偏移量作为状态保存进检查点,这样就可以在故障恢复时从检查点中读取出来,对数据源重置偏移量,重新获取数据。
两阶段提交事务
Flink写入Kafka两阶段提交
具体实现:
(1)Flink内部
flink内部可以通过检查点机制保证状态和处理结果的exactly-once语义
(2)输入端
输入数据源端的kafka可以对数据进行持久化保存,并可以重置偏移量offset,所以我们可以在source任务(FlinkKafkaConsumer)中将当前读取的偏移量保存为算子状态,写入到检查点中,当发生故障时,从检查点中读取恢复状态,并由连接器FlinkKafkaConsumer向kafka重新提交偏移量,就可以重新消费数据、保证结果一致性了。
(3)输出端
输出端保证exactly-once的最佳实现,当然就是两阶段提交2PC
我们写入kafka的过程实际上时一个两个段的提交:
- 处理完毕得到结果,写入kafka时是基于事务的预提交。
- 等到检查点保存完毕,才会提交事务进行正式提交
- 如果中间出现故障事务进行回滚,预提交就会被放弃,恢复状态之后,也只能恢复所有已经确认提交的操作
需要的配置
- 必须启用检查点
- 指定KafkaSink的发送级别为DeliveryGuarantee.EXACTLY_ONCE
- 配置kafka读取数据的消费者的隔离级别
这里所说的kafka是写入的外部系统。预提交阶段数据已经写出,只是被标记为未提交(uncommitted),而kafka中默认的隔离级别isolation.level是read_uncommitted也就是可以读取未提交的数据。这样一来,外部应用就可以直接消费未提交的数据,对于事务性的保证就失效了,所以应该将隔离级别配置为read_committed,表示消费者遇到未提交的消息时,会停止从分区中消费数据,直到消息被标记为已提交才会再次恢复消息。
- 事务超时配置
flink的kafka连接器中配置的事务超时时间transaction.timeout.ms默认是1小时,而kafka集群配置的事务最大超时时间transaction.max.timeout.ms默认是15分钟。所以检查点保存时间很长时,有可能出现kafka已经认为事务超时了,丢弃了预提交的数据,而sink任务还可以继续等待。如果接下来检查点保存成功了,发生故障后回滚到间隔检查点的状态,这部分的数据就被真正丢掉了。所以这两个超时时间,前者应该小于后者。
package com.learn.flink.checkpoint;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;
public class KafkaEOSDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
System.setProperty("HADOOP_USER_NAME", "root");
// 启动检查点 设置精准一次
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage("hdfs://hadoop001:8020/chk");
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 读取kafka
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop001:9092")
.setGroupId("test")
.setTopics("topic_1")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.latest())
.build();
DataStreamSource<String> kafkaDS = (DataStreamSource<String>) env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source");
// 写出kafka
// 精准一次写入kafka 需满足条件:开启checkpoint、设置事务前缀、设置事务超时时间 大于checkpoint间隔 小于max 15分钟
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("hadoop001:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("ws")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// 精准一次,开启两阶段提交
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// 精准一次必须设置事务的前缀
.setTransactionalIdPrefix("fs-")
// 精准一次 必须设置事务超时时间:大于checkpoint间隔(5s)小于max 15分钟
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
.build();
kafkaDS.sinkTo(kafkaSink);
env.execute();
}
}
消费被两阶段提交的topic
KakfaSource.xxx
// 作为下游的消费者,要设置事务的隔离级别 = 读已提交
.serProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
.build();
checkpoint时间设置了5s 所以读取到数据也要差不多5s