Flink, 大数据

flink-29 算子状态

算子状态就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务

  • 算子状态的实际应用场景不如Keyed State多,一般用在Source或Sink等外部系统连接的算子上,或者完全没有key定义的场景
    • flink的kafka连接器中就用到了算子状态
  • 当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配

列表状态ListState

  • 状态描述器ListStateDescriptor
package com.learn.flink.state;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class OperatorListStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        env.socketTextStream("hadoop003", 7777)
                .map(new MyMapFunction())
                .print();

        env.execute();
    }

    public static class MyMapFunction implements MapFunction<String, Long>, CheckpointedFunction {
        private Long count = 0L;
        private ListState<Long> state;

        @Override
        public Long map(String value) throws Exception {
            return ++count;
        }

        /**
         * 本地变量持久化,将本地变量拷贝到算子状态中 开启checkpoint时才会调用
         *
         * @param context the context for drawing a snapshot of the operator
         * @throws Exception
         */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("snapshotState...");
            // 清空算子状态
            state.clear();
            // 将本地变量添加到算子状态中
            state.add(count);
        }

        /**
         * 初始化本地变量:程序恢复时,从状态中 把数据添加到本地变量,每个子任务调用一次
         *
         * @param context the context for initializing the operator
         * @throws Exception
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("initializeState...");
            // 从上下文 初始化算子状态
            state = context
                    .getOperatorStateStore()
                    .getListState(new ListStateDescriptor<Long>("state", Types.LONG));
            // 从算子状态中把数据拷贝到本地变量
            if (context.isRestored()) { // 恢复成功
                for (Long c : state.get()) {
                    count += c;
                }
            }
        }
    }
}

UnionListState

  • 状态描述器 ListStateDescriptor

算子状态中,list与unionlist的区别:并行度改变时,怎么重新分配状态

  • list状态:轮询均分给新的并行子任务
  • unionlist状态:原先的多个子任务的状态,合并成一份完整的,会把完整的列表 广播给 新的并行子任务(每人一份完整的)

广播状态 BroadcastState

有时我们希望算子并行子任务都保持同一份全局状态,用户来做统一的配置和规则设定,这时所有分区的所有数据都会访问到同一个状态,这种特殊的算子状态就叫做广播状态。

package com.learn.flink.state;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

public class OperatorBroadcastStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        // 数据流
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
        // 配置流
        DataStreamSource<String> configDS = env.socketTextStream("hadoop003", 8888);
        // 配置流转广播流
        MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);
        BroadcastStream<String> configBS = configDS.broadcast(broadcastMapState);

        // 把数据流 和 广播后的配置流 connect
        BroadcastConnectedStream<WaterSensor, String> sensorBCS = sensorDS.connect(configBS);

        // 调用process
        sensorBCS.process(
                new BroadcastProcessFunction<WaterSensor, String, String>() {
                    /**
                     * 数据流的处理方法
                     * @param value The stream element.
                     * @param ctx
                     * @param out The collector to emit resulting elements to
                     * @throws Exception
                     */
                    @Override
                    public void processElement(WaterSensor value, BroadcastProcessFunction<WaterSensor, String, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {
                        // 通过上下文获取广播状态里面的值(只读,不能修改)
                        ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
                        Integer threshold = broadcastState.get("threshold");
                        // 判断广播状态是否有数据,因为刚启动,可能是数据流的数据先来。
                        threshold = threshold == null ? 0 : threshold;
                        if (value.getVc() > threshold) {
                            out.collect("传感器" + value.getId() + ": 水位值" + value.getVc() + "超过指定阈值" + threshold + "!!!");
                        }
                    }

                    /**
                     * 广播后的配置流的处理方法
                     * @param value The stream element.
                     * @param ctx
                     * @param out The collector to emit resulting elements to
                     * @throws Exception
                     */
                    @Override
                    public void processBroadcastElement(String value, BroadcastProcessFunction<WaterSensor, String, String>.Context ctx, Collector<String> out) throws Exception {
                        // 通过上下文获取广播状态,往里面写数据
                        BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
                        broadcastState.put("threshold", Integer.valueOf(value));
                    }
                }
        ).print();

        env.execute();
    }
}