单机upd的flume source的配置,100+M/s数据量,10w qps flume就开始大量丢包,因此很多公司在搭建系统时,抛弃了Flume,自己研发传输系统,但是往往会参考Flume的Source-Channel-Sink模式。
一些公司在Flume工作过程中,会对业务日志进行监控,例如Flume agent中有多少条日志,Flume到Kafka后有多少条日志等等,如果数据丢失保持在1%左右是没有问题的,当数据丢失达到5%左右时就必须采取相应措施。
采集层主要可以使用Flume、Kafka两种技术。
Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API。
Kafka:Kafka是一个可持久化的分布式的消息队列。
Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。
如果需要向HDFS写入数据,Flume需要安装在Hadoop集群上,否则会找不到HDFS文件系统。
Flume可以使用拦截器实时处理数据。这些对数据屏蔽或者过量是很有用的。Kafka需要外部的流处理系统才能做到。
Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。于是,如果Flume代理的一个节点奔溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果需要一个高可靠行的管道,那么使用Kafka是个更好的选择。
Flume和Kafka可以很好地结合起来使用。如果你的设计需要从Kafka到Hadoop的流数据,使用Flume代理并配置Kafka的Source读取数据也是可行的:你没有必要实现自己的消费者。你可以直接利用Flume与HDFS及HBase的结合的所有好处。你可以使用Cloudera Manager对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。
日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。
可选择TaildirSource和KafkaChannel,并配置日志校验拦截器
TailDirSource相比ExecSource、SpoolingDirectorySource的优势:
TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。(Apache1.7、CDH1.6版本开始存在)
taildir挂了不会丢数(断点续传),但是有可能数据重复,生产环境通常不处理重复数据,出现重复的概率比较低。处理会影响传输效率。可以在下一级处理(hive dwd sparkstreaming flink布隆)、去重手段(groupby、开窗取窗口第一条、redis;如果需要在Flume处理则可以在taildirsource里面增加自定义事务。
taildir source不支持递归遍历文件夹读取文件。
ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
SpoolingDirectorySource监控目录,支持断点续传。
采用Kafka Channel,可以省去Sink,提高效率。
日志采集Flume关键配置如下:
优点:Nginx的日志格式是固定的,但是缺少sessionid,通过logger4j采集的日志是带有sessionid的,而session可以通过redis共享,保证了集群日志中的同一session落到不同的tomcat时,sessionId还是一样的,而且logger4j的方式比较稳定,不会宕机。
缺点:不够灵活,logger4j的方式和项目结合过于紧密,而flume的方式比较灵活,拔插式比较好,不会影响项目性能。
Flume采集日志是通过流的方式直接将日志收集到存储层,而kafka是将缓存在kafka集群,待后期可以采集到存储层。
Flume采集中间停了,可以采用文件的方式记录之前的日志,而kafka是采用offset的方式记录之前的日志。
1)source:用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel,这个有点类似于Java IO部分的Channel。
Source输入端类型有Avro、Thrift、exec、netcat等,企业中最常用的还是采集日志文件。
2)channel:用于桥接Sources和Sinks,类似于一个队列。
① Channel 被设计为 Event 中转临时缓冲区,存储 Source 收集并且没有被Sink 读取的 Event,为平衡 Source 收集和 Sink 读取的速度,可视为 Flume内部的消息队列。
② Channel 线程安全并且具有事务性,⽀持 Source 写失败写,和 Sink 读失败重复读的操作。常⻅的类型包括 Memory Channel, File Channel,Kafka Channel。
3)sink:从Channel收集数据,将数据写到目标源(可以是下一个Source,也可以是HDFS或者HBase)。
Source到Channel是Put事务,Channel到Sink是Take事务
(1)File Channel
数据存储于磁盘,优势:可靠性高;劣势:传输速度低
默认容量:100万event
注意:FileChannel可以通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
(2)Memory Channel
数据存储于内存,优势:传输速度快;劣势:可靠性差
默认容量:100个event
(3)Kafka Channel
数据存储于Kafka,基于磁盘;
优势:可靠性高;
传输速度快 Kafka Channel 大于Memory Channel + Kafka Sink 原因省去了Sink阶段
(4)Kafka Channel哪个版本产生的?
Flume1.6 版本产生=》并没有火;因为有bug:event(header body ) ture 和false 控制是否包含header信息,很遗憾,都不起作用。增加了额外清洗的工作量。Flume1.7解决了这个问题,开始火了。
(5)生产环境如何选择
如果下一级是Kafka,优先选择Kafka Channel
如果是金融、对钱要求准确的公司,选择File Channel
如果就是普通的日志,通常可以选择Memory Channel
时间(半个小时) – hdfs.rollInterval=1800
大小128m – hdfs.rollSize=134217728
event个数(0禁止)-- hdfs.rollCount =0
(1)ETL拦截器:主要是用来判断json是否完整。没有做复杂的清洗操作主要是防止过多的降低传输速率。
(2)时间戳拦截器:主要是解决零点漂移问题
Source 将 Event 写⼊到 Channel 之前可以使⽤拦截器对 Event 进⾏各种形式的处理, Source 和 Channel 之间可以有多个拦截器,不同拦截器使⽤不同的规则处理 Event,包括时间、主机、 UUID、正则表达式等多种形式的拦截器。
自定义拦截器步骤:
(1)实现 Interceptor
(2)重写四个方法
initialize 初始化
public Event intercept(Event event) 处理单个Event
public List intercept(List events) 处理多个Event,在这个方法中调用Event intercept(Event event)
close方法
(3)静态内部类,实现Interceptor.Builder
ETL拦截器可以不用;需要在下一级Hive的dwd层和SparkSteaming里面处理,时间戳拦截器建议使用。 如果不用需要采用延迟15-20分钟处理数据的方式,比较麻烦。
Source 发送的 Event 通过 Channel 选择器来选择以哪种⽅式写⼊到 Channel中, Flume 提供三种类型 Channel 选择器,分别是复制、复⽤和⾃定义选择器。
1. 复制选择器(Replicating:默认选择器): ⼀个 Source 以复制的⽅式将⼀个 Event 同时写⼊到多个Channel 中,不同的 Sink 可以从不同的 Channel 中获取相同的 Event,⽐如⼀份⽇志数据同时写 Kafka 和 HDFS,⼀个 Event 同时写⼊两个Channel,然后不同类型的 Sink 发送到不同的外部存储。(将数据发往下一级所有通道)
2. 复⽤选择器(Multiplexing): 需要和拦截器配合使⽤,根据 Event 的头信息中不同键值数据来判断 Event 应该写⼊哪个 Channel 中。(选择性发往指定通道)
1)采用Ganglia监控器,监控到Flume尝试提交的次数远远大于最终成功的次数,说明Flume运行比较差。主要是内存不够导致的。
2)解决办法?
(1)自身:flume默认内存2000m。考虑增加flume内存,在flume-env.sh配置文件中修改flume内存为 4-6g
-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
(2)找朋友:增加服务器台数
搞活动 618 =》增加服务器=》用完在退出
日志服务器配置:8-16g内存、磁盘8T
⽬的是为了提⾼整个系统的容错能⼒和稳定性。简单配置就可以轻松实现,⾸先需要设置 Sink 组,同⼀个 Sink 组内有多个⼦ Sink,不同 Sink 之间可以配置成负载均衡或者故障转移。