首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >基于curator的延迟队列

基于curator的延迟队列

作者头像
用户1215919
发布于 2021-12-28 04:45:20
发布于 2021-12-28 04:45:20
40200
代码可运行
举报
文章被收录于专栏:大大的微笑大大的微笑
运行总次数:0
代码可运行

这里不介绍关于curator的用法及优劣,旨在探究curator对于延迟队列的使用原理

怎么使用
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<!--dependency-->
<dependency>
	 <groupId>org.apache.curator</groupId>
	 <artifactId>curator-recipes</artifactId>
	 <version>4.0.1</version>
</dependency>

 <dependency>
	 <groupId>org.apache.curator</groupId>
	 <artifactId>curator-framework</artifactId>
	 <version>4.0.1</version>
</dependency>
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class Processor {
    private final static CuratorFramework client;
    private final static DistributedDelayQueue<String> queue;

    static{
        ZookeeperConfig config = ZookeeperConfig.getConfig();
        // create client
        client = CuratorFrameworkFactory.newClient(config.getRegistryAddress(),
                new ExponentialBackoffRetry(3000, 2));
        // build queue
        queue = QueueBuilder.builder(client, new AutoSubmitConsumer(),
                new AutoSubmitQueueSerializer(), DelayQueueEnum.AUTO_SUBMIT.getPath())
                .buildDelayQueue();
        // 开启执行计划
        enable();
    }

    /**
     * 生产数据
     *
     * @param id
     * @param endTime
     * @throws Exception
     */
    public void producer(String id, Date endTime) throws Exception {
        queue.put(id, endTime.getTime());
    }


    private static void enable(){
        try {
            client.start();
            queue.start();
        } catch (Exception e) {
            logger.error("enable queue fail, exception:{}", e);
        }
    }

}
// Serializer
class AutoSubmitQueueSerializer implements QueueSerializer<String> {
    @Override
    public byte[] serialize(String s) {
         return s.getBytes("utf-8");
    }

    @Override
    public String deserialize(byte[] bytes) {
        return new String(bytes);
    }
}

// consumer
AutoSubmitConsumer implements QueueConsumer<String> {

    @Override
    public void consumeMessage(String id)  {
        logger.info("consumeMessage, :{}", id);
      	// service processor.
        logger.info("consumeMessage# auto submit end, result:{}, id:{}", result, id);
    }

    @Override
    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
    }
}

是临时节点还是持久化节点,如果基于内存的话客户端或者服务端挂了以后就会存在数据丢失的问题? 是否会重新排序,zk是按照请求的时间先后顺序写入的,那么curator是怎么监听到期时间的呢?

猜想
  1. 是否持久化
  2. 是否会在每次请求的时候拿到服务端所有的节点数据进行排序后存入到服务端
验证
  1. 针对第一点,我们关闭zookeeper服务端和客户端后重新启动后之前的节点还存在所以是持久化节点
  2. 通过客户端工具连接zookeeper发现并不会每次请求的时候都会重新排序,也就是说可能在client端进行处理的

以下是在客户端工具上截取的一部分信息,key是由三部分组成的,第一部分固定的queue- , 第二部分暂不确定,第三部分是节点的序号

源码求证
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// org.apache.curator.framework.recipes.queue.DistributedQueue#start
// 部分片段
client.create().creatingParentContainersIfNeeded().forPath(queuePath);
 if ( !isProducerOnly )
        {
            service.submit
                (
                    new Callable<Object>()
                    {
                        @Override
                        public Object call()
                        {
                            runLoop(); // step1
                            return null;
                        }
                    }
                );
        }
// org.apache.curator.framework.recipes.queue.DistributedQueue#runLoop
// step1中的代码片段
while ( state.get() == State.STARTED  )
            {
                try
                {
                    ChildrenCache.Data      data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
                    currentVersion = data.version;
					// 诸如:
                    //queue-|2E1D86A3BB6|0000000019
                    //queue-|1712F752AA0|0000000036
                    //queue-|1712F76FF60|0000000035
		    // 拿到所有的子节点
                    List<String> children = Lists.newArrayList(data.children); 
                    // 根据过期时间排序
	            // step6
                    sortChildren(children); 
		    // 排序后
                    //queue-|1712F752AA0|0000000036
                    //queue-|1712F76FF60|0000000035
                    //queue-|2E1D86A3BB6|0000000019
                    if ( children.size() > 0 )
                    { //获取到期时间
                        maxWaitMs = getDelay(children.get(0));
                       
                        if ( maxWaitMs > 0 ) continue;
                    }
                    else  continue;
                   // 死循环不断轮询是否有满足条件的节点;
                   // 只要有满足条件的节点就将整个排序后的集合往下传递
                    processChildren(children, currentVersion); // step2
                }
               
            }
