前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >项目三 Flume 采集日志数据至 hdfs

项目三 Flume 采集日志数据至 hdfs

原创
作者头像
弟大翻着洗
修改2024-09-28 13:16:12
360
修改2024-09-28 13:16:12
举报
文章被收录于专栏:人邮电数据采集与预处理

简介

  • Flume 是一个用于收集、聚合和传输大量日志数据的分布式系统。它通常与 Hadoop 生态系统中的 HDFS(Hadoop Distributed File System)结合使用,能够将数据存储到 HDFS 中。
  • 通过以下配置,Flume 能够高效、实时地将日志数据从本地目录采集并存储到 HDFS 中,便于后续的数据分析和处理。
  • Flume 的配置文件通常包含以下几个部分:sources(数据来源)、sinks(数据去向)、channels(数据通道)。

工作流启动

  • 先在/opt/module/flume/conf/job目录下创建一个flume采集数据至hdfs的配置文件
代码语言:shell
复制
# 切换到job目录
cd /opt/module/flume/conf/job

# 编辑配置文件
vim hdfs.conf

hdfsAgent.sources = hdfsSource
hdfsAgent.sinks = hdfsSinks
hdfsAgent.channels = hdfsChannel

hdfsAgent.sources.hdfsSource.type = spooldir
hdfsAgent.sources.hdfsSource.channels = hdfsChannel
hdfsAgent.sources.hdfsSource.spoolDir = /opt/module/flume/conf/data/hdfs
hdfsAgent.sources.hdfsSource.fileHeader = true

hdfsAgent.sinks.hdfsSinks.type = hdfs
hdfsAgent.sinks.hdfsSinks.hdfs.path = hdfs://master:8020/flume/events/%y-%m-%d/%H%M/%S
hdfsAgent.sinks.hdfsSinks.hdfs.filePrefix = events
hdfsAgent.sinks.hdfsSinks.hdfs.fileSuffix = log

hdfsAgent.sinks.hdfsSinks.hdfs.round = true
hdfsAgent.sinks.hdfsSinks.hdfs.roundValue = 10
hdfsAgent.sinks.hdfsSinks.hdfs.roundUnit = minute

hdfsAgent.sinks.hdfsSinks.hdfs.minBlockReplicas = 1
hdfsAgent.sinks.hdfsSinks.hdfs.rollInterval = 0
hdfsAgent.sinks.hdfsSinks.hdfs.rollSize = 134217728
hdfsAgent.sinks.hdfsSinks.hdfs.rollCount = 0
hdfsAgent.sinks.hdfsSinks.hdfs.idleTimeout = 60

hdfsAgent.sinks.hdfsSinks.hdfs.fileType = DataStream
hdfsAgent.sinks.hdfsSinks.hdfs.useLocalTimeStamp = true

hdfsAgent.channels.hdfsChannel.type = memory
hdfsAgent.channels.hdfsChannel.capacity = 1000
hdfsAgent.channels.hdfsChannel.transactionCapacity = 100

hdfsAgent.sources.hdfsSource.channels = hdfsChannel
hdfsAgent.sinks.hdfsSinks.channel = hdfsChannel

配置文件参数详解

  • 数据源(Sources)
代码语言:txt
复制
hdfsAgent.sources = hdfsSource
这里定义了一个名为 hdfsSource 的数据源。

hdfsAgent.sources.hdfsSource.type = spooldir
数据源的类型是 spooldir,意味着 Flume 将从一个指定的目录中监听并收集文件。

hdfsAgent.sources.hdfsSource.channels = hdfsChannel
该数据源将使用名为 hdfsChannel 的通道来传输数据。

hdfsAgent.sources.hdfsSource.spoolDir = /opt/module/flume/conf/data/hdfs
这是 Flume 监听的目录路径,它会查看这个目录中新增加的文件。

hdfsAgent.sources.hdfsSource.fileHeader = true
这表示 Flume 会在采集的文件中包含文件头信息,通常用于记录元数据。
  • 数据去向(Sinks)
代码语言:txt
复制
hdfsAgent.sinks = hdfsSinks
这里定义了一个名为 hdfsSinks 的数据去向。

hdfsAgent.sinks.hdfsSinks.type = hdfs
数据去向的类型是 HDFS,表示数据将被写入到 HDFS 中。

hdfsAgent.sinks.hdfsSinks.hdfs.path = hdfs://master:8020/flume/events/%y-%m-%d/%H%M/%S
这是数据存储在 HDFS 中的路径格式,包括日期和时间。Flume 会根据采集时间自动在这个路径中创建目录。

hdfsAgent.sinks.hdfsSinks.hdfs.filePrefix = events
输出文件的前缀是 “events”。

hdfsAgent.sinks.hdfsSinks.hdfs.fileSuffix = log
输出文件的后缀是 “.log”。

