Flink, 大数据

flink-25 窗口处理函数

思路一:

所有数据到一起,用hashmap存储, key=vc,value=count值

package com.learn.flink.process;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;

public class TopNDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000L))
                );

        // 滑动窗口 窗口大小:10s 滑动步长 5s
        sensorDS
                .windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .process(new MyTopNPWAF())
                .print();


        env.execute();
    }

    public static class MyTopNPWAF extends ProcessAllWindowFunction<WaterSensor, String, TimeWindow> {

        @Override
        public void process(ProcessAllWindowFunction<WaterSensor, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
            // 定义一个hashmap存储vc和对应的个数
            HashMap<Integer, Integer> vcCountMap = new HashMap<>();
            // 遍历数据,统计各个vc出现的次数
            for (WaterSensor element : elements) {
                // 获取vc
                Integer vc = element.getVc();
                // 如果vc存在就+1 不存在就新加数据
                if (vcCountMap.containsKey(vc)) {
                    vcCountMap.put(vc, vcCountMap.get(vc) + 1);
                } else {
                    vcCountMap.put(vc, 1);
                }
            }
            
            // 对count值进行排序,利用list来实现排序
            ArrayList<Tuple2<Integer, Integer>> datas = new ArrayList<>();
            for (Integer vc : vcCountMap.keySet()) {
                datas.add(Tuple2.of(vc, vcCountMap.get(vc)));
            }
            
            // 对list进行排序,按照count进行排序
            datas.sort(new Comparator<Tuple2<Integer, Integer>>() {
                @Override
                public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {
                    // 降序 后 - 前 升序 前 - 后
                    return o2.f1 - o1.f1;
                }
            });
            
            // 输出count最大的2个vc并组装结果字符串输出
            StringBuilder outStr = new StringBuilder();
            outStr.append("==========\n");
            // 遍历排序后的list,取出前2个,考虑可能list不够2个的情况 -> list中的元素个数和2取最小值
            for (int i = 0; i < Math.min(2, datas.size()); i++) {
                Tuple2<Integer, Integer> vcCount = datas.get(i);
                outStr.append("Top" + (i + 1) + "\n");
                outStr.append("vc=" + vcCount.f0 + "\n");
                outStr.append("count=" + vcCount.f1 + "\n");
                outStr.append("窗口结束时间=" + DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss.SSS") + "\n");
                outStr.append("==========\n");
            }
            out.collect(outStr.toString());
        }
    }
}

思路二:

按照不同的vc去keyby,分别取count

  • 按照vc做keyby,开窗,分别count
    • 增量聚合,计算count
    • 全窗口,对计算结果count值封装,带上窗口结束时间的标签
      • 为了让同一个窗口时间范围的计算结果到一起去
  • 对同一个窗口范围的count值值进行处理:排序、取前N个
    • 按照windowEnd做keyby
    • 使用process,来一条调用一次,需要先存,分开存 hashMap,key=windowEnd,value=list
      • 使用定时器,对存起来的结果进行排序、取前N个
package com.learn.flink.process;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;

public class KeyedProcessFunctionTopNDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000L))
                );

        // 滑动窗口 窗口大小:10s 滑动步长 5s
        // 按 vc进行分组 并开窗打windowEnd标签 再增量聚合 计算窗口内vc的count
        SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> windowAgg = sensorDS.keyBy(WaterSensor::getVc)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new VcCountAgg(), new WindowResult());

        // 按照窗口标签分组 同一个窗口内后很多vc
        windowAgg.keyBy(tuple -> tuple.f2)
                .process(new TopN(2))
                .print();

        env.execute();
    }

    public static class VcCountAgg implements AggregateFunction<WaterSensor, Integer, Integer> {

        @Override
        public Integer createAccumulator() {
            return 0;
        }

        @Override
        public Integer add(WaterSensor value, Integer accumulator) {
            return accumulator + 1;
        }

        @Override
        public Integer getResult(Integer accumulator) {
            return accumulator;
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            return null;
        }
    }

    /**
     * 泛型如下:
     * 参数1:输入类型 = 增量函数的输出 count值 Integer
     * 参数2:输出类型 = Tuple3<Integer, Integer, Long> 组装数据 带上窗口结束时间的标签
     * 参数3:key类型:vc Integer
     * 参数4:窗口类型
     */
    public static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow> {

        @Override
        public void process(Integer key, ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow>.Context ctx, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {
            Integer count = elements.iterator().next();
            long windowEnd = ctx.window().getEnd();
            out.collect(Tuple3.of(key, count, windowEnd));
        }
    }

    public static class TopN extends KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String> {
        // 存不同窗口的统计结果,key = windowEnd,value = list数据
        private HashMap<Long, List<Tuple3<Integer, Integer, Long>>> dataListMap;
        // 要取Top的数量
        private Integer threshold;

        public TopN(Integer threshold) {
            this.threshold = threshold;
            dataListMap = new HashMap<>();
        }

        @Override
        public void processElement(Tuple3<Integer, Integer, Long> value, KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>.Context ctx, Collector<String> out) throws Exception {
            // 进入这个方法 只是一条数据,要排序得到齐才行,所以使用map存起来,不同窗口分开存
            Long windowEnd = value.f2;
            if (dataListMap.containsKey(windowEnd)) {
                // 不是当前窗口第一条数据 追加
                dataListMap.get(windowEnd).add(value);
            } else {
                // 当前窗口第一条数据 则新建
                List<Tuple3<Integer, Integer, Long>> arr = new ArrayList<>();
                arr.add(value);
                dataListMap.put(windowEnd, arr);
            }
            // 注册一个定时器 windowEnd+1ms即可
            // 同一个窗口范围,应该同时输出,只不过是一条一条调用processElement方法,需要延迟1ms即可
            ctx.timerService().registerEventTimeTimer(windowEnd + 1);
        }

        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            // 定时器触发,同一个窗口范围的计算结果攒齐了,开始排序取topN
            Long windowEnd = ctx.getCurrentKey();
            List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);
            // 排序
            dataList.sort(new Comparator<Tuple3<Integer, Integer, Long>>() {
                @Override
                public int compare(Tuple3<Integer, Integer, Long> o1, Tuple3<Integer, Integer, Long> o2) {
                    return o2.f1 - o1.f1;
                }
            });
            // 取TopN
            StringBuilder outStr = new StringBuilder();
            outStr.append("==========\n");
            for (int i = 0; i < Math.min(threshold, dataList.size()); i++) {
                Tuple3<Integer, Integer, Long> vcCount = dataList.get(i);
                outStr.append("Top" + (i + 1) + "\n");
                outStr.append("vc=" + vcCount.f0 + "\n");
                outStr.append("count=" + vcCount.f1 + "\n");
                outStr.append("窗口结束时间=" + windowEnd + "\n");
                outStr.append("==========\n");
            }
            // 用完的list及时清理节省资源
            dataList.clear();
            out.collect(outStr.toString());
        }
    }
}