Flink, 大数据

flink-28 按键分区状态

值状态 ValueState

  • value() 取出值状态里的数据
  • update(value) 更新值状态里的数据
  • clear() 清除值状态里的数据
  • 状态描述器 ValueStateDescriptor

计算相邻两个水位的插值相差10的水位

package com.learn.flink.state;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class KeyedValueStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        sensorDS.keyBy(WaterSensor::getId)
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {
                            // TODO 1、定义状态
                            ValueState<Integer> lastVcState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                // TODO 2、在open方法中,初始化状态
                                // 状态描述器两个参数:第一个参数 起个名字,唯一不重复;第二个参数,存储的类型
                                lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT));
                            }

                            @Override
                            public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
//                                        lastVcState.value(); // 取出值状态里的数据
//                                        lastVcState.update(); // 更新值状态里的数据
//                                        lastVcState.clear(); // 清除值状态里的数据

                                // 1、取出上一条数据的水位值
                                int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();
                                // 2、求差值的绝对值,判断是否超过10
                                if (Math.abs(value.getVc() - lastVc) > 10) {
                                    out.collect("传感器=" + value.getId() + "当前水位值=" + value.getVc() + ", 与上一条水位值=" + lastVc + ", 相差超过10!!!");
                                }
                                // 3、更新状态里的水位值
                                lastVcState.update(value.getVc());
                            }
                        }
                ).print();


        env.execute();
    }
}
  • 如果使用普通变量保存值状态,那么会造成不同的key会访问到相同的值状态,valuestate保存的是当前key对应的值状态
  • 同时程序重启或出错,变量就清空了,无法断点恢复。

列表状态 ListState

  • get() 获取当前的列表状态,返回的是一个可迭代类型 Iterable
  • update(values) 传入一个列表values,直接对状态进行覆盖
  • add(value) 在状态列表中添加一个元素value
  • addAll(values) 向列表中添加多个元素,以列表values形式传入
  • 状态描述器ListStateDescriptor

计算每个传感器中水位值大小排名前三的水位

package com.learn.flink.state;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;

public class KeyedListStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        sensorDS.keyBy(WaterSensor::getId)
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {
                            ListState<Integer> vcListState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Types.INT));
                            }

                            @Override
                            public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                                // 来一条,存在list状态里
                                vcListState.add(value.getVc());

                                // 取出并拷贝数据到List中,排序 只保留3个最大的
                                List<Integer> arr = new ArrayList<>();
                                for (Integer integer : vcListState.get()) {
                                    arr.add(integer);
                                }
                                arr.sort(new Comparator<Integer>() {
                                    @Override
                                    public int compare(Integer o1, Integer o2) {
                                        return o2 - o1;
                                    }
                                });
                                if (arr.size() > 3) {
                                    arr.remove(3); // 数据是一个个来的 连续变大 所以超过三个的数据就直接删除
                                }
                                // 更新list状态
                                vcListState.update(arr);

                                String currentKey = ctx.getCurrentKey();
                                out.collect("传感器" + currentKey + "的前三个水位值: " + arr.toString());
                            }
                        }
                ).print();


        env.execute();
    }
}

Map状态 MapState

  • get() 传入一个key作为参数,查询对应的value值
  • put(key, value) 传入一个键值对,更新key对应的value值
  • putAll(map) 将传入的映射map中所有的键值对,全部添加到映射状态中
  • remove(key) 将指定key对应的键值对删除
  • contains(key) 判断是否存在指定的key
  • entries() 获取映射状态中所有的键值对
  • keys() 获取映射状态中所有的键key返回一个可迭代Iterable类型
  • values() 获取映射状态中所有的值value,返回一个可迭代Iterable类型
  • isEmpty() 判断映射是否为空
  • 状态描述器MapStateDescriptor

计算每个传感器中水位出现的次数

package com.learn.flink.state;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

public class KeyedMapStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        sensorDS.keyBy(WaterSensor::getId)
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {
                            MapState<Integer, Integer> vcMapState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                vcMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcMapState", Types.INT, Types.INT));
                            }

                            @Override
                            public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                                // 获取值状态 如果获取到就+1 获取不到就设置1
                                Integer vc = value.getVc();
                                if (vcMapState.contains(vc)) {
                                    vcMapState.put(vc, vcMapState.get(vc) + 1);
                                } else {
                                    vcMapState.put(vc, 1);
                                }
                                StringBuilder str = new StringBuilder();
                                for (Map.Entry<Integer, Integer> entry : vcMapState.entries()) {
                                    str.append(value.getId() + "" + entry.getKey() + "出现的次数: " + entry.getValue() + " ");
                                }
                                out.collect(str.toString());
                            }
                        }).print();


        env.execute();
    }
}

