Loading [MathJax]/jax/input/TeX/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >[Logstash-input-redis] 使用详解

[Logstash-input-redis] 使用详解

作者头像
用户1154259
发布于 2018-01-17 08:41:26
发布于 2018-01-17 08:41:26
1.6K00
代码可运行
举报
运行总次数:0
代码可运行

redis插件的完整配置

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
input {
    redis {
        batch_count => 1 #返回的事件数量,此属性仅在list模式下起作用。
        data_type => "list" #logstash redis插件工作方式
        key => "logstash-test-list" #监听的键值
        host => "127.0.0.1" #redis地址
        port => 6379 #redis端口号
        password => "123qwe" #如果有安全认证,此项为密码
        db => 0 #redis数据库的编号
        threads => 1 #启用线程数量
    }
}
output {
 stdout{}
}

工作流程

图不够专业,但是大致就如上图所示:

  • logstash启动redis插件
  • redis插件获取参数,进行校验工作
  • 判断监听模式(list,channel,pattern_channel等),根据不同的监听模式创建监听任务
  • 创建redis实例,绑定EVAL脚本;通过指定的redis模式,发送请求,监听数据
  • redis返回指定内容的数(可能是列表list,也可能是某个特定的频道中的数据)
  • 得到的数据,进行处理,返回给logstash
  • 如果发送了停止信号,则根据不同的模式,发送不同的命令退出redis。

源码剖析

首先是程序的自定义,这里设置了redis插件需要的参数,默认值,以及校验等。

然后注册Redis实例需要的信息,比如key的名字或者url等,可以看到默认的data_type是list模式。

程序运行的主要入口,根据不同的data_type,传递不同的实现方法,然后调用listener_loop执行循环监听

Listner_loop方法传递了两个参数,一个是监听器实现的方法,一个是处理的数据队列。循环是每秒钟执行一次,如果循环标识被设置,则退出。

上面的循环方法可以看到,是通过一个参数shutdown_requested来判断是否继续循环。该参数通过tear_down方法设置为true,然后根据不同的模式,指定不同的退出方式。 如果是list模式,则直接退出;如果是channel模式,则发送redis的unsubsribe命令退出;如果是pattern_channel,则发送punsubscribe退出。

在循环内部,判断是否已经创建了redis实例,如果没有创建,则调用connect方法创建;否则直接执行。

这里前一段是调用Redis的new方法,初始化一个redis实例。紧接着判断batch_count是否大于1,如果等于1,就什么也不做,然后返回redis。 如果batch_count大于1,那么就调用load_batch_script方法,加载Lua脚本,存储到redis中的lua脚本字典中,供后面使用。代码如下:

上面的代码应该是这个插件最难理解的部分了。为了弄清楚这段代码的工作,需要了解下面几个知识点:

  • lua脚本基本概念
  • Redis中的EVAL命令如何使用
  • 理解上面脚本的工作

首先,要想运行上面的脚本,必须是Redis2.6+的版本,才支持EVAL,否则会报错!EVAL命令与js中的差不多,就是可以把某一个字符串当做命令解析,其中字符串就包括lua脚本。这样有什么好处呢?

说白了,就是能一次性进行多个操作。比如我们可以在脚本中写入一连串的操作,这些操作会以原子模式,一次性在服务器执行完,在返回回来。

Lua脚本

关于lua脚本,其实没有详细研究的必要,但是一定要知道一个local和table的概念。local是创建本地的变量,这样就不会污染redis的数据。table是lua的一种数据结构,有点类似于json,可以存储数据。

EVAL命令

另外还要知道EVAL命令的使用方法,看下面这个命令,就好理解了! EVAL "return KEYS[1] KEYS[2] ARGV[1] ARGV[2];" 2 name:xing age:13 就会返回:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
name
age
xing
13

这段代码没有经过真正的操作,但是有助于理解就好!也就是说,EVAL后面跟着一段脚本,脚本后面跟着的就是参数,可以通过KEYS和ARGV数组获得,但是下标从1开始。

再来说说EVAL命令,它的执行过程如下:

  • 解析字符串脚本,根据校验和生成lua的方法
  • 把校验和和函数放入一个lua_script字典里面,之后就可以通过EVALSHA命令直接使用校验和执行函数。