hdfsAgent.sinks.hdfsSinks.hdfs.round = true
表示启用滚动机制,根据时间将文件进行分割。

hdfsAgent.sinks.hdfsSinks.hdfs.roundValue = 10
每10分钟触发一次数据滚动。

hdfsAgent.sinks.hdfsSinks.hdfs.roundUnit = minute
这里指定了时间单位为分钟。

hdfsAgent.sinks.hdfsSinks.hdfs.minBlockReplicas = 1
最小副本数为 1,表示写入 HDFS 时会有一个数据副本。

hdfsAgent.sinks.hdfsSinks.hdfs.rollInterval = 0
设置为0表示按照时间不会强制滚动,主要通过 round 机制来滚动文件。

hdfsAgent.sinks.hdfsSinks.hdfs.rollSize = 134217728
文件大小达到 128MB 时会进行滚动。

hdfsAgent.sinks.hdfsSinks.hdfs.rollCount = 0
设置为0表示不限制记录数的滚动。

hdfsAgent.sinks.hdfsSinks.hdfs.idleTimeout = 60
如果没有数据流入,经过60秒后文件将被关闭。

hdfsAgent.sinks.hdfsSinks.hdfs.fileType = DataStream
此设置指定文件类型为数据流,意味着数据将以流的形式写入。

hdfsAgent.sinks.hdfsSinks.hdfs.useLocalTimeStamp = true
启用本地时间戳作为文件的时间戳。
  • 数据通道(Channels)
代码语言:txt
复制
hdfsAgent.channels.hdfsChannel.type = memory
通道类型为内存通道,意味着数据在内存中进行传输。

hdfsAgent.channels.hdfsChannel.capacity = 1000
内存通道能容纳最多1000条事件。

hdfsAgent.channels.hdfsChannel.transactionCapacity = 100
每次事务中最多处理100条事件。

创建相关目录

代码语言:shell
复制
# hdfs上创建/flume/events目录
hadoop fs -mkdir -p /flume/events
# 添加权限
hadoop fs -chmod 777 -R /flume/*

# 创建日志文件路径
mkdir -p /opt/module/flume/conf/data/hdfs

模拟日志生成脚本

  • 这个脚本的作用是生成模拟的日志文件,并将其放入指定的目录中,以便于用作数据测试或进行数据采集
  • 创建一个用于存放日志文件的目录。
  • 生成 5 个模拟的日志文件,每个文件中都包含一条带有时间戳的日志记录。
  • 每个日志文件的生成会有 1 秒的间隔,模拟实际的日志生成过程。
代码语言:shell
复制
# 创建存放脚本的目录
mkdir -p /opt/module/flume/job-shell
# 切换到该目录下
/opt/module/flume/job-shell

# 编辑脚本
vim logData_To_Hdfs

#!/bin/bash

# 设置保存路径
SPOOL_DIR="/opt/module/flume/conf/data/hdfs"

# 创建 spoolDir 路径(如不存在)
mkdir -p "${SPOOL_DIR}"

# 生成 5 个随机日志文件
for i in {1..5}; do
    LOG_FILE="${SPOOL_DIR}/logfile_${i}.log"
    echo "INFO: This is a simulated log entry $(date '+%Y-%m-%d %H:%M:%S')" > "${LOG_FILE
    echo "Generated ${LOG_FILE}"
    sleep 1 # 模拟生成的间隔
done


# 添加权限
chmod 777 ./*

flume启动脚本

  • 该脚本方便地启动 Flume 任务而不需要手动输入所有命令,也可以确保 Flume 进程在后台持续运行,适合在生产环境中使用。
代码语言:shell
复制
cd /opt/module/flume/job-shell

vim hdfs

#!/bin/bash

echo " --------启动 master 采集日志数据至HDFS --------"
nohup /opt/module/flume/bin/flume-ng agent -n hdfsAgent -c /opt/module/flume/conf/ -f /opt/module/flume/conf/job/hdfs.conf >/dev/null 2>&1 &

# 添加权限
chmod 777 ./*

启动流程

代码语言:shell
复制
# 先启动Hadoop所有进程
allstart.sh

# 切换到脚本启动路径下
cd /opt/module/flume/job-shell

# 启动flume采集脚本
hdfs

# 启动日志文件生成脚本
logData_To_Hdfs
  • 启动flume采集脚本
  • 启动日志文件生成脚本
  • 查看其中一个日志文件内容

检测结果

  • 命令查看文件采集结果hadoop fs -ls -R /flume
  • 文件系统查看结果

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 工作流启动
    • 配置文件参数详解
    • 创建相关目录
    • 模拟日志生成脚本
    • flume启动脚本
    • 启动流程
    • 检测结果
    相关产品与服务
    数据保险箱
    数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档