规约状态 ReducingState

  • 类似ValueState值状态,不过需要对添加进来的所有数据进行规约,将规约聚合之后的值作为状态保存下来。
  • get() 对本组的reducing状态 获取结果
  • add() 对本组的reducing状态 添加数据
  • clear() 对本组的reducing状态 清空数据
  • 状态描述器ReducingStateDescriptor

计算每种传感器的水位和

package com.learn.flink.state;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Map;

public class KeyedReducingStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        sensorDS.keyBy(WaterSensor::getId)
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {

                            ReducingState<Integer> vcReducingState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                vcReducingState = getRuntimeContext().getReducingState(
                                        new ReducingStateDescriptor<Integer>(
                                                "vcReducingState",
                                                new ReduceFunction<Integer>() {
                                                    @Override
                                                    public Integer reduce(Integer value1, Integer value2) throws Exception {
                                                        return value1 + value2;
                                                    }
                                                },
                                                Types.INT
                                        )
                                );
                            }

                            @Override
                            public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                                // 来一条数据 直接将 值添加到状态中,会自己做累加
                                vcReducingState.add(value.getVc());
                                out.collect(value.getId() + "的vc累加值" + vcReducingState.get());
                            }
                        }).print();


        env.execute();
    }
}

聚合状态 AggregatingState

  • 与规约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果
  • add()方法调用会直接使用指定的AggregateFunction进行聚合并更新状态
  • get() 获取状态结果
  • clear() 清空状态

计算每种传感器的平均水位

package com.learn.flink.state;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import javafx.scene.chart.ValueAxis;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class KeyedAggregatingStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        sensorDS.keyBy(WaterSensor::getId)
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {
                            AggregatingState<Integer, Double> vcAggregatingState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                vcAggregatingState = getRuntimeContext().getAggregatingState(
                                        new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>(
                                                "vcAggregatingState",
                                                // 2元组 存储 count值, sum值
                                                new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
                                                    @Override
                                                    public Tuple2<Integer, Integer> createAccumulator() {
                                                        return Tuple2.of(0, 0);
                                                    }

                                                    @Override
                                                    public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
                                                        return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + value);
                                                    }

                                                    @Override
                                                    public Double getResult(Tuple2<Integer, Integer> accumulator) {
                                                        // 这里 * 1D 将Integer转Double,不能放在分母位置
                                                        return accumulator.f1 * 1D / accumulator.f0;
                                                    }

                                                    @Override
                                                    public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
                                                        return null;
                                                    }
                                                },
                                                Types.TUPLE(Types.INT, Types.INT)
                                        )
                                );
                            }

                            @Override
                            public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                                vcAggregatingState.add(value.getVc());
                                out.collect(value.getId() + "的vc平均值: " + vcAggregatingState.get());
                            }
                        }).print();


        env.execute();
    }
}

状态生存时间 TTL

实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路就是直接在代码中调用clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除,这时就需要配置一个状态的生存时间TTL当状态在内存中存在的时间超过这个值时,就将它清除

  • newBuilder() 设定状态生存时间
  • setUpdateType() 指定什么时候更新状态失效时间
    • OnCreateAndWrite
  • OnReadAndWrite
  • setStateVisibility() 所谓的状态可见性,是指因为清除操作并不是实时的,所以当状态过期之后还有可能继续存在,这时对它进行访问,能否正常读取到就是一个问题了。
  • TTL配置在open方法中配置,因为状态的赋值也是在open方法中
package com.learn.flink.state;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class StateTTLDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        sensorDS.keyBy(WaterSensor::getId)
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {
                            // TODO 1、定义状态
                            ValueState<Integer> lastVcState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                // 创建StateTtlConfig
                                StateTtlConfig stateTtlConfig = StateTtlConfig
                                        .newBuilder(Time.seconds(5)) // 过期时间5s
                                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入(更新)时 更新过期时间
                                        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值
                                        .build();
                                // 状态描述器 启动TTL
                                ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT);
                                stateDescriptor.enableTimeToLive(stateTtlConfig);

                                lastVcState = getRuntimeContext().getState(stateDescriptor);
                            }

                            @Override
                            public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                                // 获取状态值 打印
                                Integer stateValue = lastVcState.value();
                                out.collect("上一次状态值: " + stateValue);
                                // 更新状态值
                                lastVcState.update(value.getVc());
                            }
                        }
                ).print();


        env.execute();
    }
}