Blog
kafka-13 自定义拦截器
Producer拦截器
Kafka0.10版本之后新增功能,用于实现clients端的定制化控制逻辑
实现ProducerInterceptor方法
- configure(configs)
- 获取配置信息和初始化数据时调用
- onSend(ProducerRecord)
- 该方法封装进kafkaProducer.send方法中,即在用户主线程中运行,在消息被序列化即计算分区前调用该方法
- 可以在该方法中对消息做任何操作,但最好保证不要修改所属的topic和分区,否则会影响目标分区的计算
- onAcknowledgement(RecordMetadata, Exception)
- 该方法会在消息从RecordAccumulator成功发送到KafkaBroker之后,或者在发送过程中失败时调用
- 通常在producer回调逻辑触发前
- 该方法运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率
- close
案例
- TimeInterceptor
public class TimeInterceptor implements ProducerInterceptor {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord onSend(ProducerRecord record) {
// 取出数据
String value = (String) record.value();
// 创建一个新的ProducerRecord对象并返回
return new ProducerRecord(record.topic(), record.partition(), record.key(), System.currentTimeMillis() + "," + value);
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
CounterInterceptor
public class CounterInterceptor implements ProducerInterceptor {
int success;
int error;
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord onSend(ProducerRecord record) {
// 返回数据
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
success++;
} else {
error++;
}
}
@Override
public void close() {
System.out.println("success: " + success);
System.out.println("error: " + error);
}
}
- InterceptorProducer
- 添加拦截器
ArrayList<String> interceptors = new ArrayList<>();
interceptors.add("com.learn.kafka.interceptor.TimeInterceptor");
interceptors.add("com.learn.kafka.interceptor.CounterInterceptor");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
public class InterceptorProducer {
public static void main(String[] args) {
//1. 创建kafka生产者的配置信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 设置拦截器
ArrayList<String> interceptors = new ArrayList<>();
interceptors.add("com.learn.kafka.interceptor.TimeInterceptor");
interceptors.add("com.learn.kafka.interceptor.CounterInterceptor");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
// 2.创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3.发送数据
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("first", "learn", "learn kafka " + i), ((metadata, exception) -> {
if (exception == null) {
System.out.println("分区:" + metadata.partition() + ", offset:" + metadata.offset());
}
}));
}
// 关闭资源
producer.close();
}
}
Producer中的close方法也会触发拦截器中的close方法