Flink, 大数据

flink-16 分流

package com.learn.flink.split;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SplitDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStreamSource<String> source = env.socketTextStream("hadoop003", 7777);
        // 使用filter实现分流
        source.filter(value -> Integer.parseInt(value) % 2 == 0).print("偶数流");
        source.filter(value -> Integer.parseInt(value) % 2 == 1).print("奇数流");


        env.execute();
    }
}
package com.learn.flink.split;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class SideOutputDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
                .map(new WaterSensorMapFunction());

        // 使用侧输出流实现分流
        // 第二个参数WaterSensor是主流的输出类型

        // 如果是s1放到侧输出流s1中
        // 创建OutputTag对象
        // 第一个参数 标签名
        // 第二个参数 放入侧输出流中的数据的类型:Typeinfomation
        OutputTag<WaterSensor> s1Tag = new OutputTag<>("s1", Types.POJO(WaterSensor.class));
        OutputTag<WaterSensor> s2Tag = new OutputTag<>("s2", Types.POJO(WaterSensor.class));

        SingleOutputStreamOperator<WaterSensor> process = sensorDS.process(new ProcessFunction<WaterSensor, WaterSensor>() {
            @Override
            public void processElement(WaterSensor value, ProcessFunction<WaterSensor, WaterSensor>.Context ctx, Collector<WaterSensor> out) throws Exception {
                String id = value.getId();
                if ("s1".equals(id)) {
                    // 上下文调用output将数据放入侧输出流
                    // 第一个参数Tag对象
                    // 第二个参数 放入侧输出流中的数据
                    ctx.output(s1Tag, value);
                } else if ("s2".equals(id)) {
                    ctx.output(s2Tag, value);
                } else {
                    out.collect(value);
                }
            }
        });

        process.print("主流");

        // 从主流中根据标签获取侧输出流
        // 打印侧输出流
        SideOutputDataStream<WaterSensor> s1 = process.getSideOutput(s1Tag);
        SideOutputDataStream<WaterSensor> s2 = process.getSideOutput(s2Tag);

        s1.print("s1");
        s2.print("s2");

        env.execute();
    }
}