Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >使用生成器把Kafka写入速度提高1000倍

使用生成器把Kafka写入速度提高1000倍

作者头像
青南
发布于 2018-08-31 06:11:05
发布于 2018-08-31 06:11:05
1.5K00
代码可运行
举报
文章被收录于专栏:未闻Code未闻Code
运行总次数:0
代码可运行

[如果代码显示有问题,请点击阅读原文]

通过本文你会知道Python里面什么时候用yield最合适。本文不会给你讲生成器是什么,所以你需要先了解Python的yield,再来看本文。

疑惑

多年以前,当我刚刚开始学习Python协程的时候,我看到绝大多数的文章都举了一个生产者-消费者的例子,用来表示在生产者内部可以随时调用消费者,达到和多线程相同的效果。这里凭记忆简单还原一下当年我看到的代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import time
def consumer():
    product = None
    while True:
        if product is not None:
            print('consumer: {}'.format(product))
        product = yield None

def producer():
    c = consumer()
    next(c)
    for i in range(10):
        c.send(i)

start = time.time()
producer()
end = time.time()
print(f'直到把所有数据塞入Kafka,一共耗时:{end - start}秒')

运行效果如下图所示。

这些文章的说法,就像统一好了口径一样,说这样写可以减少线程切换开销,从而大大提高程序的运行效率。但是当年我始终想不明白,这种写法与直接调用函数有什么区别,如下图所示。

直到后来我需要操作Kafka的时候,我明白了使用yield的好处。

探索

为了便于理解,我会把实际场景做一些简化,以方便说明事件的产生发展和解决过程。事件的起因是我需要把一些信息写入到Kafka中,我的代码一开始是这样的:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import time
from pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
def consumer(product):
    with topic.get_producer(delivery_reports=True) as producer:
        producer.produce(str(product).encode())
def feed():
    for i in range(10):
        consumer(i)

start = time.time()
feed()
end = time.time()
print(f'直到把所有数据塞入Kafka,一共耗时:{end - start}秒')

这段代码的运行效果如下图所示。

写入10条数据需要100秒,这样的龟速显然是有问题的。问题就出在这一句代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
with topic.get_producer(delivery_reports=True) as producer

获得Kafka生产者对象是一个非常耗费时间的过程,每获取一次都需要10秒钟才能完成。所以写入10个数据就获取十次生产者对象。这消耗的100秒主要就是在获取生产者对象,而真正写入数据的时间短到可以忽略不计。

由于生产者对象是可以复用的,于是我对代码作了一些修改:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import timefrom pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
products = []
def consumer(product_list):
    with topic.get_producer(delivery_reports=True) as producer:        for product in product_list:
            producer.produce(str(product).encode())
def feed():
    for i in range(10):
        products.append(i)
    consumer(products)

start = time.time()
feed()
end = time.time()
print(f'直到把所有数据塞入Kafka,一共耗时:{end - start}秒')

首先把所有数据存放在一个列表中,最后再一次性给consumer函数。在一个Kafka生产者对象中展开列表,再把数据一条一条塞入Kafka。这样由于只需要获取一次生产者对象,所以需要耗费的时间大大缩短,如下图所示。

这种写法在数据量小的时候是没有问题的,但数据量一旦大起来,如果全部先放在一个列表里面的话,服务器内存就爆了。

于是我又修改了代码。每100条数据保存一次,并清空暂存的列表:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import timefrom pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
def consumer(product_list):
    with topic.get_producer(delivery_reports=True) as producer:        for product in product_list:
            producer.produce(str(product).encode())
def feed():
    products = []
    for i in range(1003):
        products.append(i)
        if len(products) >= 100:
            consumer(products)
            products = []
    if products:
        consumer(products)

start = time.time()
feed()
end = time.time()
print(f'直到把所有数据塞入Kafka,一共耗时:{end - start}秒')

由于最后一轮循环可能无法凑够100条数据,所以feed函数里面,循环结束以后还需要判断products列表是否为空,如果不为空,还要再消费一次。这样的写法,在上面这段代码中,一共1003条数据,每100条数据获取一次生产者对象,那么需要获取11次生产者对象,耗时至少为110秒。

显然,要解决这个问题,最直接的办法就是减少获取Kafka生产者对象的次数并最大限度复用生产者对象。如果读者举一反三的能力比较强,那么根据开关文件的两种写法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 写法一

with open('test.txt', 'w', encoding='utf-8') as f:
    f.write('xxx')
