目前市面上流行的日志解决方案主要为ELK方案,但随着使用过程中,日志不断增多,日志消费和存储阶段存在性能问题,导致写入延迟,kaibana使用上体验不优。
另外,如果日志不太规范,打开很多debug日志,环境如果也挺多的情况下,成本也居高不下,针对es存储来说费用上也是一个不小的数目。
针对上面的两个问题,通过植入自定义logstash filter插件,删除掉不需要的debug日志和其他异常日志,可以有效地缓解日志过多的问题,本文重点不讨论解决方案,主要描述自定义filter插件的使用方法,请看下文所述。
Logstash是一个具有实时管线能力的开源数据收集引擎。在ELK Stack技术栈中,通常选择更轻量级的Filebeat搜集日志,然后将日志输出到Logstash中进行加工处理,解析切割日志,再将处理后的日志输出到指定的目标(Elasticsearch、Kafka等)当中。
Logstash事件的处理管线是inputs->filters->outputs, 这三个阶段都可以自定义插件,下面主要介绍如何开发自定义需求最多的filter插件。
官方下载连接地址:https://www.elastic.co/cn/downloads/logstash
cd倒logstash根目录,使用bin/logstash-plugin生成filter插件模板,如下所示:
bin/logstash-plugin generate --type filter --name debug-drop --path vendor/localgems
其中vendor/localgems 可修改为其他路径。
查看filter插件的目录结构,如下:
查看lib/logstash/filters/debug-drop.rb 文件
Logstash依赖UTF-8编码,需要在插件代码开始处添加:
# encoding: utf-8
模板代码里面默认require
了"logstash/filters/base", 如果需要依赖其他代码或者gems就在这里添加。
插件名称配置
config_name "debug-drop"
#debug-drop 就是插件名称, 在Logstash配置的filter块中使用。
插件名称配置
config: message, :validate => :string, :default => "Hello World!"
# message 是插件test的可选参数,默认值是“Hello World!”
下面是参数的通用配置代码:
config: :variable_name, :validate => :variable_type, :default => "Default Value" :required => boolean, :deprecated => boolean, :obsolete => string
参数说明:
: varable_name: 参数名称
: validate: 验证参数类型,如:string,:password, :boolean, :number, :array,等
: required: 是否必须配置
: default: 默认值
: deprecated: 是否废弃
: obsolete: 声明该配置不再使用,通常提供升级方案
结合本文说明,定义几个hash变量
$debugStartTimeHash = Hash.new("debugStartTimeHash")
$debugEndTimeHash = Hash.new("debugEndTimeHash")
$debugNumHash = Hash.new("debugNumHash")
结合本文说明,定义几个默认参数
config :dropNum, :validate => :number, :default => 100
config :notDropNum, :validate => :number, :default => 200
config :dropTime, :validate => :number, :default => 3600
config :notDropTime, :validate => :number, :default => 7200
插件方法 Logstash插件必须实现两个方法:register
和filter
register
方法代码如下:
public
def register
@logger = self.logger
end
register
方法相当于初始化方法,不需要手动调用, 可以在这个方法里面调用配置变量,如@message,也可以初始化自己的实例变量。
filter
方法代码如下:
public
def filter(event)
if @message
event.set("message", @message)
end
msg = event.get("[event][original]")
service = event.get("[fields][document_type]")
puts event.get("[event][original]")
if msg.include?'DEBUG'
if $debugNumHash[service] == "debugNumHash"
$debugNumHash = {
service => 1
}
else
debugNums = $debugNumHash[service]
triggerTimes = Time.now.straftime("%Y-%m-%d %H:%M:%S")
$debugNumHash = {
service => debugNums + 1
}
end
@logger.info("trigger time: #{tiggerTimes}, nums: #{debugsNUms}")
if $debugStartTimeHash[service] == "debugStartTimeHash"
$debugStartTimeHash = {
service => Time.now.to_i
}
end
if $debugEndTimeHash[service] == "debugEndTimeHash"
$debugStartTimeHash = {
service => Time.now.to_i
}
end
if $debugEndTimeHash[service] == "debugEndTimeHash"
$debugEndTimeHash = {
service => 0
}
else
startTimes = $debugStartTimeHash[service]
endTime = Time.now.to_i
tilTimes = endTime - startTimes
endTimes = Time.now.strftime("%Y-%m-%d %H:%M:%S")
$debugEndTimeHash = {
service => tilTimes
}
end
@logger.info("trigger time: #{endTimes}, times: #{tilTimes}s")
end
$debugNumHash.each do |key, value|
if value > @dropNum
@logger.info("#{key} => Debug Log is Droped By over define DropNum...")
event.cancel()
elseif value > @notDropNum
@debugNumHash[service] = 0
end
end
$debugEndTimeHash.each do |key, value|
if value > @dropTime
endTimes = Time.now.strftime("%Y-%m-%d %H:%M:%")
@logger.info("#{key} => Debug Logs is Drop By over define DropTime...")
event.cancel()
elseif value > @notDropTime
$debugEndTimeHash = 0
end
end
filter_matched(event)
end
filter 方法是插件的数据处理逻辑,其中event变量封装了数据流,可以通过接口访问event中的内容,具体参考https://www.elastic.co/guide/en/logstash/8.2/event-api.html。最后一句调用了filter_matched, 这个方法用于保证Logstash的配置add——failed,remove_filed,add_tag和remove_tag会被正确执行。
filter逻辑说明
先定义全局Hash 获取msg内容,匹配debug,按照条数或者时间,创建service => value存入Hash中 根据定义的参数,数据和时间, event.cancel(), 当条数或者时间达到预定值,重置
cd到Logstash根目录下,在Gemfile添加如下配置:
gem "logstash-filter-debug-drop",:path => "vendor/localgems/logstash-filter-debug-drop"
cd到插件的根目录下,修改logstash-filter-debug-drop.gemspec中的如下三处地方,不然编译或者logstash启动直接报错。
修改logstash配置文件
/opt/logstash-8.8.2/config/logstash-sample.conf
input {
beats {
host => "0.0.0.0"
port => 5044
}
}
filter {
debug-drop {}
}
output {
file {
path => "/var/log/message.log"
}
}
通过从filebeat上接收文件,输出到本地文件中
配置启动文件
创建system/logstash.service启动文件时,需要添加–enable-local-plugin-development参数,不然启动报错,启动文件如下:
[Unit]
Description=logstash server daemon
Documentation=/opt/logstash-8.8.2/bin/logstash -help
Wants=network-online.target
After=network-online.target
[Service]
User=root
Group=root
Envionment="BEAT_CONFIG_OPTS=-f /opt/logstsh-8.8.2/config/logstsh-sample.conf"
ExecStart=/opt/logstsh-8.8.2/bin/logstash --enable-local-plugin-development $BEAT_CONFIG_OPTS
Restart=always
[Install]
WantedBy=multi-user.target
启动logstash
systemctl enable logstash
systemctl start logstash
journalctl -f -u logstash --no-pager