前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >玩转Kafka的生产者——分区器与多线程

玩转Kafka的生产者——分区器与多线程

作者头像
Janti
发布2018-08-20 17:40:02
1.7K0
发布2018-08-20 17:40:02
举报
文章被收录于专栏:Janti

上篇文章学习kafka的基本安装和基础概念,本文主要是学习kafka的常用API。其中包括生产者和消费者,

多线程生产者,多线程消费者,自定义分区等,当然还包括一些避坑指南。

 首发于个人网站:链接地址

准备工作

kafka版本:2.11-1.1.1

操作系统:centos7

java:jdk1.8

有了以上这些条件就OK了,具体怎么安装和启动Kafka这里就不强调了,可以看上一篇文章。

新建一个maven工程,需要的依赖如下:

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
</dependency>

主题管理

kafka的核心就是主题,学会使用kafka的脚本创建主题,也需要学习使用Java API来创建主题。

Kafka将zookeeper的操作封装成一个ZkUtils类,通过AdminUtils类来调用ZkUtils,来实现Kafka中元数据的操作。

下面一个例子是使用AdminUtils来创建主题,并同时创建指定大小的分区数。

代码语言:javascript
复制
 1     // 连接配置
 2     private static final String ZK_CONNECT = "10.0.90.53:2181";
 3 
 4     // session过期时间
 5     private static final int SEESSION_TIMEOUT = 30 * 1000;
 6 
 7     // 连接超时时间
 8     private static final int CONNECT_TIMEOUT = 30 * 1000;
 9 
10     /**
11      * 创建主题
12      *
13      * @param topic 主题名称
14      * @param partition 分区数
15      * @param repilca 副本数
16      * @param properties 配置信息
17      */
18     public static void createTopic(String topic, int partition, int repilca, Properties properties) {
19         ZkUtils zkUtils = null;
20         try {
21             // 创建zkutil
22             zkUtils = ZkUtils.apply(ZK_CONNECT, SEESSION_TIMEOUT, CONNECT_TIMEOUT, JaasUtils.isZkSecurityEnabled());
23             if (!AdminUtils.topicExists(zkUtils, topic)) {
24                 //主题不存在,则创建主题
25                 AdminUtils.createTopic(zkUtils, topic, partition, repilca, properties, AdminUtils.createTopic$default$6());
26             }
27         } catch (Exception e) {
28             e.printStackTrace();
29         } finally {
30             zkUtils.close();
31         }
32     }

执行该方法,创建主题,

在centos7中查看之前创建的主题:

代码语言:javascript
复制
bin/kafka-topics.sh --list --zookeeper localhost:2181  

删除主题:

代码语言:javascript
复制
/**
 * 删除主题
 *
 * @param topic
 */
public static void deleteTopic(String topic){
    ZkUtils zkUtils = null;
    try {
        zkUtils = ZkUtils.apply(ZK_CONNECT, SEESSION_TIMEOUT, CONNECT_TIMEOUT, JaasUtils.isZkSecurityEnabled());
        AdminUtils.deleteTopic(zkUtils,topic);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        zkUtils.close();
    }
}

生产者API

在掌握了创建和删除主题之后,接下来,学习Kafka的生产者API。

Kafka中的生产者,通过KafkaProducer这个类来实现的,在介绍这个类的使用之前,首先介绍kafka的配置项,这也是实际生产中比较关心的。

消息发送流程

实例化生产者时,有三个配置是必须指定的:

  • bootstrap.servers:配置连接代理列表,不必包含Kafka集群的所有代理地址,当连接上一个代理后,会从集群元数据信息中获取其他存活的代理信息。但为了保证能够成功连上Kafka集群,在多代理集群的情况下,建议至少配置两个代理。
  • (由于电脑配置有限,本文实验的是单机情况) key.serializer : 用于序列化消息Key的类
  • value.serializer :用于序列化消息值(Value)的类

向Kafka发送一个消息,基本上要经过以下的流程:

