Kafka 链路追踪

最近更新时间:2022-06-21 14:50:45

我的收藏
Kafka 的链路追踪能力通过 Spring Boot 的自动配置方式实现。

1.16.0-Edgware 版本说明

实现方式是向 bean 容器中各加入了一个 producerFactory 和 consumerFactory 类的 bean。他们各自分别重写了 createKafkaProducer 方法和 createKafkaConsumer 方法。使用该 producerFactory/consumerFactory 便具有了链路追踪的能力。建议使用 Spring 默认的 KafkaTemplate,无须自定义。
1.16.0-Edgware 版使用的 spring-kafka 版本必须在1.3.0以上,因为从这个版本开始 kafka-client 中增加了请求头,这才能方便的实现链路追踪。
<!--支持 kafka 使用调用链-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.0.RELEASE</version>
</dependency>

参见配置

#============== kafka ===================
# 指定 kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=

#=============== provider =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

# 指定消息 key 和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer =======================
# 指定默认消费者 Group ID
spring.kafka.consumer.group-id=test-consumer-group

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# 指定消息 key 和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
其他更详细配置、使用方法详见 Demo 示例

1.16.0-Finchley 版本说明

实现方式是向 Spring 容器中各增加了一个对切面,分别作用于 ProducerFactory.createProducer 方法和 ConsumerFactory.createConsumer 方法的,使得通过他们返回的 KafkaProducer 和 KafkaConsumer 具有链路追踪的能力。
使用 1.16.0-Finchley 版本无须考虑 spring-kafka 版本问题,无须指定它的版本,因为底层依赖的 Finchley 版本的 spring cloud 会自动为它配置高于 1.3.0.RELEASE 版本的 spring-kafka。
<!--支持kafka使用调用链-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

参见配置

server:
servlet:
context-path: /
port: xxxxx
spring:
application:
name: xxxxxx
kafka:
bootstrap-servers: xxxxxx
#生产者的配置,大部分我们可以使用默认的,这里列出几个比较重要的属性
producer:
#每批次发送消息的数量
batch-size: 16
#设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
retries: 0
#producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。
buffer-memory: 33554432
#key序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#消费者的配置
consumer:
#Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 **latest, earliest, none**
auto-offset-reset: latest
#是否开启自动提交
enable-auto-commit: true
#自动提交的时间间隔
auto-commit-interval: 100
#key的解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#value的解码方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#在/usr/local/etc/kafka/consumer.properties中有配置
group-id: test-consumer-group
tsf:
swagger:
enabled: false
logging:
file: /tsf-demo-logs/${spring.application.name}/root.log
其他更详细配置、使用方法详见 Demo 示例

Kafka 批量消费消息场景

Kafka 消费单条消息时,接收的调用链信息在同个线程中自动传递。
Kafka 批量消费消息时,需要手动将调用链信息注入,从而继续传递调用链信息,方法如下:
@Autowired
private KafkaTracing kafkaTracing;

kafkaTracing.joinBatchReceiveSpan(consumerRecord);
说明
当前仅以下 SDK 版本及其之后的 SDK 版本支持:1.29.4-H、1.32.0-G、1.29.1-G、1.32.1-F、1.29.7-F。