有了这些理论基础以后,就可以看看上面的代码都做了什么了! 首先是获取参数,这个参数赋值给i;然后创建了一个对象res;紧接着调用llen命令,获得指定list的长度;如果list的长度大于i,则什么也不做;如果小于i,那么i就等于lenth;然后执行命令lpop,取出list中的元素,一共取i次,放入res中,最后返回。

说得通俗点,就是比较一下list元素个数与设置batch_count的值。如果batch_count为5,列表list中有5条以上的数据,那么直接取5条,一次性返回;否则取length条返回。

可以看到这段脚本的作用,就是让logstash一次请求,最多获得batch_count条事件,减小了服务器处理请求的压力。

讲完这段代码,可以看看不同的工作模式的实现代码了:

首先是list的代码,其实就是执行BLPOP命令,获取数据。如果在list模式中,还会去判断batch_count的值,如果是1直接退出;如果大于1,则使用evalsha命令调用之前保存的脚本方法。

至于channel和pattern_channel,就没啥解释的了,就是分别调用subscribe和psubsribe命令而已。

其实最难理解的,就是中间那段lua脚本~明白它的用处,redis插件也就不难理解了。

完整的代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/inputs/threadable"
require "logstash/namespace"

# This input will read events from a Redis instance; it supports both Redis channels and lists.
# The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
# the channel commands used by Logstash are found in Redis v1.3.8+.
# While you may be able to make these Redis versions work, the best performance
# and stability will be found in more recent stable versions.  Versions 2.6.0+
# are recommended.
#
# For more information about Redis, see <http://redis.io/>
#
# `batch_count` note: If you use the `batch_count` setting, you *must* use a Redis version 2.6.0 or
# newer. Anything older does not support the operations used by batching.
#
class LogStash::Inputs::Redis < LogStash::Inputs::Threadable
  config_name "redis"

  default :codec, "json"

  # The `name` configuration is used for logging in case there are multiple instances.
  # This feature has no real function and will be removed in future versions.
  config :name, :validate => :string, :default => "default", :deprecated => true

  # The hostname of your Redis server.
  config :host, :validate => :string, :default => "127.0.0.1"

  # The port to connect on.
  config :port, :validate => :number, :default => 6379

  # The Redis database number.
  config :db, :validate => :number, :default => 0

  # Initial connection timeout in seconds.
  config :timeout, :validate => :number, :default => 5

  # Password to authenticate with. There is no authentication by default.
  config :password, :validate => :password

  # The name of the Redis queue (we'll use BLPOP against this).
  # TODO: remove soon.
  config :queue, :validate => :string, :deprecated => true

  # The name of a Redis list or channel.
  # TODO: change required to true
  config :key, :validate => :string, :required => false

  # Specify either list or channel.  If `redis\_type` is `list`, then we will BLPOP the
  # key.  If `redis\_type` is `channel`, then we will SUBSCRIBE to the key.
  # If `redis\_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
  # TODO: change required to true
  config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => false

  # The number of events to return from Redis using EVAL.
  config :batch_count, :validate => :number, :default => 1

  public
  def register
    require 'redis'
    @redis = nil
    @redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}"

    # TODO remove after setting key and data_type to true
    if @queue
      if @key or @data_type
        raise RuntimeError.new(
          "Cannot specify queue parameter and key or data_type"
        )
      end
      @key = @queue
      @data_type = 'list'
    end

    if not @key or not @data_type
      raise RuntimeError.new(
        "Must define queue, or key and data_type parameters"
      )
    end
    # end TODO

    @logger.info("Registering Redis", :identity => identity)
  end # def register

  # A string used to identify a Redis instance in log messages
  # TODO(sissel): Use instance variables for this once the @name config
  # option is removed.
  private
  def identity
    @name || "#{@redis_url} #{@data_type}:#{@key}"
  end

  private
  def connect
    redis = Redis.new(
      :host => @host,
      :port => @port,
      :timeout => @timeout,
      :db => @db,
      :password => @password.nil? ? nil : @password.value
    )
    load_batch_script(redis) if @data_type == 'list' && (@batch_count > 1)
    return redis
  end # def connect

  private
  def load_batch_script(redis)
    #A Redis Lua EVAL script to fetch a count of keys
    #in case count is bigger than current items in queue whole queue will be returned without extra nil values
    redis_script = <<EOF
          local i = tonumber(ARGV[1])
          local res = {}
          local length = redis.call('llen',KEYS[1])
          if length < i then i = length end
          while (i > 0) do
            local item = redis.call("lpop", KEYS[1])
            if (not item) then
              break
            end
            table.insert(res, item)
            i = i-1
          end
          return res
