Kafka 是一个分布式流处理平台,用于构建实时数据管道和流应用。它能够高效地处理大量数据,并且具有良好的扩展性和容错性。Kafka 主题(Topic)是 Kafka 中消息的分类单位,生产者将消息发送到特定的主题,消费者从主题中读取消息。
Kafka 主题可以分为两种类型:
Kafka 适用于以下场景:
在 Python 中,可以使用 confluent_kafka
库来消费 Kafka 主题的消息。以下是一个示例代码,展示如何获取 Kafka 主题的最新消息:
from confluent_kafka import Consumer, KafkaException
def create_consumer(broker, group_id, topic):
conf = {
'bootstrap.servers': broker,
'group.id': group_id,
'auto.offset.reset': 'latest' # 从最新的消息开始消费
}
consumer = Consumer(conf)
consumer.subscribe([topic])
return consumer
def consume_messages(consumer, timeout=1.0):
try:
while True:
msg = consumer.poll(timeout)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
if __name__ == "__main__":
broker = 'localhost:9092' # Kafka 代理地址
group_id = 'my-group' # 消费者组ID
topic = 'my-topic' # 主题名称
consumer = create_consumer(broker, group_id, topic)
consume_messages(consumer)
通过以上步骤和示例代码,你应该能够成功获取 Kafka 主题的最新消息。如果遇到其他问题,请参考相关文档或社区支持。
领取专属 10元无门槛券
手把手带您无忧上云