Blog
flink-25 窗口处理函数
思路一:
所有数据到一起,用hashmap存储, key=vc,value=count值
package com.learn.flink.process;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
public class TopNDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000L))
);
// 滑动窗口 窗口大小:10s 滑动步长 5s
sensorDS
.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new MyTopNPWAF())
.print();
env.execute();
}
public static class MyTopNPWAF extends ProcessAllWindowFunction<WaterSensor, String, TimeWindow> {
@Override
public void process(ProcessAllWindowFunction<WaterSensor, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
// 定义一个hashmap存储vc和对应的个数
HashMap<Integer, Integer> vcCountMap = new HashMap<>();
// 遍历数据,统计各个vc出现的次数
for (WaterSensor element : elements) {
// 获取vc
Integer vc = element.getVc();
// 如果vc存在就+1 不存在就新加数据
if (vcCountMap.containsKey(vc)) {
vcCountMap.put(vc, vcCountMap.get(vc) + 1);
} else {
vcCountMap.put(vc, 1);
}
}
// 对count值进行排序,利用list来实现排序
ArrayList<Tuple2<Integer, Integer>> datas = new ArrayList<>();
for (Integer vc : vcCountMap.keySet()) {
datas.add(Tuple2.of(vc, vcCountMap.get(vc)));
}
// 对list进行排序,按照count进行排序
datas.sort(new Comparator<Tuple2<Integer, Integer>>() {
@Override
public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {
// 降序 后 - 前 升序 前 - 后
return o2.f1 - o1.f1;
}
});
// 输出count最大的2个vc并组装结果字符串输出
StringBuilder outStr = new StringBuilder();
outStr.append("==========\n");
// 遍历排序后的list,取出前2个,考虑可能list不够2个的情况 -> list中的元素个数和2取最小值
for (int i = 0; i < Math.min(2, datas.size()); i++) {
Tuple2<Integer, Integer> vcCount = datas.get(i);
outStr.append("Top" + (i + 1) + "\n");
outStr.append("vc=" + vcCount.f0 + "\n");
outStr.append("count=" + vcCount.f1 + "\n");
outStr.append("窗口结束时间=" + DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss.SSS") + "\n");
outStr.append("==========\n");
}
out.collect(outStr.toString());
}
}
}
思路二:
按照不同的vc去keyby,分别取count
- 按照vc做keyby,开窗,分别count
- 增量聚合,计算count
- 全窗口,对计算结果count值封装,带上窗口结束时间的标签
- 为了让同一个窗口时间范围的计算结果到一起去
- 对同一个窗口范围的count值值进行处理:排序、取前N个
- 按照windowEnd做keyby
- 使用process,来一条调用一次,需要先存,分开存 hashMap,key=windowEnd,value=list
- 使用定时器,对存起来的结果进行排序、取前N个
package com.learn.flink.process;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
public class KeyedProcessFunctionTopNDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(((element, recordTimestamp) -> element.getTs() * 1000L))
);
// 滑动窗口 窗口大小:10s 滑动步长 5s
// 按 vc进行分组 并开窗打windowEnd标签 再增量聚合 计算窗口内vc的count
SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> windowAgg = sensorDS.keyBy(WaterSensor::getVc)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new VcCountAgg(), new WindowResult());
// 按照窗口标签分组 同一个窗口内后很多vc
windowAgg.keyBy(tuple -> tuple.f2)
.process(new TopN(2))
.print();
env.execute();
}
public static class VcCountAgg implements AggregateFunction<WaterSensor, Integer, Integer> {
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(WaterSensor value, Integer accumulator) {
return accumulator + 1;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) {
return null;
}
}
/**
* 泛型如下:
* 参数1:输入类型 = 增量函数的输出 count值 Integer
* 参数2:输出类型 = Tuple3<Integer, Integer, Long> 组装数据 带上窗口结束时间的标签
* 参数3:key类型:vc Integer
* 参数4:窗口类型
*/
public static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow> {
@Override
public void process(Integer key, ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow>.Context ctx, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {
Integer count = elements.iterator().next();
long windowEnd = ctx.window().getEnd();
out.collect(Tuple3.of(key, count, windowEnd));
}
}
public static class TopN extends KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String> {
// 存不同窗口的统计结果,key = windowEnd,value = list数据
private HashMap<Long, List<Tuple3<Integer, Integer, Long>>> dataListMap;
// 要取Top的数量
private Integer threshold;
public TopN(Integer threshold) {
this.threshold = threshold;
dataListMap = new HashMap<>();
}
@Override
public void processElement(Tuple3<Integer, Integer, Long> value, KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>.Context ctx, Collector<String> out) throws Exception {
// 进入这个方法 只是一条数据,要排序得到齐才行,所以使用map存起来,不同窗口分开存
Long windowEnd = value.f2;
if (dataListMap.containsKey(windowEnd)) {
// 不是当前窗口第一条数据 追加
dataListMap.get(windowEnd).add(value);
} else {
// 当前窗口第一条数据 则新建
List<Tuple3<Integer, Integer, Long>> arr = new ArrayList<>();
arr.add(value);
dataListMap.put(windowEnd, arr);
}
// 注册一个定时器 windowEnd+1ms即可
// 同一个窗口范围,应该同时输出,只不过是一条一条调用processElement方法,需要延迟1ms即可
ctx.timerService().registerEventTimeTimer(windowEnd + 1);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
// 定时器触发,同一个窗口范围的计算结果攒齐了,开始排序取topN
Long windowEnd = ctx.getCurrentKey();
List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);
// 排序
dataList.sort(new Comparator<Tuple3<Integer, Integer, Long>>() {
@Override
public int compare(Tuple3<Integer, Integer, Long> o1, Tuple3<Integer, Integer, Long> o2) {
return o2.f1 - o1.f1;
}
});
// 取TopN
StringBuilder outStr = new StringBuilder();
outStr.append("==========\n");
for (int i = 0; i < Math.min(threshold, dataList.size()); i++) {
Tuple3<Integer, Integer, Long> vcCount = dataList.get(i);
outStr.append("Top" + (i + 1) + "\n");
outStr.append("vc=" + vcCount.f0 + "\n");
outStr.append("count=" + vcCount.f1 + "\n");
outStr.append("窗口结束时间=" + windowEnd + "\n");
outStr.append("==========\n");
}
// 用完的list及时清理节省资源
dataList.clear();
out.collect(outStr.toString());
}
}
}