# 写法二

f = open('test.txt', 'w', encoding='utf-8')
f.write('xxx')
f.close()

可以推测出获取Kafka生产者对象的另一种写法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# 写法二

producer = topic.get_producer(delivery_reports=True)
producer.produce(b'xxxx')
producer.close()

这样一来,只要获取一次生产者对象并把它作为全局变量就可以一直使用了。

然而,pykafka的官方文档中使用的是第一种写法,通过上下文管理器with来获得生产者对象。暂且不论第二种方式是否会报错,只从写法上来说,第二种方式必需要手动关闭对象。开发者经常会出现开了忘记关的情况,从而导致很多问题。而且如果中间出现了异常,使用上下文管理器的第一种方式会自动关闭生产者对象,但第二种方式仍然需要开发者手动关闭。

函数VS生成器

但是如果使用第一种方式,怎么能在一个上下文里面接收生产者传进来的数据呢?这个时候才是yield派上用场的时候。

首先需要明白,使用yield以后,函数就变成了一个生成器。生成器与普通函数的不同之处可以通过下面两段代码来进行说明:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def funciton(i):
    print('进入')
    print(i)
    print('结束')
for i in range(5):
    funciton(i)

运行效果如下图所示。

函数在被调用的时候,函数会从里面的第一行代码一直运行到某个return或者函数的最后一行才会退出。

而生成器可以从中间开始运行,从中间跳出。例如下面的代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def generator():
    print('进入')
    i = None
    while True:
        if i is not None:
            print(i)
        print('跳出')
        i = yield None

g = generator()
next(g)
for i in range(5):
    g.send(i)

运行效果如下图所示。

从图中可以看到,进入只打印了一次。代码运行到i = yield None后就跳到外面,外面的数据可以通过g.send(i)的形式传进生成器,生成器内部拿到外面传进来的数据以后继续执行下一轮while循环,打印出被传进来的内容,然后到i = yield None的时候又跳出。如此反复。

所以回到最开始的Kafka问题。如果把with topic.get_producer(delivery_reports=True) as producer写在上面这一段代码的print('进入')这个位置上,那岂不是只需要获取一次Kafka生产者对象,然后就可以一直使用了?

根据这个逻辑,设计如下代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import timefrom pykafka import KafkaClient

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'test']
def consumer():
    with topic.get_producer(delivery_reports=True) as producer:
        print('init finished..')
        next_data = ''
        while True:
            if next_data:
                producer.produce(str(next_data).encode())
            next_data = yield True

def feed():
    c = consumer()
    next(c)
    for i in range(1000):
        c.send(i)

start = time.time()
feed()
end = time.time()
print(f'直到把所有数据塞入Kafka,一共耗时:{end - start}秒')

这一次直接插入1000条数据,总共只需要10秒钟,相比于每插入一次都获取一次Kafka生产者对象的方法,效率提高了1000倍。运行效果如下图所示。

后记

