首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往
您找到你想要的搜索结果了吗?
是的
没有找到

消费者组consumer group详解-Kafka从入门到精通(九)

上篇文章说了,kafka可以通过实现partitioner自定义分区,producer拦截器,拦截器是在producer发送消息之后,回调之前调用,里面主要重写两个方法,一个是onSend,可以重新定义发送的消息,一个是在回调之前调用,onAcknowledgement在回调之前调用,可以记录发送成功或者失败的消息数量。无消息丢失配置,首先保证一个问题,消息不会丢失,要acks设置为all或者-1,这样send回调才会生效,这时候还会存在一个问题,当网络瞬时故障时候,会出现乱序发送,乱序的出现是因为retries重试,这时候必须只能在同一时刻在同一个broker只能发送一次,max.in.flight.request.per.connection。还有参数replication.factory三备份原则,Min.insync.replica至少写入多少副本。

03

python的协程使用

# 9.py #code=utf-8 # python的协程使用 ''' 所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。 Python对协程的支持还非常有限,用在generator中的yield可以一定程度上实现协程。虽然支持不完全,但已经可以发挥相当大的威力了。 Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。 由于gevent是基于IO切换的协程,所以最神奇的是,我们编写的Web App代码,不需要引入gevent的包,也不需要改任何代码,仅仅在部署的时候,用一个支持gevent的WSGI服务器,立刻就获得了数倍的性能提升。 ''' import time def consumer(): r = '' while True: n = yield r if not n: return print('[CONSUMER] Consuming %s...' % n) time.sleep(1) r = '200 ok' def produce(c): c.next() n = 0 while n < 5: n = n + 1 print('[PRODUCER] Producing %s...' % n) r = c.send(n) print('[PRODUCER] Consumer return: %s' % r) c.close() c = consumer() produce(c) ''' 上面程序逻辑是: 首先调用c.next()启动生成器; 然后,一旦生产了东西,通过c.send(n)切换到consumer执行; consumer通过yield拿到消息,处理,又通过yield把结果传回; produce拿到consumer处理的结果,继续生产下一条消息; produce决定不生产了,通过c.close()关闭consumer,整个过程结束。 ''' ''' 执行结果是 [PRODUCER] Producing 1... [CONSUMER] Consuming 1... [PRODUCER] Consumer return: 200 ok [PRODUCER] Producing 2... [CONSUMER] Consuming 2... [PRODUCER] Consumer return: 200 ok [PRODUCER] Producing 3... [CONSUMER] Consuming 3... [PRODUCER] Consumer return: 200 ok [PRODUCER] Producing 4... [CONSUMER] Consuming 4... [PRODUCER] Consumer return: 200 ok [PRODUCER] Producing 5... [CONSUMER] Consuming 5... [PRODUCER] Consumer return: 200 ok '''

02
领券