推迟水印推进
在水印产生时,设置一个乱序容忍度(延迟时间),推迟系统的时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。
WatermarkStrategy.forBoundedOutOfOrderness(Duratiuon.ofSeconds(10));
设置窗口延迟关闭
flink的窗口也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。以后每来一条数据,就触发一次这条数据所在窗口计算(增量计算)。直到watermark超过了窗口结束时间+推迟时间,此时窗口会真正关闭。
.window(xxx)
.allowedLateness(Time.seconds(3))
package com.learn.flink.watermark;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class WatermarkAllowLatenessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
return element.getTs() * 1000L;
}
});
SingleOutputStreamOperator<WaterSensor> sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
KeyedStream<WaterSensor, String> sensorKS = sensorDSWithWatermark.keyBy(WaterSensor::getId);
// 允许推迟2秒关窗
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2));
SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
}
});
process.print();
env.execute();
}
}
watermark延迟3秒,窗口延迟2秒关闭
- 当数据来到13s,对应水位线 10s 触发[0,10)窗口计算, 因为设置窗口延迟2秒关闭 也就是 水位线12秒关闭计算,对应数据15s的时候
- 后续来了14s,不会触发计算,但是 4s和5s的数据属于[0,10)就触发了窗口计算
- 直到15s的数据到来窗口关闭,后续再来[0, 10)区间的数据也不会触发计算
使用侧输出流接收迟到的数据
package com.learn.flink.watermark;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
public class WatermarkAllowLatenessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
return element.getTs() * 1000L;
}
});
SingleOutputStreamOperator<WaterSensor> sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));
KeyedStream<WaterSensor, String> sensorKS = sensorDSWithWatermark.keyBy(WaterSensor::getId);
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(2)) // 允许推迟2秒关窗
.sideOutputLateData(lateTag); // 关窗后的迟到的数据 存储到测输出流
SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
}
});
process.print("normal");
// 从主流获取侧流
process.getSideOutput(lateTag).print("late");
env.execute();
}
}
1、乱序与迟到的区别
乱序:数据的顺序乱了,出现时间小的比时间大的晚来
迟到:当前数据的时间戳 < 当前的watermark
2、乱序、迟到数据的处理
1)watermark中指定乱序等待时间
2)如果开窗,设置窗口允许迟到
- 推迟关窗时间,在关窗之前,迟到数据来了,还能被窗口计算,来一条数据触发一次计算
- 关窗之后,迟到数据不会被计算
3)关闭窗口后迟到的数据,放入侧输出流
如果watermark等待3s,窗口允许迟到2s,为什么不直接watermark等待5s或窗口允许迟到5s
- watermark等待时间不会设置太大,影响计算延迟
- 如果等待3s,窗口第一次触发计算和输出,13s的数据来。13-3=10s
- 如果等待5s,窗口第一次触发计算和输出,15s的数据来。15-5=10s
- 窗口允许迟到,是对 大部分迟到数据的处理,尽量让结果准确
- 如果只设置允许迟到5s,那么就会导致频繁的重复输出
设置经验
- watermark等待时间,设置一个不算特别大的,一般是秒级,在乱序和延迟取舍
- 设置一定的窗口允许迟到,只考虑大部分的迟到数据,计算小部分迟到很久的数据,不管。
- 极端小部分迟到很久的数据,放到侧输出流,获取到之后可以做各种处理。