// org.apache.curator.framework.recipes.queue.DistributedQueue#processChildren
// step2对应的代码片段:
private void processChildren(List<String> children, long currentVersion)
    {
        final Semaphore processedLatch = new Semaphore(0);
        final boolean   isUsingLockSafety = (lockPath != null);
        int             min = minItemsBeforeRefresh;
        for ( final String itemNode : children )
        {
            if ( Thread.currentThread().isInterrupted() )
            {
                processedLatch.release(children.size());
                break;
            }

            if ( !itemNode.startsWith(QUEUE_ITEM_NAME) )
            {
                processedLatch.release();
                continue;
            }

            if ( min-- <= 0 )
            {
                if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) )
                {
                    processedLatch.release(children.size());
                    break;
                }
            }
	    // step3
            if ( getDelay(itemNode) > 0 )
            {
                processedLatch.release();
                continue;
            }
            //这里使用了线程池,为了保证每一个节点都执行完毕后才返回方法所以使用了信号灯
            executor.execute
            (
                new Runnable()
                {
                    @Override
                    public void run()
                    {
                        try
                        {
                            //是否采用了分布式锁,因为我们初始化的时候并未使用所以没有用到这里的安全锁,实际上是进入到了else中
                            if ( isUsingLockSafety )
                            {
                                
                                processWithLockSafety(itemNode, ProcessType.NORMAL);
                            }
                            else
                            {
			        // 看这里 step4
                                processNormally(itemNode, ProcessType.NORMAL);
                            }
                        }finally
                        {
                            processedLatch.release();
                        }
                    }
                }
            );
        }

        processedLatch.acquire(children.size());
    }

//  org.apache.curator.framework.recipes.queue.DistributedQueue#getDelay(java.lang.String)
// 对应step3处的代码片段
 protected long getDelay(String itemNode)
            {
                return getDelay(itemNode, System.currentTimeMillis());
            }
            
            private long getDelay(String itemNode, long sortTime)
            {  // 会从key上获取时间戳        
		// step5
                long epoch = getEpoch(itemNode); 
                return epoch - sortTime; // 计算过期时间
            }

// 对应step5处的代码
private static long getEpoch(String itemNode)
    {
	// itemNode -> queue-|时间戳|序号
        int     index2 = itemNode.lastIndexOf(SEPARATOR);
        int     index1 = (index2 > 0) ? itemNode.lastIndexOf(SEPARATOR, index2 - 1) : -1;
        if ( (index1 > 0) && (index2 > (index1 + 1)) )
        {
            try
            {
                String  epochStr = itemNode.substring(index1 + 1, index2);
                return Long.parseLong(epochStr, 16); // 从这里可以知道queue-|这里是16进制的时间戳了|序号| 可能是出于key长度的考量吧(更节省内存),用10进制的时间戳会长很多
            }
        }
        return 0;
    }

// org.apache.curator.framework.recipes.queue.DistributedQueue#sortChildren
// 会根据延时时间排序
// step6处的代码片段
protected void sortChildren(List<String> children)
            {
                final long sortTime = System.currentTimeMillis();
                Collections.sort
                (
                    children,
                    new Comparator<String>()
                    {
                        @Override
                        public int compare(String o1, String o2)
                        {
                            long        diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);
                            return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
                        }
                    }
                );
            }

// 对应step4处的代码片段
 private boolean processNormally(String itemNode, ProcessType type) throws Exception
    {
        try
        {
            String  itemPath = ZKPaths.makePath(queuePath, itemNode);
            Stat    stat = new Stat();

            byte[]  bytes = null;
            if ( type == ProcessType.NORMAL )
            {
                // 获取key对应的value
                bytes = client.getData().storingStatIn(stat).forPath(itemPath);
            }
            if ( client.getState() == CuratorFrameworkState.STARTED )
            {
               // 移除节点
                			client.delete().withVersion(stat.getVersion()).forPath(itemPath);
            }

            if ( type == ProcessType.NORMAL )
            {
	        //step7
                processMessageBytes(itemNode, bytes);
            }

            return true;
        }

        return false;
    }
