Blog
kafka-11 Producer API
消息发送流程
- kafka的producer发送消息采用的是异步发送的方式
- 在消息发送的过程中,涉及到两个线程:
- main线程
- sender线程
- 一个共享变量RecordAccumulator
- main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker
- batch.size:只有数据累积到batch.size之后,sender才会发送数据
- linger.ms:如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据
异步发送API
- 导入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一些列配置参数
ProducerRecord:每条数据都要封装成一个ProducerRecord对象
public class MyProducer {
public static void main(String[] args) {
//1. 创建kafka生产者的配置信息
Properties properties = new Properties();
// 指定连接的kafka集群 broker-list
properties.put("bootstrap.servers", "192.168.11.127:9092");
// 设置ack应答级别
properties.put("ack", "all");
// 重试次数
properties.put("retries", 3);
// 批次大小 16k
properties.put("batch.size", 16384);
// 等待时间
properties.put("linger.ms", 1);
// RecordAccumulator缓冲区大小 32M
properties.put("buffer.memory", 33554432);
// 指定序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2.创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3.发送数据
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("first", "learn kafka " + i));
}
// 关闭资源
producer.close();
}
}
ProducerConfig包含所有配置信息的类
回调信息
public class CallBackProducer {
public static void main(String[] args) {
// 创建配置文件
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3.发送数据
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("first", "learn kafka " + i), ((metadata, exception) -> {
if (exception == null) {
System.out.println("分区:" + metadata.partition() + ", offset:" + metadata.offset());
}
}));
}
producer.close();
}
}
自定义分区生成者
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取主题分区数
Integer integer = cluster.partitionCountForTopic(topic);
return key.toString().hashCode() % integer;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
配置自定义的partitioner
public class PartitionerProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 指定自定义分区类
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.learn.kafka.partition.MyPartitioner");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("first", "test", "haha");
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("分区:" + metadata.partition() + " offset:" + metadata.offset());
} else {
exception.printStackTrace();
}
});
producer.close();
}
}
同步发送API
同步发送的意思就是一条消息发送之后,会阻塞当前线程,直至返回ack
由于send方法返回Future对象,根据Future对象的特点,我们可以使用Future.get()实现同步发送效果
RecordMetadata result = producer.send(record).get();
实现有序:一个分区,同步API