Flume, 大数据

flume-05 自定义Interceptor

pom.xml

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
    <scope>provided</scope>
</dependency>

TypeInterceptor.java

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TypeInterceptor implements Interceptor {
    // 声明一个存放Event的集合
    private List<Event> addHeaderEvent;

    // 初始化
    public void initialize() {
        addHeaderEvent = new ArrayList<Event>();
    }
    // 单个Event操作
    public Event intercept(Event event) {
        // 获取Event中的头信息
        Map<String, String> headers = event.getHeaders();
        // 获取Event中的body信息
        String body = new String(event.getBody());
        // 根据body中是都包含hello来添加头信息
        if (body.contains("hello")) {
            headers.put("type","isword");
        } else {
            headers.put("type", "noword");
        }
        return event;
    }
    // 批量Event操作
    public List<Event> intercept(List<Event> list) {
        // 清空集合
        addHeaderEvent.clear();
        // 遍历events给每一个event添加头信息
        for (Event event : list) {
            addHeaderEvent.add(intercept(event));
        }
        return addHeaderEvent;
    }
    // 关闭
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        public Interceptor build() {
            return new TypeInterceptor();
        }

        public void configure(Context context) {

        }
    }
}

打包jar文件放到flume/lib文件夹下

案例二:

package com.chajisong.interceptors;

import com.alibaba.fastjson.JSON;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * 自定义拦截器 实现Interceptor接口 并实现其抽象方法
 */
public class CustomInterceptor implements Interceptor{
    // 打印日志,便于测试方法的执行顺序
    private static final Logger logger = LoggerFactory.getLogger(CustomInterceptor.class);
    // 自定义拦截器参数,用来接收自定义拦截器flume配置参数
    private static String param = "";

    /**
     * 拦截器构造方法,在自定义拦截器静态内部类的build方法中调用,用来创建自定义拦截器对象
     */
    public CustomInterceptor() {
        logger.info("-----自定义拦截器构造方法执行");
    }

    /**
     * 该方法用来初始化拦截器,在拦截器的构造方法执行之后执行,也就是创建完拦截器对象之后执行
     */
    public void initialize() {
        logger.info("-----自定义拦截器的initialize方法执行");
    }

    /**
     * 用来处理每一个event对象,该方法不会被系统自动调用,一般在 List<Event> intercept(List<Event> events) 内部调用
     * @param event
     * @return
     */
    public Event intercept(Event event) {
        logger.info("-----intercept(Event event)方法执行,处理单个event");
        logger.info("-----接收到的自定义拦截器参数值param值为:" + param);
        /*
        这里编写event的处理代码
         */
        String body = new String(event.getBody());
        try {
            CJSData data = JSON.parseObject(body, CJSData.class);
        } catch (Exception e) {
            logger.info("--脏数据--");
            return null;
        }
        logger.info("--完整数据--");
        return event;
    }

    /**
     * 用来处理一批event对象集合,集合大小与flume启动会配置有关,和transactionCapacity大小保持一致
     * 一般直接调用 Event intercept(Event event) 处理每一个event
     * @param events
     * @return
     */
    public List<Event> intercept(List<Event> events) {
        logger.info("-----intercept(List<Event> events)方法执行");
        /*
        这里编写对于event对象集合的处理代码,一般都是遍历event的对象集合,对于每一个event调用Event intercept(Event event)方法
        然后根据返回值是否为null,将其添加到新的集合中
         */
        List<Event> results = new ArrayList<Event>();
        Event event;
        for (Event e: events) {
            event = intercept(e);
            if (event != null) {
                results.add(event);
            }
        }
        return results;
    }

    /**
     * 该方法主要用来销毁拦截器对象执行,一般是释放一些资源的处理
     */
    public void close() {
        logger.info("-----自定义拦截器close方法执行");
    }

    /**
     * 通过该静态内部类来创建自定义对象供flume使用,实现Interceptor.Builder接口,并实现其抽象方法
     */
    public static class Builder implements Interceptor.Builder {
        /**
         * 该方法主要用来返回创建的自定义拦截器对象
         * @return
         */
        public Interceptor build() {
            logger.info("-----build方法执行");
            return new CustomInterceptor();
        }

        /**
         * 用来接收flume配置自定义拦截器参数
         * @param context 通过该对象可以获取flume配置自定义拦截器的参数
         */
        public void configure(Context context) {
            logger.info("-----configure方法执行");
            /*
            通过调用context对象的getString方法来获取flume配置自定义拦截器的参数,方法参数要和自定义拦截器配置中的参数保持一致
             */
            param = context.getString("param");
        }
    }
}