Blog
flink-28 按键分区状态
值状态 ValueState
- value() 取出值状态里的数据
- update(value) 更新值状态里的数据
- clear() 清除值状态里的数据
- 状态描述器 ValueStateDescriptor
计算相邻两个水位的插值相差10的水位
package com.learn.flink.state;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class KeyedValueStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs())
);
sensorDS.keyBy(WaterSensor::getId)
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
// TODO 1、定义状态
ValueState<Integer> lastVcState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// TODO 2、在open方法中,初始化状态
// 状态描述器两个参数:第一个参数 起个名字,唯一不重复;第二个参数,存储的类型
lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT));
}
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
// lastVcState.value(); // 取出值状态里的数据
// lastVcState.update(); // 更新值状态里的数据
// lastVcState.clear(); // 清除值状态里的数据
// 1、取出上一条数据的水位值
int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();
// 2、求差值的绝对值,判断是否超过10
if (Math.abs(value.getVc() - lastVc) > 10) {
out.collect("传感器=" + value.getId() + "当前水位值=" + value.getVc() + ", 与上一条水位值=" + lastVc + ", 相差超过10!!!");
}
// 3、更新状态里的水位值
lastVcState.update(value.getVc());
}
}
).print();
env.execute();
}
}
- 如果使用普通变量保存值状态,那么会造成不同的key会访问到相同的值状态,valuestate保存的是当前key对应的值状态
- 同时程序重启或出错,变量就清空了,无法断点恢复。
列表状态 ListState
- get() 获取当前的列表状态,返回的是一个可迭代类型 Iterable
- update(values) 传入一个列表values,直接对状态进行覆盖
- add(value) 在状态列表中添加一个元素value
- addAll(values) 向列表中添加多个元素,以列表values形式传入
- 状态描述器ListStateDescriptor
计算每个传感器中水位值大小排名前三的水位
package com.learn.flink.state;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
public class KeyedListStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs())
);
sensorDS.keyBy(WaterSensor::getId)
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
ListState<Integer> vcListState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Types.INT));
}
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
// 来一条,存在list状态里
vcListState.add(value.getVc());
// 取出并拷贝数据到List中,排序 只保留3个最大的
List<Integer> arr = new ArrayList<>();
for (Integer integer : vcListState.get()) {
arr.add(integer);
}
arr.sort(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o2 - o1;
}
});
if (arr.size() > 3) {
arr.remove(3); // 数据是一个个来的 连续变大 所以超过三个的数据就直接删除
}
// 更新list状态
vcListState.update(arr);
String currentKey = ctx.getCurrentKey();
out.collect("传感器" + currentKey + "的前三个水位值: " + arr.toString());
}
}
).print();
env.execute();
}
}
Map状态 MapState
- get() 传入一个key作为参数,查询对应的value值
- put(key, value) 传入一个键值对,更新key对应的value值
- putAll(map) 将传入的映射map中所有的键值对,全部添加到映射状态中
- remove(key) 将指定key对应的键值对删除
- contains(key) 判断是否存在指定的key
- entries() 获取映射状态中所有的键值对
- keys() 获取映射状态中所有的键key返回一个可迭代Iterable类型
- values() 获取映射状态中所有的值value,返回一个可迭代Iterable类型
- isEmpty() 判断映射是否为空
- 状态描述器MapStateDescriptor
计算每个传感器中水位出现的次数
package com.learn.flink.state;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
public class KeyedMapStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs())
);
sensorDS.keyBy(WaterSensor::getId)
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
MapState<Integer, Integer> vcMapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
vcMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcMapState", Types.INT, Types.INT));
}
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
// 获取值状态 如果获取到就+1 获取不到就设置1
Integer vc = value.getVc();
if (vcMapState.contains(vc)) {
vcMapState.put(vc, vcMapState.get(vc) + 1);
} else {
vcMapState.put(vc, 1);
}
StringBuilder str = new StringBuilder();
for (Map.Entry<Integer, Integer> entry : vcMapState.entries()) {
str.append(value.getId() + "中" + entry.getKey() + "出现的次数: " + entry.getValue() + " ");
}
out.collect(str.toString());
}
}).print();
env.execute();
}
}
规约状态 ReducingState
- 类似ValueState值状态,不过需要对添加进来的所有数据进行规约,将规约聚合之后的值作为状态保存下来。
- get() 对本组的reducing状态 获取结果
- add() 对本组的reducing状态 添加数据
- clear() 对本组的reducing状态 清空数据
- 状态描述器ReducingStateDescriptor
计算每种传感器的水位和
package com.learn.flink.state;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.Map;
public class KeyedReducingStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs())
);
sensorDS.keyBy(WaterSensor::getId)
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
ReducingState<Integer> vcReducingState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
vcReducingState = getRuntimeContext().getReducingState(
new ReducingStateDescriptor<Integer>(
"vcReducingState",
new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
},
Types.INT
)
);
}
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
// 来一条数据 直接将 值添加到状态中,会自己做累加
vcReducingState.add(value.getVc());
out.collect(value.getId() + "的vc累加值" + vcReducingState.get());
}
}).print();
env.execute();
}
}
聚合状态 AggregatingState
- 与规约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果
- add()方法调用会直接使用指定的AggregateFunction进行聚合并更新状态
- get() 获取状态结果
- clear() 清空状态
计算每种传感器的平均水位
package com.learn.flink.state;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import javafx.scene.chart.ValueAxis;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class KeyedAggregatingStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs())
);
sensorDS.keyBy(WaterSensor::getId)
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
AggregatingState<Integer, Double> vcAggregatingState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
vcAggregatingState = getRuntimeContext().getAggregatingState(
new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>(
"vcAggregatingState",
// 2元组 存储 count值, sum值
new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return Tuple2.of(0, 0);
}
@Override
public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + value);
}
@Override
public Double getResult(Tuple2<Integer, Integer> accumulator) {
// 这里 * 1D 将Integer转Double,不能放在分母位置
return accumulator.f1 * 1D / accumulator.f0;
}
@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
return null;
}
},
Types.TUPLE(Types.INT, Types.INT)
)
);
}
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
vcAggregatingState.add(value.getVc());
out.collect(value.getId() + "的vc平均值: " + vcAggregatingState.get());
}
}).print();
env.execute();
}
}
状态生存时间 TTL
实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路就是直接在代码中调用clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除,这时就需要配置一个状态的生存时间TTL,当状态在内存中存在的时间超过这个值时,就将它清除。
- newBuilder() 设定状态生存时间
- setUpdateType() 指定什么时候更新状态失效时间
- OnCreateAndWrite
- OnReadAndWrite
- setStateVisibility() 所谓的状态可见性,是指因为清除操作并不是实时的,所以当状态过期之后还有可能继续存在,这时对它进行访问,能否正常读取到就是一个问题了。
- TTL配置在open方法中配置,因为状态的赋值也是在open方法中
package com.learn.flink.state;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class StateTTLDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs())
);
sensorDS.keyBy(WaterSensor::getId)
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
// TODO 1、定义状态
ValueState<Integer> lastVcState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 创建StateTtlConfig
StateTtlConfig stateTtlConfig = StateTtlConfig
.newBuilder(Time.seconds(5)) // 过期时间5s
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入(更新)时 更新过期时间
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值
.build();
// 状态描述器 启动TTL
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT);
stateDescriptor.enableTimeToLive(stateTtlConfig);
lastVcState = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
// 获取状态值 打印
Integer stateValue = lastVcState.value();
out.collect("上一次状态值: " + stateValue);
// 更新状态值
lastVcState.update(value.getVc());
}
}
).print();
env.execute();
}
}