欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > Go singlefight 源码详解|图解

Go singlefight 源码详解|图解

2025/1/19 7:32:05 来源:https://blog.csdn.net/weixin_45304503/article/details/139029476  浏览:    关键词:Go singlefight 源码详解|图解

写在前面

通俗的来说就是 singleflight 将相同的并发请求合并成一个请求,进而减少对下层服务的压力,通常用于解决缓存击穿的问题。

在这里插入图片描述

详解

基础结构

golang.org/x/sync/singleflight singleflight结构体:

type call struct {wg sync.WaitGroup// 这些字段在 WaitGroup 结束前写入一次// 只有在 WaitGroup 结束后才会被读取。val interface{}err error// 这些字段在 WaitGroup 结束前使用 singleflight 互斥锁进行读写// 在 WaitGroup 结束后读取但不写入。dups  intchans []chan<- Result
}

Group 代表分成多个工作组,形成一个命名空间,在这个命名空间中,各工作单元可以重复执行。

type Group struct {mu sync.Mutex       // 互斥锁m  map[string]*call // 懒加载
}

Result 保存 Do 方法的结果,以便在通道上传递。做异步处理。

type Result struct {Val    interface{}Err    errorShared bool
}

简单demo

func TestSingleFightExample(t *testing.T) {var group singleflight.Group// 模拟一个并发请求for i := 0; i < 5; i++ {go func(i int) {key := "example"tmp := i // 将tmp放进去val, err, _ := group.Do(key, func() (interface{}, error) {// 模拟一次耗时操作time.Sleep(time.Second)return fmt.Sprintf("result_%d", tmp), nil})if err != nil {fmt.Println("Error:", err)}fmt.Println("Value:", val)}(i)}// 等待所有请求完成time.Sleep(3 * time.Second)
}

结果:这是一个很随机的过程,0~4都有可能,主要看哪个协程最先进来。

Value: result_2
Value: result_2
Value: result_2
Value: result_2
Value: result_2

Do 执行函数:对同一个 key 多次调用的时候,在第一次调用没有执行完的时候, 只会执行一次 fn,其他的调用会阻塞住等待这次调用返回, shared 表示fn的结果是否被共享

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)

DoChan 和 Do 类似,只是 DoChan 返回一个 channel,也就是同步与异步的区别

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result

Forget:用于通知 Group 删除某个 key 这样后面继续这个 key 的调用的时候就不会在阻塞等待了

func (g *Group) Forget(key string){g.mu.Lock()if c, ok := g.m[key]; ok {c.forgotten = true}delete(g.m, key)g.mu.Unlock()
}

singleflight的本质是对某次函数调用的复用,只执行1次,并将执行期间相同的函数返回相同的结果。由此产生一个问题,如果实际执行的函数出了问题,比如超时,则在此期间的所有调用都会超时,由此需要一些额外的方法来控制。

在一些对可用性要求极高的场景下,往往需要一定的请求饱和度来保证业务的最终成功率。一次请求还是多次请求,对于下游服务而言并没有太大区别,此时使用 singleflight 只是为了降低请求的数量级,那么使用 Forget() 提高下游请求的并发。

常见面试题

singleflight 是什么?什么时候用的?

缓存失效,合并请求的时候用的,这样我们就可以减少对DB的请求压力。
在这里插入图片描述

如果这个goruntine超时怎么办?

singleflight 内部使用 waitGroup 来让同一个 key 的除了第一个请求的后续所有请求都阻塞。直到第一个请求执行 func 返回后,其他请求才会返回。
这意味着,如果 func 执行需要很长时间,那么后面的所有请求都会被一直阻塞。
这时候我们可以使用 DoChan 结合ctx + select做超时控制

func TestSingleFightTimeout(t *testing.T) {ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)go doFly(ctx)time.Sleep(2 * time.Second)cancel() // 2秒后超时
}func doFly(ctx context.Context) {var g singleflight.Groupkey := "example"// 使用 DoChan 结合 select 做超时控制result := g.DoChan(key, func() (interface{}, error) {time.Sleep(5 * time.Second) // 模拟超时return "result", nil})select {case r := <-result:fmt.Println("r", r.Val)case <-ctx.Done():fmt.Println("done")return}
}

结果输出:

done

上述代码中,我们将主进程先sleep 2秒,然后再进行cancel,那么此时我们将会让DoChan这个方法 time.Sleep 5秒模拟超时。那么我们会发现函数过了2秒之后就会输出done

doChan方法具体是怎么实现的?

在DoChan方法中,有一个 go g.doCall(c, key, fn) 的操作,当一个 goroutine 来执行,并通过channel 来返回数据,这样外部可以自定义超时逻辑,防止因为 fn 的阻塞,导致大量请求都被阻塞。

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {ch := make(chan Result, 1)g.mu.Lock()if g.m == nil {g.m = make(map[string]*call)}if c, ok := g.m[key]; ok { // 如果没有这个keyc.dups++c.chans = append(c.chans, ch)g.mu.Unlock()return ch}c := &call{chans: []chan<- Result{ch}} // 构造异步返回结构体,可以接参数进行超时c.wg.Add(1)g.m[key] = cg.mu.Unlock()go g.doCall(c, key, fn) // 异步执行return ch
}

如果请求失败了怎么办?

如果第一个请求失败了,那么后续所有等待的请求都会返回同一个 error。但实际上可以根据下游能支撑的 rps 定时 forget 这个 key,让更多的请求能有机会走到后续逻辑。

go func() {time.Sleep(100 * time.Millisecond)g.Forget(key)}()

比如1秒内有100个请求过来,正常是第一个请求能执行queryDB,后续99个都会阻塞。增加这个 Forget 之后,每 100ms 就能有一个请求执行 queryDB,相当于是多了几次尝试的机会,相对的也给DB造成了更大的压力,需要根据具体场景进去取舍。 因为有可能前几次是因为DB的抖动导致的查询失败,重试之后就能实现了。

参考链接

[1] https://pkg.go.dev/golang.org/x/sync/singleflight
[2] https://www.lixueduan.com/posts/go/singleflight
[3] https://juejin.cn/post/7093859835694809125

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com