Flink, 大数据

flink-21 时间语义 水位线

比如1月1日买的牛奶,但是喝牛奶是1月2日,也就是买牛奶的事件时间1月1日,喝牛奶的处理时间1月2日

  • 事件时间
    • 数据产生的时间
  • 处理时间
    • 数据真正被处理的时刻

flink1.12版本开始,flink已经将事件时间作为默认的时间语义了。

事件时间和窗口

在窗口的处理过程中,我们可以基于数据的时间戳,自定义一个逻辑时钟,这个始终的时间不会自动流逝,它的时间进展,就是靠着新到的数据时间戳来推动的。

水位线

上面讲到的逻辑时钟 就是 水位线,用来衡量事件时间进展的标记。

水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳。

周期性的生成水位线:之前所有数据中的最大时间戳

周期性的生成水位线:之前所有数据中的最大时间戳

水位线特性

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线watermark(t) 表示在当前流中事件时间已经到达了时间戳t,这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’ <= t的数据

窗口的理解

正确理解:在flink中,窗口其实并不是一个框,应该把窗口理解成一个桶,在flink中,窗口可以把流切割成有限大小的多个存储桶,每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对结束时间的桶中收集的数据进行计算处理。

生成水位线

  • 如果要保证绝对正确,就必须等足够长的时间,就会带来更高的延迟。
  • 如果我们希望处理的更快、实时性更强,那么可以将水位线延迟设的低一些
  • 如果我们对准确性完全不考虑,一味的追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟

flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制。

有序流

package com.learn.flink.watermark;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class WatermarkMonoDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
        // 定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 升序的watermark,没有等待时间
                .<WaterSensor>forMonotonousTimestamps()
                // 指定时间戳分配器,从数据中提取
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        // 返回的时间戳
                        System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                        return element.getTs() * 1000L; // 这里将手动输入的数字 * 1000变成秒
                    }
                });
        SingleOutputStreamOperator<WaterSensor> sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);

        KeyedStream<WaterSensor, String> sensorKS = sensorDSWithWatermark.keyBy(WaterSensor::getId);
        // 使用事件时间的语义窗口
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingEventTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                long startTs = context.window().getStart();
                long endTs = context.window().getEnd();
                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                long count = elements.spliterator().estimateSize();

                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
            }
        });
        process.print();
        env.execute();
    }
}

乱序流

package com.learn.flink.watermark;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class WatermarkOrdernessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
        // 定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 升序的watermark,没有等待时间
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                // 指定时间戳分配器,从数据中提取
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        // 返回的时间戳
                        System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                        return element.getTs() * 1000L; // 这里将手动输入的数字 * 1000变成秒
                    }
                });
        SingleOutputStreamOperator<WaterSensor> sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);

        KeyedStream<WaterSensor, String> sensorKS = sensorDSWithWatermark.keyBy(WaterSensor::getId);
        // 使用事件时间的语义窗口
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingEventTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                long startTs = context.window().getStart();
                long endTs = context.window().getEnd();
                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                long count = elements.spliterator().estimateSize();

                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
            }
        });
        process.print();
        env.execute();
    }
}

设置水位线等待3秒,在ts=13的时候触发0-10秒窗口计算

内置watermark的生成原理

  • 都是周期性生成的,默认200ms
env.getConfig().setAutoWatermarkInterval() 默认200ms
  • 有序流: watermark = 当前最大的事件时间 – 1ms
  • 乱序流:watermark = 当前最大的事件时间 – 延迟时间 – 1ms

自定义watermark

package com.learn.flink.watermark;

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;

public class MyPeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {
    // 乱序等待时间
    private long delayTs;
    // 用来保存 当前位置 最大的事件时间
    private long maxTs;

    public MyPeriodWatermarkGenerator(long delayTs) {
        this.delayTs = delayTs;
        this.maxTs = Long.MIN_VALUE + this.delayTs + 1;
    }

