Blog
flink-29 算子状态
算子状态就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务
- 算子状态的实际应用场景不如Keyed State多,一般用在Source或Sink等外部系统连接的算子上,或者完全没有key定义的场景
- flink的kafka连接器中就用到了算子状态
- 当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配
列表状态ListState
- 状态描述器ListStateDescriptor
package com.learn.flink.state;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class OperatorListStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.socketTextStream("hadoop003", 7777)
.map(new MyMapFunction())
.print();
env.execute();
}
public static class MyMapFunction implements MapFunction<String, Long>, CheckpointedFunction {
private Long count = 0L;
private ListState<Long> state;
@Override
public Long map(String value) throws Exception {
return ++count;
}
/**
* 本地变量持久化,将本地变量拷贝到算子状态中 开启checkpoint时才会调用
*
* @param context the context for drawing a snapshot of the operator
* @throws Exception
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
System.out.println("snapshotState...");
// 清空算子状态
state.clear();
// 将本地变量添加到算子状态中
state.add(count);
}
/**
* 初始化本地变量:程序恢复时,从状态中 把数据添加到本地变量,每个子任务调用一次
*
* @param context the context for initializing the operator
* @throws Exception
*/
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
System.out.println("initializeState...");
// 从上下文 初始化算子状态
state = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<Long>("state", Types.LONG));
// 从算子状态中把数据拷贝到本地变量
if (context.isRestored()) { // 恢复成功
for (Long c : state.get()) {
count += c;
}
}
}
}
}
UnionListState
- 状态描述器 ListStateDescriptor
算子状态中,list与unionlist的区别:并行度改变时,怎么重新分配状态
- list状态:轮询均分给新的并行子任务
- unionlist状态:原先的多个子任务的状态,合并成一份完整的,会把完整的列表 广播给 新的并行子任务(每人一份完整的)
广播状态 BroadcastState
有时我们希望算子并行子任务都保持同一份全局状态,用户来做统一的配置和规则设定,这时所有分区的所有数据都会访问到同一个状态,这种特殊的算子状态就叫做广播状态。
package com.learn.flink.state;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
public class OperatorBroadcastStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 数据流
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
// 配置流
DataStreamSource<String> configDS = env.socketTextStream("hadoop003", 8888);
// 配置流转广播流
MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);
BroadcastStream<String> configBS = configDS.broadcast(broadcastMapState);
// 把数据流 和 广播后的配置流 connect
BroadcastConnectedStream<WaterSensor, String> sensorBCS = sensorDS.connect(configBS);
// 调用process
sensorBCS.process(
new BroadcastProcessFunction<WaterSensor, String, String>() {
/**
* 数据流的处理方法
* @param value The stream element.
* @param ctx
* @param out The collector to emit resulting elements to
* @throws Exception
*/
@Override
public void processElement(WaterSensor value, BroadcastProcessFunction<WaterSensor, String, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {
// 通过上下文获取广播状态里面的值(只读,不能修改)
ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
Integer threshold = broadcastState.get("threshold");
// 判断广播状态是否有数据,因为刚启动,可能是数据流的数据先来。
threshold = threshold == null ? 0 : threshold;
if (value.getVc() > threshold) {
out.collect("传感器" + value.getId() + ": 水位值" + value.getVc() + "超过指定阈值" + threshold + "!!!");
}
}
/**
* 广播后的配置流的处理方法
* @param value The stream element.
* @param ctx
* @param out The collector to emit resulting elements to
* @throws Exception
*/
@Override
public void processBroadcastElement(String value, BroadcastProcessFunction<WaterSensor, String, String>.Context ctx, Collector<String> out) throws Exception {
// 通过上下文获取广播状态,往里面写数据
BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
broadcastState.put("threshold", Integer.valueOf(value));
}
}
).print();
env.execute();
}
}