前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从零到一实现有趣的时间轮算法,你会了吗!

从零到一实现有趣的时间轮算法,你会了吗!

原创
作者头像
小许code
发布2023-11-08 10:49:15
2.6K2
发布2023-11-08 10:49:15
举报
文章被收录于专栏:小许code

简介

📚 全文字数 : 6k+

⏳ 阅读时长 : 10min

📢 关键词 : 时间轮、双向链表、定时任务、Golang

时间轮(Timing Wheel)是George Varghese和Tony Lauck在1996年的论文【Hashed and Hierarchical Timing Wheels: data structures to efficiently implement a timer facility】实现的,它在Linux内核中使用广泛,是Linux内核定时器的实现方法和基础之一。

相对于Go自带的 Timer、Ticker来说,时间轮算法是一种更加高效、适用更多场景的任务调度模型

不过,时间轮调度的时间精度可能不是很高,对于精度要求特别高的调度任务可能不太适合。因为时间轮算法的精度取决于,时间段“指针”单元的最小粒度大小,比如时间轮的格子是一秒跳一次,那么调度精度小于一秒的任务就无法被时间轮所调度。

时间轮的运用其实是非常的广泛的,在 Netty、Akka、Quartz、ZooKeeper、Kafka 等组件中都存在时间轮的踪影

解决了什么问题

如果一个系统中存在着大量的调度任务,而大量的调度任务如果每一个都使用自己的调度器来管理任务的生命周期的话,浪费cpu的资源并且很低效。

时间轮是一种高效来利用线程资源来进行批量化调度的一种调度模型。把大批量的调度任务全部都绑定到同一个的调度器上面,使用这一个调度器来进行所有任务的管理(manager),触发(trigger)以及运行(runnable)。能够高效的管理各种延时任务,周期任务,通知任务等等

原理

时间轮(Timing Wheel)是一种环形的数据结构,就像一个时钟可以分成很多格子(Slot) 底层采用数组实现,每个格子代表时间的间隔 (Interval),存放的是具体定时任务列表(TaskList),TaskList是一个环形双向链表,链表中的每个元素都是定时任务 Task。

单级轮

上图中:是一个tick为12、interval等于1s的时间轮、当前currentTime = 3,slot = 3 的链表中存有4个任务项task

换个角度看TimimgWheel时间轮盘,你可以简单理解就是一个时钟表盘,指针每隔一段时间前进一格,走玩一圈是一个周期,需要执行的任务就放置在表盘的刻度处,当指针走到该位置时,就执行相应的任务。

新增一个5s的任务后,会怎么样呢

如果当前的指针 currentTime 指向的是3,此时如果插入一个5s的任务进来,那么新来的任务会存放到时间格8中。

层级轮

上面讲的都是简单单级时间轮,如果时间跨度查过了时间轮的刻度slot、比如添加一个15秒之后执行的任务,单个轮盘就无法满足我们的需求。我们就要考虑使用别的方案了!

在确定如何实现之前,先捋一捋多级轮中时间和轮次的关系:

在单次轮中我们假设有m个slot,形成一个数组(环形队列的底层用数组表示),每个slot表示的时间间隔是 t,那么能个表示的时间范围就是 m * t,,获取slot上的tasklist,可以用slot[i]表示,i 是slot数组的下标。

如果是多级轮,我们可以理解为多个环形队列,假如一个两级轮分别是circle1和circle2,如果同样给circle 2 分配12个 slot,每个slot的时间间隔 t 表示1s。

此时circle中每个slot对应的时间范围应该是m * t了(circle1表示的时间范围),而整个二级时间轮能表示的时间范围就是 m * m * t。

为什么表达的时间范围是 m * m *t ?

因为circle2是基于circle1表达的时间范围 m * t ,而二级轮circle2的slot表达的时间是基于circle的,同样也是 m个slot,所以整个表示的范围就是 m * m * t

依次类推,如果slot的数量是60,那么这种方式像不像类似于【秒、分、时】这种时间的换算比,细品一下,其实就是这样的

那么如何实现者中多轮的方式呢?这里有两种方式去实现这个需求

1:多轮盘方式

多级时间轮的理念,更贴近了钟表,钟表分为时针、分针、秒针。每过60s,分针前进一格,每过60min,时针前进一格。

多级时间轮的理念就是分两级甚至更多级轮盘,当一级轮盘走一圈后,二级轮盘前进一格,二级轮盘走一圈后,三级轮盘前进一格,依次类推~

