Blog
flink-24 处理函数
Flink提供了8个不同的处理函数:
- ProcessFunction
- 最基本的处理函数,基于DataStream直接调用.process()时作为参数传入
- KeyedProcessFunction
- 对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入,要想使用定时器,比如基于keyedstream
- ProcessWindowFunction
- 开窗之后的处理函数,也是窗口函数的代表,基于WindowedStream调用.process()时作为参数传入
- ProcessAllWindowFunction
- 同样时开窗之后的处理函数,基于AllWindowStream调用.process()时作为参数传入
- CoProcessFunction
- 合并(connect)两条流之后的处理函数,基于connectedStream调用.process()时作为参数传入。
- ProcessJoinFunction
- 间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。
- BroadcastProcessFunction
- 广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入,这里的广播连接流时一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(connect)之后的产物。
- KeyedBroadcastProcessFunction
按键分区处理函数 KeyedProcessFunction
只有在keyedStream中才支持使用TimerService设置定时器的操作
package com.learn.flink.process;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class KeyedProcessTimerDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((r, ts) -> r.getTs() * 1000L)
);
KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);
SingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {
@Override
public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
// 数据中提取出来的事件时间
Long ts = ctx.timestamp();
String currentKey = ctx.getCurrentKey();
// 定时器
TimerService timerService = ctx.timerService();
// 注册定时器: 事件时间
timerService.registerEventTimeTimer(5000L); // 这里不是当前数据+5s 只是timer定时器固定值是5s的定时器
System.out.println("当前key=" + currentKey + ",当前时间是:" + ts + ",注册了一个5s的定时器");
// 注册定时器:处理时间
// timerService.registerProcessingTimeTimer();
// 删除定时器:事件时间
//timerService.deleteEventTimeTimer();
// 删除定时器:处理时间
// timerService.deleteProcessingTimeTimer();
// 获取当前处理时间, 就是系统时间
// long currentTs = timerService.currentProcessingTime();
// 获取process当前的水位线 watermark
// long wm = timerService.currentWatermark();
}
// 时间进展到定时器注册的时间,调用该方法
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, WaterSensor, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
String key = ctx.getCurrentKey();
System.out.println("key=" + key + "现在时间是" + timestamp + "定时器触发");
}
});
process.print();
env.execute();
}
}
代码中注册的定时器时间是watermark的时间
// 注册定时器: 事件时间
timerService.registerEventTimeTimer(5000L);
也就是说watermark到达5s的时候触发定时器,s3,9,9注册的定时器也是watermark 5s,所以会和s1,s2一起触发
处理时间定时器
long currentTs = timerService.currentProcessingTime();
// 处理时间 + 5s定时器
timerService.registerProcessingTimeTimer(currentTs + 5000L);
System.out.println("当前key=" + currentKey + ",当前时间是:" + currentTs + ",注册了一个5s后的定时器");
总结:
- 只有keyedStream才有
- 事件时间定时器,通过watermark来触发
- watermark 》= 注册的时间
- 注意:watermark = 当前最大事件时间 - 延迟等待时间 - 1ms,因为 -1ms 所以会推迟一条数据
- 比如:5s的定时器
- 如果延迟等待=3s,watermark=8s - 3s -1ms = 4999ms,不会触发5s的定时器
- 需要watermark = 9s - 3s - 1ms = 5999ms >= 5s 触发
- 在process中获取当前watermark,显示的是上一次的watermark(因为process还没接收到这条数据对应生成的新watermark)
process一次只能处理一条数据,且获取的watermark是process的watermark,因为当前数据对应的watermark还没到,所以process记录的还是上一条的watermark