Flink, 大数据

flink-14 用户自定义函数UDF

  • 方式一:匿名实现类
  • 方式二:lambda表达式
  • 方式三:定义一个类来实现MapFunction
    • 建议单独一个类文件

定义类的同时 指定初始化参数

package com.learn.flink.functions;

import com.learn.flink.bean.WaterSensor;
import org.apache.flink.api.common.functions.FilterFunction;

public class FilterFunctionImpl implements FilterFunction<WaterSensor> {
    public String id;

    public FilterFunctionImpl(String id) {
        this.id = id;
    }

    @Override
    public boolean filter(WaterSensor value) throws Exception {
        return this.id.equals(value.getId());
    }
}

使用

SingleOutputStreamOperator<String> map = streamSource
                .filter(new FilterFunctionImpl("s1"))
                .map(new MyMapFunction());

富函数

package com.learn.flink.transform;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class RichFunctionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);
        /*
        RichXXXFunction:富函数
        1、多了声明周期管理方法
            open(): 每个子任务,在启动时,调用一次
            close(): 每个子任务,在结束时,调用一次
                如果是flink程序异常挂掉,不会调用close
                如果是正常调用cancel命令,可以close
        2、多了一个 运行时上下文
            可以获取一些运行时的环境信息,比如子任务编号、名称、其他信息等...
         */
        SingleOutputStreamOperator<Integer> map = source.map(new RichMapFunction<Integer, Integer>() {
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                // 可以获取到环境和运行的信息
                // RuntimeContext runtimeContext = getRuntimeContext();
                System.out.println("子任务编号:" + getRuntimeContext().getIndexOfThisSubtask() + ", 子任务名称:" + getRuntimeContext().getTaskNameWithSubtasks() + ", 调用open()");
            }

            @Override
            public void close() throws Exception {
                super.close();
                System.out.println("子任务编号:" + getRuntimeContext().getIndexOfThisSubtask() + ", 子任务名称:" + getRuntimeContext().getTaskNameWithSubtasks() + ", 调用close()");
            }

            @Override
            public Integer map(Integer value) throws Exception {
                return value + 1;
            }
        });
        map.print();
        env.execute();
    }
}
  • 多了声明周期管理方法
    • open() 每个子任务在启动时 调用一次
    • close()每个子任务在结束时,调用一次
      • 如果是flink程序异常挂掉,不会调用close
      • 如果是正常调用cancel命令,可以close
  • 多了一个运行时上下文
    • 可以获取一些运行时的环境信息,比如子任务的编号、名称、等信息

About 蓝染君

喜爱编程开发的程序猿