Flink, 大数据

flink-15 物理分区算子

常见的物理分区策略有:随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)

package com.learn.flink.partition;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class PartitionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStreamSource<String> source = env.socketTextStream("hadoop003", 7777);
        // 随机分区:random.nextInt(下游算子并行度)
        // source.shuffle().print();

        // 轮询 nextChannelToSendTo = (nextChannelToSendTo + 1) % 下游算子并行度
        // 如果时数据源倾斜的场景,source读进来之后,调用rebalance,就可以解决数据源的 数据倾斜
        // source.rebalance().print();

        // 缩放 实现轮询,局部组队,比rebalance更高效
        // source.rescale().print();

        // 广播 发送给下游所有子任务
        // source.broadcast().print();

        // 全局分区 global 全部发往 第一个子任务
        // 相当于让下游子任务并行度变成1
        source.global().print();
        
        // keyby 按指定key发送,相同key发往同一个子任务
        // one-to-one使用Forward分区器

        env.execute();
    }
}

分区器

  • flink提供了7种分区器 + 1种自定义

自定义分区

package com.learn.flink.partition;

import org.apache.flink.api.common.functions.Partitioner;

public class MyPartitioner implements Partitioner<String> {

    @Override
    public int partition(String key, int numPartitions) {
        return Integer.parseInt(key) % numPartitions;
    }
}
package com.learn.flink.partition;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class PartitionCustomDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

        DataStreamSource<String> source = env.socketTextStream("hadoop003", 7777);
        source.partitionCustom(new MyPartitioner(), (KeySelector<String, String>) value -> value)
                .print();

        env.execute();
    }
}