Blog
flink-13 聚合算子
package com.learn.flink.aggregate;
import com.learn.flink.bean.WaterSensor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KeyByDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<WaterSensor> sensorDS = env.fromElements(
new WaterSensor("s1", 1L, 1),
new WaterSensor("s1", 11L, 11),
new WaterSensor("s2", 2L, 2),
new WaterSensor("s3", 3L, 3)
);
/*
按照id分组
要点:
1、返回的是一个KeyedStream, 键控流
2、keyby不是转换算子,只是对数据重新分区,不能设置并行度
3、keyby分组与分区的关系
1)keyby是对数据分组,保证相同key的数据在同一个分区
2)分区:一个子任务,可以理解为一个分区
*/
KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor value) throws Exception {
return value.getId();
}
});
keyedStream.print();
env.execute();
}
}
- keyBy返回的是一个KeyedStream 键控流
- keyby不是转换算子,只是对数据重新分区,不能设置并行度
- keyby分组与分区的关系
- keyby是对数据分组,保证相同的key的数据 再同一个分区
- 分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以多个分组(key)
可以理解一个教室(分区)里面的学生可以有好几个小组(分组),每个分组都是按照逻辑规则划归一起。
简单聚合算子
package com.learn.flink.aggregate;
import com.learn.flink.bean.WaterSensor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SimpleAggregateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<WaterSensor> sensorDS = env.fromElements(
new WaterSensor("s1", 1L, 1),
new WaterSensor("s1", 11L, 11),
new WaterSensor("s2", 2L, 2),
new WaterSensor("s3", 3L, 3)
);
KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor value) throws Exception {
return value.getId();
}
});
/*
简单聚合算子
1、keyby之后才能调用
2、分组内的聚合,对同一个key进行聚合
*/
// 传位置索引适用于Tuple类型
// POJO类型传字段名
/*
max、maxby区别
max:只会取比较字段的最大值,非比较字段保留第一次的值
maxby:取比较字段的最大值,同时非比较字段 取最大值这条数据的值
*/
SingleOutputStreamOperator<WaterSensor> sum = keyedStream.sum("vc");
sum.print();
env.execute();
}
}
- 聚合算子需要再keyby之后才能调用
- 分组内的聚合,对同一个key进行聚合
- 聚合算子可以传参:位置参数,和字段名
- 传位置参数适用于Tuple类型
- POJO类型传字段名
- max、maxby区别
- max:只会取比较字段的最大值,其他字段保留第一次的值
- maxby:取比较字段的最大值,其他字段取最大值的数据的值
reduce算子
package com.learn.flink.aggregate;
import com.learn.flink.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ReduceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<WaterSensor> sensorDS = env.fromElements(
new WaterSensor("s1", 1L, 1),
new WaterSensor("s1", 11L, 11),
new WaterSensor("s2", 2L, 2),
new WaterSensor("s3", 3L, 3)
);
KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor value) throws Exception {
return value.getId();
}
});
/*
reduce聚合 同一分组内的两两元素操作
1、keyby之后调用
2、输入类型 = 输出类型,类型不能变
3、每个分组的第一条数据并不会进入reduce方法,存起来 直接输出
4、reduce方法中两个参数
value1:之前计算结果 存状态
value2:现在来的数据
*/
SingleOutputStreamOperator<WaterSensor> reduce = keyedStream.reduce(new ReduceFunction<WaterSensor>() {
@Override // 函数参数类型 和返回值类型都是一样的
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
System.out.println("value1=" + value1);
System.out.println("value2=" + value2);
return new WaterSensor(value1.getId(), value1.getTs(), value1.getVc() + value2.getVc());
}
});
reduce.print();
env.execute();
}
}
- keyby之后调用
- 输入类型 = 输出类型 类型不变
- 每个key的第一条数据来的时候,不会执行reduce方法,存起来、直接输出
- reduce方法中的两个参数
- value1:之前的计算结果,存状态
- value2:现在来的数据