2:task关联circle

每个任务添加一个circle (圈数) 的参数,未超过时间轮单轮刻度那么圈数circle = 0,超过的话就通过计算得到circle的值。

例如某个任务需要16s之后执行,那么该任务的circle = 1,计算得到的表盘位置slot = 4,也就是该任务需要在表盘走一圈以后,在位置4处执行。如果表盘指针刚好前进到该slot,该处的任务列表中的circle都减1,直到slot = 4的任务链表中的任务circle = 0,才执行该任务。

相比较而言个人更倾向于基于在任务上添加circle参数的方式,这种方式需要控制的是任务上的circle的值,本文也是基于这种方式去实现的。

实现

了解完时间轮的原理之后,我们看下如何去用Golang语言实现一个时间轮,下面的实现是基于单轮、结合任务circle实现层级轮扩展的时间轮实现方式。

梳理下时间轮算法的流程:

  • 建立环状数据结构(底层数组实现)
  • 确定数据结构的刻度(slot)、每个刻度对应的时间 interval、单圈能表示的时间等于 slot * interval
  • 创建任务,根据任务的延迟执行时间,确定刻度slot和圈数circle
  • 在刻度上挂上要执行的定时任务链表
  • 执行对应刻度(slot)链表上的任务

设计思路

在有想法写之前,我们看下要达到什么样的目标和功能,然后基于这些点去做具体实现,我认为的应该是需要如下结构和方法去实现。

总共分为三个部分:数据结构、对外方法、内部实现

数据结构是时间轮的基石,整个时间轮的什么周期都是基于这几个结构去进行的,而对外方法是我们进行使用时需要用到的,内部实现是具体的业务逻辑,其中execute和getSlotAndCircle是整个实现方案的核心,后续也是我们讲代码实现的重点。

这里有两个基于用Golang实现的基础:

双向链表:基于Golang的标准库container/list中已经实现的双向链表来进行定时任务的底层存储

环状数据结构:这里的环状数据结构其实并不需要我们进行实际意义上的收尾相连形成一个环,这里是基于数组,然后利用求余运算来逐个下标遍历方式,实现一个在逻辑上符合环状方式的目的。

开始说代码实现吧!

核心数据结构

时间轮XTimingWhee结构体字段用下图来表示:

核心字段如下:

currentSlot:当前轮盘位置

slots:用于初始化轮盘数组内的元素 -- 链表(基于数组实现逻辑上的环状结构)

taskRecords:这是个sync.Map结构主要是将任务key和添加到链表的element进行映射和管理,方便后续任务删除等进行判断操作

ticker:定时器,一般根据interval设定触发时间间隔,利用channel传递触发通知

stopCha : 关闭时间轮

而对于任务的管理主要是利用channel类型,主要是两个字段去处理:

addTaskCh:这个定时任务Task的核心结构,这个在后面会讲到

remoteTaskCh:传递任务的唯一标记即可

高能预警🔥🔥:这里说下为啥stopCh用空结构体,因为这个关闭场景只需通知型 channel,其不需要发送任何数据,不会带来额外的内存开销

Task任务结构

对任务结构的定义如下,和XTimingWheel来说少了不字段信息

代码语言:javascript
复制
type Task struct {
    //任务key 唯一
    key       string
    //执行的具体函数
    job       Job
    //任务执行时间
    executeAt time.Duration
    //执行次数 -1:重复执行
    times     int
    //轮盘位置
    slot      int
    //圈数
    circle    int
}
  • key:任务的唯一ID,需要全局唯一
  • job:定义的具体实际执行函数
  • executeAt:任务的延迟执行时间
  • times:执行次数,默认值是-1表示重复执行,如果是1表示执行一次
  • slot:轮盘位置,可以通过它用下标的方式访问环形数组
  • circle:圈数也可以说成轮次,如果添加的任务没有超过轮盘数,那么该值是0,如果超过:slotNum等于12添加15s执行的任务,那么circle = 1,依次类推。

注:circle = 0是才能执行该任务,表示轮次已经遍历完了,比如circle = 1的任务,需要遍历一层后,

circle 的值等于0时才能执行

Job是实际的执行函数,是的,这是个入参为key的函数,可以在我们实际实现函数的时候对唯一标记简单验证,对它的定义结构比较简单:

代码语言:javascript
复制
type Job func(key string)

timewheel初始化

在使用时间轮之前需要先进行初始化,这里初始化提供两个参数:

interval:轮盘之间每个槽位的时间间隔

slotNum:轮盘的槽位数

代码语言:javascript
复制
func NewXTimingWheel(interval time.Duration, slotNum int) (*XTimeWheel, error) {

这里还提供了一个初始化默认时间轮的方法 DefaultTimingWheel,该方式默认interval = 1, slotNum = 12

代码语言:javascript
复制
func DefaultTimingWheel() (*XTimeWheel, error) {
	tw, _ := NewXTimingWheel(time.Second, DefaultSlotNum)
	return tw, nil
}

继续看下NewXTimingWheel的实现,各种Channel的初始化,以及默认值的填充

代码语言:javascript
复制
func NewXTimingWheel(interval time.Duration, slotNum int) (*XTimeWheel, error) {
        //参数判断
	if interval <= 0 {
		return nil, errors.New("minimum interval need one second")
	}
	if slotNum <= 0 {
		return nil, errors.New("minimum slotNum need greater than zero")
	}
	t := &XTimeWheel{
		interval:     interval,
		currentSlot:  0,
		slotNum:      slotNum,
		slots:        make([]*list.List, slotNum),
		stopCh:       make(chan struct{}),
		removeTaskCh: make(chan string),
		addTaskCh:    make(chan *Task),
		isRun:        false,
	}
	t.start()
	return t, nil
}

func (t *XTimeWheel) start() {
        //判断时间轮时间在运行
	if !t.isRun {
                //根据slotNum初始化数组结构双向链表list
		for i := 0; i < t.slotNum; i++ {
			t.slots[i] = list.New()
		}
                //设置定时器时间间隔
		t.ticker = time.NewTicker(t.interval)
		t.mux.Lock()
		t.isRun = true
                //开启协程执行
		go t.run()
		t.mux.Unlock()
	}
}

任务监听和分发

我们知道Channel遵循先入先出(First In First Out)的规则,可以保证收顺序。本实现同样我们任务的新增和删除任务等消息的传递和分发,结合for + select 多路复用方式监听多个 channel 的读写操作,作为一个简单的调度层。

代码如下:利用for作为常驻协程的方式实现

代码语言:javascript
复制
func (t *XTimeWheel) run() {
	for {
	select {
                //关闭时间轮, 退出本函数
		case <-t.stopCh:
			return
                //添加任务
		case task := <-t.addTaskCh:
			t.addTask(task)
                //删除任务
		case key := <-t.removeTaskCh:
			t.removeTask(key)
                //定时器信号
		case <-t.ticker.C:
			t.execute()
		}
	}
}

添加任务

从设计的脑图和run方法实现中,可以估摸出任务的添加其实涉及两方,我们来捋一下整个流程:

  • 对外方法AddTask,这里主要进行参数验证,然后讲Task写入addTaskCh
  • 处理channel中的添加任务信号事件
  • 执行内部具体任务添加实现,涉及slot和circle计算和将任务追加到指定slot的双向链表list中
代码语言:javascript
复制
func (t *XTimeWheel) AddTask(key string, job Job, executeAt time.Duration, times int) error {
	if key == "" {
		return errors.New("key is empty")
	}
	if executeAt < t.interval {
		return errors.New("key is empty")
	}
        //sync.Map判断是否已添加过任务key
	_, ok := t.taskRecords.Load(key)
	if ok {
		return errors.New("key of job already exists")
	}
	task := &Task{
		key:       key,
		job:       job,
		times:     times,
		executeAt: executeAt,
	}
  //写入addTaskCh这个channel
	t.addTaskCh <- task
	return nil
}

内部关于AddTask的实现如下

代码语言:javascript
复制
func (t *XTimeWheel) addTask(task *Task) {
        //计算slot和cirle的值
        slot, circle := t.calSlotAndCircle(task.executeAt)
        task.slot = slot
        task.circle = circle
        //追加到指定slot下标的list链表汇总
        ele := t.slots[slot].PushBack(task)
        //sync.Map保存key和链表中具体的任务
        t.taskRecords.Store(task.key, ele)
}

其中对于slot、circle的计算是整个时间轮对于任务位置编排的核心,下个小节【计算slot和circle】会进行详细介绍。

计算slot和circle

时间轮每次任务添加之前都会先进行slot和circle的计算,为了确定任务的在轮次中的位置和圈数,这两个参数在任务调度的时候很重要,会基于当前位置currentSlot的双向链表进行轮次循环等操作

代码语言:javascript
复制
func (t *XTimeWheel) calSlotAndCircle(executeAt time.Duration) (slot, circle int) {
        //延迟时间 秒
        delay := int(executeAt.Seconds())
        //当前轮盘表示的时间 秒
        circleTime := len(t.slots) * int(t.interval.Seconds())
        //计算圈数
        circle = delay / circleTime
        //计算延迟时间对应的slot步长
        steps := delay / int(t.interval.Seconds())
        //计算位置slot
        slot = (t.currentSlot + steps) % len(t.slots)
        return
}

执行任务

通过前面初始化、计算slot和circle、任务添加之后我们的任务就已经添加到链表上了,等定时器触发到了任务所在slot后就到了执行阶段了,这里就是本文最关键的业务逻辑🔥🔥🔥--【执行任务】。

我们来简单回顾下,到目前位置,任务已经添加到指定位置对应的双向链表list中了,那么在执行阶段就是将list中的任务拿出来执行了,没错,这就是整个时间轮的内核,一起来看是如何执行的!

先看下代码,都有注释的,先把注释看完

代码语言:javascript
复制
func (t *XTimeWheel) execute() {
        //取出当前slot下标对应的 list
	taskList := t.slots[t.currentSlot]
        //判断list是否为空
	if taskList != nil {
                //遍历list
		for ele := taskList.Front(); ele != nil; {
                    //获取链表元素的定时任务
                    taskEle, _ := ele.Value.(*Task)
                    //判断任务circle (circle == 0才执行)
                    if taskEle.circle > 0 {
			taskEle.circle--
                         //返回链表的下一个元素
			ele = ele.Next()
			continue
			}
                    //执行任务函数
                    go taskEle.job(taskEle.key)
            
                    //删除key映射删除
                    t.taskRecords.Delete(taskEle.key)
                    //删除任务所在链表中元素
                    taskList.Remove(ele)
            
                    //执行固定次数任务
                    if taskEle.times-1 > 0 {
			taskEle.times--
			t.addTask(taskEle)
                    }
                    //重复任务
                    if taskEle.times == -1 {
                        t.addTask(taskEle)
                    }
                    ele = ele.Next()
		}
	}
        //将当前位置slot往前自增
	t.incrCurrentSlot()
}

看完代码注释的你应该清楚了吧,不清楚的话没事,小许还有总结!📝📝📝

  1. 每当接收到ticker定时器信号时,会根据currentSlot,通过数组下标方式获取slots的任务list
  2. 如果currentSlot的list部位空,那么进行遍历 list,从而获取 list 中的元素--待执行定时任务
  3. 获取到待执行任务,会对任务中的参数 circle进行校验
  • circle > 0的任务,还未到达执行时间,需要将该circle 减1,等后续轮次再处理
  • circle = 0的任务,在当前轮次可被执行,开启一个协程,执行任务对应的函数
  1. 任务执行后删除当前任务
  2. 当任务的times参数大于0时,每执行一次,times减1,而times = -1的任务为重复执行任务
  3. 当前currentSlot的任务遍历执行完毕,进入下一个轮盘位置
  4. 等待下次ticker定时信号触发

其他

上面我们把整体的流程和代码分析都整理了一遍了,看了大纲的同学都知道还有移除任务和停止时间轮这两块,具体代码就不贴出来啦。

我已把项目放到github上了:https://github.com/xiaoxucode/xtimingwheel

欢迎给个 star ⭐⭐

文末安利一波:

欢迎朋友们关注我的同名公众号📢📢:【小许code】,等你哦!🤣🤣

👨👩 朋友,希望本文对你有帮助~🌐

欢迎点赞 👍、收藏 💙、关注 💡 三连支持一下~🎈

🎈知道的越多,不知道的也越多,我是小许,下期见~🙇💻

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
    • 解决了什么问题
    • 原理
      • 单级轮
        • 层级轮
        • 实现
          • 设计思路
            • 核心数据结构
              • timewheel初始化
                • 任务监听和分发
                  • 添加任务
                    • 计算slot和circle
                      • 执行任务
                        • 其他
                        相关产品与服务
                        腾讯云代码分析
                        腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,助力维护团队卓越代码文化。
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档