Blog
flink-15 物理分区算子
常见的物理分区策略有:随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)
package com.learn.flink.partition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class PartitionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<String> source = env.socketTextStream("hadoop003", 7777);
// 随机分区:random.nextInt(下游算子并行度)
// source.shuffle().print();
// 轮询 nextChannelToSendTo = (nextChannelToSendTo + 1) % 下游算子并行度
// 如果时数据源倾斜的场景,source读进来之后,调用rebalance,就可以解决数据源的 数据倾斜
// source.rebalance().print();
// 缩放 实现轮询,局部组队,比rebalance更高效
// source.rescale().print();
// 广播 发送给下游所有子任务
// source.broadcast().print();
// 全局分区 global 全部发往 第一个子任务
// 相当于让下游子任务并行度变成1
source.global().print();
// keyby 按指定key发送,相同key发往同一个子任务
// one-to-one使用Forward分区器
env.execute();
}
}
分区器
- flink提供了7种分区器 + 1种自定义
自定义分区
package com.learn.flink.partition;
import org.apache.flink.api.common.functions.Partitioner;
public class MyPartitioner implements Partitioner<String> {
@Override
public int partition(String key, int numPartitions) {
return Integer.parseInt(key) % numPartitions;
}
}
package com.learn.flink.partition;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class PartitionCustomDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<String> source = env.socketTextStream("hadoop003", 7777);
source.partitionCustom(new MyPartitioner(), (KeySelector<String, String>) value -> value)
.print();
env.execute();
}
}