Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布

python多进程-kafka异步消息处理

背景:

现有资源及框架是基于python语言编写;部分松耦合资源的串行导致效率较低;

思路:

核心思路:采用数据分片,将存在冲突的数据分到不同数据块上,通过数据块之间的并行及数据块内的串行,尽可能使中间过程并行;提高整体速度 ,就kafka而言,可以帮助解耦部分中间过程处理,下面给出python下生产者与消费者使用的demo,注意:消费者是采用的multiprocessing.Process非线程

import threading, logging, timeimport multiprocessingfrom kafka import KafkaConsumer, KafkaProducer# 生产者 基于线程class Producer(threading.Thread):def __init__(self): threading.Thread.__init__(self) self.stop_event = threading.Event() def stop(self): self.stop_event.set() def run(self): producer = KafkaProducer(bootstrap_servers='localhost:9092') while not self.stop_event.is_set(): producer.send('my-topic', b"test") producer.send('my-topic', b"Hola, mundo!") time.sleep(1) producer.close()

# 消费者 基于多进程class Consumer(multiprocessing.Process):def __init__(self): multiprocessing.Process.__init__(self) self.stop_event = multiprocessing.Event() def stop(self): self.stop_event.set() def run(self): consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest', consumer_timeout_ms=1000) consumer.subscribe(['my-topic']) while not self.stop_event.is_set(): for message in consumer: print(message) if self.stop_event.is_set(): break consumer.close()

#main方法中具体进行具体消息的生产及消费时,实例化生产/消费者后,依次调用.start() .stop() .join()方法

具体参见git:https://github.com/dpkp/kafka-python下的example.py文件

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190320A08N7V00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。
交个朋友
加入腾讯云官网粉丝站
蹲全网底价单品 享第一手活动信息
领券
首页
学习
活动
专区
圈层
工具
MCP广场