消息队列,英文名:Message Queue,经常缩写为MQ。从字面上来理解,消息队列是一种用来存储消息的队列。来看一下下面的代码:
// 1. 创建一个保存字符串的队列 QueuestringQueue = new LinkedList();
// 2. 往消息队列中放入消息 stringQueue.offer( "hello" );
// 3. 从消息队列中取出消息并打印 System.out.println(stringQueue.poll());
上述代码,创建了一个队列,先往队列中添加了一个消息,然后又从队列中取出了一个消息。这说明了队列是可以用来存取消息的。
我们可以简单理解消息队列就是将需要传输的数据存放在队列中。
消息队列中间件就是用来存储消息的软件(组件)。举个例子来理解,为了分析网站的用户行为,我们需要记录用户的访问日志。这些一条条的日志,可以看成是一条条的消息,我们可以将它们保存到消息队列中。将来有一些应用程序需要处理这些日志,就可以随时将这些消息取出来处理。
目前市面上的消息队列有很多,例如:Kafka、RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ等。
消息队列的应用场景
电商网站中,新的用户注册时,需要将用户的信息保存到数据库中,同时还需要额外发送注册的邮件通知、以及短信注册码给用户。但因为发送邮件、发送注册短信需要连接外部的服务器,需要额外等待一段时间,此时,就可以使用消息队列来进行异步处理,从而实现快速响应。
image.png
image.png
1.1.1.1 日志处理(大数据领域常见)
大型电商网站(淘宝、京东、国美、苏宁...)、App(抖音、美团、滴滴等)等需要分析用户行为,要根据用户的访问行为来发现用户的喜好以及活跃情况,需要在页面上收集大量的用户访问信息。
image.png
image.png
image.png
image.png
image.png
点对点模式特点:
每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中)
发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;
image.png
image.png
Kafka是由Apache软件基金会开发的一个开源流平台,由Scala和Java编写。Kafka的Apache官网是这样介绍Kakfa的。
Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:
1. 发布和订阅流数据流,类似于消息队列或者是企业消息传递系统
2. 以容错的持久化方式存储数据流
处理数据流
1. Publish and subscribe:发布与订阅
2. Store:存储
3. Process:处理
我们通常将Apache Kafka用在两类程序:
1. 建立实时数据管道,以可靠地在系统或应用程序之间获取数据
2. 构建实时流应用程序,以转换或响应数据流
image.png
上图,我们可以看到:
1. Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。
2. Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。
3. Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到
数据库中。
4. Stream Processors:流处理器可以Kafka中拉取数据,也可以将数据写入到Kafka中。
Kafka比ActiveMQ牛逼得多
特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
---|---|---|---|---|
所属社区/公司 | Apache | Mozilla Public License | Apache | Apache/Ali |
成熟度 | 成熟 | 成熟 | 成熟 | 比较成熟 |
生产者-消费者模式 | 支持 | 支持 | 支持 | 支持 |
发布-订阅 | 支持 | 支持 | 支持 | 支持 |
REQUEST-REPLY | 支持 | 支持 | - | 支持 |
API完备性 | 高 | 高 | 高 | 低(静态配置) |
多语言支持 | 支持JAVA优先 | 语言无关 | 支持,JAVA优先 | 支持 |
单机呑吐量 | 万级(最差) | 万级 | 十万级 | 十万级(最高) |
消息延迟 | - | 微秒级 | 毫秒级 | - |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 高 |
消息丢失 | - | 低 | 理论上不会丢失 | - |
消息重复 | - | 可控制 | 理论上会有重复 | - |
事务 | 支持 | 不支持 | 支持 | 支持 |
文档的完备性 | 高 | 高 | 高 | 中 |
提供快速入门 | 有 | 有 | 有 | 无 |
首次部署难度 | - | 低 | 中 | 高 |
可以注意到Kafka的版本号为:kafka_2.12-2.4.1,因为kafka主要是使用scala语言开发的,2.12为scala的版本号。http://kafka.apache.org/downloads可以查看到每个版本的发布时间。
image.png
image.png
image.png
image.png
image.png
目录名称 | 说明 |
---|---|
bin | Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等 |
config | Kafka的所有配置文件 |
libs | 运行Kafka所需要的所有JAR包 |
logs | Kafka的所有日志文件,如果Kafka出现一些问题,需要到该目录中去查看异常信息 |
site-docs | Kafka的网站帮助文件 |
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
基于1个分区1个副本的基准测试
测试步骤:
1. 启动Kafka集群
2. 创建一个1个分区1个副本的topic: benchmark
3. 同时运行生产者、消费者基准测试程序
4. 观察结果
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
<**repositories**><!-- 代码库 -->
<**repository**>
<**id**>central</**id**>
<**url**>http://maven.aliyun.com/nexus/content/groups/public//</**url**>
<**releases**>
<**enabled**>true</**enabled**>
</**releases**>
<**snapshots**>
<**enabled**>true</**enabled**>
<**updatePolicy**>always</**updatePolicy**>
<**checksumPolicy**>fail</**checksumPolicy**>
</**snapshots**>
</**repository**>
</**repositories**>
<**dependencies**>
<!-- kafka客户端工具 -->
<**dependency**>
<**groupId**>org.apache.kafka</**groupId**>
<**artifactId**>kafka-clients</**artifactId**>
<**version**>2.4.1</**version**>
</**dependency**>
<!-- 工具类 -->
<**dependency**>
<**groupId**>org.apache.commons</**groupId**>
<**artifactId**>commons-io</**artifactId**>
<**version**>1.3.2</**version**>
</**dependency**>
<!-- SLF桥接LOG4J日志 -->
<**dependency**>
<**groupId**>org.slf4j</**groupId**>
<**artifactId**>slf4j-log4j12</**artifactId**>
<**version**>1.7.6</**version**>
</**dependency**>
<!-- SLOG4J日志 -->
<**dependency**>
<**groupId**>log4j</**groupId**>
<**artifactId**>log4j</**artifactId**>
<**version**>1.2.16</**version**>
</**dependency**>
</**dependencies**>
<**build**>
<**plugins**>
<**plugin**>
<**groupId**>org.apache.maven.plugins</**groupId**>
<**artifactId**>maven-compiler-plugin</**artifactId**>
<**version**>3.7.0</**version**>
<**configuration**>
<**source**>1.8</**source**>
<**target**>1.8</**target**>
</**configuration**>
</**plugin**>
</**plugins**>
</**build**>
将log4j.properties配置文件放入到resources文件夹中
**log4j.rootLogger**=**INFO,stdout****
****log4j.appender.stdout**=**org.apache.log4j.ConsoleAppender****
****log4j.appender.stdout.layout**=**org.apache.log4j.PatternLayout****
****log4j.appender.stdout.layout.ConversionPattern**= **%5p - %m%n**
image.png
**public class** KafkaProducerTest {
**public static void** main(String[] args) {
// 1. 创建用于连接Kafka的Properties配置
Properties props = **new** Properties();
props.put( **"bootstrap.servers"** , **"192.168.88.100:9092"** );
props.put( **"acks"** , **"all"** );
props.put( **"key.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );
props.put( **"value.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );
// 2. 创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = **new** KafkaProducer<String, String>(props);
// 3. 调用send发送1-100消息到指定Topic test
**for**(**int** i = 0; i < 100; ++i) {
**try** {
// 获取返回值Future,该对象封装了返回值
Future<RecordMetadata> future = producer.send(**new** ProducerRecord<String, String>( **"test"** , **null**, i + **""** ));
// 调用一个Future.get()方法等待响应
future.get();
} **catch** (InterruptedException e) {
e.printStackTrace();
} **catch** (ExecutionException e) {
e.printStackTrace();
}
}
// 5. 关闭生产者
producer.close();
}
}
image.png
image.png
**public class** KafkaProducerTest {
**public static void** main(String[] args) {
// 1. 创建用于连接Kafka的Properties配置
Properties props = **new** Properties();
props.put( **"bootstrap.servers"** , **"node1.itcast.cn:9092"** );
props.put( **"acks"** , **"all"** );
props.put( **"key.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );
props.put( **"value.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );
// 2. 创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = **new** KafkaProducer<String, String>(props);
// 3. 调用send发送1-100消息到指定Topic test
**for**(**int** i = 0; i < 100; ++i) {
**try** {
// 获取返回值Future,该对象封装了返回值
Future<RecordMetadata> future = producer.send(**new** ProducerRecord<String, String>( **"test"** , **null**, i + **""** ));
// 调用一个Future.get()方法等待响应
future.get();
} **catch** (InterruptedException e) {
e.printStackTrace();
} **catch** (ExecutionException e) {
e.printStackTrace();
}
}
// 5. 关闭生产者
producer.close();
}
}
**public class** KafkaProducerTest {
**public static void** main(String[] args) {
// 1. 创建用于连接Kafka的Properties配置
Properties props = **new** Properties();
props.put( **"bootstrap.servers"** , **"node1.itcast.cn:9092"** );
props.put( **"acks"** , **"all"** );
props.put( **"key.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );
props.put( **"value.serializer"** , **"org.apache.kafka.common.serialization.StringSerializer"** );
// 2. 创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = **new** KafkaProducer<String, String>(props);
// 3. 调用send发送1-100消息到指定Topic test
**for**(**int** i = 0; i < 100; ++i) {
// 一、同步方式
// 获取返回值Future,该对象封装了返回值
// Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
// 调用一个Future.get()方法等待响应
// future.get();
// 二、带回调函数异步方式
producer.send(**new** ProducerRecord<String, String>( **"test"** , **null**, i + **""** ), **new** Callback() {
@Override
**public void** onCompletion(RecordMetadata metadata, Exception exception) {
**if**(exception != **null**) {
System.***out***.println( **"** **发送消息出现异常** **"** );
}
**else** {
String topic = metadata.topic();
**int** partition = metadata.partition();
**long** offset = metadata.offset();
System.***out***.println( **"** **发送消息到** **Kafka** **中的名字为** **"** + topic + **"** **的主题,第** **"** + partition + **"** **分区,第** **"** + offset + **"** **条数据成功** **!"** );
}
}
});
}
// 5. 关闭生产者
producer.close();
}
}
image.png
image.png
image.png
ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)
ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker。
Kafka正在逐步想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据
生产者负责将数据推送给broker的topic
消费者负责从broker的topic中拉取数据,并自己进行处理
image.png
image.png
image.png
image.png
image.png
image.png
// 3. 发送1-100数字到Kafka的test主题中
**while**(**true**) {
**for** (**int** i = 1; i <= 100; ++i) {
// 注意:send方法是一个异步方法,它会将要发送的数据放入到一个buffer中,然后立即返回
// 这样可以让消息发送变得更高效
producer.send(**new** ProducerRecord<>( **"test"** , i + **""** ));
}
Thread.*sleep*(3000);
}
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
*// 1. 创建消费者*
**public** **static** Consumer **<** String, String **>** createConsumer() {
*// 1. 创建Kafka消费者配置*
Properties **props** **=** **new** Properties();
**props**.setProperty( **"** **bootstrap.servers** **"** , **"** **node1.itcast.cn:9092** **"** );
**props**.setProperty( **"** **group.id** **"** , **"** **ods_user** **"** );
**props**.put( **"** **isolation.level** **"** , **"** **read_committed** **"** );
**props**.setProperty( **"** **enable.auto.commit** **"** , **"** **false** **"** );
**props**.setProperty( **"** **key.deserializer** **"** , **"** **org.apache.kafka.common.serialization.StringDeserializer** **"** );
**props**.setProperty( **"** **value.deserializer** **"** , **"** **org.apache.kafka.common.serialization.StringDeserializer** **"** );
*// 2. 创建Kafka消费者*
KafkaConsumer<String, String> **consumer** **=** **new** KafkaConsumer<>(props);
*// 3. 订阅要消费的主题*
**consumer**.subscribe(**Arrays**.asList( **"** **ods_user** **"** ));
**return** consumer;
}
编写一个方法 createProducer,返回一个生产者对象。注意:需要配置事务的id,开启了事务会默认开启幂等性。
image.png
image.png
实现步骤:
1. 调用之前实现的方法,创建消费者、生产者对象
2. 生产者调用initTransactions初始化事务
3. 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic
(1) 生产者开启事务
(2) 消费者拉取消息
(3) 遍历拉取到的消息,并进行预处理(将1转换为男,0转换为女)
(4) 生产消息到dwd_user topic中
(5) 提交偏移量到事务中
(6) 提交事务
(7) 捕获异常,如果出现异常,则取消事务
**public** **static** void main(String[] args) {
Consumer<String, String> **consumer** **=** createConsumer();
Producer<String, String> **producer** **=** createProducer();
*// 初始化事务*
**producer**.initTransactions();
**while**(true) {
**try** {
*// 1. 开启事务*
**producer**.beginTransaction();
*// 2. 定义Map结构,用于保存分区对应的offset*
Map<TopicPartition, OffsetAndMetadata> **offsetCommits** **=** **new** HashMap<>();
*// 2. 拉取消息*
ConsumerRecords<String, String> **records** **=** **consumer**.poll(**Duration**.ofSeconds(2));
**for** (ConsumerRecord<String, String> **record** **:** records) {
*// 3. 保存偏移量*
**offsetCommits**.put(**new** TopicPartition(**record**.topic(), **record**.partition()),
**new** OffsetAndMetadata(**record**.offset() + 1));
*// 4. 进行转换处理*
String[] **fields** **=** **record**.value().split( **"** **,** **"** );
fields[1] **=** fields[1].equalsIgnoreCase( **"** **1** **"** ) **?** **"** **男** **"** **:** **"** **女** **"** ;
String **message** **=** fields[0] **+** **"** **,** **"** **+** fields[1] **+** **"** **,** **"** **+** fields[2];
*// 5. 生产消息到dwd_user*
**producer**.send(**new** ProducerRecord<>( **"** **dwd_user** **"** , message));
}
*// 6. 提交偏移量到事务*
**producer**.sendOffsetsToTransaction(offsetCommits, **"** **ods_user** **"** );
*// 7. 提交事务*
**producer**.commitTransaction();
} **catch** (Exception **e**) {
*// 8. 放弃事务*
**producer**.abortTransaction();
}
}
}
image.png
image.png
生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中
1. 轮询分区策略
2. 随机分区策略
3. 按key分区分配策略
4. 自定义分区策略
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
指标 | 意义 |
---|---|
Brokers Spread | broker使用率 |
Brokers Skew | 分区是否倾斜 |
Brokers Leader Skew | leader partition是否存在倾斜 |
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
指标**** | 单分区单副本(ack=0)**** | 单分区单副本(ack=1)**** | 单分区单副本(ack=-1/all)**** |
---|---|---|---|
吞吐量 | 165875.991109/s每秒16.5W条记录 | 93092.533979/s每秒9.3W条记录 | 73586.766156 /s每秒7.3W调记录 |
吞吐速率 | 158.19 MB/sec | 88.78 MB/sec | 70.18 MB |
平均延迟时间 | 192.43 ms | 346.62 ms | 438.77 ms |
最大延迟时间 | 670.00 ms | 1003.00 ms | 1884.00 ms |
image.png
image.png
指标**** | 单分区单副本(ack=0)**** | 单分区单副本(ack=1)**** |
---|---|---|
吞吐量 | 165875.991109 records/sec每秒16.5W条记录 | 93092.533979 records/sec每秒9.3W条记录 |
吞吐速率 | 158.19 MB/sec每秒约160MB数据 | 88.78 MB/sec每秒约89MB数据 |
平均延迟时间 | 192.43 ms avg latency | 346.62 ms avg latency |
最大延迟时间 | 670.00 ms max latency | 1003.00 ms max latency |