Blog
flink-17 合流
union
package com.learn.flink.combine;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class UnionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
DataStreamSource<Integer> source2 = env.fromElements(11, 22, 33);
DataStreamSource<String> source3 = env.fromElements("111", "222", "333");
// DataStream<Integer> union = source1.union(source2).union(source3.map(Integer::parseInt));
DataStream<Integer> union = source1.union(source2, source3.map(Integer::parseInt));
union.print();
env.execute();
}
}
- 要求流的数据类型必须一致
- 可以一次性union多个流
connect
package com.learn.flink.combine;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
public class ConnectDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
DataStreamSource<String> source2 = env.fromElements("111", "222", "333");
ConnectedStreams<Integer, String> connect = source1.connect(source2);
SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() {
@Override
public String map1(Integer value) throws Exception {
return value.toString();
}
@Override
public String map2(String value) throws Exception {
return value;
}
});
result.print();
env.execute();
}
}
- connect只能连接1条流
- connectStream后续操作需要对两个流单独处理,同一类型输出
- map、flatMap、process
inner join效果
package com.learn.flink.combine;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ConnectKeybyDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
Tuple2.of(1, "a1"),
Tuple2.of(1, "a2"),
Tuple2.of(2, "b"),
Tuple2.of(3, "c")
);
DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(
Tuple3.of(1, "aa1", 1),
Tuple3.of(1, "aa2", 2),
Tuple3.of(2, "bb", 1),
Tuple3.of(3, "cc", 1)
);
ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);
// 多并行度下,需要根据条件进行keyby,才能保证key相同的数据到一起才能匹配上
ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKeyBy = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);
/*
实现互相匹配的效果:两条流,不一定谁的流的数据先来
1、每条流,有数据来,存到一个变量中
hashmap
=》key = id 第一个字段值
=》value=List<数据>
2、每条流有数据来的时候,除了存变量中,不知道对方是否有匹配的数据,要去另一条流存的变量中 查找是否有匹配上的
*/
SingleOutputStreamOperator<String> process = connectKeyBy.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
// 每条流都一个 定义hashmap用来存储数据
Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();
Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();
/**
* 第一条流的处理逻辑
* @param value 第一条流的数据
* @param ctx 上下文
* @param out 采集器
*/
@Override
public void processElement1(Tuple2<Integer, String> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
Integer id = value.f0;
// TODO s1的数据来了,就存到变量中
if (!s1Cache.containsKey(id)) {
// 如果key不存在,说明是该key的第一条数据,初始化,put进map中
List<Tuple2<Integer, String>> s1Values = new ArrayList<>();
s1Values.add(value);
s1Cache.put(id, s1Values);
} else {
// key存在,直接添加到value的list中
s1Cache.get(id).add(value);
}
// TODO 去s2Cache中查找是否有id能匹配上的, 匹配上就输出
if (s2Cache.containsKey(id)) {
for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {
out.collect("s1:" + value + "<=====>" + "s2:" + s2Element);
}
}
}
/**
* 第二条流的处理逻辑
* @param value 第一条流的数据
* @param ctx 上下文
* @param out 采集器
*/
@Override
public void processElement2(Tuple3<Integer, String, Integer> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
Integer id = value.f0;
// TODO s2的数据来了,就存到变量中
if (!s2Cache.containsKey(id)) {
// 如果key不存在,说明是该key的第一条数据,初始化,put进map中
List<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();
s2Values.add(value);
s2Cache.put(id, s2Values);
} else {
// key存在,直接添加到value的list中
s2Cache.get(id).add(value);
}
// TODO 去s2Cache中查找是否有id能匹配上的, 匹配上就输出
if (s1Cache.containsKey(id)) {
for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {
out.collect("s1:" + s1Element + "<=====>" + "s2:" + value);
}
}
}
});
process.print();
env.execute();
}
}