📚 全文字数 : 6k+
⏳ 阅读时长 : 10min
📢 关键词 : 时间轮、双向链表、定时任务、Golang
时间轮(Timing Wheel)是George Varghese和Tony Lauck在1996年的论文【Hashed and Hierarchical Timing Wheels: data structures to efficiently implement a timer facility】实现的,它在Linux内核中使用广泛,是Linux内核定时器的实现方法和基础之一。
相对于Go自带的 Timer、Ticker来说,时间轮算法是一种更加高效、适用更多场景的任务调度模型
不过,时间轮调度的时间精度可能不是很高,对于精度要求特别高的调度任务可能不太适合。因为时间轮算法的精度取决于,时间段“指针”单元的最小粒度大小,比如时间轮的格子是一秒跳一次,那么调度精度小于一秒的任务就无法被时间轮所调度。
时间轮的运用其实是非常的广泛的,在 Netty、Akka、Quartz、ZooKeeper、Kafka 等组件中都存在时间轮的踪影
如果一个系统中存在着大量的调度任务,而大量的调度任务如果每一个都使用自己的调度器来管理任务的生命周期的话,浪费cpu的资源并且很低效。
时间轮是一种高效来利用线程资源来进行批量化调度的一种调度模型。把大批量的调度任务全部都绑定到同一个的调度器上面,使用这一个调度器来进行所有任务的管理(manager),触发(trigger)以及运行(runnable)。能够高效的管理各种延时任务,周期任务,通知任务等等
时间轮(Timing Wheel)是一种环形的数据结构,就像一个时钟可以分成很多格子(Slot) 底层采用数组实现,每个格子代表时间的间隔 (Interval),存放的是具体定时任务列表(TaskList),TaskList是一个环形双向链表,链表中的每个元素都是定时任务 Task。
上图中:是一个tick为12、interval等于1s的时间轮、当前currentTime = 3,slot = 3 的链表中存有4个任务项task
换个角度看TimimgWheel时间轮盘,你可以简单理解就是一个时钟表盘,指针每隔一段时间前进一格,走玩一圈是一个周期,需要执行的任务就放置在表盘的刻度处,当指针走到该位置时,就执行相应的任务。
新增一个5s的任务后,会怎么样呢
如果当前的指针 currentTime 指向的是3,此时如果插入一个5s的任务进来,那么新来的任务会存放到时间格8中。
上面讲的都是简单单级时间轮,如果时间跨度查过了时间轮的刻度slot、比如添加一个15秒之后执行的任务,单个轮盘就无法满足我们的需求。我们就要考虑使用别的方案了!
在确定如何实现之前,先捋一捋多级轮中时间和轮次的关系:
在单次轮中我们假设有m个slot,形成一个数组(环形队列的底层用数组表示),每个slot表示的时间间隔是 t,那么能个表示的时间范围就是 m * t,,获取slot上的tasklist,可以用slot[i]表示,i 是slot数组的下标。
如果是多级轮,我们可以理解为多个环形队列,假如一个两级轮分别是circle1和circle2,如果同样给circle 2 分配12个 slot,每个slot的时间间隔 t 表示1s。
此时circle中每个slot对应的时间范围应该是m * t了(circle1表示的时间范围),而整个二级时间轮能表示的时间范围就是 m * m * t。
为什么表达的时间范围是 m * m *t ?
因为circle2是基于circle1表达的时间范围 m * t ,而二级轮circle2的slot表达的时间是基于circle的,同样也是 m个slot,所以整个表示的范围就是 m * m * t
依次类推,如果slot的数量是60,那么这种方式像不像类似于【秒、分、时】这种时间的换算比,细品一下,其实就是这样的
那么如何实现者中多轮的方式呢?这里有两种方式去实现这个需求
1:多轮盘方式
多级时间轮的理念,更贴近了钟表,钟表分为时针、分针、秒针。每过60s,分针前进一格,每过60min,时针前进一格。
多级时间轮的理念就是分两级甚至更多级轮盘,当一级轮盘走一圈后,二级轮盘前进一格,二级轮盘走一圈后,三级轮盘前进一格,依次类推~
2:task关联circle
每个任务添加一个circle (圈数) 的参数,未超过时间轮单轮刻度那么圈数circle = 0,超过的话就通过计算得到circle的值。
例如某个任务需要16s之后执行,那么该任务的circle = 1,计算得到的表盘位置slot = 4,也就是该任务需要在表盘走一圈以后,在位置4处执行。如果表盘指针刚好前进到该slot,该处的任务列表中的circle都减1,直到slot = 4的任务链表中的任务circle = 0,才执行该任务。
相比较而言个人更倾向于基于在任务上添加circle参数的方式,这种方式需要控制的是任务上的circle的值,本文也是基于这种方式去实现的。
了解完时间轮的原理之后,我们看下如何去用Golang语言实现一个时间轮,下面的实现是基于单轮、结合任务circle实现层级轮扩展的时间轮实现方式。
梳理下时间轮算法的流程:
在有想法写之前,我们看下要达到什么样的目标和功能,然后基于这些点去做具体实现,我认为的应该是需要如下结构和方法去实现。
总共分为三个部分:数据结构、对外方法、内部实现
数据结构是时间轮的基石,整个时间轮的什么周期都是基于这几个结构去进行的,而对外方法是我们进行使用时需要用到的,内部实现是具体的业务逻辑,其中execute和getSlotAndCircle是整个实现方案的核心,后续也是我们讲代码实现的重点。
这里有两个基于用Golang实现的基础:
双向链表:基于Golang的标准库container/list中已经实现的双向链表来进行定时任务的底层存储
环状数据结构:这里的环状数据结构其实并不需要我们进行实际意义上的收尾相连形成一个环,这里是基于数组,然后利用求余运算来逐个下标遍历方式,实现一个在逻辑上符合环状方式的目的。
开始说代码实现吧!
时间轮XTimingWhee结构体字段用下图来表示:
核心字段如下:
currentSlot:当前轮盘位置
slots:用于初始化轮盘数组内的元素 -- 链表(基于数组实现逻辑上的环状结构)
taskRecords:这是个sync.Map结构主要是将任务key和添加到链表的element进行映射和管理,方便后续任务删除等进行判断操作
ticker:定时器,一般根据interval设定触发时间间隔,利用channel传递触发通知
stopCha : 关闭时间轮
而对于任务的管理主要是利用channel类型,主要是两个字段去处理:
addTaskCh:这个定时任务Task的核心结构,这个在后面会讲到
remoteTaskCh:传递任务的唯一标记即可
高能预警🔥🔥:这里说下为啥stopCh用空结构体,因为这个关闭场景只需通知型 channel,其不需要发送任何数据,不会带来额外的内存开销
Task任务结构
对任务结构的定义如下,和XTimingWheel来说少了不字段信息
type Task struct {
//任务key 唯一
key string
//执行的具体函数
job Job
//任务执行时间
executeAt time.Duration
//执行次数 -1:重复执行
times int
//轮盘位置
slot int
//圈数
circle int
}
注:circle = 0是才能执行该任务,表示轮次已经遍历完了,比如circle = 1的任务,需要遍历一层后,
circle 的值等于0时才能执行
Job是实际的执行函数,是的,这是个入参为key的函数,可以在我们实际实现函数的时候对唯一标记简单验证,对它的定义结构比较简单:
type Job func(key string)
在使用时间轮之前需要先进行初始化,这里初始化提供两个参数:
interval:轮盘之间每个槽位的时间间隔
slotNum:轮盘的槽位数
func NewXTimingWheel(interval time.Duration, slotNum int) (*XTimeWheel, error) {
这里还提供了一个初始化默认时间轮的方法 DefaultTimingWheel,该方式默认interval = 1, slotNum = 12
func DefaultTimingWheel() (*XTimeWheel, error) {
tw, _ := NewXTimingWheel(time.Second, DefaultSlotNum)
return tw, nil
}
继续看下NewXTimingWheel的实现,各种Channel的初始化,以及默认值的填充
func NewXTimingWheel(interval time.Duration, slotNum int) (*XTimeWheel, error) {
//参数判断
if interval <= 0 {
return nil, errors.New("minimum interval need one second")
}
if slotNum <= 0 {
return nil, errors.New("minimum slotNum need greater than zero")
}
t := &XTimeWheel{
interval: interval,
currentSlot: 0,
slotNum: slotNum,
slots: make([]*list.List, slotNum),
stopCh: make(chan struct{}),
removeTaskCh: make(chan string),
addTaskCh: make(chan *Task),
isRun: false,
}
t.start()
return t, nil
}
func (t *XTimeWheel) start() {
//判断时间轮时间在运行
if !t.isRun {
//根据slotNum初始化数组结构双向链表list
for i := 0; i < t.slotNum; i++ {
t.slots[i] = list.New()
}
//设置定时器时间间隔
t.ticker = time.NewTicker(t.interval)
t.mux.Lock()
t.isRun = true
//开启协程执行
go t.run()
t.mux.Unlock()
}
}
我们知道Channel遵循先入先出(First In First Out)的规则,可以保证收顺序。本实现同样我们任务的新增和删除任务等消息的传递和分发,结合for + select 多路复用方式监听多个 channel 的读写操作,作为一个简单的调度层。
代码如下:利用for作为常驻协程的方式实现
func (t *XTimeWheel) run() {
for {
select {
//关闭时间轮, 退出本函数
case <-t.stopCh:
return
//添加任务
case task := <-t.addTaskCh:
t.addTask(task)
//删除任务
case key := <-t.removeTaskCh:
t.removeTask(key)
//定时器信号
case <-t.ticker.C:
t.execute()
}
}
}
从设计的脑图和run方法实现中,可以估摸出任务的添加其实涉及两方,我们来捋一下整个流程:
func (t *XTimeWheel) AddTask(key string, job Job, executeAt time.Duration, times int) error {
if key == "" {
return errors.New("key is empty")
}
if executeAt < t.interval {
return errors.New("key is empty")
}
//sync.Map判断是否已添加过任务key
_, ok := t.taskRecords.Load(key)
if ok {
return errors.New("key of job already exists")
}
task := &Task{
key: key,
job: job,
times: times,
executeAt: executeAt,
}
//写入addTaskCh这个channel
t.addTaskCh <- task
return nil
}
内部关于AddTask的实现如下
func (t *XTimeWheel) addTask(task *Task) {
//计算slot和cirle的值
slot, circle := t.calSlotAndCircle(task.executeAt)
task.slot = slot
task.circle = circle
//追加到指定slot下标的list链表汇总
ele := t.slots[slot].PushBack(task)
//sync.Map保存key和链表中具体的任务
t.taskRecords.Store(task.key, ele)
}
其中对于slot、circle的计算是整个时间轮对于任务位置编排的核心,下个小节【计算slot和circle】会进行详细介绍。
时间轮每次任务添加之前都会先进行slot和circle的计算,为了确定任务的在轮次中的位置和圈数,这两个参数在任务调度的时候很重要,会基于当前位置currentSlot的双向链表进行轮次循环等操作
func (t *XTimeWheel) calSlotAndCircle(executeAt time.Duration) (slot, circle int) {
//延迟时间 秒
delay := int(executeAt.Seconds())
//当前轮盘表示的时间 秒
circleTime := len(t.slots) * int(t.interval.Seconds())
//计算圈数
circle = delay / circleTime
//计算延迟时间对应的slot步长
steps := delay / int(t.interval.Seconds())
//计算位置slot
slot = (t.currentSlot + steps) % len(t.slots)
return
}
通过前面初始化、计算slot和circle、任务添加之后我们的任务就已经添加到链表上了,等定时器触发到了任务所在slot后就到了执行阶段了,这里就是本文最关键的业务逻辑🔥🔥🔥--【执行任务】。
我们来简单回顾下,到目前位置,任务已经添加到指定位置对应的双向链表list中了,那么在执行阶段就是将list中的任务拿出来执行了,没错,这就是整个时间轮的内核,一起来看是如何执行的!
先看下代码,都有注释的,先把注释看完
func (t *XTimeWheel) execute() {
//取出当前slot下标对应的 list
taskList := t.slots[t.currentSlot]
//判断list是否为空
if taskList != nil {
//遍历list
for ele := taskList.Front(); ele != nil; {
//获取链表元素的定时任务
taskEle, _ := ele.Value.(*Task)
//判断任务circle (circle == 0才执行)
if taskEle.circle > 0 {
taskEle.circle--
//返回链表的下一个元素
ele = ele.Next()
continue
}
//执行任务函数
go taskEle.job(taskEle.key)
//删除key映射删除
t.taskRecords.Delete(taskEle.key)
//删除任务所在链表中元素
taskList.Remove(ele)
//执行固定次数任务
if taskEle.times-1 > 0 {
taskEle.times--
t.addTask(taskEle)
}
//重复任务
if taskEle.times == -1 {
t.addTask(taskEle)
}
ele = ele.Next()
}
}
//将当前位置slot往前自增
t.incrCurrentSlot()
}
看完代码注释的你应该清楚了吧,不清楚的话没事,小许还有总结!📝📝📝
上面我们把整体的流程和代码分析都整理了一遍了,看了大纲的同学都知道还有移除任务和停止时间轮这两块,具体代码就不贴出来啦。
我已把项目放到github上了:https://github.com/xiaoxucode/xtimingwheel
欢迎给个 star ⭐⭐
文末安利一波:
欢迎朋友们关注我的同名公众号📢📢:【小许code】,等你哦!🤣🤣
👨👩 朋友,希望本文对你有帮助~🌐
欢迎点赞 👍、收藏 💙、关注 💡 三连支持一下~🎈
🎈知道的越多,不知道的也越多,我是小许,下期见~🙇💻
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。