Blog
flink-16 分流
package com.learn.flink.split;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SplitDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<String> source = env.socketTextStream("hadoop003", 7777);
// 使用filter实现分流
source.filter(value -> Integer.parseInt(value) % 2 == 0).print("偶数流");
source.filter(value -> Integer.parseInt(value) % 2 == 1).print("奇数流");
env.execute();
}
}
package com.learn.flink.split;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class SideOutputDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
.map(new WaterSensorMapFunction());
// 使用侧输出流实现分流
// 第二个参数WaterSensor是主流的输出类型
// 如果是s1放到侧输出流s1中
// 创建OutputTag对象
// 第一个参数 标签名
// 第二个参数 放入侧输出流中的数据的类型:Typeinfomation
OutputTag<WaterSensor> s1Tag = new OutputTag<>("s1", Types.POJO(WaterSensor.class));
OutputTag<WaterSensor> s2Tag = new OutputTag<>("s2", Types.POJO(WaterSensor.class));
SingleOutputStreamOperator<WaterSensor> process = sensorDS.process(new ProcessFunction<WaterSensor, WaterSensor>() {
@Override
public void processElement(WaterSensor value, ProcessFunction<WaterSensor, WaterSensor>.Context ctx, Collector<WaterSensor> out) throws Exception {
String id = value.getId();
if ("s1".equals(id)) {
// 上下文调用output将数据放入侧输出流
// 第一个参数Tag对象
// 第二个参数 放入侧输出流中的数据
ctx.output(s1Tag, value);
} else if ("s2".equals(id)) {
ctx.output(s2Tag, value);
} else {
out.collect(value);
}
}
});
process.print("主流");
// 从主流中根据标签获取侧输出流
// 打印侧输出流
SideOutputDataStream<WaterSensor> s1 = process.getSideOutput(s1Tag);
SideOutputDataStream<WaterSensor> s2 = process.getSideOutput(s2Tag);
s1.print("s1");
s2.print("s2");
env.execute();
}
}