Kafka, 大数据

kafka-13 自定义拦截器

Producer拦截器

Kafka0.10版本之后新增功能,用于实现clients端的定制化控制逻辑

实现ProducerInterceptor方法

  • configure(configs)
    • 获取配置信息和初始化数据时调用
  • onSend(ProducerRecord)
    • 该方法封装进kafkaProducer.send方法中,即在用户主线程中运行,在消息被序列化即计算分区前调用该方法
    • 可以在该方法中对消息做任何操作,但最好保证不要修改所属的topic和分区,否则会影响目标分区的计算
  • onAcknowledgement(RecordMetadata, Exception)
    • 该方法会在消息从RecordAccumulator成功发送到KafkaBroker之后,或者在发送过程中失败时调用
    • 通常在producer回调逻辑触发前
    • 该方法运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率
  • close

案例

  • TimeInterceptor
public class TimeInterceptor implements ProducerInterceptor {

    @Override
    public void configure(Map<String, ?> configs) {

    }

    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        // 取出数据
        String value = (String) record.value();
        // 创建一个新的ProducerRecord对象并返回
        return new ProducerRecord(record.topic(), record.partition(), record.key(), System.currentTimeMillis() + "," + value);
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }
}

CounterInterceptor

public class CounterInterceptor implements ProducerInterceptor {
    int success;
    int error;

    @Override
    public void configure(Map<String, ?> configs) {

    }

    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        // 返回数据
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            success++;
        } else {
            error++;
        }
    }

    @Override
    public void close() {
        System.out.println("success: " + success);
        System.out.println("error: " + error);
    }
}
  • InterceptorProducer
  • 添加拦截器
ArrayList<String> interceptors = new ArrayList<>();
interceptors.add("com.learn.kafka.interceptor.TimeInterceptor");
interceptors.add("com.learn.kafka.interceptor.CounterInterceptor");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
public class InterceptorProducer {
    public static void main(String[] args) {
        //1. 创建kafka生产者的配置信息
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 设置拦截器
        ArrayList<String> interceptors = new ArrayList<>();
        interceptors.add("com.learn.kafka.interceptor.TimeInterceptor");
        interceptors.add("com.learn.kafka.interceptor.CounterInterceptor");
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

        // 2.创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 3.发送数据
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first", "learn", "learn kafka " + i), ((metadata, exception) -> {
                if (exception == null) {
                    System.out.println("分区:" + metadata.partition() + ", offset:" + metadata.offset());
                }
            }));
        }
        // 关闭资源
        producer.close();
    }
}

Producer中的close方法也会触发拦截器中的close方法

About 蓝染君

喜爱编程开发的程序猿