Flink, 大数据

flink-30 状态后端

在flink中状态的存储、访问以及维护,都是由一个可拔插的组件决定:状态后端(state backend)

状态后端主要复制管理本地状态的存储方式和位置

状态后端的分类

  • HashMapStateBackend (默认)
    • 状态存储在内存里,将状态当作对象(objects)保存在TaskManager的JVM堆上
    • 普通的状态,以及窗口中收集的数据和触发器,都会以键值对的形式存储起来,所以底层是一个哈希表HashMap
  • RocksDB
    • 是一种内嵌的key-value存储介质,可以把数据存储到本地硬盘
    • 默认存储在TaskManager本地数据目录里
    • RocksDB的状态数据被存储为序列化的字节数组
      • 读写操作需要序列化和反序列化
      • 因为做了序列所以key的比较也会按照字节进行而不是直接调用hashCode()和equals()方法
    • 始终执行的是异步快照,不会因为保存检查点而阻塞数据的处理,它还提供了增量式保存检查点的机制,可以提升保存效率

状态后端的配置

(1)配置默认的状态后端

在flink-conf.yaml中,可以使用state.backend来配置默认状态后端

hashmap -> HashMapStateBackend

rocksdb -> EmbeddedRocksDBStateBackend

# 默认状态后端
state.backend: hashmap

# 存放检查点的文件路径
state.checkpoints.dir: hdfs://hadoop001:8020/flink/checkpoints

这里的state.checkpoints.dir定义了检查点和元数据写入的目录

(2)为每个作业Per-job单独配置状态后端

通过执行环境设置

env.setStateBackend(new HashMapStateBackend());
env.setStateBackend(new EmbeddedRocksDBStateBackend());

如果向在IDE中使用EmbeddedRocksDBStateBackend需要添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>