Blog
flink-21 时间语义 水位线
比如1月1日买的牛奶,但是喝牛奶是1月2日,也就是买牛奶的事件时间1月1日,喝牛奶的处理时间1月2日
- 事件时间
- 数据产生的时间
- 处理时间
- 数据真正被处理的时刻
flink1.12版本开始,flink已经将事件时间作为默认的时间语义了。
事件时间和窗口
在窗口的处理过程中,我们可以基于数据的时间戳,自定义一个逻辑时钟,这个始终的时间不会自动流逝,它的时间进展,就是靠着新到的数据时间戳来推动的。
水位线
上面讲到的逻辑时钟 就是 水位线,用来衡量事件时间进展的标记。
水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳。
周期性的生成水位线:之前所有数据中的最大时间戳
周期性的生成水位线:之前所有数据中的最大时间戳
水位线特性
- 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
- 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
- 水位线是基于数据的时间戳生成的
- 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
- 水位线可以通过设置延迟,来保证正确处理乱序数据
- 一个水位线watermark(t) 表示在当前流中事件时间已经到达了时间戳t,这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t' <= t的数据
窗口的理解
正确理解:在flink中,窗口其实并不是一个框,应该把窗口理解成一个桶,在flink中,窗口可以把流切割成有限大小的多个存储桶,每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对结束时间的桶中收集的数据进行计算处理。
生成水位线
- 如果要保证绝对正确,就必须等足够长的时间,就会带来更高的延迟。
- 如果我们希望处理的更快、实时性更强,那么可以将水位线延迟设的低一些
- 如果我们对准确性完全不考虑,一味的追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟
flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制。
有序流
package com.learn.flink.watermark;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class WatermarkMonoDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
// 定义Watermark策略
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
// 升序的watermark,没有等待时间
.<WaterSensor>forMonotonousTimestamps()
// 指定时间戳分配器,从数据中提取
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
// 返回的时间戳
System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
return element.getTs() * 1000L; // 这里将手动输入的数字 * 1000变成秒
}
});
SingleOutputStreamOperator<WaterSensor> sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
KeyedStream<WaterSensor, String> sensorKS = sensorDSWithWatermark.keyBy(WaterSensor::getId);
// 使用事件时间的语义窗口
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingEventTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
}
});
process.print();
env.execute();
}
}
乱序流
package com.learn.flink.watermark;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class WatermarkOrdernessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
// 定义Watermark策略
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
// 升序的watermark,没有等待时间
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
// 指定时间戳分配器,从数据中提取
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
// 返回的时间戳
System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
return element.getTs() * 1000L; // 这里将手动输入的数字 * 1000变成秒
}
});
SingleOutputStreamOperator<WaterSensor> sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
KeyedStream<WaterSensor, String> sensorKS = sensorDSWithWatermark.keyBy(WaterSensor::getId);
// 使用事件时间的语义窗口
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingEventTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
}
});
process.print();
env.execute();
}
}
设置水位线等待3秒,在ts=13的时候触发0-10秒窗口计算
内置watermark的生成原理
- 都是周期性生成的,默认200ms
env.getConfig().setAutoWatermarkInterval() 默认200ms
- 有序流: watermark = 当前最大的事件时间 - 1ms
- 乱序流:watermark = 当前最大的事件时间 - 延迟时间 - 1ms
自定义watermark
package com.learn.flink.watermark;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
public class MyPeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {
// 乱序等待时间
private long delayTs;
// 用来保存 当前位置 最大的事件时间
private long maxTs;
public MyPeriodWatermarkGenerator(long delayTs) {
this.delayTs = delayTs;
this.maxTs = Long.MIN_VALUE + this.delayTs + 1;
}
/*
每条数据来,都会调用一次:用来提取最大的事件时间,保存下来
eventTimestamp 提取到的数据的 事件时间
*/
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
this.maxTs = Math.max(maxTs, eventTimestamp);
System.out.println("调用onEvent方法, 获取目前位置最大时间戳=" + maxTs);
}
/*
周期性调用:生成watermark
*/
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTs - delayTs - 1));
System.out.println("调用onPeriodicEmit方法, 生成watermark=" + (maxTs - delayTs - 1));
}
}
package com.learn.flink.watermark;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class WatermarkCustomDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 默认周期200ms
env.getConfig().setAutoWatermarkInterval(2000);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
// 指定自定义的水位线生成器
.<WaterSensor>forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {
@Override
public WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {
return new MyPeriodWatermarkGenerator<>(3000L);
}
})
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
return element.getTs() * 1000L; // 这里将手动输入的数字 * 1000变成秒
}
});
SingleOutputStreamOperator<WaterSensor> sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
KeyedStream<WaterSensor, String> sensorKS = sensorDSWithWatermark.keyBy(WaterSensor::getId);
WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingEventTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
}
});
process.print();
env.execute();
}
}
断点式水位线生成器
取消周期性生成函数,onEvent发射watermark
package com.learn.flink.watermark;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
public class MyPunctuatedWatermarkGenerator<T> implements WatermarkGenerator<T> {
// 乱序等待时间
private long delayTs;
// 用来保存 当前位置 最大的事件时间
private long maxTs;
public MyPunctuatedWatermarkGenerator(long delayTs) {
this.delayTs = delayTs;
this.maxTs = Long.MIN_VALUE + this.delayTs + 1;
}
/*
每条数据来,都会调用一次:用来提取最大的事件时间,保存下来
eventTimestamp 提取到的数据的 事件时间
*/
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
this.maxTs = Math.max(maxTs, eventTimestamp);
output.emitWatermark(new Watermark(maxTs - delayTs - 1));
System.out.println("调用onEvent方法, 生成watermark=" + (maxTs - delayTs - 1));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
}
}
重点是在对应函数中执行output.emitWatermark(new Watermark(maxTs - delayTs - 1));生成水位线
也可以在数据源指定水位线策略
env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkaSource").print();
多并行度下watermark的传递
- 接受到上有多个,取最小
- 往下游多个发送,广播
水位线空闲等待
package com.learn.flink.watermark;
import com.learn.flink.partition.MyPartitioner;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class WatermarkIdlenessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 自定义分区器:数据 % 分区数,只输入奇数,都只会去往map的一个子任务
SingleOutputStreamOperator<Integer> socketDS = env.socketTextStream("hadoop003", 7777)
.partitionCustom(new MyPartitioner(), k -> k)
.map(Integer::parseInt)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Integer>forMonotonousTimestamps()
.withTimestampAssigner((r, ts) -> r * 1000L)
.withIdleness(Duration.ofSeconds(5)) // 空闲等待5秒
);
// 分成两组:奇数一组、偶数一组 开10s的事件时间滚动窗口
socketDS.keyBy(r -> r % 2)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {
@Override
public void process(Integer integer, ProcessWindowFunction<Integer, String, Integer, TimeWindow>.Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {
long startTs = context.window().getStart();
long endTs = context.window().getEnd();
String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
long count = elements.spliterator().estimateSize();
out.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
}
}).print();
env.execute();
}
}