Kafka, 大数据

kafka-07 kafka消费者

消费方式

  • consumer采用pull(拉)模式从broker中读取数据
  • 如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据
  • 如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout(传入的时长参数timeout)

分区分配策略

一个consumer group中有多个consumer,一个topic有多个partition,kafka数据分配策略

  • RoundRobin(轮询)
    • 轮询保证消费均衡
    • 多个主题一个消费组,使用TopicAndPartition类对象 将主题和分区 进行hash排序,变成排序数据,轮询插入数据,会出现未订阅该主题的消费者获取到数据
    • 所以消费者组中消费的主题必须是一样的
  • Range(默认)
    • 按照主题划分,消费者订阅了该主题的才会发,订阅主题有3个分区2个消费 消费者组A 每次获取2个分区数据 B 获取1个数据 可能会出现消费不均衡现象
  • 消费者组中的消费者个数发生变化时触发重新分配分区

offset的维护

group + topic + partition来确定offset

组添加消费者会继续消费,不会重头消费,offset对应的组不是消费者

0.9版本开始consumer默认将offset保存在kafka一个内置topic中,该topic为__consumer_offsets

  • 修改配置文件consumer.properties
    • 普通消费者可以消费系统的主题
exclude.internal.topics=false

读取offset

bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop001:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatt" --consumer.config config/consumer.properties --from-beginning

key(组+主题+分区),value(offset)

消费者组案例

需求:测试同一个消费者组的消费者,同一时刻只能有一个消费者消费

配置:

hadoop001 hadoop002

# consumer.properties
group.id=mygroup

启动消费者

bin/kafka-console-consumer.sh --topic first --bootstrap-server hadoop001:9092,hadoop002:9092,hadoop003:9092 --consumer.config config/consumer.properties

hadoop003 启动生产者

bin/kafka-console-producer.sh --topic first --broker-list hadoop001:9092,hadoop002:9092,hadoop003:9092