Blog
flume-05 自定义Interceptor
pom.xml
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
TypeInterceptor.java
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TypeInterceptor implements Interceptor {
// 声明一个存放Event的集合
private List<Event> addHeaderEvent;
// 初始化
public void initialize() {
addHeaderEvent = new ArrayList<Event>();
}
// 单个Event操作
public Event intercept(Event event) {
// 获取Event中的头信息
Map<String, String> headers = event.getHeaders();
// 获取Event中的body信息
String body = new String(event.getBody());
// 根据body中是都包含hello来添加头信息
if (body.contains("hello")) {
headers.put("type","isword");
} else {
headers.put("type", "noword");
}
return event;
}
// 批量Event操作
public List<Event> intercept(List<Event> list) {
// 清空集合
addHeaderEvent.clear();
// 遍历events给每一个event添加头信息
for (Event event : list) {
addHeaderEvent.add(intercept(event));
}
return addHeaderEvent;
}
// 关闭
public void close() {
}
public static class Builder implements Interceptor.Builder {
public Interceptor build() {
return new TypeInterceptor();
}
public void configure(Context context) {
}
}
}
打包jar文件放到flume/lib文件夹下
案例二:
package com.chajisong.interceptors;
import com.alibaba.fastjson.JSON;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* 自定义拦截器 实现Interceptor接口 并实现其抽象方法
*/
public class CustomInterceptor implements Interceptor{
// 打印日志,便于测试方法的执行顺序
private static final Logger logger = LoggerFactory.getLogger(CustomInterceptor.class);
// 自定义拦截器参数,用来接收自定义拦截器flume配置参数
private static String param = "";
/**
* 拦截器构造方法,在自定义拦截器静态内部类的build方法中调用,用来创建自定义拦截器对象
*/
public CustomInterceptor() {
logger.info("-----自定义拦截器构造方法执行");
}
/**
* 该方法用来初始化拦截器,在拦截器的构造方法执行之后执行,也就是创建完拦截器对象之后执行
*/
public void initialize() {
logger.info("-----自定义拦截器的initialize方法执行");
}
/**
* 用来处理每一个event对象,该方法不会被系统自动调用,一般在 List<Event> intercept(List<Event> events) 内部调用
* @param event
* @return
*/
public Event intercept(Event event) {
logger.info("-----intercept(Event event)方法执行,处理单个event");
logger.info("-----接收到的自定义拦截器参数值param值为:" + param);
/*
这里编写event的处理代码
*/
String body = new String(event.getBody());
try {
CJSData data = JSON.parseObject(body, CJSData.class);
} catch (Exception e) {
logger.info("--脏数据--");
return null;
}
logger.info("--完整数据--");
return event;
}
/**
* 用来处理一批event对象集合,集合大小与flume启动会配置有关,和transactionCapacity大小保持一致
* 一般直接调用 Event intercept(Event event) 处理每一个event
* @param events
* @return
*/
public List<Event> intercept(List<Event> events) {
logger.info("-----intercept(List<Event> events)方法执行");
/*
这里编写对于event对象集合的处理代码,一般都是遍历event的对象集合,对于每一个event调用Event intercept(Event event)方法
然后根据返回值是否为null,将其添加到新的集合中
*/
List<Event> results = new ArrayList<Event>();
Event event;
for (Event e: events) {
event = intercept(e);
if (event != null) {
results.add(event);
}
}
return results;
}
/**
* 该方法主要用来销毁拦截器对象执行,一般是释放一些资源的处理
*/
public void close() {
logger.info("-----自定义拦截器close方法执行");
}
/**
* 通过该静态内部类来创建自定义对象供flume使用,实现Interceptor.Builder接口,并实现其抽象方法
*/
public static class Builder implements Interceptor.Builder {
/**
* 该方法主要用来返回创建的自定义拦截器对象
* @return
*/
public Interceptor build() {
logger.info("-----build方法执行");
return new CustomInterceptor();
}
/**
* 用来接收flume配置自定义拦截器参数
* @param context 通过该对象可以获取flume配置自定义拦截器的参数
*/
public void configure(Context context) {
logger.info("-----configure方法执行");
/*
通过调用context对象的getString方法来获取flume配置自定义拦截器的参数,方法参数要和自定义拦截器配置中的参数保持一致
*/
param = context.getString("param");
}
}
}