//对应step7处代码,会回调我们的业务代码
private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception
    {
        ProcessMessageBytesCode     resultCode = ProcessMessageBytesCode.NORMAL;
        MultiItem<T>                items;
        try
        {
		  // 根据我们定义的序列化器序列化
            items = ItemSerializer.deserialize(bytes, serializer);
        }

        for(;;)
        {
		 // 省略一部分代码
            try
            {
                consumer.consumeMessage(item); // 这里就会回调到我们的业务代码
            }
        }
        return resultCode;
    }
总结
  1. org.apache.curator.framework.recipes.queue.DistributedQueue#internalCreateNode这个方法也证实了确实是持久化且有序的节点;
  2. 如果过期时间太长而数据生产的过于频繁的话,那么势必会造成数据的积压对于性能和内存都是很大的考验;
  3. 而且是客户端不断的循环获取所有的节点、排序、再处理,由此我们也证明了前面猜想是排序后在服务端重新添加所有节点每次监听第一个节点变化的想法看来是错误的;
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
一文搞懂 MySQL InnoDB架构 Buffer Pool、Change Buffer、自适应哈希索引、Log Buffer
书接上回,林渊重生使得 MySQL B+Tree 提前问世二十年,这次亲自操刀 InnoDB 架构引擎的设计,一个支持高并发读写、支持事务行级锁的划时代架构诞生...
码哥字节
2025/04/09
7.1K0
一文搞懂 MySQL InnoDB架构 Buffer Pool、Change Buffer、自适应哈希索引、Log Buffer
⑩⑧【MySQL】InnoDB架构、事务原理、MVCC多版本并发控制
MySQL5.5版本开始,默认使用InnoDB存储引擎,它擅长事务处理,具有崩溃恢复特性,在日常开发中使用非常广泛。下面是InnoDB架构图,左侧为内存结构,右侧为磁盘结构。
.29.
2023/11/21
3870
⑩⑧【MySQL】InnoDB架构、事务原理、MVCC多版本并发控制
【MySQL】一文带你理清InnoDB引擎的<内部架构>(内存结构,磁盘结构,后台线程)
YY的秘密代码小屋
2024/09/09
2850
【MySQL】一文带你理清InnoDB引擎的<内部架构>(内存结构,磁盘结构,后台线程)
详细了解 InnoDB 内存结构及其原理
之前写过一篇文章「简单了解InnoDB原理」,现在回过头看,其实里面只是把缓冲池(Buffer Pool),重做日志缓冲(Redo Log Buffer)、插入缓冲(Insert Buffer)和自适应哈希索引(Adaptive Hash Index)等概念简单的介绍了一下。
冬夜先生
2021/10/08
6350
【Mysql-InnoDB 系列】InnoDB 架构
封面图片来自:mysql官方文档,8.0版本,InnoDB Architecture。
程序员架构进阶
2021/03/05
1.4K0
【Mysql-InnoDB 系列】InnoDB 架构
MySQL InnoDB引擎
表空间是InnoDB存储引擎逻辑结构的最高层, 如果用户启用了参数 innodb_file_per_table(在8.0版本中默认开启) ,则每张表都会有一个表空间(xxx.ibd),一个mysql实例可以对应多个表空间,用于存储记录、索引等数据。
用户9615083
2022/12/25
1.5K0
MySQL InnoDB引擎
InnoDB 内存结构及其原理
InnoDB 是 MySQL 的默认存储引擎,以其强大的事务支持、崩溃恢复能力和高并发处理性能著称。要深入理解 InnoDB 的内存结构及其工作原理,有必要探讨其内存使用的各个方面,包括缓冲池(Buffer Pool)、内存分配、锁结构(Lock Structure)和其他内部内存使用机制。
Michel_Rolle
2024/07/31
3K0
【MySQL-23】万字总结<InnoDB引擎>——【逻辑存储结果&架构(内存结构,磁盘结构,后台线程)&事务原理&MVCC】
两次查询我们会得到如下两个ReadView (而RR可重复读级别下就只会有一个)
YY的秘密代码小屋
2024/09/09
2510
【MySQL-23】万字总结<InnoDB引擎>——【逻辑存储结果&架构(内存结构,磁盘结构,后台线程)&事务原理&MVCC】
MySQL高级10-InnoDB引擎存储架构
  表空间(Tablespace):一个mysql实例,及一个数据库实例,可以对应多个表空间(ibd文件),用于存储记录,索引等数据。
