Logstash来自ES家族,是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,还有强大的插件功能,常用于日志处理。
Logstash的数据处理流水线有三个主要角色完成:inputs –> filters –> outputs:
logstash安装启动都非常简单,这里就简单提一下关键步骤:
①、下载:https://artifacts.elastic.co/downloads/logstash/logstash-5.5.1.tar.gz
②、解压
③、启动
nohup ./bin/logstash -f config/logstash.conf >/dev/null 2>&1 &
input
file {
path => ["/var/log/*.log", "/var/log/message"]
type => "system"
start_position => "beginning"
}
}
input {
beats {
port => 5044
}
}
input {
redis {
host => "127.0.0.1"
port => 6379
data_type => "list"
key => "logstash-list"
}
}
input {
kafka {
bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
topics => "messages"
group_id => "logstash"
codec => json {
charset => "UTF-8"
}
add_field => { "[@metadata][type]" => "messages" }
}
kafka {
bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
topics => "mysql_slow_log"
group_id => "logstash"
codec => json {
charset => "UTF-8"
}
add_field => { "[@metadata][type]" => "mysql_slow_log" }
}
}
网管这边主要用到kafka,这里着重介绍下:
更多介绍:https://www.elastic.co/guide/en/logstash/5.5/input-plugins.html
filter {
# 这里就用到前文提到的区分字段了:
if [@metadata][type] == "mysql_slow_log" {
grok {
# 正则匹配(不同MySQL版本慢日志格式可能不通,此处适用于MySQL 5.5+版本)
match => [ "message", "(?m)^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+\[(?:%{IP:client_ip})?\]\s*\n#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time
:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:\s+%{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<query>(?<a
ction>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$"]
# 慢日志里面的主机IP为主机名,因此这里变相处理下,加入server_ip字段,值为beatname指定的值
add_field => [ "server_ip", "%{[beat][name]}" ]
# 匹配到了就加入标签
add_tag => [ "matched" ]
}
# 未匹配的数据直接drop
if ("matched" not in [tags]) {
drop {}
}
date {
# 这里对慢日志的时间戳进行格式转换
match => [ "timestamp", "UNIX","YYYY-MM-dd HH:mm:ss"]
remove_field => [ "timestamp" ]
}
# 此处对SQL进行MD5运算,并存到fingerprint字段,用于区分同一条SQL
mutate {
add_field => {"sql_hash" => "%{query}"}
gsub => [
"sql_hash", "'.+?'", "",
"sql_hash", "-?\d*\.{0,1}\d+", ""
]
}
fingerprint {
method => "MD5"
key => ["sql_hash"]
}
# 移除不需要的字段
mutate {
remove_field => "sql_hash"
remove_field => "[beat][hostname]"
remove_field => "[beat][name]"
remove_field => "@version"
remove_field => "[beat][version]"
remove_field => "input_type"
remove_field => "offset"
remove_field => "tags"
remove_field => "type"
remove_field => "message"
}
}
}
有用的说明都在注释中,仔细看看吧~
filter {
# 只处理标签为web_access_log的数据
if [@metadata][type] == "web_access_log" {
# 为了兼容中文路径,这里做了下数据替换
mutate {
gsub => ["message", "\\x", "\\\x"]
}
# 排除HEAD请求
if ( 'method":"HEAD' in [message] ) {
drop {}
}
# Nginx、Apache已经将日志格式定制为json,所以简单处理即可
json {
# 从数据中取出message
source => "message"
# 删除多余字段
remove_field => "message"
remove_field => "[beat][hostname]"
remove_field => "[beat][name]"
remove_field => "@version"
remove_field => "[beat][version]"
remove_field => "input_type"
remove_field => "offset"
remove_field => "tags"
remove_field => "type"
remove_field => "host"
}
}
}
大同小异,就不做注释了。
filter {
if [@metadata][type] == "messages" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:mess
age_content}" }
add_field => [ "ip", "%{[beat][name]}" ]
add_tag => [ "matched" ]
}
if ("matched" not in [tags]) {
drop {}
}
date {
locale => "en_US"
timezone => "Asia/Shanghai"
match => [ "timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
target => "@timestamp"
}
ruby {
code => "event['@timestamp'] = event['@timestamp'].getlocal"
}
mutate {
remove_field => "[beat][hostname]"
remove_field => "[beat][name]"
remove_field => "@version"
remove_field => "[beat][version]"
remove_field => "input_type"
remove_field => "offset"
remove_field => "tags"
remove_field => "type"
remove_field => "host"
}
}
更多介绍:https://www.elastic.co/guide/en/logstash/5.5/filter-plugins.html
output {
stdout{
codec => rubydebug
}
}
output {
elasticsearch {
hosts => ["x.x.x.1:9200","x.x.x.2:9200","x.x.x.3:9200"]
# 指定索引,按时间分表
index => "logstash-%{type}-%{+yyyy.MM.dd}"
# 由logstash控制ES模板
template_overwrite => true
}
}
output{
kafka{
topic_id => "hello"
bootstrap_servers => "x.x.x.x:9092" # kafka的地址
batch_size => 5
}
}
output {
# 上报系统日志
if [@metadata][type] == "messages" {
elasticsearch {
hosts => ["eshost:9200"]
index => "messages-%{+YYYY.MM.dd}"
# logstash不管理模块,使用ES已保存模板
manage_template => false
template_name => "template-messages"
}
}
#上报MySQL慢日志
if [@metadata][type] == "mysql_slow_log" {
elasticsearch {
hosts => ["eshost:9200"]
index => "mysqlslowlog-%{+YYYY.MM.dd}"
manage_template => false
template_name => "template-mysqlslowlog"
}
}
# 上报WEB日志
if [@metadata][type] == "web_access_log" {
elasticsearch {
hosts => ["eshost:9200"]
index => "web_access_log-%{+YYYY.MM.dd}"
manage_template => false
template_name => "template-web_access_log"
}
}
}
更多介绍:https://www.elastic.co/guide/en/logstash/5.5/output-plugins.html
最后附上网管侧完整logstash配置,仅供参考:
input {
kafka {
bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
topics => "messages"
group_id => "logstash"
codec => json {
charset => "UTF-8"
}
add_field => { "[@metadata][type]" => "messages" }
}
kafka {
bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
topics => "mysql_slow_log"
group_id => "logstash"
codec => json {
charset => "UTF-8"
}
add_field => { "[@metadata][type]" => "mysql_slow_log" }
}
kafka {
bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
topics => "web_access_log"
group_id => "logstash"
codec => json {
charset => "UTF-8"
}
add_field => { "[@metadata][type]" => "web_access_log" }
}
}
filter {
if [@metadata][type] == "messages" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:mess
age_content}" }
add_field => [ "ip", "%{[beat][name]}" ]
add_tag => [ "matched" ]
}
if ("matched" not in [tags]) {
drop {}
}
date {
locale => "en_US"
timezone => "Asia/Shanghai"
match => [ "timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
target => "@timestamp"
}
ruby {
code => "event['@timestamp'] = event['@timestamp'].getlocal"
}
mutate {
remove_field => "[beat][hostname]"
remove_field => "[beat][name]"
remove_field => "@version"
remove_field => "[beat][version]"
remove_field => "input_type"
remove_field => "offset"
remove_field => "tags"
remove_field => "type"
remove_field => "host"
}
}
if [@metadata][type] == "mysql_slow_log" {
grok {
match => [ "message", "(?m)^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+\[(?:%{IP:client_ip})?\]\s*\n#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time
:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:\s+%{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<query>(?<a
ction>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$"]
add_field => [ "server_ip", "%{[beat][name]}" ]
add_tag => [ "matched" ]
}
if ("matched" not in [tags]) {
drop {}
}
date {
match => [ "timestamp", "UNIX","YYYY-MM-dd HH:mm:ss"]
remove_field => [ "timestamp" ]
}
mutate {
add_field => {"sql_hash" => "%{query}"}
gsub => [
"sql_hash", "'.+?'", "",
"sql_hash", "-?\d*\.{0,1}\d+", ""
]
}
fingerprint {
method => "MD5"
key => ["sql_hash"]
}
mutate {
remove_field => "sql_hash"
remove_field => "[beat][hostname]"
remove_field => "[beat][name]"
remove_field => "@version"
remove_field => "[beat][version]"
remove_field => "input_type"
remove_field => "offset"
remove_field => "tags"
remove_field => "type"
remove_field => "message"
}
}
if [@metadata][type] == "web_access_log" {
mutate {
gsub => ["message", "\\x", "\\\x"]
}
json {
source => "message"
remove_field => "message"
remove_field => "[beat][hostname]"
remove_field => "[beat][name]"
remove_field => "@version"
remove_field => "[beat][version]"
remove_field => "input_type"
remove_field => "offset"
remove_field => "tags"
remove_field => "type"
remove_field => "host"
}
}
}
output {
if [@metadata][type] == "messages" {
elasticsearch {
hosts => ["x.x.x.x:9200"]
index => "messages-%{+YYYY.MM.dd}"
manage_template => false
template_name => "template-messages"
}
}
if [@metadata][type] == "mysql_slow_log" {
elasticsearch {
hosts => ["x.x.x.x:9200"]
index => "mysqlslowlog-%{+YYYY.MM.dd}"
manage_template => false
template_name => "template-mysqlslowlog"
}
}
if [@metadata][type] == "web_access_log" {
elasticsearch {
hosts => ["x.x.x.x:9200"]
index => "web_access_log-%{+YYYY.MM.dd}"
manage_template => false
template_name => "template-web_access_log"
}
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。