Blog
flume-03 入门案例
监控端口数据
- 需求
使用Flume监听一个端口,收集该端口数据,并打印到控制台
- 分析
安装netcat
sudo yum install -y nc
判断44444端口是否被占用
sudo netstat -tunlp | grep 44444
nc开启服务端
nc -lk 44444
nc开启客户端
nc localhost 44444
创建flume agent配置文件 netcat-flume-logger.conf
# agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source组件配置
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# sink组件配置
a1.sinks.k1.type = logger
# channel组件配置
a1.channels.c1.type = memory
# 缓冲容量1000个Event
a1.channels.c1.capacity = 1000
# 传输速率 一次传多少
a1.channels.c1.transactionCapacity = 100
# 组件关联
a1.sources.r1.channels = c1
# 一个sink对应一个channel
a1.sinks.k1.channel = c1
启动命令
bin/flume-ng agent
--name a1
--conf conf
--conf-file jobs/netcat-flume-logger.conf
-Dflume.root.logger=INFO,console
# 或者
bin/flume-ng agent -n a1 -c conf -f jobs/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
实时监控单个追加文件
- 需求
实时监控Hive日志,并上传到HDFS中
- 分析
实现监控一个文件 并将结果打印到终端
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
# /home/test.log 监控文件路径
a1.sources.r1.command = tail -f /home/test.log
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume想要将数据传输到HDFS必须有Hadoop相关jar包
hadoop3只需要替换guava包
- 将flume/lib/guava-11.0.2.jar替换为hadoop3/share/hadoop/hdfs/lib/guava-27.0-jre.jar
以下三者满足其一即滚动文件,设置0则无视当前项规则
- hdfs.rollInterval
- 多久生成一个新的文件 单位秒
- hdfs.rollSize
- 134217700 设置每个文件的滚动大小,一般设置块大小 略小一点
- hdfs.rollCount
- 一般关闭设置0 文件滚动与Event数量无关
其他:
- hdfs.batchSize
- 积攒多少个Event才flush到HDFS一次
- hdfs.filePrefix
- log 上传文件的前缀
- hdfs.useLocalTimeStamp
- true 是否使用本地时间戳
- hdfs.fileType
- DataStream设置文件类型,可支持压缩
控制一天一个日志文件夹
- hdfs.round
- true 是否按照时间滚动文件夹
- hdfs.roundValue
- 1 多少时间单位创建一个新的文件夹
- hdfs.roundUnit
- hour 重新定义时间单位
提示:可以使用-Dflume.root.logger=INFO,console来查看写入过程是否出错
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/test.log
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop001:8020/flume/%Y%m%d/%H
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = hour
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
实时监控目录下多个新文件
- 需求
使用Flume监听整个目录的文件,并上传至HDFS
- 分析
Spooldir是监控新文件,而不是有文件新增内容
- spoolDir监控的目标文件夹 默认500ms扫描一次文件夹
- fileSuffix已同步完毕文件添加后缀标识
- includePattern 正则
- ignorePattern 正则 不监控的文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/upload
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop001:8020/flume/%Y%m%d/%H
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = hour
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
实时监控目录下的多个追加文件
exec source适用于监控一个实时追加的文件,但不能保证数据不丢失
spooldir source能保证数据不丢失,且能够实现断点续传,但延迟高,不能实时监控
taildir技能实现断点续传,又可以保证数据不丢失,还能够进行实时监控
- 需求
使用flume监听整个目录的实施追加文件,并上传至hdfs
- 分析
使用logger sink 方便查看
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/test1/.*.txt
a1.sources.r1.positionFile = /home/hadoop/position/position.json
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1