Kafka, 大数据

kafka-11 Producer API

消息发送流程

  • kafka的producer发送消息采用的是异步发送的方式
  • 在消息发送的过程中,涉及到两个线程:
    • main线程
    • sender线程
  • 一个共享变量RecordAccumulator
  • main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker
  • batch.size:只有数据累积到batch.size之后,sender才会发送数据
  • linger.ms:如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据

异步发送API

  • 导入依赖
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.1</version>
</dependency>

KafkaProducer:需要创建一个生产者对象,用来发送数据

ProducerConfig:获取所需的一些列配置参数

ProducerRecord:每条数据都要封装成一个ProducerRecord对象

public class MyProducer {
    public static void main(String[] args) {
        //1. 创建kafka生产者的配置信息
        Properties properties = new Properties();
        // 指定连接的kafka集群 broker-list
        properties.put("bootstrap.servers", "192.168.11.127:9092");
        // 设置ack应答级别
        properties.put("ack", "all");
        // 重试次数
        properties.put("retries", 3);
        // 批次大小 16k
        properties.put("batch.size", 16384);
        // 等待时间
        properties.put("linger.ms", 1);
        // RecordAccumulator缓冲区大小 32M
        properties.put("buffer.memory", 33554432);
        // 指定序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2.创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 3.发送数据
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("first", "learn kafka " + i));
        }

        // 关闭资源
        producer.close();
    }
}

ProducerConfig包含所有配置信息的类

回调信息

public class CallBackProducer {
    public static void main(String[] args) {
        // 创建配置文件
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        // 3.发送数据
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first", "learn kafka " + i), ((metadata, exception) -> {
                if (exception == null) {
                    System.out.println("分区:" + metadata.partition() + ", offset:" + metadata.offset());
                }
            }));
        }
        producer.close();
    }
}

自定义分区生成者

public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取主题分区数
        Integer integer = cluster.partitionCountForTopic(topic);
        return key.toString().hashCode() % integer;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

配置自定义的partitioner

public class PartitionerProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 指定自定义分区类
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.learn.kafka.partition.MyPartitioner");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        ProducerRecord<String, String> record = new ProducerRecord<>("first", "test", "haha");
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                System.out.println("分区:" + metadata.partition() + " offset:" + metadata.offset());
            } else {
                exception.printStackTrace();
            }
        });

        producer.close();
    }
}

同步发送API

同步发送的意思就是一条消息发送之后,会阻塞当前线程,直至返回ack

由于send方法返回Future对象,根据Future对象的特点,我们可以使用Future.get()实现同步发送效果

RecordMetadata result = producer.send(record).get();

实现有序:一个分区,同步API