1.配置Properties对象,这个是必须的

2.实例化KafkaProducer对象

3.实例化ProducerRecord对象,每条消息对应一个ProducerRecord对象

4.调用KafkaProducer的send方法,发送消息。发送消息有两种,一种是带回调函数的(如果发送消息有异常,会在回调函数中返回),另一种是不带回调函数的。

KafkaProducer默认是异步发送消息,首先它会将消息缓存到消息缓冲区中,当缓存区累积到一定数量时,将消息封装成一个

RecordBatch,统一发送消息。也就是说,发送消息实质上分为两个阶段,第一将消息发送到消息缓冲区,第二执行网络I/O操作

5.关闭KafkaProducer,释放连接的资源。

了解以上的流程,那么接下来就实现Java版本的API。

代码实例

第一步:

新建一个消息实体类,模拟支付订单消息,包含消息的ID,商家名称,创建时间,备注。

代码语言:javascript
复制
public class OrderMessage {

    // 订单ID
    private String id;

    // 商家名称
    private String sName;

    // 创建时间
    private long createTime;

    // 备注
    private String remake;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getsName() {
        return sName;
    }

    public void setsName(String sName) {
        this.sName = sName;
    }

    public long getCreateTime() {
        return createTime;
    }

    public void setCreateTime(long createTime) {
        this.createTime = createTime;
    }

    public String getRemake() {
        return remake;
    }

    public void setRemake(String remake) {
        this.remake = remake;
    }

    @Override
    public String toString() {
        return "OrderMessage{" +
                "id='" + id + '\'' +
                ", sName='" + sName + '\'' +
                ", createTime=" + createTime +
                ", remake='" + remake + '\'' +
                '}';
    }
}

第二步:

这里简单的发送一个消息demo,按照上面的流程,生产者例子如下:

代码语言:javascript
复制
package kafka.producer;

import kafka.OrderMessage;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.UUID;

/**
 * kafka生产者
 */
public class ProducerSimpleDemo {
    static Properties properties = new Properties();

    //主题名称
    static String topic = "myTopic";

    //生产者
    static KafkaProducer<String, String> producer = null;

    //生产者配置
    static {
        properties.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.90.53:9092");
        properties.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(properties);
    }

    public static void main(String args[]) throws Exception {
        sendMsg();
    }

    /**
     * 发送消息
     *
     * @throws Exception
     */
    public static void sendMsg() throws Exception {
        ProducerRecord<String, String> record = null;
        try {
            // 循环发送一百条消息
            for (int i = 0; i < 10; i++) {
                // 构造待发送的消息
                OrderMessage orderMessage = new OrderMessage();
                orderMessage.setId(UUID.randomUUID().toString());
                long timestamp = System.nanoTime();
                orderMessage.setCreateTime(timestamp);
                orderMessage.setRemake("remind");
                orderMessage.setsName("test");
                // 实例化ProducerRecord
                record = new ProducerRecord<String, String>(topic, timestamp + "", orderMessage.toString());
                producer.send(record, (metadata, e) -> {
                    // 使用回调函数
                    if (null != e) {
                        e.printStackTrace();
                    }
                    if (null != metadata) {
                        System.out.println(String.format("offset: %s, partition:%s, topic:%s  timestamp:%s", metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp()));
                    }
                });
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

运行,结果就出现了,异常。

异常记录:

代码语言:javascript
复制
2018-07-30 18:05:10.755 DEBUG 10272 --- [ad | producer-1] o.apache.kafka.common.network.Selector   : Connection with localhost/127.0.0.1 disconnected
 
java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_111]
    at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[na:1.8.0_111]
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51) ~[kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73) ~[kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:291) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:148) [kafka-clients-0.10.1.1.jar:na]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_111]

可以看到报错第一句显示:Connection with localhost/127.0.0.1 disconnected

但是可以看到自己的配置是正确的。

这里需要在kafka中修改部分配置:

在配置kafka中,首先需要修改kafka的配置server.properties中的

