Flink, 大数据

flink-20 窗口API

窗口分配器

        // 按键分区
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // TODO 1 指定窗口分配器: 使用哪一种窗口
        // 1.1 没有keyby的窗口,窗口内的所有数据进入同一个 子任务,并行度只能为1
        // sensorDS.windowAll()
        // 1.2 有keyby的窗口 每个key上都定义了一组窗口,各自独立进行统计计算
        // 基于时间的
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 滚动窗口,窗口长度10s
//        sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))); // 滑动窗口 窗口长度10s 滑动步长2s
//        sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))); // 会话窗口 超时时间5s

        // 基于计数的
//        sensorKS.countWindow(5); // 滚动窗口 窗口长度5个元素
//        sensorKS.countWindow(5, 2); // 滑动窗口,窗口长度=5个元素,滑动步长=2个元素

窗口函数

定义了窗口分配器,我们只知道了数据属于哪个窗口,可以将数据收集起来了,至于收集起来到底要做什么,其实还完全没有头绪。所以在窗口分配器之后,必须再接上一个定义窗口如何进行计算的操作,这就是所谓的窗口函数。

package com.learn.flink.window;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class WindowApiDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
                .map(new WaterSensorMapFunction());

        // 按键分区
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // TODO 1 指定窗口分配器: 使用哪一种窗口
        // 1.1 没有keyby的窗口,窗口内的所有数据进入同一个 子任务,并行度只能为1
        // sensorDS.windowAll()
        // 1.2 有keyby的窗口 每个key上都定义了一组窗口,各自独立进行统计计算
        // 基于时间的
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 滚动窗口,窗口长度10s
//        sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))); // 滑动窗口 窗口长度10s 滑动步长2s
//        sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))); // 会话窗口 超时时间5s

        // 基于计数的
//        sensorKS.countWindow(5); // 滚动窗口 窗口长度5个元素
//        sensorKS.countWindow(5, 2); // 滑动窗口,窗口长度=5个元素,滑动步长=2个元素

        // TODO 2 指定窗口函数:窗口内数据的计算逻辑
        // 增量聚合:来一条数据 计算一条数据,窗口触发的时候输出计算结果
//        sensorWS
//                .reduce()
//                .aggregate()
        // 全窗口函数:数据来了不计算,存起来,窗口触发的时候,计算并输出结果
//        sensorWS.process()

        env.execute();
    }
}

reduce

package com.learn.flink.window;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class WindowReduceDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);
        // 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        // 窗口函数 增量聚合 reduce
        /*
        1、相同key的第一条数据来的时候不会调用reduce
        2、增量聚合:来一条数据就会计算一次,但是不会输出
        3、在窗口触发的时候,才会输出窗口的最终计算结果
         */
        SingleOutputStreamOperator<WaterSensor> reduce = sensorWS.reduce(
                new ReduceFunction<WaterSensor>() {
                    @Override
                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                        System.out.println("调用reduce方法:value1=" + value1 + " value2=" + value2);
                        return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
                    }
                }
        );

        reduce.print();


        env.execute();
    }
}

aggregate

package com.learn.flink.window;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class WindowAggregateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);
        // 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        // 窗口函数 增量聚合aggregate
        SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(
                /*
                参数说明:
                第一个类型:输入数据的类型
                第二个类型:累加器的类型,存储的中间计算结果的类型
                第三个类型:输出的类型
                 */
                new AggregateFunction<WaterSensor, Integer, String>() {
                    /*
                    创建累加器,初始化累加器
                     */
                    @Override
                    public Integer createAccumulator() {
                        System.out.println("创建累加器");
                        return 0;
                    }

                    /*
                    聚合逻辑
                     */
                    @Override
                    public Integer add(WaterSensor value, Integer accumulator) {
                        System.out.println("调用add方法, value=" + value);
                        return accumulator + value.getVc();
                    }

                    /*
                    获取最终结果,窗口触发时输出
                     */
                    @Override
                    public String getResult(Integer accumulator) {
                        System.out.println("调用getResult方法");
                        return accumulator.toString();
                    }

                    @Override
                    public Integer merge(Integer a, Integer b) {
                        // 只有会话窗口才会用到
                        System.out.println("调用merge方法");
                        return null;
                    }
                });

        aggregate.print();


        env.execute();
    }
}

窗口函数:增量聚合 aggregate

  • 属于本窗口的第一条数据来,创建窗口,创建累加器
  • 增量聚合:来一条计算一条,调用一次add方法
  • 窗口输出时调用一次getResult方法
  • 参数:输入、中间累加器、输出 类型可以不一样,非常灵活

全窗口函数

和增量聚合不同之处,数据最后触发时计算,可以获取上下文信息

