Flink, 大数据

flink-13 聚合算子

package com.learn.flink.aggregate;

import com.learn.flink.bean.WaterSensor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class KeyByDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 11L, 11),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3)
        );
        /*
          按照id分组
          要点:
          1、返回的是一个KeyedStream, 键控流
          2、keyby不是转换算子,只是对数据重新分区,不能设置并行度
          3、keyby分组与分区的关系
            1)keyby是对数据分组,保证相同key的数据在同一个分区
            2)分区:一个子任务,可以理解为一个分区
         */
        KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(new KeySelector<WaterSensor, String>() {
            @Override
            public String getKey(WaterSensor value) throws Exception {
                return value.getId();
            }
        });

        keyedStream.print();

        env.execute();
    }
}
  • keyBy返回的是一个KeyedStream 键控流
  • keyby不是转换算子,只是对数据重新分区,不能设置并行度
  • keyby分组与分区的关系
    • keyby是对数据分组,保证相同的key的数据 再同一个分区
    • 分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以多个分组(key)

可以理解一个教室(分区)里面的学生可以有好几个小组(分组),每个分组都是按照逻辑规则划归一起。

简单聚合算子

package com.learn.flink.aggregate;

import com.learn.flink.bean.WaterSensor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SimpleAggregateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 11L, 11),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3)
        );

        KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(new KeySelector<WaterSensor, String>() {
            @Override
            public String getKey(WaterSensor value) throws Exception {
                return value.getId();
            }
        });
        /*
        简单聚合算子
        1、keyby之后才能调用
        2、分组内的聚合,对同一个key进行聚合
         */
        // 传位置索引适用于Tuple类型
        // POJO类型传字段名
        /*
        max、maxby区别
            max:只会取比较字段的最大值,非比较字段保留第一次的值
            maxby:取比较字段的最大值,同时非比较字段 取最大值这条数据的值
         */
        SingleOutputStreamOperator<WaterSensor> sum = keyedStream.sum("vc");
        sum.print();

        env.execute();
    }
}
  • 聚合算子需要再keyby之后才能调用
  • 分组内的聚合,对同一个key进行聚合
  • 聚合算子可以传参:位置参数,和字段名
    • 传位置参数适用于Tuple类型
    • POJO类型传字段名
  • max、maxby区别
    • max:只会取比较字段的最大值,其他字段保留第一次的值
    • maxby:取比较字段的最大值,其他字段取最大值的数据的值

reduce算子

package com.learn.flink.aggregate;

import com.learn.flink.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ReduceDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 11L, 11),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3)
        );
        KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(new KeySelector<WaterSensor, String>() {
            @Override
            public String getKey(WaterSensor value) throws Exception {
                return value.getId();
            }
        });
        /*
        reduce聚合 同一分组内的两两元素操作
        1、keyby之后调用
        2、输入类型 = 输出类型,类型不能变
        3、每个分组的第一条数据并不会进入reduce方法,存起来 直接输出
        4、reduce方法中两个参数
            value1:之前计算结果 存状态
            value2:现在来的数据
         */
        SingleOutputStreamOperator<WaterSensor> reduce = keyedStream.reduce(new ReduceFunction<WaterSensor>() {
            @Override // 函数参数类型 和返回值类型都是一样的
            public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                System.out.println("value1=" + value1);
                System.out.println("value2=" + value2);
                return new WaterSensor(value1.getId(), value1.getTs(), value1.getVc() + value2.getVc());
            }
        });

        reduce.print();

        env.execute();
    }
}
  • keyby之后调用
  • 输入类型 = 输出类型 类型不变
  • 每个key的第一条数据来的时候,不会执行reduce方法,存起来、直接输出
  • reduce方法中的两个参数
    • value1:之前的计算结果,存状态
    • value2:现在来的数据