前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >项目三 flume 采集数据至hbase

项目三 flume 采集数据至hbase

原创
作者头像
弟大翻着洗
发布2024-09-28 15:39:52
330
发布2024-09-28 15:39:52
举报
文章被收录于专栏:人邮电数据采集与预处理

简介

flume采集数据至hbase有四个实例,本文章一一列举,各实例流程均差不多,区别基本上就是配置文件的编写。其中实例一流程较为详细,后面几个实例参考实例一流程

实例一

编写配置文件

  • 先在/opt/module/flume/conf/job目录下创建一个flume采集数据至hbase的配置文件
代码语言:shell
复制
agent.sources = logfile-source
agent.channels = file-channel
agent.sinks = hbase-sink

agent.sources.logfile-source.type = exec
agent.sources.logfile-source.command = tail -f /data/flume-hbase-test/mkhbasetable/data/test.log
agent.sources.logfile-source.checkerpiodic = 50
agent.sources.logfile-source.channels = file-channel

agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data

agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink.table = mikeal-hbase-table-test1
agent.sinks.hbase-sink.columnFamily = familycloml
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer

agent.sinks.hbase-sink.serializer.payloadcolumn = column-1
agent.sinks.hbase-sink.channel = file-channel
配置文件详解
  • 源(Source)
代码语言:txt
复制
agent.sources: 定义 Flume 代理的源,这里设置为 `logfile-source`。

agent.sources.logfile-source.type: 源的类型为 `exec`,表示它将执行一个命令来获取数据。

agent.sources.logfile-source.command: 指定要执行的命令,`tail -f /data/flume-hbase-test/mkhbasetable/data/test.log` 用来实时跟踪 `test.log` 文件,输出文件中新增的内容。

agent.sources.logfile-source.checkerpiodic: 这是一个拼写错误,应为 `checker.period`,它的作用是设定源的检查时长(单位为毫秒),决定多长时间检查源是否有新数据,这里设置为每 50 毫秒检查一次。

agent.sources.logfile-source.channels: 指定源与通道之间的连接,这里指向 `file-channel`,表示从这个源读取的数据会被发送到这个通道。
  • 通道(Channel)
代码语言:txt
复制
agent.channels: 定义 Flume 代理的通道,这里设置为 `file-channel`。

agent.channels.file-channel.type: 通道的类型为 `file`,表示使用文件系统存储数据。

agent.channels.file-channel.checkpointDir: 这是通道的检查点目录,用于存储通道的状态,以便在失败后恢复。

agent.channels.file-channel.dataDirs: 这是通道的数据存储目录,通道会把数据暂时存储在这里,直到 Sink 提取数据。
  • 接收器(Sink)
代码语言:txt
复制
agent.sinks: 定义 Flume 代理的接收器,这里设置为 `hbase-sink`。

agent.sinks.hbase-sink.type: 接收器的类型是 HBaseSink,意味着这个接收器将数据写入 HBase 数据库。

agent.sinks.hbase-sink.table: 指定要写入的 HBase 表名,这里是 `mikeal-hbase-table-test1`。

agent.sinks.hbase-sink.columnFamily: 指定 HBase 表的列族名,这里为 `familycloml`。需要确认该列族是否已经存在于 HBase 表中。

agent.sinks.hbase-sink.serializer: 指定如何序列化数据的类,这里使用了简单的 HBase 事件序列化器 `SimpleHbaseEventSerializer`。

agent.sinks.hbase-sink.serializer.payloadcolumn: 指定数据中应保存到 HBase 的列名,这里设置为 `column-1`,这个值会对应于 HBase 表的某一列。

agent.sinks.hbase-sink.channel: 指定这个接收器使用的通道,这里指向 `file-channel`,表示它从这个通道中读取数据。

创建相关路径和文件

代码语言:shell
复制
# 创建日志和数据存放路径
mkdir -p /data/flume-hbase-test/mkhbasetable/data/

# 创建通道检查点路径和通道数据暂存路径
mkdir -p /data/flume-hbase-test/data
mkdir -p /data/flume-hbase-test/checkpoint

# 创建空日志文件
cd /data/flume-hbase-test/mkhbasetable/data/
touch test.log

# 编辑模拟数据文件
vim data-test1.txt

134.3
726.9
262.0
902.8
665.8
153.2
618.3
333.4
985.7
201.2
970.3
234.8

hbase创建相关表

代码语言:shell
复制
# 依次启动Hadoop,zookeeper,hbase所有进程
allstart.sh

# 启动hbase shell
hbase shell

# 创建mikeal-hbase-table-test1表
create 'mikeal-hbase-table-test1', 'familycloml'

编写启动脚本

  • 该脚本方便地启动 Flume 任务而不需要手动输入所有命令,也可以确保 Flume 进程在后台持续运行,适合在生产环境中使用。
代码语言:shell
复制
# 切换到脚本路径
cd /opt/module/flume/job-shell

