Golang 自适应微服务自理背后的滑动窗口算法如何实现

2023-07-0219:53:34后端程序开发Comments968 views字数 3888阅读模式

学习一下 go-zero 框架封装的滑动窗口算法是如何实现的,RollingWindow 是一种用于计算具有时间间隔事件的桶的数据结构。该滑动窗口使用循环缓冲区来存储桶,每个桶代表一个时间间隔内的时间。例如:最近30秒请求成功数,请求总数。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

它适用于需要统计某些数字指标的场景,例如:自适应断流,熔断器。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

01 基本思路

将整个时间区间 totalInterval 划分为 n 个桶,每个桶的时间区间为 interval,桶使用长度为 n 的环形数组存放。假设 totalInterval = 10s,n = 40,那么 interval = totalInterval / n = 250 ms.文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

而 now 时刻增加指标的目标桶的索引为:(nowTime - lastTime) / interval % n。 在每次新增时,需要计算自上次更新时间 lastTime 后跨越了几个 bucket,更新环形缓冲区的偏移量,重置过期的桶。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

Golang 自适应微服务自理背后的滑动窗口算法如何实现 文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

02 数据结构

我们来看看 RollingWindow 定义的数据结构:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

type (  // RollingWindowOption 定义了一个函数类型,用于定制 RollingWindow。  RollingWindowOption func(rollingWindow *RollingWindow)
  // RollingWindow 定义了一个滑动窗口,用于计算时间间隔内的事件。  // 桶存储在一个循环缓冲区中。  RollingWindow struct {    lock          sync.RWMutex  // lock for updating offset    size          int           // number of buckets    win           *window       // circular buffer    interval      time.Duration // time interval    offset        int           // offset of the circular buffer    ignoreCurrent bool          // ignore current bucket if set    lastTime      time.Duration // start time of the last bucket  })

上述数据结构中,window 就是一个环形缓存区,下面看看 window 的数据结构:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

// window 定义了一个循环缓冲区,它包含了一组桶。type window struct {  buckets []*Bucket  size    int}
// Bucket 定义成功总数和请求总数type Bucket struct {  Sum   float64  Count int64}

Golang 自适应微服务自理背后的滑动窗口算法如何实现文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

在 window 中,需要多少个 Bucket 由开发者自行定义,在 go-zero 框架的熔断器实现 googlebreaker 中,则默认使用了 40 个 Bucket。而每一个桶中分别定义了两个参数:Sum 表示成功总数,Count 表示请求总数。在熔断器中做计算时,会将 Sum 累加和记为 accepts,Count 累加和记为 total,从而计算出了当前的错误率。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

03 window 和 Bucket 的实现

我们来看看 window 和 Bucket 提供了什么样的方法:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

func (b *Bucket) add(v float64) {  b.Sum += v  b.Count++}
func (b *Bucket) reset() {  b.Sum = 0  b.Count = 0}
func newWindow(size int) *window {  buckets := make([]*Bucket, size)  for i := 0; i < size; i++ {    buckets[i] = new(Bucket)  }  return &window{    buckets: buckets,    size:    size,  }}
// add 往执行的桶中添加值。func (w *window) add(offset int, v float64) {  w.buckets[offset%w.size].add(v)}
// reduce 运行 fn 在所有的桶上。func (w *window) reduce(start, count int, fn func(b *Bucket)) {  for i := 0; i < count; i++ {    fn(w.buckets[(start+i)%w.size])  }}
// resetBucket 重置指定的桶。func (w *window) resetBucket(offset int) {  w.buckets[offset%w.size].reset()}
  • Bucket:提供了添加和重置的方法文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

  • window:也提供了添加和重置桶的方法,并且提供了 Reduce 用于在所有桶上执行特定的操作。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

04 RollingWindow 的实现

我们来看看 RollingWindow 提供了什么样的方法:文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

// NewRollingWindow 返回一个 RollingWindow,它有 size 个桶和时间间隔,使用 opts 来定制 RollingWindow。func NewRollingWindow(size int, interval time.Duration, opts ...RollingWindowOption) *RollingWindow {  if size < 1 {    panic("size must be greater than 0")  }
  w := &RollingWindow{    size:          size,    win:           newWindow(size),    interval:      interval,    offset:        0,    ignoreCurrent: false,    lastTime:      timex.Now(),  }  for _, opt := range opts {    opt(w)  }  return w}
