前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka扩展内容

Kafka扩展内容

作者头像
matt
发布2022-10-25 16:00:28
3090
发布2022-10-25 16:00:28
举报
文章被收录于专栏:CSDN迁移CSDN迁移

Kafka扩展

一、重复消费配置

代码语言:javascript
复制
# 低级API:
props.put("group.id","01");
# offset自动重置,offset可能因为缓存删除,序号不一定从0开始
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

# 高级API:
consumer.seek(new TopicPartition(topic, partititon, offset));

消费者组id、topic和partition唯一确定一个offset。

可以查看_consumer_offsets这个Topic里的数据。

二、拦截器

Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor。

onSend(ProducerRecord)方法在消息被序列号及计算分区之前调用。 onAcknowledgement(RecordMetadata, Exception)在消息被应答或发送失败时调用。

代码语言:javascript
复制
// 配置文件添加拦截器链
// props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, list);
    
public class Interceptor implements ProducerInterceptor<String, String> {
    private int SUCCESS_CONT = 0;
    private int ERROR_CONT = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return new ProducerRecord<>(record.topic(), record.key(), "new value");
        // return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            SUCCESS_CONT++;
        } else {
            ERROR_CONT++;
        }
    }

    @Override
    public void close() {
        System.out.println("success times:" + SUCCESS_CONT);
        System.out.println("error times:" + ERROR_CONT);
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

三、Kafka Streams

轻量级(功能性弱) ,实时性(非微批次处理,窗口允许乱序数据,允许数据迟到),一条条数据处理。

代码语言:javascript
复制
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>${kafka.version}</version>
</dependency>
代码语言:javascript
复制
// Kafka Streams处理案例
public class Streams {
    public static void main(String[] args) {
        // 创建拓扑对象
        TopologyBuilder topologyBuilder = new TopologyBuilder();

        // 构建拓扑结构
        topologyBuilder.addSource("SOURCE", "first")
                .addProcessor("PROCESSOR", () -> new MyProcessor(){}, "SOURCE")
                .addSink("SINK", "second", "PROCESSOR");
                
        // 创建配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop01:9092");
        props.put("application.id", "kafkaStreams");

        // 创建Kafka Streams对象
        KafkaStreams kafkaStreams = new KafkaStreams(topologyBuilder, props);

        // 开启流处理
        kafkaStreams.start();
    }
}


// TopologyBuilder.addProcessor中的ProssorSupplier返回的Processor类这里做定制
public class MyProcessor implements Processor<byte[], byte[]> {
    private ProcessorContext context;

    // 初始化,上下文
    @Override
    public void init(ProcessorContext processorContext) {
        context = processorContext;
    }

    // 处理逻辑
    @Override
    public void process(byte[] bytes, byte[] bytes2) {
        // 获取一行数据
        String line = new String(bytes2);

        // 逻辑处理
        line = line.replaceAll(">>>", "");
        bytes2 = line.getBytes();

        // 写出数据
        context.forward(bytes, bytes2);
    }

    @Override
    public void punctuate(long l) {

    }

    @Override
    public void close() {

    }
}

四、Kafka与Flume比较

数据传输层。

Flume:cloudera公司研发,适合多个生产者,适合下游数据消费者不多的情况(费内存),适合数据安全性要求不高的操作,适合与Hadoop生态圈对接的操作。

Kafka:linkedin公司研发,适合数据下游消费众多的情况(缓存数据跟消费者个数无关),适合数据安全性较高的操作(数据在磁盘,备份),支持relication。 多个Agent后台数据,交由一个Agent汇总,对接Kafka,离线/实时两条线消费。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-02-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka扩展
  • 一、重复消费配置
  • 二、拦截器
  • 三、Kafka Streams
  • 四、Kafka与Flume比较
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档