- 方式一:匿名实现类
- 方式二:lambda表达式
- 方式三:定义一个类来实现MapFunction
- 建议单独一个类文件
定义类的同时 指定初始化参数
package com.learn.flink.functions;
import com.learn.flink.bean.WaterSensor;
import org.apache.flink.api.common.functions.FilterFunction;
public class FilterFunctionImpl implements FilterFunction<WaterSensor> {
public String id;
public FilterFunctionImpl(String id) {
this.id = id;
}
@Override
public boolean filter(WaterSensor value) throws Exception {
return this.id.equals(value.getId());
}
}
使用
SingleOutputStreamOperator<String> map = streamSource
.filter(new FilterFunctionImpl("s1"))
.map(new MyMapFunction());
富函数
package com.learn.flink.transform;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class RichFunctionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);
/*
RichXXXFunction:富函数
1、多了声明周期管理方法
open(): 每个子任务,在启动时,调用一次
close(): 每个子任务,在结束时,调用一次
如果是flink程序异常挂掉,不会调用close
如果是正常调用cancel命令,可以close
2、多了一个 运行时上下文
可以获取一些运行时的环境信息,比如子任务编号、名称、其他信息等...
*/
SingleOutputStreamOperator<Integer> map = source.map(new RichMapFunction<Integer, Integer>() {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 可以获取到环境和运行的信息
// RuntimeContext runtimeContext = getRuntimeContext();
System.out.println("子任务编号:" + getRuntimeContext().getIndexOfThisSubtask() + ", 子任务名称:" + getRuntimeContext().getTaskNameWithSubtasks() + ", 调用open()");
}
@Override
public void close() throws Exception {
super.close();
System.out.println("子任务编号:" + getRuntimeContext().getIndexOfThisSubtask() + ", 子任务名称:" + getRuntimeContext().getTaskNameWithSubtasks() + ", 调用close()");
}
@Override
public Integer map(Integer value) throws Exception {
return value + 1;
}
});
map.print();
env.execute();
}
}
- 多了声明周期管理方法
- open() 每个子任务在启动时 调用一次
- close()每个子任务在结束时,调用一次
- 如果是flink程序异常挂掉,不会调用close
- 如果是正常调用cancel命令,可以close
- 多了一个运行时上下文
- 可以获取一些运行时的环境信息,比如子任务的编号、名称、等信息