Blog
kafka-14 加入退出消费者组测试
消费者1
public class MyConsumer01 {
public static void main(String[] args) {
// 创建配置信息
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
// 创建消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 创建消费对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
consumer.subscribe(Arrays.asList("test01"));
// 获取数据 时间使用Duration.ofMillis()
// 使用循环获取,否则jvm执行完即退出
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("主题: " + consumerRecord.topic() + " 分区: " + consumerRecord.partition() + " Key: " + consumerRecord.key() + " Value: " + consumerRecord.value() + " Offset: " + consumerRecord.offset());
}
}
}
}
消费者2
public class MyConsumer02 {
public static void main(String[] args) {
// 创建配置信息
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
// 创建消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 创建消费对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 订阅主题
consumer.subscribe(Arrays.asList("test01"));
// 获取数据 时间使用Duration.ofMillis()
// 使用循环获取,否则jvm执行完即退出
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("主题: " + consumerRecord.topic() + " 分区: " + consumerRecord.partition() + " Key: " + consumerRecord.key() + " Value: " + consumerRecord.value() + " Offset: " + consumerRecord.offset());
}
}
}
}
生产者
public class MyProducer01 {
public static void main(String[] args) {
//1. 创建kafka生产者的配置信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2.创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3.发送数据
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("test01", "" + i, "i am data ..." ), ((metadata, exception) -> {
if (exception == null) {
System.out.println("分区:" + metadata.partition() + ", offset:" + metadata.offset());
}
}));
}
// 关闭资源
producer.close();
}
}