你能帮我在日志中输入RabbitMQ吗?我的应用程序将代码的版本发送到rabbitmq,然后将其存储在弹性堆栈中。在rabbitmq中为app创建了队列名: app_version_queue;类型:经典;持久:真
然后用该配置配置logstash:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# INPUT - PRODUCERS
key => "app_version_queue"
# OUTPUT - CONSUMER
# queue for logstash
queue => "logstash"
auto_delete => false
# Exchange for logstash
exchange => logstash
exchange_type => direct
durable => "true"
# No ack will boost your perf
ack => false
}
}
output {
elasticsearch {
hosts => [ "elasticsearch:9200" ]
index => "app_version-%{+YYYY.MM.dd}"
}
}
这是可行的,但是现在,在RabbitMQ控制台中,我在表排队的消息中看到就绪: 914,444,Unacked: 0总计: 914,444
我的磁盘空间在三天内就满了。重新启动rabbitmq服务器后,所有空间都是空闲的。
更新:所有的原因,为什么我要这么做,我想从我想做的链app=>rabbit=>nifi=>elastic : app=>rabbit=>logstash=>elastic中删除
我试图阻止NIFI的发送,但是消息没有离开。
发布于 2021-02-11 08:13:44
听起来就像你已经创建了两次基础设施:
你需要的是--只有三样东西--
你拥有的是所有的
logs
(手动创建)的交换。app_version_queue
的队列(手动创建),没有任何消耗。logs
传递到app_version_queue
,然后app_version_queue
永远在那里。logstash
的交换(由LogStash创建),它没有向其发布消息。logstash
(由LogStash创建)的队列,LogStash从该队列中消费消息。logstash
交换到logstash
队列,它什么也不做,因为没有消息发布到该交换。logs
交换到logstash
队列的绑定(手动创建),它实际上是从应用程序传递消息。因此,对于这三种情况(交换、队列和绑定),您需要:
例如,您可以保留名称logs
和app_version_queue
,并手动创建所有内容。
那么您的LogStash应用程序应该如下所示:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# Consume from existing queue
queue => "app_version_queue"
# No ack will boost your perf
ack => false
}
}
另一方面,您可以只创建logs
交换,让LogStash创建队列和绑定,如下所示:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# Create a new queue
queue => "logstash_processing_queue"
durable => "true"
# Take a copy of all messages with the "app_version_queue" routing key from the existing exchange
exchange => "logs"
key => "app_version_queue"
# No ack will boost your perf
ack => false
}
}
或者,您可以让LogStash创建所有这些,并确保应用程序发布到正确的exchange:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# Create a new queue
queue => "logstash_processing_queue"
durable => "true"
# Create a new exchange; point your application to publish here!
exchange => "log_exchange"
exchange_type => "direct"
# Take a copy of all messages with the "app_version_queue" routing key from the new exchange
key => "app_version_queue"
# No ack will boost your perf
ack => false
}
}
我可能会选择中间选项: exchange是应用程序的部署需求的一部分(如果不能在那里发布,它将产生错误),但是任何数量的队列都可能由于不同的原因绑定到它(在测试环境中根本不需要设置ElasticSearch )。
https://stackoverflow.com/questions/66154402
复制相似问题