前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >【赵渝强老师】Kafka生产者的消息发送方式

【赵渝强老师】Kafka生产者的消息发送方式

原创
作者头像
赵渝强老师
发布2025-02-23 11:44:56
发布2025-02-23 11:44:56
660
举报
文章被收录于专栏:大数据技术大数据技术

Kafka生产者有三种方式进行消息的发送,这三种方式区别在于对于消息是否正常到达的处理。视频讲解如下:

下面分别介绍生产者的这三种消息发送方式。

第一种:fire-and-forget

该方式把消息发送给Kafka的Broker之后不关心其是否正常到达。在大多数情况下消息会正常到达,即使出错了生产者也会自动重试。但这种方式可能造成Kafka Broker没有接收到生产者的消息。因此这种方式适用于允许消息的丢失、并对吞吐量要求大的场景,比如用户点击日志上传。代码如下:

代码语言:java
复制
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerDemo {

	public static void main(String[] args) throws InterruptedException {
		Properties props = new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
				"localhost:9092");
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
				"org.apache.kafka.common.serialization.StringSerializer");
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
				"org.apache.kafka.common.serialization.StringSerializer");

		// 发送方式:fire-and-forget
		Producer<String, String> producer = new KafkaProducer<String, String>(props);
		for (int i = 0; i < 10; i++) {
			producer.send(new ProducerRecord<String, String>("mytopic1", "key" + i, "value" + i));
			Thread.sleep(1000);
		}
		producer.close();
	}
}

第二种:同步发送

生产者使用send方法发送一条消息,该方法会返回一个Future对象。调用该对象的get方法可以阻塞当前线程并等待返回。这种方式适用对消息可靠性要求高的场景,比如支付的场景。在这种场景下要求消息不可丢失,如果丢失了将回滚相关的业务操作。代码如下:

代码语言:java
复制
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class KafkaProducerDemo {

	public static void main(String[] args) throws Exception {
		Properties props = new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
				"localhost:9092");
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
				"org.apache.kafka.common.serialization.StringSerializer");
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
				"org.apache.kafka.common.serialization.StringSerializer");

		// 发送方式:同步发送
		Producer<String, String> producer = new KafkaProducer<String, String>(props);
		for (int i = 0; i < 10; i++) {
			RecordMetadata metadata = 
					producer.send(new ProducerRecord<String, String>
					             ("mytopic1", "key" + i, "value" + i)).get();
			System.out.println("同步消息发送成功:" + i);
		}
		producer.close();
	}
}

第三种:异步发送

生产者使用send方法发送一条消息时指定回调函数,在Kafka Broker返回结果时调用。这个回调函数可以进行错误日志的记录或者重试。这种方式牺牲了一部分可靠性,但是吞吐量会比同步发送高很多。代码如下:

代码语言:java
复制
import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class KafkaProducerDemo {

	public static void main(String[] args) throws Exception {
		Properties props = new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
				"localhost:9092");
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
				"org.apache.kafka.common.serialization.StringSerializer");
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
				"org.apache.kafka.common.serialization.StringSerializer");

		// 发送方式:异步发送
		Producer<String, String> producer = new KafkaProducer(props);
		for (int i = 0; i < 10; i++) {
			producer.send(new ProducerRecord<String, String>("mytopic1", "key" + i, "value" + i),new Callback() {
				
				@Override
				public void onCompletion(RecordMetadata metadata, Exception exception) {
					// 回调函数
	                if(exception != null) {
	                    System.out.println("消息异步发送出现错误!!!");
	                    exception.printStackTrace();
	                } else {
	                    System.out.println("消息异步发送成功。 " +
	                           "Topic: " + metadata.topic() + 
	                           ", Partition: " + metadata.partition() +
	                           ", Offset: " + metadata.offset());
	                }
				}
			});
		}
		producer.close();
	}
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 第一种:fire-and-forget
  • 第二种:同步发送
  • 第三种:异步发送
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档