Golang 自适应微服务自理背后的滑动窗口算法如何实现
学习一下 go-zero 框架封装的滑动窗口算法是如何实现的,RollingWindow 是一种用于计算具有时间间隔事件的桶的数据结构。该滑动窗口使用循环缓冲区来存储桶,每个桶代表一个时间间隔内的时间。例如:最近30秒请求成功数,请求总数。
它适用于需要统计某些数字指标的场景,例如:自适应断流,熔断器。
01 基本思路
将整个时间区间 totalInterval 划分为 n 个桶,每个桶的时间区间为 interval,桶使用长度为 n 的环形数组存放。假设 totalInterval = 10s,n = 40,那么 interval = totalInterval / n = 250 ms.
而 now 时刻增加指标的目标桶的索引为:(nowTime - lastTime) / interval % n。 在每次新增时,需要计算自上次更新时间 lastTime 后跨越了几个 bucket,更新环形缓冲区的偏移量,重置过期的桶。
02 数据结构
我们来看看 RollingWindow 定义的数据结构:
type (// RollingWindowOption 定义了一个函数类型,用于定制 RollingWindow。RollingWindowOption func(rollingWindow *RollingWindow)// RollingWindow 定义了一个滑动窗口,用于计算时间间隔内的事件。// 桶存储在一个循环缓冲区中。RollingWindow struct {lock sync.RWMutex // lock for updating offsetsize int // number of bucketswin *window // circular bufferinterval time.Duration // time intervaloffset int // offset of the circular bufferignoreCurrent bool // ignore current bucket if setlastTime time.Duration // start time of the last bucket})
上述数据结构中,window 就是一个环形缓存区,下面看看 window 的数据结构:
// window 定义了一个循环缓冲区,它包含了一组桶。type window struct {buckets []*Bucketsize int}// Bucket 定义成功总数和请求总数type Bucket struct {Sum float64Count int64}

在 window 中,需要多少个 Bucket 由开发者自行定义,在 go-zero 框架的熔断器实现 googlebreaker 中,则默认使用了 40 个 Bucket。而每一个桶中分别定义了两个参数:Sum 表示成功总数,Count 表示请求总数。在熔断器中做计算时,会将 Sum 累加和记为 accepts,Count 累加和记为 total,从而计算出了当前的错误率。
03 window 和 Bucket 的实现
我们来看看 window 和 Bucket 提供了什么样的方法:
func (b *Bucket) add(v float64) {b.Sum += vb.Count++}func (b *Bucket) reset() {b.Sum = 0b.Count = 0}func newWindow(size int) *window {buckets := make([]*Bucket, size)for i := 0; i < size; i++ {buckets[i] = new(Bucket)}return &window{buckets: buckets,size: size,}}// add 往执行的桶中添加值。func (w *window) add(offset int, v float64) {w.buckets[offset%w.size].add(v)}// reduce 运行 fn 在所有的桶上。func (w *window) reduce(start, count int, fn func(b *Bucket)) {for i := 0; i < count; i++ {fn(w.buckets[(start+i)%w.size])}}// resetBucket 重置指定的桶。func (w *window) resetBucket(offset int) {w.buckets[offset%w.size].reset()}
-
Bucket:提供了添加和重置的方法
-
window:也提供了添加和重置桶的方法,并且提供了 Reduce 用于在所有桶上执行特定的操作。
04 RollingWindow 的实现
我们来看看 RollingWindow 提供了什么样的方法:
// NewRollingWindow 返回一个 RollingWindow,它有 size 个桶和时间间隔,使用 opts 来定制 RollingWindow。func NewRollingWindow(size int, interval time.Duration, opts ...RollingWindowOption) *RollingWindow {if size < 1 {panic("size must be greater than 0")}w := &RollingWindow{size: size,win: newWindow(size),interval: interval,offset: 0,ignoreCurrent: false,lastTime: timex.Now(),}for _, opt := range opts {opt(w)}return w}// Add 添加值到当前桶// 通过 lastTime 和 nowTime 的标注,不断重置来实现窗口滑动,新的数据不断补上,从而实现滑动窗口的效果。func (rw *RollingWindow) Add(v float64) {rw.lock.Lock()defer rw.lock.Unlock()// 滑动在此处进行rw.updateOffset()rw.win.add(rw.offset, v)}// Reduce 运行 fn 在所有的桶上,如果 ignoreCurrent 被设置,则忽略当前桶。func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {rw.lock.RLock()defer rw.lock.RUnlock()var diff intspan := rw.span()// 忽略当前桶,因为数据不完整if span == 0 && rw.ignoreCurrent {diff = rw.size - 1} else {diff = rw.size - span // size - span 取出未过期的桶}if diff > 0 {offset := (rw.offset + span + 1) % rw.size// 从 offset 开始,取出 diff 个桶rw.win.reduce(offset, diff, fn)}}// span 自上次更新 lastTime 跨越了几个 bucketfunc (rw *RollingWindow) span() int {offset := int(timex.Since(rw.lastTime) / rw.interval)if 0 <= offset && offset < rw.size {return offset}return rw.size}// updateOffset 更新环形缓冲区的偏移量。func (rw *RollingWindow) updateOffset() {span := rw.span()if span <= 0 {return}offset := rw.offset// 重置过期的桶for i := 0; i < span; i++ {rw.win.resetBucket((offset + i + 1) % rw.size)}rw.offset = (offset + span) % rw.sizenow := timex.Now()// 更新时间,与时间间隔对齐rw.lastTime = now - (now-rw.lastTime)%rw.interval}
-
NewRollingWindow:用于创建一个滑动窗口对象,并可以使用参数 size(桶的数量)和 interval (时间间隔)进行自定义设置。
-
Add:用于向当前桶中添加一个值。通过 lastTime 和 nowTime 的标注,不断重置来实现窗口滑动,新的数据不断补上,从而实现滑动窗口的效果。
-
Reduce:用于对所有桶运行指定的函数,如果设置了 ignoreCurrent,则会忽略当前桶。
在熔断器中,Reduce 用于计算 accepts 和 total 值。
func (b *googleBreaker) history() (accepts, total int64) {b.stat.Reduce(func(b *collection.Bucket) {accepts += int64(b.Sum)total += b.Count})return}
-
updateOffset:是一个比较核心的方法,滑动也就是在此处进行,用于 bucket 的更新,重置掉过期的 bucket,以及确定当前时间需要落在哪个 bucket。
-
确定当前时间相对于 bucket interval 的跨度,即调用 span 方法。
-
将跨度内的 bucket 都清空数据,因为已经过期了。
-
更新当前偏移量 offset,这也是即将要写入数据的 bucket。
-
更新 lastTime,给下一次移动做一个标识。
-
05 总结
1. RollingWindow是一个滑动窗口,其中有 size 个桶,每个桶记录了一段时间内的事件次数,每个桶的时间间隔为 interval。
2. 每个桶有一个sum和count,sum为成功总数,count为请求总数。