    /*
        每条数据来,都会调用一次:用来提取最大的事件时间,保存下来
        eventTimestamp 提取到的数据的 事件时间
         */
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        this.maxTs = Math.max(maxTs, eventTimestamp);
        System.out.println("调用onEvent方法, 获取目前位置最大时间戳=" + maxTs);
    }

    /*
    周期性调用:生成watermark
     */
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTs - delayTs - 1));
        System.out.println("调用onPeriodicEmit方法, 生成watermark=" + (maxTs - delayTs - 1));
    }
}
package com.learn.flink.watermark;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class WatermarkCustomDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 默认周期200ms
        env.getConfig().setAutoWatermarkInterval(2000);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 指定自定义的水位线生成器
                .<WaterSensor>forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {
                    @Override
                    public WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {
                        return new MyPeriodWatermarkGenerator<>(3000L);
                    }
                })
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                        return element.getTs() * 1000L; // 这里将手动输入的数字 * 1000变成秒
                    }
                });
        SingleOutputStreamOperator<WaterSensor> sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);
        KeyedStream<WaterSensor, String> sensorKS = sensorDSWithWatermark.keyBy(WaterSensor::getId);
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingEventTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                long startTs = context.window().getStart();
                long endTs = context.window().getEnd();
                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
                long count = elements.spliterator().estimateSize();
                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
            }
        });
        process.print();
        env.execute();
    }
}

断点式水位线生成器

取消周期性生成函数,onEvent发射watermark

package com.learn.flink.watermark;

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;

public class MyPunctuatedWatermarkGenerator<T> implements WatermarkGenerator<T> {
    // 乱序等待时间
    private long delayTs;
    // 用来保存 当前位置 最大的事件时间
    private long maxTs;

    public MyPunctuatedWatermarkGenerator(long delayTs) {
        this.delayTs = delayTs;
        this.maxTs = Long.MIN_VALUE + this.delayTs + 1;
    }

    /*
        每条数据来,都会调用一次:用来提取最大的事件时间,保存下来
        eventTimestamp 提取到的数据的 事件时间
         */
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        this.maxTs = Math.max(maxTs, eventTimestamp);
        output.emitWatermark(new Watermark(maxTs - delayTs - 1));
        System.out.println("调用onEvent方法, 生成watermark=" + (maxTs - delayTs - 1));
    }


    @Override
    public void onPeriodicEmit(WatermarkOutput output) {

    }
}

重点是在对应函数中执行output.emitWatermark(new Watermark(maxTs – delayTs – 1));生成水位线

也可以在数据源指定水位线策略

env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkaSource").print();

多并行度下watermark的传递

  • 接受到上有多个,取最小
  • 往下游多个发送,广播

水位线空闲等待

package com.learn.flink.watermark;

import com.learn.flink.partition.MyPartitioner;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class WatermarkIdlenessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        // 自定义分区器:数据 % 分区数,只输入奇数,都只会去往map的一个子任务
        SingleOutputStreamOperator<Integer> socketDS = env.socketTextStream("hadoop003", 7777)
                .partitionCustom(new MyPartitioner(), k -> k)
                .map(Integer::parseInt)
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Integer>forMonotonousTimestamps()
                                .withTimestampAssigner((r, ts) -> r * 1000L)
                                .withIdleness(Duration.ofSeconds(5)) // 空闲等待5秒
                );
        // 分成两组:奇数一组、偶数一组 开10s的事件时间滚动窗口
        socketDS.keyBy(r -> r % 2)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {
                    @Override
                    public void process(Integer integer, ProcessWindowFunction<Integer, String, Integer, TimeWindow>.Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {
                        long startTs = context.window().getStart();
                        long endTs = context.window().getEnd();
                        String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                        String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                        long count = elements.spliterator().estimateSize();

                        out.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
                    }
                }).print();

        env.execute();
    }
}