读者如果仔细对比第一段代码和最后一段代码,就会发现他们本质上是一回事。但是第一段代码,也就是网上很多人讲yield的时候举的生产者-消费者的例子之所以会让人觉得毫无用处,就在于他们的消费者几乎就是秒运行,这样看不出和函数调用的差别。而我最后这一段代码,它的消费者分成两个部分,第一部分是获取Kafka生产者对象,这个过程非常耗时;第二部分是把数据通过Kafka生产者对象插入Kafka,这一部分运行速度极快。在这种情况下,使用生成器把这个消费者代码分开,让耗时长的部分只运行一次,让耗时短的反复运行,这样就能体现出生成器的优势。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-04-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 未闻Code 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
❤️[前端学习]大数据全栈工程师之一文快速上手vue3❤️
大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。暑假给自己在家附近找了份实习,作为初级码农,啥都得懂点,于
Maynor
2021/08/25
1.6K0
vue3知识点:reactive对比ref
答案:请看官方文档: https://v3.cn.vuejs.org/guide/composition-api-introduction.html
刘大猫
2024/10/31
1340
Vue3学习笔记
①性能的提升 打包大小减少41%、初次渲染快55%,更新渲染快133%,内存减少54%…… ②源码的升级
玖柒的小窝
2021/09/29
8840
Vue3学习笔记
vue3知识点:ref函数
答案:请看官方文档: https://v3.cn.vuejs.org/guide/composition-api-introduction.html
刘大猫
2024/10/31
2740
vue3知识点:Vue3.0中的响应式原理和 vue2.x的响应式
答案:请看官方文档: https://v3.cn.vuejs.org/guide/composition-api-introduction.html
刘大猫
2024/10/31
1320
升级 Vue3 大幅提升开发运行效率
作者:louiszhai,腾讯 IEG 前端开发工程师 Vue3 性能提升了 1.3~2 倍,SSR 性能提升了 2~3 倍,升级 Vue3 正是当下。 背景 原计划 2019 年发布的 Vue3,又经过一年的再次打磨,终于于去年 9 月正式发布。随后,不少 UI 组件库都积极参与适配,去年 12 月,Element-plus(Element-ui 官方升级版)也发布了 beta 版。 由于项目中用到了 Element-ui 组件,组件库未适配的情况下,不敢贸然升级 Vue3。Element-plus
腾讯技术工程官方号
2021/04/27
2K0
vue3知识点:setup
答案:请看官方文档: https://v3.cn.vuejs.org/guide/composition-api-introduction.html
刘大猫
2024/10/31
1350
[科普文] Vue3 到底更新了什么?
Vue3 已经发布一段时间了,这个版本从底层实现到上层 API 设计都发生了非常大的变化,但具体改变了些什么呢?一起简单盘点下:
Tecvan
2022/03/31
1.1K0
[科普文] Vue3 到底更新了什么?
Vue3基础
官方文档:https://cli.vuejs.org/zh/guide/creating-a-project.html#vue-create
六个周
2022/10/28
1K0
Vue3基础
再遇vue之vue3新特性
首先说明一下,vue2和vue3是Vue.js的两个主要版本。目前vue3已经更新到3.3.4的版本了
用户6297767
2023/11/21
5640
再遇vue之vue3新特性
基于 Vue3 和 TS4 项目大量实践后的总结
Vue3出来已经有一段时间了,在团队中,也进行了大量的业务实践,也有了一些自己的思考。
落落落洛克
2021/12/01
6230
vue3知识点:reactive函数
答案:请看官方文档: https://v3.cn.vuejs.org/guide/composition-api-introduction.html
刘大猫
2024/10/31
1710
vue3笔记1 组件配置API
vue2中的数据,计算属性,方法,生命周期钩子等都可以在setup函数中构造 setup函数返回:
路过君
2022/04/13
4120
Vue3 学习笔记 —— (一)深入理解组合式 API
Vue3 是向下兼容 Vue2 API 的,但是 Vue3 中提供了一种全新的 Composition API
Gorit
2021/12/08
7620
Vue3 学习笔记 —— (一)深入理解组合式 API
Vue3 中有哪些值得深究的知识点?
众所周知,前端技术一直更新很快,这不 vue3 也问世这么久了,今天就来给大家分享下vue3中值得注意的知识点。喜欢的话建议收藏,点个关注!
玖柒的小窝
2021/12/07
1.1K0
Vue3 中有哪些值得深究的知识点?
Vue3学习笔记 —— Composition API 快速入门
对比 Vue2 中 OPTIONS API,data 和 methods 的定义如下所示
Gorit
2021/12/08
5480
Vue3学习笔记 —— Composition API 快速入门
Vue3学习笔记(二)——组合式API(Composition API)
官方文档: https://v3.cn.vuejs.org/guide/composition-api-introduction.html
张果
2022/10/31
4.5K0
Vue3学习笔记(二)——组合式API(Composition API)
一口气复习完 Vue3 相关基础知识点
看完你就基本可以上手搞开发了,本文适合Vue初学者,或者Vue2迁移者,当然还是建议Vue3官网完全过一遍。不适合精通原理,源码的大佬们。
前端达人
2021/09/09
2.3K0
一口气复习完 Vue3 相关基础知识点
vue3简易入门剖析
,发音同 “veet”)是一种新型前端构建工具,能够显著提升前端开发体验。它主要由两部分组成:
张哥编程
2024/12/13
4570
【初学者笔记】整理的一些Vue3知识点
拒绝标题党,哈哈哈,看完你就基本可以上手搞开发了,本文适合Vue初学者,或者Vue2迁移者,当然还是建议Vue3官网完全过一遍。不适合精通原理,源码的大佬们。
一尾流莺
2022/12/10
2.4K0
【初学者笔记】整理的一些Vue3知识点
相关推荐
❤️[前端学习]大数据全栈工程师之一文快速上手vue3❤️
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验