首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >kafka删除topic消息的四种方式[通俗易懂]

kafka删除topic消息的四种方式[通俗易懂]

作者头像
全栈程序员站长
发布于 2022-11-03 07:54:53
发布于 2022-11-03 07:54:53
13.7K01
代码可运行
举报
运行总次数:1
代码可运行

方法一:快速配置删除法(简单粗暴,如果这个主题有程序还在消费者,此时KAFKA就game over)

1.kafka启动之前,在server.properties配置delete.topic.enable=true

2.执行命令bin/kafka-topics.sh –delete –topic test –zookeeper zk:2181或者使用kafka-manager集群管理工具删除

注意:如果kafka启动之前没有配置delete.topic.enable=true,topic只会标记为marked for deletion,加上配置,重启kafka,之前的topick就真正删除了。

方法二:设置删除策略(简单粗暴,如果这个消息有程序还在消费者,此时KAFKA就game over)

1.kafka启动之前,在server.properties配置

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
#日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete

# 注意:下面有两种配置,一种是基于时间的策略,另种是基于日志文件大小的策略,两种策略同是配置的话,只要满足其中种策略,则触发Log删除的操作。删除操作总是先删除最旧的日志
# 消息在Kafka中保存的时间,168小时之前的1og, 可以被删除掉,根据policy处理数据。
log.retention.hours=4

# 当剩余空间低于log.retention.bytes字节,则开始删除1og
log.retention.bytes=37580963840

# 每隔300000ms, logcleaner线程将检查一次,看是否符合上述保留策略的消息可以被删除
log.retention.check.interval.ms=1000

方法三:手动删除法(不推荐)(简单粗暴,如果这个消息有程序还在消费者,此时KAFKA就game over)

前提:不允许更改server.properties配置

1.删除zk下面topic(test)

启动bin/zkCli.sh ls /brokers/topics rmr /brokers/topics/test ls /brokers/topics 查topic是否删除:bin/kafka-topics.sh –list –zookeeper zk:2181

2.删除各broker下topic数据,默认目录为/tmp/kafka-logs

