Flink, 大数据

flink-12 转换算子

map

package com.learn.flink.transform;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functiona.MyMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MapDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<WaterSensor> streamSource = env.fromElements(
                new WaterSensor("1001", 1000L, 10),
                new WaterSensor("1002", 2000L, 20)
        );
        // 匿名实现类
//        SingleOutputStreamOperator<String> map = streamSource.map(new MapFunction<WaterSensor, String>() {
//            @Override
//            public String map(WaterSensor value) throws Exception {
//                return value.getId();
//            }
//        });
        
        // lambda表达式
//        SingleOutputStreamOperator<String> map = streamSource.map(WaterSensor::getId);
//                .returns(Types.STRING);
        
        // 单独定义类(建议)
        SingleOutputStreamOperator<String> map = streamSource.map(new MyMapFunction());

        map.print();


        env.execute();
    }
}

filter

package com.learn.flink.functiona;

import com.learn.flink.bean.WaterSensor;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;

public class MyFilterFunction implements FilterFunction<WaterSensor> {

    @Override
    public boolean filter(WaterSensor value) throws Exception {
        return "1001".equals(value.getId());
    }
}

一般equals前两在前面 避免空值

package com.learn.flink.transform;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functiona.MyFilterFunction;
import com.learn.flink.functiona.MyMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FilterDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<WaterSensor> streamSource = env.fromElements(
                new WaterSensor("1001", 1000L, 10),
                new WaterSensor("1002", 2000L, 20)
        );
        // 单独定义类(建议)
        SingleOutputStreamOperator<String> map = streamSource
                .filter(new MyFilterFunction())
                .map(new MyMapFunction());

        map.print();


        env.execute();
    }
}

flatMap

package com.learn.flink.functions;

import com.learn.flink.bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;

public class MyFlatMapFunction implements FlatMapFunction<WaterSensor, String> {


    @Override
    public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
        if ("1001".equals(value.getId())) {
            out.collect("vc: " + value.vc);
        } else if ("1002".equals(value.getId())) {
            out.collect("ts: " + value.getTs() + ", vc: " + value.vc);
        }
    }
}
package com.learn.flink.transform;

import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.MyFlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlatMapDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<WaterSensor> streamSource = env.fromElements(
                new WaterSensor("1001", 1000L, 10),
                new WaterSensor("1002", 2000L, 20)
        );

        SingleOutputStreamOperator<String> flatMap = streamSource.flatMap(new MyFlatMapFunction());

        flatMap.print();


        env.execute();
    }
}