Flume, 大数据

flume-06 自定义Source

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

public class MySource extends AbstractSource implements Configurable, PollableSource {

    // 定义全局的前缀和后缀
    private String prefix;
    private String suffix;

    public void configure(Context context) {
        // 读取配置信息给前后缀赋值
        // a1.source.r1.prefix
        // a1.source.r1.suffix
        context.getString("prefix");
        context.getString("suffix", "log");
    }

    /*
    1. 接收数据 for循环造数据
    2. 封装Event
    3. 将Event传给channel
     */
    public Status process() throws EventDeliveryException {
        for (int i = 0; i < 10; i++) {
            // 构建Event对象
            SimpleEvent event = new SimpleEvent();
            // 给Event设置值
            event.setBody((prefix + "--" + i + "--" + suffix).getBytes());
            // 将Event传给channel
            getChannelProcessor().processEvent(event);
        }
        return null;
    }

    public long getBackOffSleepIncrement() {
        return 0;
    }

    public long getMaxBackOffSleepInterval() {
        return 0;
    }
}
  • getChannelProcessor().processEvent(event) 这里的processEvent函数源码
    • ChannelProcessor -> Interceptor -> ChannelSelector (事务)
public void processEvent(Event event) {
    // 拦截器处理数据
  event = interceptorChain.intercept(event);
  if (event == null) {
    return;
  }
    // channel选择器 对设置的channel遍历发送数据
  List<Channel> requiredChannels = selector.getRequiredChannels(event);
  for (Channel reqChannel : requiredChannels) {
    Transaction tx = reqChannel.getTransaction();
    Preconditions.checkNotNull(tx, "Transaction object must not be null");
    try {
      tx.begin();

      reqChannel.put(event);

      tx.commit();
    } catch (Throwable t) {
      tx.rollback();
      ...
  }