我们知道 Storm 是一个常驻服务,消息源源不断的来,他源源不断的处理,那肯定在有些情况下会导致消息的不正确处理,比如worker进程挂掉了,那么正在被处理的消息很可能就会丢失掉,那么该如何解决这个问题呢?这时候我们就可以引入 ACK 机制了,当消息没有被正确处理时,可以通过 ACK机制 重新发送该消息进行处理。
当然,大多数时候,一条两条数据的异常,并不在我们的考虑范围内,所以并不是所有任务都要引入 ACK 机制
SpoutOutputCollector.emit(List<Object> tuple, Object messageId)
static class MySpout extends BaseRichSpout {
@Override
public void fail(Object msgId) {
//消息失败的时候回回调到这个方法
}
@Override
public void ack(Object msgId) {
//消息成功执行的时候回回调到这个方法
}
}
你可能已经发现这两个回调的参数是 msgId,而不是你发送的 message,所以这个时候需要我们自己在发送数据的时候维护一个缓存,在 ack 回调里面移除, 在 fail 里面重发。
OutputCollector.emit(Tuple anchor, List<Object> tuple)
Config.setNumAckers(1)
;IBasicBolt 接口下的 BaseBasicBolt
而不是 IRichBolt接口下的BaseRichBolt
,
该 Bolt 会自动进行 ACK 的发送 和 anchor的关联,这样就省得我们忘记添加ACK,使得ACK无法正确运行上面介绍了如何开启一个 ACK,实际上我们也看到了,ACK机制的控制是精确到了 message 的,比如我们Spout 发送这个 message 的时候不指定其 messageId,那么这个message 的数据流就不会被 ACK,
Config.setNumAckers(1)
,其实就是开启了一个 ACKER 的task,
ACKER 会负责跟踪 spout 发出的每一个数据流ack()
调用就是向 ACKER 汇报本次执行任务结果,ACKER 接受到数据之后会判断该任务是否执行完毕ack
方法让我们处理成功的逻辑Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
可以设置这个时间还) 没有收到 ACKER 的汇报,那么Spout就认为该任务失败了,会通过fail
方法告诉我们处理失败的逻辑看完这个原理,我想你大概也知道了这个 ACK机制 其实并无法保证数据有且只有一次被消费,他只能保证你的数据至少被消费一次,很可能会重复消费。
10^3^3 还是 10
输入Tuple
和 生成Tuple
,ACKER会将其本身的ack-val
异或 输入Tuple的id
再异或 生成Tuple的id
生成一个新的 ack-val
0^A1^B1
ack_val ^(A1^A2)
ack_val ^(B1^B2)
ack_val ^(B2^A2)
最后我们把ack_val值的变化都写在一个公式,大概就是:0^A1^B1^(A1^A2)^(B1^B2)^(B2^A2)
,因为异或的性质,不难得出 最后的结果为 0 , 而当ack_val 重新变为0,ACKER就认为你这个数据已经处理完成了
好吧,ACK的讲解就到这里了,不知道有没有讲清楚,不过实际运用中并没有太大的用处,也可能只是我目前用的不多,基于对一门技术的热情,还是稍稍深入研究了一下,如有不对,欢迎指错