Kafka, 大数据

kafka-14 加入退出消费者组测试

消费者1

public class MyConsumer01 {
    public static void main(String[] args) {
        // 创建配置信息
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        // 创建消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // 创建消费对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 订阅主题
        consumer.subscribe(Arrays.asList("test01"));
        // 获取数据 时间使用Duration.ofMillis()
        // 使用循环获取,否则jvm执行完即退出
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println("主题: " + consumerRecord.topic() + " 分区: " + consumerRecord.partition() + " Key: " + consumerRecord.key() + " Value: " + consumerRecord.value() + " Offset: " + consumerRecord.offset());
            }
        }
    }
}

消费者2

public class MyConsumer02 {
    public static void main(String[] args) {
        // 创建配置信息
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        // 创建消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // 创建消费对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 订阅主题
        consumer.subscribe(Arrays.asList("test01"));
        // 获取数据 时间使用Duration.ofMillis()
        // 使用循环获取,否则jvm执行完即退出
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println("主题: " + consumerRecord.topic() + " 分区: " + consumerRecord.partition() + " Key: " + consumerRecord.key() + " Value: " + consumerRecord.value() + " Offset: " + consumerRecord.offset());
            }
        }
    }
}

生产者

public class MyProducer01 {
    public static void main(String[] args) {
        //1. 创建kafka生产者的配置信息
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2.创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 3.发送数据
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("test01", "" + i, "i am data ..." ), ((metadata, exception) -> {
                if (exception == null) {
                    System.out.println("分区:" + metadata.partition() + ", offset:" + metadata.offset());
                }
            }));
        }
        // 关闭资源
        producer.close();
    }
}