EOF
    @redis_script_sha = redis.script(:load, redis_script)
  end

  private
  def queue_event(msg, output_queue)
    begin
      @codec.decode(msg) do |event|
        decorate(event)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
        output_queue << event
      end
    rescue LogStash::ShutdownSignal => e
      # propagate up
      raise(e)
    rescue => e # parse or event creation error
      @logger.error("Failed to create event", :message => msg, :exception => e, :backtrace => e.backtrace);
    end
  end

  private
  def list_listener(redis, output_queue)

    item = redis.blpop(@key, 0, :timeout => 1)
    return unless item # from timeout or other conditions

    # blpop returns the 'key' read from as well as the item result
    # we only care about the result (2nd item in the list).
    queue_event(item[1], output_queue)

    # If @batch_count is 1, there's no need to continue.
    
    return if @batch_count == 1
    
    begin
      redis.evalsha(@redis_script_sha, [@key], [@batch_count-1]).each do |item|
        queue_event(item, output_queue)
      end

      # Below is a commented-out implementation of 'batch fetch'
      # using pipelined LPOP calls. This in practice has been observed to
      # perform exactly the same in terms of event throughput as
      # the evalsha method. Given that the EVALSHA implementation uses
      # one call to Redis instead of N (where N == @batch_count) calls,
      # I decided to go with the 'evalsha' method of fetching N items
      # from Redis in bulk.
      #redis.pipelined do
        #error, item = redis.lpop(@key)
        #(@batch_count-1).times { redis.lpop(@key) }
      #end.each do |item|
        #queue_event(item, output_queue) if item
      #end
      # --- End commented out implementation of 'batch fetch'
    rescue Redis::CommandError => e
      if e.to_s =~ /NOSCRIPT/ then
        @logger.warn("Redis may have been restarted, reloading Redis batch EVAL script", :exception => e);
        load_batch_script(redis)
        retry
      else
        raise e
      end
    end
  end

  private
  def channel_listener(redis, output_queue)
    redis.subscribe @key do |on|
      on.subscribe do |channel, count|
        @logger.info("Subscribed", :channel => channel, :count => count)
      end

      on.message do |channel, message|
        queue_event message, output_queue
      end

      on.unsubscribe do |channel, count|
        @logger.info("Unsubscribed", :channel => channel, :count => count)
      end
    end
  end

  private
  def pattern_channel_listener(redis, output_queue)
    redis.psubscribe @key do |on|
      on.psubscribe do |channel, count|
        @logger.info("Subscribed", :channel => channel, :count => count)
      end

      on.pmessage do |ch, event, message|
        queue_event message, output_queue
      end

      on.punsubscribe do |channel, count|
        @logger.info("Unsubscribed", :channel => channel, :count => count)
      end
    end
  end

  # Since both listeners have the same basic loop, we've abstracted the outer
  # loop.
  private
  def listener_loop(listener, output_queue)
    while !@shutdown_requested
      begin
        @redis ||= connect
        self.send listener, @redis, output_queue
      rescue Redis::BaseError => e
        @logger.warn("Redis connection problem", :exception => e)
        # Reset the redis variable to trigger reconnect
        @redis = nil
        sleep 1
      end
    end
  end # listener_loop

  public
  def run(output_queue)
    if @data_type == 'list'
      listener_loop :list_listener, output_queue
    elsif @data_type == 'channel'
      listener_loop :channel_listener, output_queue
    else
      listener_loop :pattern_channel_listener, output_queue
    end
  rescue LogStash::ShutdownSignal
    # ignore and quit
  end # def run

  public
  def teardown
    @shutdown_requested = true

    if @redis
      if @data_type == 'list'
        @redis.quit rescue nil
      elsif @data_type == 'channel'
        @redis.unsubscribe rescue nil
        @redis.connection.disconnect
      elsif @data_type == 'pattern_channel'
        @redis.punsubscribe rescue nil
        @redis.connection.disconnect
      end
      @redis = nil
    end
  end
