singleflight
业务中在查询缓存的时候,并发场景下会存在多个请求同时获取一个缓存key数据的情况,如下图所示:3个Client分别请求到WebServer获取缓存,WebServer同样会处理3个请求到CacheServer获取数据。
graph LR
Client1 --> WebServer
Client2 --> WebServer
Client3 --> WebServer
WebServer --> CacheServer
WebServer --> CacheServer
WebServer --> CacheServer
为了提升性能,减少对CacheServer的请求压力,并发下可以只用其中一个请求去CacheServer读取缓存数据,其它的请求只需要等待并共享这一个请求获得的缓存数据即可。
graph LR
Client1 --> WebServer
Client2 --> WebServer
Client3 --> WebServer
WebServer --> CacheServer
Go源码内部的internal/singleflight
包便实现了这样的功能,https://github.com/golang/go/blob/master/src/internal/singleflight/singleflight.go
golang的扩展库中也有这样的一个实现: golang.org/x/sync/singleflight
。它们的代码逻辑大致一样。
示例
下面的示例模拟了10个go协程去获取缓存数据,这里使用一个随机数替代:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main
import (
"fmt"
"math/rand"
"sync"
"time"
"golang.org/x/sync/singleflight"
)
func main() {
rand.Seed(time.Now().UnixNano())
sf := &singleflight.Group{}
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
i, err, shard := sf.Do("test", func() (interface{}, error) {
time.Sleep(time.Second)
return rand.Intn(10), nil
})
fmt.Println(i, err, shard)
}()
}
wg.Wait()
}
运行它可以得到输出:
1
2
3
4
5
6
7
8
9
10
3 <nil> true
3 <nil> true
3 <nil> true
3 <nil> true
3 <nil> true
3 <nil> true
3 <nil> true
3 <nil> true
3 <nil> true
3 <nil> true
可以看到10个go协程获取到了同样的随机数,也就是它们共享了返回结果。
Do的返回值分别是 val, err, shared。shared即是否共享了同一结果。
源码
上面的代码示例很简单,功能却强大。那么它是怎么实现的呢?源码也很简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
源码中主要的处理逻辑是:
- 通过一个map:
g.m
判断是否第一个请求的协程(注意map是非并发安全的,所以需要加锁) - 通过sync.WaitGroup阻塞其它的协程
第一个获取锁的请求,因为g.m
中key还不存在,会直接进入下面的逻辑:
1
2
3
4
5
6
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
- c.wg.Add(1) //waitgroup
- 设置
g.m[key]
- 释放锁
- 调用
g.doCall
获取数据
第一个请求释放锁后g.m
中已经存在key了,其它的请求,就会进入下面的代码逻辑, 通过waitgroupc.wg.Wait()
等待第一个请求得到的结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
doCall是执行fn并获取返回结果的函数,fn执行完成后,写入返回结果,返回通过defer删除map g.m
中的key。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
...
defer func(){
c.wg.Done()
g.mu.Lock()
defer g.mu.Unlock()
if !c.forgotten {
delete(g.m, key)
}
}
...
c.val, c.err = fn()
...
}
以上便是singleflight源码中的实现逻辑,singleflight共提供了3个方法:
1
2
3
Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
DoChan(key string, fn func() (interface{}, error)) <-chan Result
Forget(key string)
- Do 通过参数key执行fn并获取结果。
- DoChan 和Do大致相同,只不过返回结果是一个 chan,我们可以通过chan获取数据,还可以进行超时等处理。
- Forget 忘记这个key,即删除map中的key,以便后续其它请求能继续处理这个key。(在
internal/singleflight/singleflight.go
中为Forget命名为ForgetUnshared)
总结
singleflight模式适合在并发场景下对同一个key合并处理一个请求,并共享返回的结果。除了合并处理缓存请求,还可以利用它处理缓存击穿的问题,若缓存的key过期了,可以防止大量请求同时打到数据库上,同样只需要一个请求去数据库获取并缓存数据即可。
singleflight源码简单易懂,singflight模式同样值得我们借鉴并应用到业务场景中去优化我们的代码。Go源码中也还有其它值得我们探索和学习的地方。