// Add 添加值到当前桶// 通过 lastTime 和 nowTime 的标注,不断重置来实现窗口滑动,新的数据不断补上,从而实现滑动窗口的效果。func (rw *RollingWindow) Add(v float64) {  rw.lock.Lock()  defer rw.lock.Unlock()  // 滑动在此处进行  rw.updateOffset()  rw.win.add(rw.offset, v)}
// Reduce 运行 fn 在所有的桶上,如果 ignoreCurrent 被设置,则忽略当前桶。func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {  rw.lock.RLock()  defer rw.lock.RUnlock()
  var diff int  span := rw.span()  // 忽略当前桶,因为数据不完整  if span == 0 && rw.ignoreCurrent {    diff = rw.size - 1  } else {    diff = rw.size - span  // size - span 取出未过期的桶  }  if diff > 0 {    offset := (rw.offset + span + 1) % rw.size    // 从 offset 开始,取出 diff 个桶    rw.win.reduce(offset, diff, fn)  }}
// span 自上次更新 lastTime 跨越了几个 bucketfunc (rw *RollingWindow) span() int {  offset := int(timex.Since(rw.lastTime) / rw.interval)  if 0 <= offset && offset < rw.size {    return offset  }
  return rw.size}
// updateOffset 更新环形缓冲区的偏移量。func (rw *RollingWindow) updateOffset() {  span := rw.span()  if span <= 0 {    return  }
  offset := rw.offset  // 重置过期的桶  for i := 0; i < span; i++ {    rw.win.resetBucket((offset + i + 1) % rw.size)  }
  rw.offset = (offset + span) % rw.size  now := timex.Now()  // 更新时间,与时间间隔对齐  rw.lastTime = now - (now-rw.lastTime)%rw.interval}
  • NewRollingWindow:用于创建一个滑动窗口对象,并可以使用参数 size(桶的数量)和 interval (时间间隔)进行自定义设置。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

  • Add:用于向当前桶中添加一个值。通过 lastTime 和 nowTime 的标注,不断重置来实现窗口滑动,新的数据不断补上,从而实现滑动窗口的效果。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

  • Reduce:用于对所有桶运行指定的函数,如果设置了 ignoreCurrent,则会忽略当前桶。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

在熔断器中,Reduce 用于计算 accepts 和 total 值。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

func (b *googleBreaker) history() (accepts, total int64) {  b.stat.Reduce(func(b *collection.Bucket) {    accepts += int64(b.Sum)    total += b.Count  })
  return}
  • updateOffset:是一个比较核心的方法,滑动也就是在此处进行,用于 bucket 的更新,重置掉过期的 bucket,以及确定当前时间需要落在哪个 bucket。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

    • 确定当前时间相对于 bucket interval 的跨度,即调用 span 方法。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

    • 将跨度内的 bucket 都清空数据,因为已经过期了。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

    • 更新当前偏移量 offset,这也是即将要写入数据的 bucket。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

    • 更新 lastTime,给下一次移动做一个标识。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

05 总结

1. RollingWindow是一个滑动窗口,其中有 size 个桶,每个桶记录了一段时间内的事件次数,每个桶的时间间隔为 interval。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

2. 每个桶有一个sum和count,sum为成功总数,count为请求总数。文章源自菜鸟学院-https://www.cainiaoxueyuan.com/bc/49393.html

  • 本站内容整理自互联网,仅提供信息存储空间服务,以方便学习之用。如对文章、图片、字体等版权有疑问,请在下方留言,管理员看到后,将第一时间进行处理。
  • 转载请务必保留本文链接:https://www.cainiaoxueyuan.com/bc/49393.html

Comment

匿名网友 填写信息

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定