Blog
flume-06 自定义Source
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
public class MySource extends AbstractSource implements Configurable, PollableSource {
// 定义全局的前缀和后缀
private String prefix;
private String suffix;
public void configure(Context context) {
// 读取配置信息给前后缀赋值
// a1.source.r1.prefix
// a1.source.r1.suffix
context.getString("prefix");
context.getString("suffix", "log");
}
/*
1. 接收数据 for循环造数据
2. 封装Event
3. 将Event传给channel
*/
public Status process() throws EventDeliveryException {
for (int i = 0; i < 10; i++) {
// 构建Event对象
SimpleEvent event = new SimpleEvent();
// 给Event设置值
event.setBody((prefix + "--" + i + "--" + suffix).getBytes());
// 将Event传给channel
getChannelProcessor().processEvent(event);
}
return null;
}
public long getBackOffSleepIncrement() {
return 0;
}
public long getMaxBackOffSleepInterval() {
return 0;
}
}
- getChannelProcessor().processEvent(event) 这里的processEvent函数源码
- ChannelProcessor -> Interceptor -> ChannelSelector (事务)
public void processEvent(Event event) {
// 拦截器处理数据
event = interceptorChain.intercept(event);
if (event == null) {
return;
}
// channel选择器 对设置的channel遍历发送数据
List<Channel> requiredChannels = selector.getRequiredChannels(event);
for (Channel reqChannel : requiredChannels) {
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();
reqChannel.put(event);
tx.commit();
} catch (Throwable t) {
tx.rollback();
...
}