代码语言:javascript
复制
 advertised.listeners=PLAINTEXT://:your.host.name:9092

翻译过来就是hostname和端口是用来建议给生产者和消费者使用的。

如果没有设置,将会使用listeners的配置,如果listeners也没有配置,将使用java.net.InetAddress.getCanonicalHostName()来获取这个hostname和port,对于ipv4,基本就是localhost了。

"PLAINTEXT"表示协议,可选的值有PLAINTEXT和SSL,hostname可以指定IP地址,也可以用"0.0.0.0"表示对所有的网络接口有效,如果hostname为空表示只对默认的网络接口有效

也就是说如果你没有配置advertised.listeners,就使用listeners的配置通告给消息的生产者和消费者,这个过程是在生产者和消费者获取源数据(metadata)。

修改之后:

代码语言:javascript
复制
advertised.listeners=PLAINTEXT://10.0.90.53:9092

需要注意的是,如果Kafka有多个节点,那么需要每个节点都按照这个节点的实际hostname和port情况进行设置。

修改完毕,重启Kafka服务,开启消费者,接受消息,在服务器中输入:

代码语言:javascript
复制
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic myTopic --from-beginning

可以看到服务器中的消费者:

成功接收到消息。之前提到过在生产者有回调函数,来看看回调函数的输出:

代码语言:javascript
复制
offset: 0, partition:0, topic:myTopic  timestamp:1533199115840
offset: 1, partition:0, topic:myTopic  timestamp:1533199115850
offset: 2, partition:0, topic:myTopic  timestamp:1533199115850
offset: 3, partition:0, topic:myTopic  timestamp:1533199115850
offset: 4, partition:0, topic:myTopic  timestamp:1533199115850
offset: 5, partition:0, topic:myTopic  timestamp:1533199115850
offset: 6, partition:0, topic:myTopic  timestamp:1533199115850
offset: 7, partition:0, topic:myTopic  timestamp:1533199115852
offset: 8, partition:0, topic:myTopic  timestamp:1533199115852
offset: 9, partition:0, topic:myTopic  timestamp:1533199115852

打印出了偏移值,分区,主题,和时间戳。说明发送成功了。到此就完成第一个Helloworld操作了。

我们可以看到回调函数返回的消息,怎么都在一个分区中呢?下面来研究分区器。

自定义分区器

Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。

这个在单机上的提高,对于集群,Kafka使用了分区,将topic的消息分散到多个分区上,并保存在不同的机器上。

但是是否分区越多,效率越高呢?也不尽然!

1.每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。

2.消费者和生产者都会为分区缓存消息,分区越多,缓存的消息就越多,占用的内存就越大。

3.降低高可用,Kafka是通过高可用来实现高可用性的。我们知道在集群中往往会有一个leader,假设集群中有10个Kafka进程,1个leader,9个follwer,如果一个leader挂了,那么就会重新选出一个leader,如果集群中有10000个分区,那么将要花费很长的时间,这对于高可用是有损耗的。

本身kafka有自己的分区策略的,如果未指定,就会使用默认的分区策略:

Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions。如果Key相同的话,那么就会分配到统一分区。

Kafka提供了自定义的分区器,只要实现Partitioner接口即可,下面是自定义分区的例子:

代码语言:javascript
复制
package kafka.partition;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * 自定义分区器
 */
public class PartitionUtil implements Partitioner {

    // 分区数
    private static final Integer PARTITION_NUM = 6;

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (null == key){
            return 0;
        }
        String keyValue = String.valueOf(key);
        // key取模
        int partitionId = (int) (Long.valueOf(key.toString())%PARTITION_NUM);
        return partitionId;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

还是刚才分区的代码,只要在之前的配置中加上

代码语言:javascript
复制
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionUtil.class.getName());

运行生产者,回调函数打印如下:

