目录
- 1、背景
- 2、下载
- 3、原理解释
- 4、代码示例
- 5、总结
1、背景
在处理同一时刻接口的并发请求时,常见的有这几种情况:一个请求正在执行,相同的其它请求等待顺序执行,使用互斥锁就能完成、一个请求正在执行,相同的其它请求都丢弃、一个请求正在执行,相同的其它请求等待拿取相同的结果。使用singleflight包就能达到一个请求正在执行,相同的其它请求过来等待第一个请求执行完,然后共享第一个请求的结果,在处理并发场景时非常好用。
2、下载
go get -u golang.org/x/sync/singleflight
3、原理解释
singleflight底层结构:
type Group struct {mu sync.Mutex //保护map对象m的并发安全m map[string]*call //key-请求的唯一标识,val-请求唯一标识对应要执行的函数
}
call底层结构:
type call struct {wg sync.WaitGroup //用来阻塞相同标识对应的请求中的第一个请求之外的请求val interface{} //第一个请求的执行结果err error //第一个请求返回的错误dups int //第一个请求之外的其它请求数chans []chan<- Result //请求结果写入通道
}
关键函数:
//
// Do
// @Description: 一个唯一标识对应的请求在执行过程中,相同唯一标识对应的请求会被阻塞,等待第一个请求执行完并共享结果
// @receiver g
// @param key 请求唯一标识
// @param fn 请求要执行的函数
// @return v 请求要执行函数返回的结果
// @return err 请求要执行的函数返回的错误
// @return shared 是否有多个请求共享结果
//
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {g.mu.Lock() //保护map对象m的并发安全if g.m == nil {g.m = make(map[string]*call) //初始化m对象}if c, ok := g.m[key]; ok { //map中key存在说明这个key对应的请求正在执行中,这次请求不是第一个请求c.dups++ //等待共享结果数+1g.mu.Unlock()c.wg.Wait() //阻塞等待第一个请求执行完if e, ok := c.err.(*panicError); ok {panic(e)} else if c.err == errGoexit {runtime.Goexit()}return c.val, c.err, true //返回第一个请求的执行结果}//第一个请求的处理逻辑c := new(call)c.wg.Add(1) //计数+1g.m[key] = c //唯一标识关联对应的函数对象g.mu.Unlock()g.doCall(c, key, fn) //执行请求对应的函数return c.val, c.err, c.dups > 0 //返回执行结果
}
上面Do函数中g.doCall函数也需要大概理解一下,就是会将key对应函数的执行结果写的call对象c里,然后清空wg计数,相同key对应的其它请求就会跳出c.wg.Wait()阻塞,直接从call对象c中读取第一个请求的执行结果和错误信息并返回,源码如下:
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {normalReturn := falserecovered := falsedefer func() { //fn函数执行完之后执行if !normalReturn && !recovered {c.err = errGoexit}g.mu.Lock()defer g.mu.Unlock()c.wg.Done() //释放计数if g.m[key] == c {delete(g.m, key) //删除此次请求的唯一标识相关信息}if e, ok := c.err.(*panicError); ok {if len(c.chans) > 0 {go panic(e)select {} } else {panic(e)}} else if c.err == errGoexit {// Already in the process of goexit, no need to call again} else {for _, ch := range c.chans { //执行结果写入通道ch <- Result{c.val, c.err, c.dups > 0}}}}()func() {defer func() {if !normalReturn {if r := recover(); r != nil {c.err = newPanicError(r)}}}()c.val, c.err = fn() //执行fn函数normalReturn = true}()if !normalReturn {recovered = true}
}
singleflight中还提供了与Do函数功能相同的函数DoChan函数,唯一区别就是将请求对应的函数执行结果放到通道中进行返回,这两函数一个用于同步场景,一个用于异步场景。还有一个Forget函数:
func (g *Group) Forget(key string) {g.mu.Lock()delete(g.m, key) //删除map中的key,相同key对应请求进来会重新执行,不等待第一个key对应请求的执行结果g.mu.Unlock()
}
4、代码示例
示例如下:
func main() {var singleFlight singleflight.Group //初始化一个单次执行对象var count uint64 //用于测试是否被修改//为了并发执行,这里测试唯一标识都为xxxgo func() {val1, _, shared1 := singleFlight.Do("xxx", func() (interface{}, error) {logger.Info("first count +1")atomic.AddUint64(&count, 1) //第一次执行,将count+1time.Sleep(5 * time.Second) //增加第一次执行时间return count, nil})//打印第一次执行结果logger.Info("first count info", zap.Any("val1", val1), zap.Bool("shared1", shared1), zap.Uint64("count", count))}()time.Sleep(2 * time.Second) //为了防止下面的Do函数先执行val2, _, shared2 := singleFlight.Do("xxx", func() (interface{}, error) {logger.Info("second count +1")atomic.AddUint64(&count, 1) //第2次执行count+1return count, nil})//打印第二次执行结果logger.Info("second count info", zap.Any("val2", val2), zap.Bool("shared2", shared2), zap.Uint64("count", count))
}
控制台输出:
$ go run ./singlefight_demo/main.go
[2025-01-09 17:08:11.169] | INFO | Goroutine:6 | [singlefight_demo/main.go:19] | first count +1
[2025-01-09 17:08:16.261] | INFO | Goroutine:6 | [singlefight_demo/main.go:28] | first count info | {"val1": 1, "shared1": true, "count": 1}
[2025-01-09 17:08:16.261] | INFO | Goroutine:1 | [singlefight_demo/main.go:41] | second count info | {"val2": 1, "shared2": true, "count": 1}
5、总结
看singleflight原码之后,要实现一个请求正在执行,相同的其它请求进来时直接报错的功能也很简单,将singleflight中等待第一个请求的逻辑改为直接返回错误就可以。