Blog
flink-23 基于时间的合流 双流联结
窗口联结
固定时间内两条流数据的匹配情况
flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中配对处理。
窗口联结的调用
窗口联结在代码中的实现,首先需要调用DataStream的.join()方法来合并两条流,得到一个joinedStreams,接着通过.where()和.equalTo()方法指定两条流中联结的key,然后通过.window()开窗,并调用.apply()传入联结窗口函数进行处理计算。
stream1.join(stream2)
.where(KeySelector) // 指定stream1中的key
.equalTo(KeySelector) // 指定stream2中的key
.window(WindowAssigner)
.apply(JoinFunction)
package com.learn.flink.watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class WindowJoinDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(
Tuple2.of("a", 1),
Tuple2.of("a", 2),
Tuple2.of("b", 3),
Tuple2.of("c", 4)
).assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(
Tuple3.of("a", 1, 1),
Tuple3.of("a", 4, 1),
Tuple3.of("b", 2, 1),
Tuple3.of("b", 12, 1),
Tuple3.of("c", 14, 1),
Tuple3.of("d", 15, 1)
).assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);
DataStream<String> join = ds1.join(ds2)
.where(value -> value.f0)
.equalTo(value -> value.f0)
// 两个流同时开窗
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
@Override
public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
return first + "<----->" + second;
}
});
join.print();
env.execute();
}
}
- 落在同一个时间窗口范围内才能匹配
- 根据keyby的key,来进行匹配关联
- 只能拿到匹配上的数据,类似inner join
间隔联结
当前流元素指定去另一条流查找配置上下时间区间界查找数据
package com.learn.flink.watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class IntervalJoinDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(
Tuple2.of("a", 1),
Tuple2.of("a", 2),
Tuple2.of("b", 3),
Tuple2.of("c", 4)
).assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(
Tuple3.of("a", 1, 1),
Tuple3.of("a", 4, 1),
Tuple3.of("b", 2, 1),
Tuple3.of("b", 12, 1),
Tuple3.of("c", 14, 1),
Tuple3.of("d", 15, 1)
).assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);
// interval join
// 使用keyedStream
KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(value -> value.f0);
KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(value -> value.f0);
SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2)
.between(Time.seconds(-2), Time.seconds(2))
.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
@Override
public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
out.collect(left.toString() + "<----->" + right.toString());
}
});
process.print();
env.execute();
}
}
IntervalJoin迟到数据
package com.learn.flink.watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class IntervalJoinDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(
Tuple2.of("a", 1),
Tuple2.of("a", 2),
Tuple2.of("b", 3),
Tuple2.of("c", 4)
).assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(
Tuple3.of("a", 1, 1),
Tuple3.of("a", 4, 1),
Tuple3.of("b", 2, 1),
Tuple3.of("b", 12, 1),
Tuple3.of("c", 14, 1),
Tuple3.of("d", 15, 1)
).assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);
// interval join
// 使用keyedStream
KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(value -> value.f0);
KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(value -> value.f0);
SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2)
.between(Time.seconds(-2), Time.seconds(2))
.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
@Override
public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
out.collect(left.toString() + "<----->" + right.toString());
}
});
process.print();
env.execute();
}
}
处理迟到数据
package com.learn.flink.watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
public class IntervalJoinWithLateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env
.socketTextStream("hadoop003", 7777)
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] fields = value.split(",");
return Tuple2.of(fields[0], Integer.valueOf(fields[1]));
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy
// 设置watermark3秒乱序延迟
.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env
.socketTextStream("hadoop003", 8888)
.map(new MapFunction<String, Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> map(String value) throws Exception {
String[] fields = value.split(",");
return Tuple3.of(fields[0], Integer.valueOf(fields[1]), Integer.valueOf(fields[2]));
}
}).assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((value, ts) -> value.f1 * 1000L)
);
// interval join
// 使用keyedStream
KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(value -> value.f0);
KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(value -> value.f0);
// 迟到数据
OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT));
OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));
SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2)
.between(Time.seconds(-2), Time.seconds(2))
.sideOutputLeftLateData(ks1LateTag) // 将ks1的迟到数据放到侧输出流
.sideOutputRightLateData(ks2LateTag) // 将kss的迟到数据放到侧输出流
.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
@Override
public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
out.collect(left.toString() + "<----->" + right.toString());
}
});
process.print("主流:");
process.getSideOutput(ks1LateTag).print("ks1迟到数据:");
process.getSideOutput(ks2LateTag).print("ks2迟到数据:");
env.execute();
}
}
/*
左流ks1数据 a,4
水位线: 4-3 = 1s
上下限: 4-2 = 2s
4+2 = 6s
*/
interval join总结
- 只支持事件时间
- 指定上届、下届的偏移,负号代表时间往前,正号代表时间往后
- process中,只能处理join上的数据
- 两条流关联后的watermark,以两条流中最小的为准
- 如果 当前数据的事件时间 < 当前的watermark,就是迟到数据,主流的process不处理
- >= between后,可以指定将 左流 或 右流 的迟到数据 放入侧输出流