Flink, 大数据

flink-22 迟到数据的处理

推迟水印推进

在水印产生时,设置一个乱序容忍度(延迟时间),推迟系统的时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

WatermarkStrategy.forBoundedOutOfOrderness(Duratiuon.ofSeconds(10));

设置窗口延迟关闭

flink的窗口也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。以后每来一条数据,就触发一次这条数据所在窗口计算(增量计算)。直到watermark超过了窗口结束时间+推迟时间,此时窗口会真正关闭。

.window(xxx)
.allowedLateness(Time.seconds(3))
package com.learn.flink.watermark;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class WatermarkAllowLatenessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                        return element.getTs() * 1000L;
                    }
                });
        SingleOutputStreamOperator<WaterSensor> sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);

        KeyedStream<WaterSensor, String> sensorKS = sensorDSWithWatermark.keyBy(WaterSensor::getId);
        // 允许推迟2秒关窗
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2));

        SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                long startTs = context.window().getStart();
                long endTs = context.window().getEnd();
                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                long count = elements.spliterator().estimateSize();

                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
            }
        });
        process.print();
        env.execute();
    }
}

watermark延迟3秒,窗口延迟2秒关闭

  • 当数据来到13s,对应水位线 10s 触发[0,10)窗口计算, 因为设置窗口延迟2秒关闭 也就是 水位线12秒关闭计算,对应数据15s的时候
  • 后续来了14s,不会触发计算,但是 4s和5s的数据属于[0,10)就触发了窗口计算
  • 直到15s的数据到来窗口关闭,后续再来[0, 10)区间的数据也不会触发计算

使用侧输出流接收迟到的数据

package com.learn.flink.watermark;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

public class WatermarkAllowLatenessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777).map(new WaterSensorMapFunction());
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                        return element.getTs() * 1000L;
                    }
                });
        SingleOutputStreamOperator<WaterSensor> sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);

        OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));

        KeyedStream<WaterSensor, String> sensorKS = sensorDSWithWatermark.keyBy(WaterSensor::getId);

        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(2)) // 允许推迟2秒关窗
                .sideOutputLateData(lateTag); // 关窗后的迟到的数据 存储到测输出流

        SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                long startTs = context.window().getStart();
                long endTs = context.window().getEnd();
                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                long count = elements.spliterator().estimateSize();

                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据:" + elements.toString());
            }
        });
        process.print("normal");
        // 从主流获取侧流
        process.getSideOutput(lateTag).print("late");
        env.execute();
    }
}

1、乱序与迟到的区别

乱序:数据的顺序乱了,出现时间小的比时间大的晚来

迟到:当前数据的时间戳 < 当前的watermark

2、乱序、迟到数据的处理

1)watermark中指定乱序等待时间

2)如果开窗,设置窗口允许迟到

  • 推迟关窗时间,在关窗之前,迟到数据来了,还能被窗口计算,来一条数据触发一次计算
  • 关窗之后,迟到数据不会被计算

3)关闭窗口后迟到的数据,放入侧输出流

如果watermark等待3s,窗口允许迟到2s,为什么不直接watermark等待5s或窗口允许迟到5s

  • watermark等待时间不会设置太大,影响计算延迟
    • 如果等待3s,窗口第一次触发计算和输出,13s的数据来。13-3=10s
    • 如果等待5s,窗口第一次触发计算和输出,15s的数据来。15-5=10s
  • 窗口允许迟到,是对 大部分迟到数据的处理,尽量让结果准确
    • 如果只设置允许迟到5s,那么就会导致频繁的重复输出

设置经验

  • watermark等待时间,设置一个不算特别大的,一般是秒级,在乱序和延迟取舍
  • 设置一定的窗口允许迟到,只考虑大部分的迟到数据,计算小部分迟到很久的数据,不管。
  • 极端小部分迟到很久的数据,放到侧输出流,获取到之后可以做各种处理。