Golang 自适应微服务自理背后的滑动窗口算法如何实现
学习一下 go-zero 框架封装的滑动窗口算法是如何实现的,RollingWindow 是一种用于计算具有时间间隔事件的桶的数据结构。该滑动窗口使用循环缓冲区来存储桶,每个桶代表一个时间间隔内的时间。例如:最近30秒请求成功数,请求总数。
它适用于需要统计某些数字指标的场景,例如:自适应断流,熔断器。
01 基本思路
将整个时间区间 totalInterval 划分为 n 个桶,每个桶的时间区间为 interval,桶使用长度为 n 的环形数组存放。假设 totalInterval = 10s,n = 40,那么 interval = totalInterval / n = 250 ms.
而 now 时刻增加指标的目标桶的索引为:(nowTime - lastTime) / interval % n。 在每次新增时,需要计算自上次更新时间 lastTime 后跨越了几个 bucket,更新环形缓冲区的偏移量,重置过期的桶。
02 数据结构
我们来看看 RollingWindow 定义的数据结构:
type (
// RollingWindowOption 定义了一个函数类型,用于定制 RollingWindow。
RollingWindowOption func(rollingWindow *RollingWindow)
// RollingWindow 定义了一个滑动窗口,用于计算时间间隔内的事件。
// 桶存储在一个循环缓冲区中。
RollingWindow struct {
lock sync.RWMutex // lock for updating offset
size int // number of buckets
win *window // circular buffer
interval time.Duration // time interval
offset int // offset of the circular buffer
ignoreCurrent bool // ignore current bucket if set
lastTime time.Duration // start time of the last bucket
}
)
上述数据结构中,window 就是一个环形缓存区,下面看看 window 的数据结构:
// window 定义了一个循环缓冲区,它包含了一组桶。
type window struct {
buckets []*Bucket
size int
}
// Bucket 定义成功总数和请求总数
type Bucket struct {
Sum float64
Count int64
}
在 window 中,需要多少个 Bucket 由开发者自行定义,在 go-zero 框架的熔断器实现 googlebreaker 中,则默认使用了 40 个 Bucket。而每一个桶中分别定义了两个参数:Sum 表示成功总数,Count 表示请求总数。在熔断器中做计算时,会将 Sum 累加和记为 accepts,Count 累加和记为 total,从而计算出了当前的错误率。
03 window 和 Bucket 的实现
我们来看看 window 和 Bucket 提供了什么样的方法:
func (b *Bucket) add(v float64) {
b.Sum += v
b.Count++
}
func (b *Bucket) reset() {
b.Sum = 0
b.Count = 0
}
func newWindow(size int) *window {
buckets := make([]*Bucket, size)
for i := 0; i < size; i++ {
buckets[i] = new(Bucket)
}
return &window{
buckets: buckets,
size: size,
}
}
// add 往执行的桶中添加值。
func (w *window) add(offset int, v float64) {
w.buckets[offset%w.size].add(v)
}
// reduce 运行 fn 在所有的桶上。
func (w *window) reduce(start, count int, fn func(b *Bucket)) {
for i := 0; i < count; i++ {
fn(w.buckets[(start+i)%w.size])
}
}
// resetBucket 重置指定的桶。
func (w *window) resetBucket(offset int) {
w.buckets[offset%w.size].reset()
}
-
Bucket:提供了添加和重置的方法
-
window:也提供了添加和重置桶的方法,并且提供了 Reduce 用于在所有桶上执行特定的操作。
04 RollingWindow 的实现
我们来看看 RollingWindow 提供了什么样的方法:
// NewRollingWindow 返回一个 RollingWindow,它有 size 个桶和时间间隔,使用 opts 来定制 RollingWindow。
func NewRollingWindow(size int, interval time.Duration, opts ...RollingWindowOption) *RollingWindow {
if size < 1 {
panic("size must be greater than 0")
}
w := &RollingWindow{
size: size,
win: newWindow(size),
interval: interval,
offset: 0,
ignoreCurrent: false,
lastTime: timex.Now(),
}
for _, opt := range opts {
opt(w)
}
return w
}
// Add 添加值到当前桶
// 通过 lastTime 和 nowTime 的标注,不断重置来实现窗口滑动,新的数据不断补上,从而实现滑动窗口的效果。
func (rw *RollingWindow) Add(v float64) {
rw.lock.Lock()
defer rw.lock.Unlock()
// 滑动在此处进行
rw.updateOffset()
rw.win.add(rw.offset, v)
}
// Reduce 运行 fn 在所有的桶上,如果 ignoreCurrent 被设置,则忽略当前桶。
func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
rw.lock.RLock()
defer rw.lock.RUnlock()
var diff int
span := rw.span()
// 忽略当前桶,因为数据不完整
if span == 0 && rw.ignoreCurrent {
diff = rw.size - 1
} else {
diff = rw.size - span // size - span 取出未过期的桶
}
if diff > 0 {
offset := (rw.offset + span + 1) % rw.size
// 从 offset 开始,取出 diff 个桶
rw.win.reduce(offset, diff, fn)
}
}
// span 自上次更新 lastTime 跨越了几个 bucket
func (rw *RollingWindow) span() int {
offset := int(timex.Since(rw.lastTime) / rw.interval)
if 0 <= offset && offset < rw.size {
return offset
}
return rw.size
}
// updateOffset 更新环形缓冲区的偏移量。
func (rw *RollingWindow) updateOffset() {
span := rw.span()
if span <= 0 {
return
}
offset := rw.offset
// 重置过期的桶
for i := 0; i < span; i++ {
rw.win.resetBucket((offset + i + 1) % rw.size)
}
rw.offset = (offset + span) % rw.size
now := timex.Now()
// 更新时间,与时间间隔对齐
rw.lastTime = now - (now-rw.lastTime)%rw.interval
}
-
NewRollingWindow:用于创建一个滑动窗口对象,并可以使用参数 size(桶的数量)和 interval (时间间隔)进行自定义设置。
-
Add:用于向当前桶中添加一个值。通过 lastTime 和 nowTime 的标注,不断重置来实现窗口滑动,新的数据不断补上,从而实现滑动窗口的效果。
-
Reduce:用于对所有桶运行指定的函数,如果设置了 ignoreCurrent,则会忽略当前桶。
在熔断器中,Reduce 用于计算 accepts 和 total 值。
func (b *googleBreaker) history() (accepts, total int64) {
b.stat.Reduce(func(b *collection.Bucket) {
accepts += int64(b.Sum)
total += b.Count
})
return
}
-
updateOffset:是一个比较核心的方法,滑动也就是在此处进行,用于 bucket 的更新,重置掉过期的 bucket,以及确定当前时间需要落在哪个 bucket。
-
确定当前时间相对于 bucket interval 的跨度,即调用 span 方法。
-
将跨度内的 bucket 都清空数据,因为已经过期了。
-
更新当前偏移量 offset,这也是即将要写入数据的 bucket。
-
更新 lastTime,给下一次移动做一个标识。
-
05 总结
1. RollingWindow是一个滑动窗口,其中有 size 个桶,每个桶记录了一段时间内的事件次数,每个桶的时间间隔为 interval。
2. 每个桶有一个sum和count,sum为成功总数,count为请求总数。