# 编辑启动脚本
vim test-flume-into-hbase-1

#!/bin/bash

echo " --------启动 master 采集日志数据至Hbase 测试1--------"
nohup /opt/module/flume/bin/flume-ng agent -n agent -c /opt/module/flume/conf/ -f /opt/module/flume/conf/job/test-flume-into-hbase-1.conf >/dev/null 2>&1 &


# 添加权限
chmod 777 ./*

启动流程

代码语言:shell
复制
# 切换到脚本启动路径下
cd /opt/module/flume/job-shell

# 启动flume采集脚本
test-flume-into-hbase-1

# 向日志文件添加数据
/data/flume-hbase-test/mkhbasetable/data
cat data-test1.txt >> test.log 

检测结果

实例二

编写配置文件

代码语言:shell
复制
agent.sources = logfile-source
agent.channels = file-channel
agent.sinks = hbase-sink

agent.sources.logfile-source.type = exec
agent.sources.logfile-source.command = tail -f /data/flume-hbase-test/mkhbasetable/data/test.log
agent.sources.logfile-source.checkPeriodic = 50

agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data

agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.AsyncHBaseSink
agent.sinks.hbase-sink.table = mikeal-hbase-table-test2
agent.sinks.hbase-sink.columnFamily = familycloml
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
agent.sinks.hbase-sink.serializer.payloadColumn = column-1

agent.sources.logfile-source.channels = file-channel
agent.sinks.hbase-sink.channel = file-channel

配置文件详解

代码语言:txt
复制
实例二使用SimpleAsyncHbaseEventSerializer序列化模式来采集数据,其他模式基本和实例一一致

hbase创建相关表

代码语言:shell
复制
create 'mikeal-hbase-table-test2', 'familycloml'

编写启动脚本

代码语言:shell
复制
vim test-flume-into-hbase-2

#!/bin/bash

echo " --------启动 master 采集日志数据至Hbase 测试2--------"
nohup /opt/module/flume/bin/flume-ng agent -n agent -c /opt/module/flume/conf/ -f /opt/module/flume/conf/job/test-flume-into-hbase-2.conf >/dev/null 2>&1 &


# 添加权限
chmod 777 ./*

启动流程

代码语言:shell
复制
# 切换到脚本启动路径下
cd /opt/module/flume/job-shell

# 启动flume采集脚本
test-flume-into-hbase-2

# 向日志文件添加数据
cd /data/flume-hbase-test/mkhbasetable/data
cat data-test1.txt >> test.log 

检测结果

实例三

编写配置文件

代码语言:shell
复制
agent.sources = logfile-source
agent.channels = file-channel
agent.sinks = hbase-sink

agent.sources.logfile-source.type = exec
agent.sources.logfile-source.command = tail -f /data/flume-hbase-test/mkhbasetable/data/nginx.log
agent.sources.logfile-source.checkperiodic = 50

agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data

agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink.table = mikeal-hbase-table-test3
agent.sinks.hbase-sink.columnFamily = familycloml
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

agent.sinks.hbase-sink.serializer.regex = (\\d+\\.\\d+\\.\\d+\\.\\d+) \\[(.*?)\\] \\w+ (.+)
agent.sinks.hbase-sink.serializer.colNames = ip, time, url

agent.sources.logfile-source.channels = file-channel
agent.sinks.hbase-sink.channel = file-channel

配置文件详解

代码语言:txt
复制
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer: 指定使用正则表达式序列化器将事件序列化为 HBase 可接受的格式。

agent.sinks.hbase-sink.serializer.regex =(\\d+\\.\\d+\\.\\d+\\.\\d+) \\[(.*?)\\] \\w+ (.+): 定义正则表达式,将相关数据提取

agent.sinks.hbase-sink.serializer.colNames = ip, time, url: 指定要提取的列名,即将从日志中解析出的数据对应到 HBase 表的列。这里是 ip、time 和 url。

hbase创建相关表

代码语言:shell
复制
create 'mikeal-hbase-table-test3', 'familycloml'

编辑日志文件所需要测试的数据

代码语言:txt
复制
vim nginx-data.txt

192.168.1.1 [27/Sep/2024:10:28:00 -0400] GET /path/to/resource?param=value HTTP/1.1
10.0.0.2 [27/Sep/2024:10:32:31 -0400] POST /api/v1/data HTTP/1.1
172.16.0.3 [27/Sep/2024:10:34:45 -0400] DELETE /api/v2/resource HTTP/1.1

编写启动脚本

代码语言:shell
复制
cd /data/flume-hbase-test/mkhbasetable/data

vim test-flume-into-hbase-3

#!/bin/bash

echo " --------启动 master 采集日志数据至Hbase 测试3--------"
nohup /opt/module/flume/bin/flume-ng agent -n agent -c /opt/module/flume/conf/ -f /opt/module/flume/conf/job/test-flume-into-hbase-3.conf >/dev/null 2>&1 &

# 添加权限
chmod 777 ./*

启动流程

代码语言:shell
复制
# 切换到脚本启动路径下
cd /opt/module/flume/job-shell

# 启动flume采集脚本
test-flume-into-hbase-3

# 向日志文件添加数据
cd /data/flume-hbase-test/mkhbasetable/data
cat nginx-data.txt >> nginx.log 

检测结果

实例四

编写配置文件

代码语言:shell
复制
agent.sources = logfile-source-1, logfile-source-2
agent.channels = file-channel-1, file-channel-2
agent.sinks = hbase-sink-1, hbase-sink-2

agent.sources.logfile-source-1.type = exec
agent.sources.logfile-source-1.command = tail -f /data/flume-hbase-test/mkhbasetable/data/nginx.log
agent.sources.logfile-source-1.checkperiodic = 50

agent.sources.logfile-source-2.type = exec
agent.sources.logfile-source-2.command = tail -f /data/flume-hbase-test/mkhbasetable/data/tomcat.log
agent.sources.logfile-source-2.checkperiodic = 50

agent.channels.file-channel-1.type = file
agent.channels.file-channel-1.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel-1.dataDirs = /data/flume-hbase-test/data

agent.channels.file-channel-2.type = file
agent.channels.file-channel-2.checkpointDir = /data/flume-hbase-test/checkpoint2
agent.channels.file-channel-2.dataDirs = /data/flume-hbase-test/data2

agent.sinks.hbase-sink-1.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink-1.table = mikeal-hbase-table-test-multi-position
agent.sinks.hbase-sink-1.columnFamily = familycloml1
agent.sinks.hbase-sink-1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

agent.sinks.hbase-sink-1.serializer.regex = (\\d+\\.\\d+\\.\\d+\\.\\d+) \\[(.*?)\\] \\w+ (.+) 
agent.sinks.hbase-sink-1.serializer.colNames = ip, time, url

agent.sinks.hbase-sink-2.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink-2.table = mikeal-hbase-table-test-multi-position
agent.sinks.hbase-sink-2.columnFamily = familycloml2
agent.sinks.hbase-sink-2.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

agent.sinks.hbase-sink-2.serializer.regex = (\\d+\\.\\d+\\.\\d+\\.\\d+) \\[(.*?)\\] \\w+ (.+)
agent.sinks.hbase-sink-2.serializer.colNames = ip, time, url

agent.sources.logfile-source-1.channels = file-channel-1
agent.sinks.hbase-sink-1.channel = file-channel-1

agent.sources.logfile-source-2.channels = file-channel-2
agent.sinks.hbase-sink-2.channel = file-channel-2

配置文件详解

代码语言:txt
复制
配置在前面三个实例的基础上加上双通道模式

创建相关路径

代码语言:shell
复制
# 创建通道检查点路径和通道数据暂存路径
mkdir -p /data/flume-hbase-test/checkpoint2
mkdir -p /data/flume-hbase-test/data2

hbase创建相关表

代码语言:shell
复制
create 'mikeal-hbase-table-test-multi-position', 'familycloml1', 'familycloml2'

编写启动脚本

代码语言:shell
复制
cd /data/flume-hbase-test/mkhbasetable/data

vim test-flume-into-hbase-multi-position 


#!/bin/bash

echo " --------启动 master 采集日志数据至Hbase 测试4--------"
nohup /opt/module/flume/bin/flume-ng agent -n agent -c /opt/module/flume/conf/ -f /opt/module/flume/conf/job/test-flume-into-hbase-multi-position.conf >/dev/null 2>&1 &


# 添加权限
chmod 777 ./*

启动流程

代码语言:shell
复制
# 切换到脚本启动路径下
cd /opt/module/flume/job-shell

# 启动flume采集脚本
test-flume-into-hbase-multi-position 

# 向日志文件添加数据
cd /data/flume-hbase-test/mkhbasetable/data
cat nginx-data.txt >> nginx.log && cat nginx-data.txt >> tomcat.log

检测结果

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 实例一
    • 编写配置文件
      • 配置文件详解
    • 创建相关路径和文件
      • hbase创建相关表
        • 编写启动脚本
          • 启动流程
          • 检测结果
          • 实例二
            • 编写配置文件
              • 配置文件详解
                • hbase创建相关表
                  • 编写启动脚本
                    • 启动流程
                      • 检测结果
                      • 实例三
                        • 编写配置文件
                          • 配置文件详解
                            • hbase创建相关表
                              • 编辑日志文件所需要测试的数据
                                • 编写启动脚本
                                  • 启动流程
                                    • 检测结果
                                    • 实例四
                                      • 编写配置文件
                                        • 配置文件详解
                                          • 创建相关路径
                                            • hbase创建相关表
                                              • 编写启动脚本
                                                • 启动流程
                                                  • 检测结果
                                                  相关产品与服务
                                                  领券
                                                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档