方法四:偏移量(看起来你最友好,会程序的你推荐)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.censoft.kafkaAdmin;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import java.sql.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @author zy Zhang
* @version : 1.0
* @Description
* @since 2020/7/13 16:02
*/
public class DeleteReordsByOffset {
public static void main(String[] args) throws ClassNotFoundException {
// 1.创建kafkaAdminClient
Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.27.111:9092");
AdminClient kafkaAdminClient = KafkaAdminClient.create(properties);
// 2.从数据库获取需要删除的消息
Class.forName("com.mysql.jdbc.Driver");
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
String url  = "jdbc:mysql://localhost:3306/test?useSSL=false&amp;useUnicode=true&amp;characterEncoding=UTF-8";
String user = "root";
String password = "123456";
Connection conn = null;
Statement statement = null;
ResultSet res = null;
String sql = "SELECT Topic, KafkaPartition, UntilOffset FROM Kafka_Offset;";
try {
conn = DriverManager.getConnection(url, user, password);
statement = conn.createStatement();
res = statement.executeQuery(sql);
if (res != null) {
while (res.next()) {
String topic = res.getString("Topic");
Integer partition = res.getInt("KafkaPartition");
Long offset = res.getLong("UntilOffset");
TopicPartition topicPartition = new TopicPartition(topic, partition);
RecordsToDelete recordsToDelete1 = RecordsToDelete.beforeOffset(offset);
recordsToDelete.put(topicPartition, recordsToDelete1);
}
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
if (statement != null) {
try {
statement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
// 3.执行删除
DeleteRecordsResult result = kafkaAdminClient.deleteRecords(recordsToDelete);
Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = result.lowWatermarks();
try {
for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : lowWatermarks.entrySet()) {
System.out.println(entry.getKey().topic() + " " + entry.getKey().partition() + " " + entry.getValue().get().lowWatermark());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
kafkaAdminClient.close();
}
}

2020-11-27 补充说明:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
		目前发现通过这种方法起到的效果是:
topic的起始偏移量会被定位到传入的recordsToDelete指定的位置
但是并没有将磁盘中存储的数据删除
如果我找到在磁盘删除的方法会继续更新,看下面

2020-11-30 补充说明: 我重新进行了测试,发现使用kafka-delete-records.sh或者kafkaAdminClient.deleteRecords()方法还有其他约束条件: 首先就是log文件自身有大小设置,对应配置文件中log.segment.bytes,在没有达到这个大小的时候是不会创建下一个log文件的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
eg: test-0下有三个log文件
00000000000000000000.log, 00000000000000000036.log, 00000000000000000136.log
我们修改起始偏移量=100
那么deleteLogStartOffsetBreachedSegments运行时会判定00000000000000000000.log是可以删除的

在原先测试时,log.segment.bytes=1G了,这造成了很难观测到数据从硬盘删除 本次测试,我将log.segment.bytes修改为了1M

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/181323.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022年10月16日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Kafka分布式消息系统(搭建Kafka集群) - Part.3
在前面两篇文章中,我们了解了基本概念,也安装、配置好了zookeeper集群,在这篇文章中,我们将一步步搭建kafka集群。
张子阳
2018/09/29
7250
kafka删除topic方式
工作中因为各种原因,topic中消息堆积的太多或者kafka所在磁盘空间满了等。可能需要彻底清理一下kafka topic。 cd /opt/kafka/kafka_2.10-0.10.2.2/bin 列出所有topic: ./kafka-topics.sh –zookeeper ip:2181 -list 其实就是检查zk上节点的/brokers/topics子节点,打印出来。 创建topic ./kafka-topics.sh –zookeeper ip:2181 -create –topic my_topic –partitions 4 –replication-factor 1 线上环境将自动创建topic禁用掉,改为手动创建(auto.create.topics.enable=false),partitions和replication-factor是两个必备选项, 第一个参数是消息并行度的一个重要参数,第二个极大提高了topic的可用性,备份因子默认是1,相当于没有备份,其值不能大于broker个数, 否则会报错。同时还可以指定topic级别的配置参数,这种特定的配置会覆盖掉默认配置,并且存储在zookeeper的/config/topics/[topic_name]节点数据里。 –alter –config –deleteConfig。replication-factor参数用来指定需要多少个副本(连同leader在内),一般比较推荐设置为2或3。如果设置太少(比如1)导致可用性下降, 如果设置太大会影响Kafka的性能。 方式一: 配置delete.topic.enable=true 修改kafaka配置文件server.properties,添加delete.topic.enable=true,重启kafka。之后通过kafka命令行就可以直接删除topic 重启kafka nohup /usr/kafka/bin/kafka-server-start.sh /usr/kafka/config/server.properties >/dev/null 2>&1 & 通过命令行删除topic: ./kafka-topics.sh –zookeeper ip:2181 –topic my_topic –delete 方式二: 没有配置delete.topic.enable=true 1、通过命令行删除topic: ./kafka-topics.sh –zookeeper ip:2181 –topic my_topic –delete 因为kafaka配置文件中server.properties没有配置delete.topic.enable=true, 此时的删除并不是真正的删除,只是把topic标记为:marked for deletion 2、删除kafka存储目录(server.properties文件log.dirs配置,默认为”/tmp/kafka-logs”)相关topic目录。 方式三: 若想真正删除它,需要登录zookeeper客户端: cd /opt/kafka/zookeeper-3.4.13/bin chmod 755 ./* (可执行命令)
全栈程序员站长
2022/11/02
2.5K0
Kafka
优点: (1)解耦 (2)冗余(备份) (3)扩展性 (4)灵活性、峰值处理能力 (5)可恢复性(冗余) (6)顺序保证(队列) (7)缓冲(冗余) (8)异步通信(宕机)
matt
2022/10/25
5530
Kafka
kafka删除topic数据
https://blog.csdn.net/u013256816/article/details/80418297
py3study
2020/02/25
4.9K0
kafka 集群配置_kafka集群原理
kafka是一个高吞吐的分布式消息队列系统。特点是生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据。消息列队常见场景:系统之间解耦合、峰值压力缓冲、异步通信。
全栈程序员站长
2022/09/27
1.4K0
创建Topic原来还能这样玩,真绝了!!!(附视频)
需要注意的是–zookeeper后面接的是kafka的zk配置, 假如你配置的是localhost:2181/kafka 带命名空间的这种,不要漏掉了
石臻臻的杂货铺[同名公众号]
2021/08/12
1.8K0
kafka如何彻底删除topic及数据
删除kafka topic及其数据,严格来说并不是很难的操作。但是,往往给kafka 使用者带来诸多问题。项目组之前接触过多个开发者,发现都会偶然出现无法彻底删除kafka的情况。本文总结多个删除kafka topic的应用场景,总结一套删除kafka topic的标准操作方法。
mikealzhou
2018/05/19
21.1K13
kafka如何彻底删除topic及数据
[721]linux安装kafka
首先确保你的机器上安装了jdk,kafka需要java运行环境,以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境,所以我们可以直接使用
周小董
2020/01/13
2.9K0
[721]linux安装kafka
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本
(3)查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。
鱼找水需要时间
2023/02/16
1.1K0
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本
【消息队列 MQ 专栏】消息队列之 Kafka
Kafka 最早是由 LinkedIn 公司开发一种分布式的基于发布/订阅的消息系统,之后成为 Apache 的顶级项目。主要特点如下:
芋道源码
2018/07/31
4.5K0
【消息队列 MQ 专栏】消息队列之 Kafka
kafka删除主题_kafka从头消费topic数据
转自https://www.cnblogs.com/xiaodf/p/10710136.html
全栈程序员站长
2022/11/03
7590
kafka学习笔记
搭建参考: https://www.cnblogs.com/luotianshuai/p/5206662.html
保持热爱奔赴山海
2019/09/17
6040
kafka学习笔记
Kafka原理和实践
本文从Kafka的基本概念、特点、部署和配置、监控和管理等方面阐述 Kafka 的实践过程。
杨振涛
2019/08/08
1.5K0
kafka集群搭建
RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
Cyylog
2020/08/19
7990
大数据消息处理中间件之kafka win10快速部署
kafka官网:http://kafka.apache.org/documentation/
静谧星空TEL
2021/04/27
1.3K0
大数据消息处理中间件之kafka win10快速部署
kafka实战教程(python操作kafka),kafka配置文件详解
应用往Kafka写数据的原因有很多:用户行为分析、日志存储、异步通信等。多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量?消息的延迟?
全栈程序员站长
2022/08/12
3.8K0
kafka实战教程(python操作kafka),kafka配置文件详解
Kafka快速入门
LEO:Log End Offset,待写入消息的offset,即最后一条消息的offset+1
兜兜转转
2023/03/06
4070
kafka删除topic中的数据_kafka删除数据
二、如果当前topic有使用过即有过传输过信息:并没有真正删除topic只是把这个topic标记为删除(marked for deletion)。想要彻底删除topic数据要经过下面两个步骤:
全栈程序员站长
2022/11/03
4.8K0
kafka介绍和常见操作
RabbitMQ:支持消息的可靠的传递,支持事务,不支持批量操作,基于存储的可靠性的要求存储可以采用内存或硬盘,吞吐量小。
章工运维
2023/05/19
3470
消息队列——Kafka基本使用及原理分析
Kafka也是一款消息队列中间件,与ActiveMQ和RabbitMQ不同的是,它不是基于JMS和AMQP规范开发的,而是提供了类似JMS的特性,同时Kafka比较重量级,天然支持集群分布式搭建以及数据分片备份,由Scala和Java编写,因其高性能和高吞吐量的特点被广泛用于大数据的传输场景。简单而言,Kafka就是一款适用于大数据场景下的消息队列。
夜勿语
2020/09/07
1.8K0
相关推荐
Kafka分布式消息系统(搭建Kafka集群) - Part.3
更多 >
领券
一站式MCP教程库,解锁AI应用新玩法
涵盖代码开发、场景应用、自动测试全流程,助你从零构建专属AI助手
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档