package com.learn.flink.window;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class WindowProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);
        // 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        // 全窗口函数
        SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            /**
             *
             * @param s 分组的key
             * @param context 上下文
             * @param elements 存的数据
             * @param out 采集器
             * @throws Exception
             */
            @Override
            public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                // 上下文可以拿到window对象,还有其他东西:侧输出流 等待
                long startTs = context.window().getStart();
                long endTs = context.window().getEnd();
                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                long count = elements.spliterator().estimateSize();

                out.collect("key=" + s + "的窗口(" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
            }
        });
        process.print();


        env.execute();
    }
}

增量聚合和全窗口函数结合使用

package com.learn.flink.window;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class WindowAggregateAndProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);
        // 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        // 增量聚合 全窗口函数结合使用
        SingleOutputStreamOperator<String> DS = sensorWS.aggregate(
                new MyAgg(),
                new MyProcess()
        );

        DS.print();


        env.execute();
    }

    public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String> {

        /*
                    创建累加器,初始化累加器
                     */
        @Override
        public Integer createAccumulator() {
            System.out.println("创建累加器");
            return 0;
        }

        /*
        聚合逻辑
         */
        @Override
        public Integer add(WaterSensor value, Integer accumulator) {
            System.out.println("调用add方法, value=" + value);
            return accumulator + value.getVc();
        }

        /*
        获取最终结果,窗口触发时输出
         */
        @Override
        public String getResult(Integer accumulator) {
            System.out.println("调用getResult方法");
            return accumulator.toString();
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            // 只有会话窗口才会用到
            System.out.println("调用merge方法");
            return null;
        }
    }

    public static class MyProcess extends ProcessWindowFunction<String, String, String, TimeWindow> {

        @Override
        public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
            // 上下文可以拿到window对象,还有其他东西:侧输出流 等待
            long startTs = context.window().getStart();
            long endTs = context.window().getEnd();
            String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
            String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

            long count = elements.spliterator().estimateSize();

            out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
        }
    }
}

结合两者的优点:

  • 增量聚合:来一条计算一条,存储中间的计算结果,占用空间少
  • 全窗口函数:可以通过 上下文实现灵活的功能

触发器 Trigger

触发器主要是用来控制窗口什么时候触发计算。所谓的触发计算本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。

基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器

以时间类型的滚动窗口为例,分析原理:

  • 窗口什么时候触发 输出?
    • 时间进展 >= 窗口的最大时间戳 (end – 1ms)
  • 窗口是怎么划分的
    • start = 向下取整,取窗口长度的整数倍
    • emd = start + 窗口长度
    • 窗口左闭右开:属于本窗口的最大时间戳 = end -1ms
  • 窗口的声明周期
    • 创建:属于本窗口的第一条数据来了,现new的,放入一个singleton单例的集合中
    • 销毁(关窗):时间进展 >= 窗口的最大时间戳 (end – 1ms) + 允许迟到的时间(默认0)

自定义触发器案例:

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
 * 自定义触发器 每10s触发一次窗口计算
 */
public class MyTriggerFunction<T> extends Trigger<T, TimeWindow> {

    // 每个元素来的时候都执行
    @Override
    public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ValueStateDescriptor<Boolean> valueStateDescriptor = new ValueStateDescriptor<>("isFirstState", Boolean.class);
        ValueState<Boolean> isFirstState = ctx.getPartitionedState(valueStateDescriptor);
        Boolean isFirst = isFirstState.value();
        // 如果是(今天)窗口的第一个元素
        if (isFirst == null) {
            // 将状态的值进行更新
            isFirstState.update(true);
            // 注册定时器 当前事件事件向下取整 + 10s 参考滚动窗口 事件时间 - 事件时间%窗口大小的余数
            // 10秒触发一次,也就是10s 20s 30s...的时候触发 而不是11 21 31 这样
            // 比如 7s - 7s/10s + 10s = 0s + 10s = 10s 也就是7s的数据向下取0秒 10秒后也就是第10秒的时候触发
            ctx.registerEventTimeTimer(timestamp - timestamp % 10 * 1000L + 10 * 1000L);
        } else if (isFirst) {
            isFirstState.update(false);
        }
        // 如果不触发计算 直接往下执行
        return TriggerResult.CONTINUE;
    }
    // 处理时间定时器被触发的时候执行
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return null;
    }

    // time 表示事件时间触发器 触发时间
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        long end = window.getEnd();
        // 触发时间与窗口时间进行比较
        // 如果小于窗口时间(1天) 说明窗口还没关闭 那就触发计算 否则就不计算
        if (time < end) {
            // 如果触发时间 + 10s还没有到达窗口关闭时间  那么继续创建一个触发器
            // 在窗口关闭的时候也会触发一次计算
            if (time + 10 * 1000L < end) {
                ctx.registerEventTimeTimer(time + 10 * 1000L);
            }
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }
    // 当窗口被清除的时候执行
    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        // 清除窗口最大时间定时器
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }
}

移除器 Evictor

移除器主要用来定义移除某些数据的逻辑,基于WindowedStream调用.evictor()方法就可以传入一个自定义的移除器。Evictor是一个接口,不同的窗口类型都要有各自预实现的移除器。