Blog
flink-12 转换算子
map
package com.learn.flink.transform;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functiona.MyMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MapDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<WaterSensor> streamSource = env.fromElements(
new WaterSensor("1001", 1000L, 10),
new WaterSensor("1002", 2000L, 20)
);
// 匿名实现类
// SingleOutputStreamOperator<String> map = streamSource.map(new MapFunction<WaterSensor, String>() {
// @Override
// public String map(WaterSensor value) throws Exception {
// return value.getId();
// }
// });
// lambda表达式
// SingleOutputStreamOperator<String> map = streamSource.map(WaterSensor::getId);
// .returns(Types.STRING);
// 单独定义类(建议)
SingleOutputStreamOperator<String> map = streamSource.map(new MyMapFunction());
map.print();
env.execute();
}
}
filter
package com.learn.flink.functiona;
import com.learn.flink.bean.WaterSensor;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
public class MyFilterFunction implements FilterFunction<WaterSensor> {
@Override
public boolean filter(WaterSensor value) throws Exception {
return "1001".equals(value.getId());
}
}
一般equals前两在前面 避免空值
package com.learn.flink.transform;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functiona.MyFilterFunction;
import com.learn.flink.functiona.MyMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FilterDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<WaterSensor> streamSource = env.fromElements(
new WaterSensor("1001", 1000L, 10),
new WaterSensor("1002", 2000L, 20)
);
// 单独定义类(建议)
SingleOutputStreamOperator<String> map = streamSource
.filter(new MyFilterFunction())
.map(new MyMapFunction());
map.print();
env.execute();
}
}
flatMap
package com.learn.flink.functions;
import com.learn.flink.bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
public class MyFlatMapFunction implements FlatMapFunction<WaterSensor, String> {
@Override
public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
if ("1001".equals(value.getId())) {
out.collect("vc: " + value.vc);
} else if ("1002".equals(value.getId())) {
out.collect("ts: " + value.getTs() + ", vc: " + value.vc);
}
}
}
package com.learn.flink.transform;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.MyFlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlatMapDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<WaterSensor> streamSource = env.fromElements(
new WaterSensor("1001", 1000L, 10),
new WaterSensor("1002", 2000L, 20)
);
SingleOutputStreamOperator<String> flatMap = streamSource.flatMap(new MyFlatMapFunction());
flatMap.print();
env.execute();
}
}