代码语言:javascript
复制
offset: 3, partition:5, topic:MyOrder  timestamp:1533205894785
offset: 5, partition:3, topic:MyOrder  timestamp:1533205893202
offset: 6, partition:3, topic:MyOrder  timestamp:1533205894784
offset: 2, partition:2, topic:MyOrder  timestamp:1533205894785
offset: 4, partition:1, topic:MyOrder  timestamp:1533205894785
offset: 5, partition:1, topic:MyOrder  timestamp:1533205894785
offset: 5, partition:0, topic:MyOrder  timestamp:1533205894784
offset: 6, partition:0, topic:MyOrder  timestamp:1533205894784
offset: 7, partition:0, topic:MyOrder  timestamp:1533205894785
offset: 8, partition:0, topic:MyOrder  timestamp:1533205894786

分区成功了,在实际生产过程中,可以根据项目的实际需要进行分区设计。

线程池生产者

在实际生产过程中,通常消息数量是比较多的,就可以考虑使用线程池。

使用线程池发送消息时,要考虑两点:1.需要结合实际情况,合理设计线程池的大小;2.使用线程池时,消息的发送是无序的,如果对消息的顺序有要求,不建议使用。

如果使用线程池,建议是只实例化一个KafkaProducer对象,这样性能最好。代码如下:

首先写一个线程类:

代码语言:javascript
复制
package kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * 生产者线程
 */
public class ProducerThread implements Runnable {

    private KafkaProducer<String, String> producer = null;
    private ProducerRecord<String, String> record = null;

    public ProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
        this.producer = producer;
        this.record = record;
    }

    @Override
    public void run() {
        producer.send(record, (metadata, e) -> {
            if (null != e) {
                e.printStackTrace();
            }
            if (null != metadata) {
                System.out.println("消息发送成功 :         "+String.format("offset: %s, partition:%s, topic:%s  timestamp:%s",
                        metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp()));
            }
        });
    }

}

接着完成启动类,启动类中自定义了一个线程池,这里还是有一些遐思,就是没有自定义,线程创建工厂,没有指定创建的线程名称,在实际生产中,最好是自定义线程工厂。

代码如下:

代码语言:javascript
复制
package kafka.producer;

import kafka.OrderMessage;
import kafka.partition.PartitionUtil;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.*;

/**
 * 线程池生产者
 *
 * @author tangj
 * @date 2018/7/29 20:15
 */
public class ProducerDemo {
    static Properties properties = new Properties();

    static String topic = "MyOrder";

    static KafkaProducer<String, String> producer = null;

    // 核心池大小
    static int corePoolSize = 5;

    // 最大值
    static int maximumPoolSize = 20;

    // 无任务时存活时间
    static long keepAliveTime = 60;

    // 时间单位
    static TimeUnit timeUnit = TimeUnit.SECONDS;

    // 阻塞队列
    static BlockingQueue blockingQueue = new LinkedBlockingQueue();

    // 线程池
    static ExecutorService service = null;

    static {
        // 配置项
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.90.53:9092");
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionUtil.class.getName());
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(properties);
        // 初始化线程池
        service = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, blockingQueue);
    }

    public static void main(String args[]) throws Exception {
        for (int i = 0; i < 6; i++) {
            service.submit(createMsgTask());
        }
    }


    /**
     * 生产消息
     *
     * @return
     */
    public static ProducerThread createMsgTask() {
        OrderMessage orderMessage = new OrderMessage();
        orderMessage.setId(UUID.randomUUID().toString());
        long timestamp = System.nanoTime();
        orderMessage.setCreateTime(timestamp);
        orderMessage.setRemake("rem");
        orderMessage.setsName("test");
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, timestamp + "", orderMessage.toString());
        ProducerThread task = new ProducerThread(producer, record);
        return task;
    }

}

 总结

对于Kafka的分区器和多线程生成者,切记一点,一定要根据实际业务进行设计。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-08-09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 准备工作
  • 主题管理
  • 生产者API
    • 消息发送流程
      • 代码实例
      • 自定义分区器
      • 线程池生产者
      •  总结
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档