Blog
kafka-07 kafka消费者
消费方式
- consumer采用pull(拉)模式从broker中读取数据
- 如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据
- 如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout(传入的时长参数timeout)
分区分配策略
一个consumer group中有多个consumer,一个topic有多个partition,kafka数据分配策略
- RoundRobin(轮询)
- 轮询保证消费均衡
- 多个主题一个消费组,使用TopicAndPartition类对象 将主题和分区 进行hash排序,变成排序数据,轮询插入数据,会出现未订阅该主题的消费者获取到数据
- 所以消费者组中消费的主题必须是一样的
- Range(默认)
- 按照主题划分,消费者订阅了该主题的才会发,订阅主题有3个分区2个消费 消费者组A 每次获取2个分区数据 B 获取1个数据 可能会出现消费不均衡现象
- 消费者组中的消费者个数发生变化时触发重新分配分区
offset的维护
group + topic + partition来确定offset
组添加消费者会继续消费,不会重头消费,offset对应的组不是消费者
0.9版本开始consumer默认将offset保存在kafka一个内置topic中,该topic为__consumer_offsets
- 修改配置文件consumer.properties
- 普通消费者可以消费系统的主题
exclude.internal.topics=false
读取offset
bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop001:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatt" --consumer.config config/consumer.properties --from-beginning
key(组+主题+分区),value(offset)
消费者组案例
需求:测试同一个消费者组的消费者,同一时刻只能有一个消费者消费
配置:
hadoop001 hadoop002
# consumer.properties
group.id=mygroup
启动消费者
bin/kafka-console-consumer.sh --topic first --bootstrap-server hadoop001:9092,hadoop002:9092,hadoop003:9092 --consumer.config config/consumer.properties
hadoop003 启动生产者
bin/kafka-console-producer.sh --topic first --broker-list hadoop001:9092,hadoop002:9092,hadoop003:9092