Flink, 大数据

flink-24 处理函数

Flink提供了8个不同的处理函数:

  • ProcessFunction
    • 最基本的处理函数,基于DataStream直接调用.process()时作为参数传入
  • KeyedProcessFunction
    • 对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入,要想使用定时器,比如基于keyedstream
  • ProcessWindowFunction
    • 开窗之后的处理函数,也是窗口函数的代表,基于WindowedStream调用.process()时作为参数传入
  • ProcessAllWindowFunction
    • 同样时开窗之后的处理函数,基于AllWindowStream调用.process()时作为参数传入
  • CoProcessFunction
    • 合并(connect)两条流之后的处理函数,基于connectedStream调用.process()时作为参数传入。
  • ProcessJoinFunction
    • 间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。
  • BroadcastProcessFunction
    • 广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入,这里的广播连接流时一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(connect)之后的产物。
  • KeyedBroadcastProcessFunction

按键分区处理函数 KeyedProcessFunction

只有在keyedStream中才支持使用TimerService设置定时器的操作

package com.learn.flink.process;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class KeyedProcessTimerDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((r, ts) -> r.getTs() * 1000L)
                );

        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);

        SingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {
            @Override
            public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                // 数据中提取出来的事件时间
                Long ts = ctx.timestamp();
                String currentKey = ctx.getCurrentKey();

                // 定时器
                TimerService timerService = ctx.timerService();
                // 注册定时器: 事件时间
                timerService.registerEventTimeTimer(5000L); // 这里不是当前数据+5s 只是timer定时器固定值是5s的定时器
                System.out.println("当前key=" + currentKey + ",当前时间是:" + ts + ",注册了一个5s的定时器");
                // 注册定时器:处理时间
                // timerService.registerProcessingTimeTimer();
                // 删除定时器:事件时间
                //timerService.deleteEventTimeTimer();
                // 删除定时器:处理时间
                // timerService.deleteProcessingTimeTimer();

                // 获取当前处理时间, 就是系统时间
                // long currentTs = timerService.currentProcessingTime();
                // 获取process当前的水位线 watermark
                // long wm = timerService.currentWatermark();
            }

            // 时间进展到定时器注册的时间,调用该方法
            @Override
            public void onTimer(long timestamp, KeyedProcessFunction<String, WaterSensor, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                super.onTimer(timestamp, ctx, out);
                String key = ctx.getCurrentKey();
                System.out.println("key=" + key + "现在时间是" + timestamp + "定时器触发");
            }
        });

        process.print();

        env.execute();
    }
}

代码中注册的定时器时间是watermark的时间

// 注册定时器: 事件时间
timerService.registerEventTimeTimer(5000L);

也就是说watermark到达5s的时候触发定时器,s3,9,9注册的定时器也是watermark 5s,所以会和s1,s2一起触发

处理时间定时器

long currentTs = timerService.currentProcessingTime();
// 处理时间 + 5s定时器
timerService.registerProcessingTimeTimer(currentTs + 5000L);
System.out.println("当前key=" + currentKey + ",当前时间是:" + currentTs + ",注册了一个5s后的定时器");

总结:

  • 只有keyedStream才有
  • 事件时间定时器,通过watermark来触发
    • watermark 》= 注册的时间
    • 注意:watermark = 当前最大事件时间 – 延迟等待时间 – 1ms,因为 -1ms 所以会推迟一条数据
      • 比如:5s的定时器
      • 如果延迟等待=3s,watermark=8s – 3s -1ms = 4999ms,不会触发5s的定时器
      • 需要watermark = 9s – 3s – 1ms = 5999ms >= 5s 触发
  • 在process中获取当前watermark,显示的是上一次的watermark(因为process还没接收到这条数据对应生成的新watermark)

process一次只能处理一条数据,且获取的watermark是process的watermark,因为当前数据对应的watermark还没到,所以process记录的还是上一条的watermark