首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

LogStash采集数据到kafka

【背景】

当前大数据日志采集框架中,最火的莫过于 Flume 和 LogStash。

但 Flume 存在缺陷,

假如 Flume 如果宕机,重新启动后,

再次采集的数据可能存在数据丢失或者数据重复采集的问题。

如果要解决这个问题,需要自己去想办法维护偏移量。

有没有更简便的解决方案呢?

当然有,就是使用 LogStash。

【简介】

LogStash 是当前比较火的 ELK 技术栈成员之一,

它能够保证数据不重复消费,不丢失。

ELK:ElasticSearch,Logstash,Kibana

Logstash 有三个组件:

input:读取数据源,相当于 flume 的 source;

filter:可以过滤整理数据;

output:将数据写入到某种存储介质中。

【实战】

下载 Logstash

https://www.elastic.co/downloads/logstash

这里下载的是 2.3.1 版本:logstash-2.3.1.tar.gz

如今共有三个节点,其主机名和ip为:

mini5 192.168.87.155

mini6 192.168.87.156

mini7 192.168.87.157

将 Logstash 安装包上传到 mini5,解压:

[hadoop@mini5 ~]$ tar -zxvf logstash-2.3.1.tar.gz -C apps/

配置文件:

我们要采集文件日志写入到kafka。

[hadoop@mini5 ~]$ cd apps/logstash-2.3.1/

[hadoop@mini5 logstash-2.3.1]$ vi flow-kafka.conf

input {

file {

path => "/var/nginx_logs/*.log"

discover_interval => 5

start_position => "beginning"

}

}

output {

kafka {

topic_id => "accesslog"

codec => plain {

format => "%"

charset => "UTF-8"

}

bootstrap_servers => "mini5:9092,mini6:9092,mini7:9092"

}

}

注1:

path => "/var/nginx_logs/*.log",

该采集文件路径为专题5埋点采集日志的文件,

注2:

bootstrap_servers => "mini5:9092,mini6:9092,mini7:9092",

该配置为配置 kafka 的 broker 地址,关于 kafka 具体可以参见 0#2:

注3:

topic_id 为将要输入到 kafka 的 topic,此处设为 accesslog,

则在 kafka 集群中要先创建相应的 topic。

启动 ZooKeeper:

[hadoop@mini5 ~]$ startzk.sh

startzk.sh 脚本内容:

#/bin/sh

echo "start zkServer..."

for i in 5 6 7

do

ssh mini$i "source /etc/profile;/home/hadoop/apps/zookeeper-3.4.6/bin/zkServer.sh start"

done

启动 Kafka

[hadoop@mini5 ~]$ startKafkaServers.sh

startKafkaServers.sh 脚本内容:

#/bin/sh

KAFKA_HOME=/export/servers/kafka

KAFKA_LOG=/export/servers/log

echo "start kafkaServers..."

for i in 5 6 7

do

ssh mini$i "source /etc/profile;cd $KAFKA_HOME;bin/kafka-server-start.sh config/server.properties >> $KAFKA_LOG/kafka.log 2>&1 &"

echo "mini$i kafkaServer has started..."

done

创建主题

[hadoop@mini6 ~]$ cd /export/servers/kafka

[hadoop@mini6 kafka]$ bin/kafka-topics.sh --create --zookeeper mini5:2181,mini6:2181,mini7:2181 --replication-factor 3 --partitions 3 --topic accesslog

查看主题是否已创建

[hadoop@mini6 kafka]$ bin/kafka-topics.sh --list --zookeeper mini5

启动 Logstash

[hadoop@mini5 logstash-2.3.1]$ bin/logstash agent -f flow-kafka.conf

监视 mini5 收集到的日志

[hadoop@mini5 ~]$ cd /var/nginx_logs/

[hadoop@mini5 nginx_logs]$ tail -F track.log

打开 kafka 消费者客户端查看日志

[hadoop@mini6 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server mini5:9092 --topic accesslog --from-beginning

浏览相关网页

http://mini6

点击

1.html

发现监视的文件 track.log 增加了新的记录

同时kafka 客户端也消费到新的数据了

说明 Logstash 采集数据到 kafka 成功!

———————————分割线———————————

阅后来撩啊:

未关注的,文章开头右上角,请亲戳"择码记"关注哦~

或者微信搜索公众号“择码记”进行关注~

觉得文章不错,请亲点赞哦~

觉得可能对别人有帮助,请亲分享哦~

觉得为您共享了价值,请壕打赏哦~

版本所有:Gavin Hawk

谢谢亲支持哟!

另:

感谢您的阅读~

择码记将持续发布高质量的文章,诚意满满,干货满满,以期对您有所帮助!

若您要查看章节,请直接输入“1”,即可跳转到第一章。以此类推。

有些章节分上下两篇,如第三章有“3_1”和"3_2",

则输入“3_1”即可出来上篇,以此类推。

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20171213G063G200?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券