Blog
flink-20 窗口API
窗口分配器
// 按键分区
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());
// TODO 1 指定窗口分配器: 使用哪一种窗口
// 1.1 没有keyby的窗口,窗口内的所有数据进入同一个 子任务,并行度只能为1
// sensorDS.windowAll()
// 1.2 有keyby的窗口 每个key上都定义了一组窗口,各自独立进行统计计算
// 基于时间的
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 滚动窗口,窗口长度10s
// sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))); // 滑动窗口 窗口长度10s 滑动步长2s
// sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))); // 会话窗口 超时时间5s
// 基于计数的
// sensorKS.countWindow(5); // 滚动窗口 窗口长度5个元素
// sensorKS.countWindow(5, 2); // 滑动窗口,窗口长度=5个元素,滑动步长=2个元素
窗口函数
定义了窗口分配器,我们只知道了数据属于哪个窗口,可以将数据收集起来了,至于收集起来到底要做什么,其实还完全没有头绪。所以在窗口分配器之后,必须再接上一个定义窗口如何进行计算的操作,这就是所谓的窗口函数。
package com.learn.flink.window;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class WindowApiDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
.map(new WaterSensorMapFunction());
// 按键分区
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());
// TODO 1 指定窗口分配器: 使用哪一种窗口
// 1.1 没有keyby的窗口,窗口内的所有数据进入同一个 子任务,并行度只能为1
// sensorDS.windowAll()
// 1.2 有keyby的窗口 每个key上都定义了一组窗口,各自独立进行统计计算
// 基于时间的
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 滚动窗口,窗口长度10s
// sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))); // 滑动窗口 窗口长度10s 滑动步长2s
// sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))); // 会话窗口 超时时间5s
// 基于计数的
// sensorKS.countWindow(5); // 滚动窗口 窗口长度5个元素
// sensorKS.countWindow(5, 2); // 滑动窗口,窗口长度=5个元素,滑动步长=2个元素
// TODO 2 指定窗口函数:窗口内数据的计算逻辑
// 增量聚合:来一条数据 计算一条数据,窗口触发的时候输出计算结果
// sensorWS
// .reduce()
// .aggregate()
// 全窗口函数:数据来了不计算,存起来,窗口触发的时候,计算并输出结果
// sensorWS.process()
env.execute();
}
}
reduce
package com.learn.flink.window;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class WindowReduceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);
// 窗口分配器
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
// 窗口函数 增量聚合 reduce
/*
1、相同key的第一条数据来的时候不会调用reduce
2、增量聚合:来一条数据就会计算一次,但是不会输出
3、在窗口触发的时候,才会输出窗口的最终计算结果
*/
SingleOutputStreamOperator<WaterSensor> reduce = sensorWS.reduce(
new ReduceFunction<WaterSensor>() {
@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
System.out.println("调用reduce方法:value1=" + value1 + " value2=" + value2);
return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
}
}
);
reduce.print();
env.execute();
}
}
aggregate
package com.learn.flink.window;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class WindowAggregateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);
// 窗口分配器
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
// 窗口函数 增量聚合aggregate
SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(
/*
参数说明:
第一个类型:输入数据的类型
第二个类型:累加器的类型,存储的中间计算结果的类型
第三个类型:输出的类型
*/
new AggregateFunction<WaterSensor, Integer, String>() {
/*
创建累加器,初始化累加器
*/
@Override
public Integer createAccumulator() {
System.out.println("创建累加器");
return 0;
}
/*
聚合逻辑
*/
@Override
public Integer add(WaterSensor value, Integer accumulator) {
System.out.println("调用add方法, value=" + value);
return accumulator + value.getVc();
}
/*
获取最终结果,窗口触发时输出
*/
@Override
public String getResult(Integer accumulator) {
System.out.println("调用getResult方法");
return accumulator.toString();
}
@Override
public Integer merge(Integer a, Integer b) {
// 只有会话窗口才会用到
System.out.println("调用merge方法");
return null;
}
});
aggregate.print();
env.execute();
}
}
窗口函数:增量聚合 aggregate
- 属于本窗口的第一条数据来,创建窗口,创建累加器
- 增量聚合:来一条计算一条,调用一次add方法
- 窗口输出时调用一次getResult方法
- 参数:输入、中间累加器、输出 类型可以不一样,非常灵活
全窗口函数
和增量聚合不同之处,数据最后触发时计算,可以获取上下文信息
package com.learn.flink.window;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class WindowProcessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);
// 窗口分配器
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
// 全窗口函数
SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
/**
*
* @param s 分组的key
* @param context 上下文
* @param elements 存的数据
* @param out 采集器
* @throws Exception
*/
@Override
public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
// 上下文可以拿到window对象,还有其他东西:侧输出流 等待
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + s + "的窗口(" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
}
});
process.print();
env.execute();
}
}
增量聚合和全窗口函数结合使用
package com.learn.flink.window;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class WindowAggregateAndProcessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);
// 窗口分配器
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
// 增量聚合 全窗口函数结合使用
SingleOutputStreamOperator<String> DS = sensorWS.aggregate(
new MyAgg(),
new MyProcess()
);
DS.print();
env.execute();
}
public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String> {
/*
创建累加器,初始化累加器
*/
@Override
public Integer createAccumulator() {
System.out.println("创建累加器");
return 0;
}
/*
聚合逻辑
*/
@Override
public Integer add(WaterSensor value, Integer accumulator) {
System.out.println("调用add方法, value=" + value);
return accumulator + value.getVc();
}
/*
获取最终结果,窗口触发时输出
*/
@Override
public String getResult(Integer accumulator) {
System.out.println("调用getResult方法");
return accumulator.toString();
}
@Override
public Integer merge(Integer a, Integer b) {
// 只有会话窗口才会用到
System.out.println("调用merge方法");
return null;
}
}
public static class MyProcess extends ProcessWindowFunction<String, String, String, TimeWindow> {
@Override
public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
// 上下文可以拿到window对象,还有其他东西:侧输出流 等待
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
}
}
}
结合两者的优点:
- 增量聚合:来一条计算一条,存储中间的计算结果,占用空间少
- 全窗口函数:可以通过 上下文实现灵活的功能
触发器 Trigger
触发器主要是用来控制窗口什么时候触发计算。所谓的触发计算本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。
基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器
以时间类型的滚动窗口为例,分析原理:
- 窗口什么时候触发 输出?
- 时间进展 >= 窗口的最大时间戳 (end - 1ms)
- 窗口是怎么划分的
- start = 向下取整,取窗口长度的整数倍
- emd = start + 窗口长度
- 窗口左闭右开:属于本窗口的最大时间戳 = end -1ms
- 窗口的声明周期
- 创建:属于本窗口的第一条数据来了,现new的,放入一个singleton单例的集合中
- 销毁(关窗):时间进展 >= 窗口的最大时间戳 (end - 1ms) + 允许迟到的时间(默认0)
自定义触发器案例:
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
/**
* 自定义触发器 每10s触发一次窗口计算
*/
public class MyTriggerFunction<T> extends Trigger<T, TimeWindow> {
// 每个元素来的时候都执行
@Override
public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ValueStateDescriptor<Boolean> valueStateDescriptor = new ValueStateDescriptor<>("isFirstState", Boolean.class);
ValueState<Boolean> isFirstState = ctx.getPartitionedState(valueStateDescriptor);
Boolean isFirst = isFirstState.value();
// 如果是(今天)窗口的第一个元素
if (isFirst == null) {
// 将状态的值进行更新
isFirstState.update(true);
// 注册定时器 当前事件事件向下取整 + 10s 参考滚动窗口 事件时间 - 事件时间%窗口大小的余数
// 10秒触发一次,也就是10s 20s 30s...的时候触发 而不是11 21 31 这样
// 比如 7s - 7s/10s + 10s = 0s + 10s = 10s 也就是7s的数据向下取0秒 10秒后也就是第10秒的时候触发
ctx.registerEventTimeTimer(timestamp - timestamp % 10 * 1000L + 10 * 1000L);
} else if (isFirst) {
isFirstState.update(false);
}
// 如果不触发计算 直接往下执行
return TriggerResult.CONTINUE;
}
// 处理时间定时器被触发的时候执行
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return null;
}
// time 表示事件时间触发器 触发时间
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
long end = window.getEnd();
// 触发时间与窗口时间进行比较
// 如果小于窗口时间(1天) 说明窗口还没关闭 那就触发计算 否则就不计算
if (time < end) {
// 如果触发时间 + 10s还没有到达窗口关闭时间 那么继续创建一个触发器
// 在窗口关闭的时候也会触发一次计算
if (time + 10 * 1000L < end) {
ctx.registerEventTimeTimer(time + 10 * 1000L);
}
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
// 当窗口被清除的时候执行
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
// 清除窗口最大时间定时器
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
}
移除器 Evictor
移除器主要用来定义移除某些数据的逻辑,基于WindowedStream调用.evictor()方法就可以传入一个自定义的移除器。Evictor是一个接口,不同的窗口类型都要有各自预实现的移除器。