end # class LogStash::Inputs::Redis
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2015-10-10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
[logstash-input-redis]插件使用详解
Redis插件参数配置详解 最小化配置 input { redis { data_type => "list" #logstash redis插件工作方式 key => "logstash-test-list" #监听的键值 host => "127.0.0.1" #redis地址 port => 6379 #redis端口号 } } output { stdout{} } 详细配置 input { redis { batch_count => 1 #EVAL命令返回的事件数目 data_type
用户1154259
2018/01/17
1.5K0
[logstash-input-redis]插件使用详解
logstash之input配置redis类型详解
用途 监控redis数据 配置示例 input {     redis {         data_type => "list"         key => "logstash-demo"         host => "127.0.0.1"         port => 6379         threads => 5     } } output { stdout { codec => rubydebug } } 启动 bin/logstash -f /etc/logstash/con
苦咖啡
2018/04/28
6410
logstash配置output到redis
input { stdin { type => 'demo-stdin' add_field => {"test" => "hello"} codec => "plain" tags => ["stdin-test"] } } output { redis { data_type => "list" key => "logstash-demo" host => "127.
苦咖啡
2018/04/28
1.9K0
Redis入坟(二)高级特性,发布订阅、事务、Lua脚本
前面我们说通过队列的 rpush 和 lpop 可以实现消息队列(队尾进队头出),但是消费者需要不停地调用 lpop 查看 List 中是否有等待处理的消息(比如写一个 while 循环)。
源码之路
2020/09/04
9500
Redis入坟(二)高级特性,发布订阅、事务、Lua脚本
ELK学习笔记之Logstash详解
官方介绍:Logstash is an open source data collection engine with real-time pipelining capabilities。简单来说logstash就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。
Jetpropelledsnake21
2018/12/05
5.7K0
ELK学习笔记之Logstash详解
Redis 如何实现延时任务队列
顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。
Tinywan
2024/05/11
8170
Redis 如何实现延时任务队列
Redis学习(二)
Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。
Vincent-yuan
2021/05/11
7460
Redis学习(二)
亿级流量多级缓存 - Lua整合Redis/Nginx
Lua 是由巴西里约热内卢天主教大学(Pontifical Catholic University of Rio de Janeiro)里的一个研究小组于1993年开发的一种轻量、小巧的脚本语言,用标准 C 语言编写,其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。
Parker
2020/07/21
1.3K0
使用Filebeat和Logstash集中归档日志
方 案 Filebeat->Logstash->Files Filebeat->Redis->Logstash->Files Nxlog(Rsyslog、Logstash)->Kafka->Flink(Logstash->ES-Kibana) 其他方案(可根据自己需求,选择合适的架构,作者选择了第二种方案) 注释: 由于Logstash无法处理输出到文件乱序的问题,可通过不同的文件使用不同的Logstash;或者直接写入ES(不存在乱序问题)、通过Flink输出到文件 部 署 系统环境 Debian8 x6
程序员同行者
2018/07/02
1.8K0
Redis Lua脚本大学教程
前面我们已经把Redis Lua相关的基础都介绍过了,如果你可以编写一些简单的Lua脚本,恭喜你已经可以从Lua中学毕业了。
Jackeyzhe
2020/03/11
1.1K0
Redis Lua脚本大学教程
快速搭建ELK7.5版本的日志分析系统--ELK实战篇
现在索引也可以创建了,现在可以来输出nginx、apache、message、secrue的日志到前台展示(Nginx有的话直接修改,没有自行安装)
用户6641876
2020/02/19
8700
用Lua定制Redis命令
前言 Redis作为一个非常成功的数据库,提供了非常丰富的数据类型和命令,使用这些,我们可以轻易而高效地完成很多缓存操作,可是总有一些比较特殊的问题或需求需要解决,这时候可能就需要我们自己定制自己的 Redis 数据结构和命令。 文章欢迎转载,请尊重作者劳动成果,带上原文链接:http://www.cnblogs.com/zhenbianshu/p/8416162.html ---- Redis命令问题 线程安全问题 我们都知道 Redis 是单线程的,可是它怎么会有 线程安全 问题呢? 我们正常理解的线程
枕边书
2018/03/30
1.5K2
用Lua定制Redis命令
Node.js 中实践 Redis Lua 脚本
Lua 是一种轻量小巧的脚本语言,用标准 C 语言编写并以源代码形式开放,其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。由于 Lua 语言具备原子性,其在执行的过程中不会被其它程序打断,对于并发下数据的一致性是有帮助的。
五月君
2019/11/10
4.6K1
【全文检索_11】Logstash 基本使用
  Logstash 是由 JRuby 编写的,使用基于消息的简单架构,在 JVM 上运行(本篇博客主要介绍 Logstash 基本使用,介绍请见 ☞【全文检索_09】Logstash 基本介绍)。Logstash 的事件处理流水线有三个主要角色完成:inputs → filters → outputs。必须定义这些过程的配置才能使用 Logstash,尽管不是每一个都必须的。在过滤器的部分,它可以对数据源的数据进行分析,丰富,处理等等,但是我们可以不使用过滤器。在输出的部分,我们可以有多于一个以上的输出。
Demo_Null
2021/03/02
8340
【全文检索_11】Logstash 基本使用
大流量架构(一)之REDIS篇
Lua 是由巴西里约热内卢天主教大学(Pontifical Catholic University of Rio de Janeiro)里的一个研究小组于1993年开发的一种轻量、小巧的脚本语言,用标准 C 语言编写,其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。
茶半香初
2021/11/26
6030
大流量架构(一)之REDIS篇
[logstash-input-http] 插件使用详解
插件介绍 Http插件是2.0版本才出现的新插件,1.x是没有这个插件的。这个插件可以帮助logstash接收其他主机或者本机发送的http报文。 插件的原理很简单,它自己启动了一个ruby的服务器,用于接收Http请求。然后会把host(IP地址)和header相关的信息添加到event中。 下面就看看这个插件如何使用吧! 基本配置 先看看默认的配置吧! http {} 简单到心碎啊!其实有很多参数都是默认的... 上面的配置其实相当于: http{ host => "0.0.0.0"
用户1154259
2018/01/17
2.7K0
使用Redis实现延时任务(二)
前一篇文章通过Redis的有序集合Sorted Set和调度框架Quartz实例一版简单的延时任务,但是有两个相对重要的问题没有解决:
Throwable
2020/06/23
1.1K0
使用Redis实现延时任务(二)
Redis事务的灵活应用与异步连接的优化策略
事务是指用户定义一系列数据库操作,这些操作视为一个完整的逻辑处理工作单元,要么全部执行,要么全部不执行,是不可分割的工作单元。
Lion 莱恩呀
2024/11/07
1230
Redis事务的灵活应用与异步连接的优化策略
Redis:18---常用功能之(Lua脚本)
为了保证多条命令组合的原子性,Redis提供了简单的事务功能以及集成Lua脚本来解决这个问题,本文介绍Lua,事务已经在前一篇文章介绍过了 一、Lua概述 Lua语言是在1993年由巴西一个大学研究小组发明,其设计目标是作为嵌入式程序移植到其他应用程序,它是由C语言实现的,虽然简单小巧但是功能强大 所以许多应用都选用它作为脚本语言,尤其是在游戏领域,例如大名鼎鼎的暴雪公司将Lua语言引入到“魔兽世界”这款游戏中,Rovio公司将 Lua语言作为“愤怒的小鸟”这款火爆游戏的关卡升级引擎,Web服务器Nginx
用户3479834
2021/02/03
1K0
Redis:18---常用功能之(Lua脚本)
高性能伪事务之Lua in Redis
Redis2.6加入了对Lua脚本的支持。Lua脚本可以被用来扩展Redis的功能,并提供更好的性能。
sunsky
2020/08/20
2.4K0
相关推荐
[logstash-input-redis]插件使用详解
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验