假设有这么一个场景: 假设100w个Uber司机,司机客户端每隔10分钟上报一次数据,如果十分钟没有上报数据,服务端会将这个司机设置为离线状态,不给他派单。 我们应该如何实现这个功能? 通常的情况下,我们会使用redis的zset等第三方组件来实现这个功能,但是如果不使用第三方组件呢?只使用内存呢?大概有这么几种方案:
var (
userMap sync.Map
timeout = 10 * time.Minute
)
funccheckTimeout() {
now := time.Now().Unix()
userMap.Range(func(key, value any)bool {
uid := key.(string)
lastTime := value.(int64)
if now-lastTime > int64(timeout) {
fmt.Printf("User %s timed out. Last reported at %d\n", uid, lastTime)
userMap.Delete(uid)
}
returntrue
})
}
缺点:这种方式效率不高,因为需要定期轮询整个Map,时间复杂度较高。
另一种方案是为每个司机分配一个Goroutine来管理其心跳超时。
timer := time.NewTimer(timeout * time.Second)
for {
select {
case <-heartbeat: // 收到心跳信号
if !timer.Stop() {
<-timer.C
}
timer.Reset(timeout * time.Second)
case <-timer.C: // 超时
fmt.Printf("Driver %s timed out.\n", uid)
break
}
}
userMap.Delete(uid)
缺点:虽然不需要轮询,但每个Goroutine都有栈空间,当司机数量非常多时,内存消耗会很大。此外,Timer的创建和删除时间复杂度为O(log n),效率有待提升。 前面铺垫了这么久,终于轮到我们的主角了,时间轮。
时间轮是一个比较有趣的算法,他最早刊登在George Varghese和Tony Lauck的论文里。 时间轮算法的核心逻辑是:
所以我们可以使用时间轮算法来实现功能。我们可以将interval设置成1s, 时间轮的slots为600,司机上报数据的时候,将记录插入到position-1的slot里面。 一个简单的Demo:
package main
import (
"log"
"sync"
"time"
)
type Driver struct {
uid string
expireAt int64
}
type TimeWheel struct {
drivers map[string]*Driver // all drivers
slots [][]*Driver // time wheel solts
position int // current position
mu sync.Mutex //
ticker *time.Ticker //
stop chan struct{} //
interval int
slotCount int
timeoutSecs int
}
func NewTimeWheel(interval, slotCount, timeoutSecs int) *TimeWheel {
tw := &TimeWheel{
interval: interval,
slotCount: slotCount,
timeoutSecs: timeoutSecs,
drivers: make(map[string]*Driver),
slots: make([][]*Driver, slotCount),
position: 0,
ticker: time.NewTicker(time.Duration(interval) * time.Second),
stop: make(chan struct{}),
}
for i := 0; i < slotCount; i++ {
tw.slots[i] = make([]*Driver, 0)
}
go tw.run()
return tw
}
func (tw *TimeWheel) Add(uid string) {
tw.mu.Lock()
defer tw.mu.Unlock()
expireAt := time.Now().Unix() + int64(tw.timeoutSecs)
driver := &Driver{
uid: uid,
expireAt: expireAt,
}
tw.drivers[uid] = driver
slot := tw.GetSlot(-1)
tw.slots[slot] = append(tw.slots[slot], driver)
log.Printf("time:%d,Driver %s added to slot %d\n", time.Now().Unix(), uid, slot)
}
func (tw *TimeWheel) GetSlot(index int) int {
return (tw.position + index + tw.slotCount) % tw.slotCount
}
func (tw *TimeWheel) run() {
for {
select {
case <-tw.ticker.C:
log.Printf("tick, position:%d\n", tw.position)
tw.mu.Lock()
expired := tw.slots[tw.position]
tw.slots[tw.position] = make([]*Driver, 0)
tw.position = (tw.position + 1) % tw.slotCount
tw.mu.Unlock()
for _, driver := range expired {
if time.Now().Unix() >= driver.expireAt {
tw.mu.Lock()
delete(tw.drivers, driver.uid)
tw.mu.Unlock()
log.Printf("Driver %s timeout\n", driver.uid)
}
}
case <-tw.stop:
return
}
}
}
func (tw *TimeWheel) Stop() {
close(tw.stop)
tw.ticker.Stop()
}
所以时间轮特别适合以下场景的任务:
将所有任务的换算为多少秒或毫秒(Interval)后到期,维护一个最大过期值(Interval)长度的数组。比如有10个任务,分别是1s,3s,100s 后到期,就建一个100长度的数组,数组的index就是每个任务的过期值(Interval),当前时间作为第一个元素,那么第二个元素就是1s 后到期的任务,第三个是2s 后到期的任务,依次类推。当前时间随着时钟的前进(tick),逐步发现过期的任务。
简单时间轮虽然很完美,所有的操作时间复杂度都是O(1),但是当任务最大到期时间值非常大时,比如100w,构建这样一个数组是非常耗费内存的。可以改进一下,仍然使用时间轮,但是是用hash的方式将所有任务放到一定大小的数组内。 这个数组长度可以想象为时间轮的格子数量,轮盘大小(W)。 hash的数值仍然是每个任务的到期值(Interval),最简单的是轮盘大小(W)取值为2的幂次方,Interval哈希W后取余,余数作为轮盘数组的index,数组每个元素可能会有多个任务,把这些任务按照过期的绝对时间排序,这样就形成了一个链表,或者叫做时间轮上的一个桶。 但是Hash有序时间轮 还是有一个问题: 因为只使用了一个时间轮,处理每一格的定时任务列表的时间复杂度是 O(n),如果定时任务数量很大,分摊到每一格的定时任务列表就会很长,这样的处理性能显然是让人无法接受的。
层级时间轮通过使用多个时间轮,并且对每个时间轮采用不同的 u,可以有效地解决简单时间轮及其变体实现的问题。 参考 Kafka 的 Purgatory 中的层级时间轮实现:
总结一下几种算法的性能。
算法 | 添加任务复杂度 | 删除任务复杂度 | 内存开销 | 适用场景 |
---|---|---|---|---|
Single Timer | O(1)O(1) | O(1)O(1) | 低 | 适用于任务数量少、精度要求高的场景。 |
Multi Timer | O(logn)O(\log n) | O(logn)O(\log n) | 中 | 适用于任务数量中等、任务间相互独立的场景,但内存开销较高。 |
Simple Timing Wheel | O(1)O(1) | O(n)O(n) | 高(大数组) | 任务分布均匀、到期时间精度要求较低的场景。 |
Hash Timing Wheel | O(1)O(1) | O(n)O(n) | 中 | 任务数量较多、分布不均匀但对精度容忍较高的场景。 |
Hierarchical Timing Wheel | O(1)O(1) | O(logn)O(\log n) | 低到中 | 适用于大规模任务、层级管理复杂任务、需要较长生命周期的任务调度场景(如 Kafka 和 Netty)。 |