flume采集数据至hbase有四个实例,本文章一一列举,各实例流程均差不多,区别基本上就是配置文件的编写。其中实例一流程较为详细,后面几个实例参考实例一流程
agent.sources = logfile-source
agent.channels = file-channel
agent.sinks = hbase-sink
agent.sources.logfile-source.type = exec
agent.sources.logfile-source.command = tail -f /data/flume-hbase-test/mkhbasetable/data/test.log
agent.sources.logfile-source.checkerpiodic = 50
agent.sources.logfile-source.channels = file-channel
agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data
agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink.table = mikeal-hbase-table-test1
agent.sinks.hbase-sink.columnFamily = familycloml
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
agent.sinks.hbase-sink.serializer.payloadcolumn = column-1
agent.sinks.hbase-sink.channel = file-channel
agent.sources: 定义 Flume 代理的源,这里设置为 `logfile-source`。
agent.sources.logfile-source.type: 源的类型为 `exec`,表示它将执行一个命令来获取数据。
agent.sources.logfile-source.command: 指定要执行的命令,`tail -f /data/flume-hbase-test/mkhbasetable/data/test.log` 用来实时跟踪 `test.log` 文件,输出文件中新增的内容。
agent.sources.logfile-source.checkerpiodic: 这是一个拼写错误,应为 `checker.period`,它的作用是设定源的检查时长(单位为毫秒),决定多长时间检查源是否有新数据,这里设置为每 50 毫秒检查一次。
agent.sources.logfile-source.channels: 指定源与通道之间的连接,这里指向 `file-channel`,表示从这个源读取的数据会被发送到这个通道。
agent.channels: 定义 Flume 代理的通道,这里设置为 `file-channel`。
agent.channels.file-channel.type: 通道的类型为 `file`,表示使用文件系统存储数据。
agent.channels.file-channel.checkpointDir: 这是通道的检查点目录,用于存储通道的状态,以便在失败后恢复。
agent.channels.file-channel.dataDirs: 这是通道的数据存储目录,通道会把数据暂时存储在这里,直到 Sink 提取数据。
agent.sinks: 定义 Flume 代理的接收器,这里设置为 `hbase-sink`。
agent.sinks.hbase-sink.type: 接收器的类型是 HBaseSink,意味着这个接收器将数据写入 HBase 数据库。
agent.sinks.hbase-sink.table: 指定要写入的 HBase 表名,这里是 `mikeal-hbase-table-test1`。
agent.sinks.hbase-sink.columnFamily: 指定 HBase 表的列族名,这里为 `familycloml`。需要确认该列族是否已经存在于 HBase 表中。
agent.sinks.hbase-sink.serializer: 指定如何序列化数据的类,这里使用了简单的 HBase 事件序列化器 `SimpleHbaseEventSerializer`。
agent.sinks.hbase-sink.serializer.payloadcolumn: 指定数据中应保存到 HBase 的列名,这里设置为 `column-1`,这个值会对应于 HBase 表的某一列。
agent.sinks.hbase-sink.channel: 指定这个接收器使用的通道,这里指向 `file-channel`,表示它从这个通道中读取数据。
# 创建日志和数据存放路径
mkdir -p /data/flume-hbase-test/mkhbasetable/data/
# 创建通道检查点路径和通道数据暂存路径
mkdir -p /data/flume-hbase-test/data
mkdir -p /data/flume-hbase-test/checkpoint
# 创建空日志文件
cd /data/flume-hbase-test/mkhbasetable/data/
touch test.log
# 编辑模拟数据文件
vim data-test1.txt
134.3
726.9
262.0
902.8
665.8
153.2
618.3
333.4
985.7
201.2
970.3
234.8
# 依次启动Hadoop,zookeeper,hbase所有进程
allstart.sh
# 启动hbase shell
hbase shell
# 创建mikeal-hbase-table-test1表
create 'mikeal-hbase-table-test1', 'familycloml'
# 切换到脚本路径
cd /opt/module/flume/job-shell
# 编辑启动脚本
vim test-flume-into-hbase-1
#!/bin/bash
echo " --------启动 master 采集日志数据至Hbase 测试1--------"
nohup /opt/module/flume/bin/flume-ng agent -n agent -c /opt/module/flume/conf/ -f /opt/module/flume/conf/job/test-flume-into-hbase-1.conf >/dev/null 2>&1 &
# 添加权限
chmod 777 ./*
# 切换到脚本启动路径下
cd /opt/module/flume/job-shell
# 启动flume采集脚本
test-flume-into-hbase-1
# 向日志文件添加数据
/data/flume-hbase-test/mkhbasetable/data
cat data-test1.txt >> test.log
agent.sources = logfile-source
agent.channels = file-channel
agent.sinks = hbase-sink
agent.sources.logfile-source.type = exec
agent.sources.logfile-source.command = tail -f /data/flume-hbase-test/mkhbasetable/data/test.log
agent.sources.logfile-source.checkPeriodic = 50
agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data
agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.AsyncHBaseSink
agent.sinks.hbase-sink.table = mikeal-hbase-table-test2
agent.sinks.hbase-sink.columnFamily = familycloml
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
agent.sinks.hbase-sink.serializer.payloadColumn = column-1
agent.sources.logfile-source.channels = file-channel
agent.sinks.hbase-sink.channel = file-channel
实例二使用SimpleAsyncHbaseEventSerializer序列化模式来采集数据,其他模式基本和实例一一致
create 'mikeal-hbase-table-test2', 'familycloml'
vim test-flume-into-hbase-2
#!/bin/bash
echo " --------启动 master 采集日志数据至Hbase 测试2--------"
nohup /opt/module/flume/bin/flume-ng agent -n agent -c /opt/module/flume/conf/ -f /opt/module/flume/conf/job/test-flume-into-hbase-2.conf >/dev/null 2>&1 &
# 添加权限
chmod 777 ./*
# 切换到脚本启动路径下
cd /opt/module/flume/job-shell
# 启动flume采集脚本
test-flume-into-hbase-2
# 向日志文件添加数据
cd /data/flume-hbase-test/mkhbasetable/data
cat data-test1.txt >> test.log
agent.sources = logfile-source
agent.channels = file-channel
agent.sinks = hbase-sink
agent.sources.logfile-source.type = exec
agent.sources.logfile-source.command = tail -f /data/flume-hbase-test/mkhbasetable/data/nginx.log
agent.sources.logfile-source.checkperiodic = 50
agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data
agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink.table = mikeal-hbase-table-test3
agent.sinks.hbase-sink.columnFamily = familycloml
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent.sinks.hbase-sink.serializer.regex = (\\d+\\.\\d+\\.\\d+\\.\\d+) \\[(.*?)\\] \\w+ (.+)
agent.sinks.hbase-sink.serializer.colNames = ip, time, url
agent.sources.logfile-source.channels = file-channel
agent.sinks.hbase-sink.channel = file-channel
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer: 指定使用正则表达式序列化器将事件序列化为 HBase 可接受的格式。
agent.sinks.hbase-sink.serializer.regex =(\\d+\\.\\d+\\.\\d+\\.\\d+) \\[(.*?)\\] \\w+ (.+): 定义正则表达式,将相关数据提取
agent.sinks.hbase-sink.serializer.colNames = ip, time, url: 指定要提取的列名,即将从日志中解析出的数据对应到 HBase 表的列。这里是 ip、time 和 url。
create 'mikeal-hbase-table-test3', 'familycloml'
vim nginx-data.txt
192.168.1.1 [27/Sep/2024:10:28:00 -0400] GET /path/to/resource?param=value HTTP/1.1
10.0.0.2 [27/Sep/2024:10:32:31 -0400] POST /api/v1/data HTTP/1.1
172.16.0.3 [27/Sep/2024:10:34:45 -0400] DELETE /api/v2/resource HTTP/1.1
cd /data/flume-hbase-test/mkhbasetable/data
vim test-flume-into-hbase-3
#!/bin/bash
echo " --------启动 master 采集日志数据至Hbase 测试3--------"
nohup /opt/module/flume/bin/flume-ng agent -n agent -c /opt/module/flume/conf/ -f /opt/module/flume/conf/job/test-flume-into-hbase-3.conf >/dev/null 2>&1 &
# 添加权限
chmod 777 ./*
# 切换到脚本启动路径下
cd /opt/module/flume/job-shell
# 启动flume采集脚本
test-flume-into-hbase-3
# 向日志文件添加数据
cd /data/flume-hbase-test/mkhbasetable/data
cat nginx-data.txt >> nginx.log
agent.sources = logfile-source-1, logfile-source-2
agent.channels = file-channel-1, file-channel-2
agent.sinks = hbase-sink-1, hbase-sink-2
agent.sources.logfile-source-1.type = exec
agent.sources.logfile-source-1.command = tail -f /data/flume-hbase-test/mkhbasetable/data/nginx.log
agent.sources.logfile-source-1.checkperiodic = 50
agent.sources.logfile-source-2.type = exec
agent.sources.logfile-source-2.command = tail -f /data/flume-hbase-test/mkhbasetable/data/tomcat.log
agent.sources.logfile-source-2.checkperiodic = 50
agent.channels.file-channel-1.type = file
agent.channels.file-channel-1.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel-1.dataDirs = /data/flume-hbase-test/data
agent.channels.file-channel-2.type = file
agent.channels.file-channel-2.checkpointDir = /data/flume-hbase-test/checkpoint2
agent.channels.file-channel-2.dataDirs = /data/flume-hbase-test/data2
agent.sinks.hbase-sink-1.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink-1.table = mikeal-hbase-table-test-multi-position
agent.sinks.hbase-sink-1.columnFamily = familycloml1
agent.sinks.hbase-sink-1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent.sinks.hbase-sink-1.serializer.regex = (\\d+\\.\\d+\\.\\d+\\.\\d+) \\[(.*?)\\] \\w+ (.+)
agent.sinks.hbase-sink-1.serializer.colNames = ip, time, url
agent.sinks.hbase-sink-2.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink-2.table = mikeal-hbase-table-test-multi-position
agent.sinks.hbase-sink-2.columnFamily = familycloml2
agent.sinks.hbase-sink-2.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent.sinks.hbase-sink-2.serializer.regex = (\\d+\\.\\d+\\.\\d+\\.\\d+) \\[(.*?)\\] \\w+ (.+)
agent.sinks.hbase-sink-2.serializer.colNames = ip, time, url
agent.sources.logfile-source-1.channels = file-channel-1
agent.sinks.hbase-sink-1.channel = file-channel-1
agent.sources.logfile-source-2.channels = file-channel-2
agent.sinks.hbase-sink-2.channel = file-channel-2
配置在前面三个实例的基础上加上双通道模式
# 创建通道检查点路径和通道数据暂存路径
mkdir -p /data/flume-hbase-test/checkpoint2
mkdir -p /data/flume-hbase-test/data2
create 'mikeal-hbase-table-test-multi-position', 'familycloml1', 'familycloml2'
cd /data/flume-hbase-test/mkhbasetable/data
vim test-flume-into-hbase-multi-position
#!/bin/bash
echo " --------启动 master 采集日志数据至Hbase 测试4--------"
nohup /opt/module/flume/bin/flume-ng agent -n agent -c /opt/module/flume/conf/ -f /opt/module/flume/conf/job/test-flume-into-hbase-multi-position.conf >/dev/null 2>&1 &
# 添加权限
chmod 777 ./*
# 切换到脚本启动路径下
cd /opt/module/flume/job-shell
# 启动flume采集脚本
test-flume-into-hbase-multi-position
# 向日志文件添加数据
cd /data/flume-hbase-test/mkhbasetable/data
cat nginx-data.txt >> nginx.log && cat nginx-data.txt >> tomcat.log
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。