Se7eN_HOU
2023/09/20
4230
MySQL高级10-InnoDB引擎存储架构
给你汇总了MySQL各个版本的体系结构图
InnoDB是MySQL中最重要的存储引擎之一,它的架构设计旨在提供高可靠性和高性能。以下是InnoDB架构的简要介绍:
俊才
2024/03/21
5230
给你汇总了MySQL各个版本的体系结构图
MySQL Innodb和Myisam
InnoDB是一种兼顾高可靠性和高性能的通用存储引擎,架构分为两块:内存中的结构和磁盘上的结构。InnoDB 使用日志先行策略,将数据修改先在内存中完成,并且将事务记录成重做日志(Redo Log),转换为顺序IO高效的提交事务。
默存
2022/06/24
2K0
MySQL Innodb和Myisam
MySQL底层概述—1.InnoDB内存结构
Buffer Pool是缓冲池的意思。Buffer Pool的作用是缓存表数据与索引数据,减少磁盘IO,提升效率。
东阳马生架构
2025/02/10
2350
Innodb是如何运转的
Master Thread是非常核心的后台线程,主要负责脏页的刷新,合并插入缓冲,UNDO页回收等。
大忽悠爱学习
2022/12/07
3980
Innodb是如何运转的
MySQL四:InnoDB的存储结构
「MySQL存储引擎最大的特点就是【插件化】,可以根据自己的需求使用不同的存储引擎,innodb存储引擎支持行级锁以及事务特性,也是多种场合使用较多的存储引擎。」
云扬四海
2022/09/26
9570
你真的了解MySQL吗(从MySQL基础架构深入探究)
MySQL,关系型数据库,我们在开发过程中经常使用,谈及事务,我们会想到MVCC机制+锁机制+日志,谈及事务隔离级别,读未提交、读已提交、可重复读、串行化我们再熟悉不过,但是我们在经常使用MySQL的时候,真的了解过它的底层架构吗,本文旨在总底层架构分析,探究一些我们日常会遇到的问题
潋湄
2025/02/16
5042
你真的了解MySQL吗(从MySQL基础架构深入探究)
关系型数据库 MySQL 之 InnoDB 体系结构
InnoDB 存储引擎是 MySQL 5.5 版本后的默认存储引擎,支持事务 ACID,回滚,系统崩溃恢复能力及多版本并发控制的事务安全,主要用于 OLTP 数据库业务场景;支持自增长列(auto_increment);支持外键约束(foreign key);支持 MVCC 的行级锁;使用 Btree 索引;如果你还没有看到前面一文介绍 MySQL 体系结构,那么推荐戳此查看[MySQL 体系结构详解],介绍完 MySQL 体系结构,下面来一起学习 InnoDB 体系结构。
JiekeXu之路
2019/08/15
1.4K0
【Mysql】详解InnoDB存储引擎以及binlog,redelog,undolog+MVCC
在Mysql中,InnoDB存储引擎是默认的,也是我们最常用的一个存储引擎,其中分为内存结构和磁盘结构两大部分,整体架构图如下:
用户11369558
2025/05/18
2710
【Mysql】详解InnoDB存储引擎以及binlog,redelog,undolog+MVCC
MySql InnoDB 存储引擎表优化
适时的使用 OPTIMIZE TABLE 语句来重组表,压缩浪费的表空间。这是在其它优化技术不可用的情况下最直接的方法。OPTIMIZE TABLE 语句通过拷贝表数据并重建表索引,使得索引数据更加紧凑,减少空间碎片。语句的执行效果会因表的不同而不同。过大的表或者过大的索引及初次添加大量数据的情况下都会使得这一操作变慢。
WindWant
2023/06/27
5520
【精华】洞悉MySQL底层架构:游走在缓冲与磁盘之间
提起MySQL,其实网上已经有一大把教程了,为什么我还要写这篇文章呢,大概是因为网上很多网站都是比较零散,而且描述不够直观,不能对MySQL相关知识有一个系统的学习,导致不能形成知识体系。为此我撰写了这篇文章,试图让这些底层架构相关知识更加直观易懂:
cxuan
2020/06/12
2K1
【MySQL常见疑难杂症】InnoDB存储引擎体系结构
● Buffer Pool:缓冲池是InnoDB在启动时分配的一个内存区域,用于InnoDB在访问数据时缓存表和索引数据。利用缓冲池,可以合并一些对经常访问的数据的操作,直接从内存中处理,加快了处理速度。通常,在专用数据库服务器上,可以将80%的物理内存分配给InnoDB缓冲池。为了提高缓存管理的效率,使用页面链表的方式+LRU(最近最少使用)算法进行管理。
samRsa
2025/02/21
670
【MySQL常见疑难杂症】InnoDB存储引擎体系结构
推荐阅读
相关推荐
一文搞懂 MySQL InnoDB架构 Buffer Pool、Change Buffer、自适应哈希索引、Log Buffer
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档