Golang并发编程之singleflight

Golang singleflight

Posted by alovn on April 24, 2022

singleflight

业务中在查询缓存的时候,并发场景下会存在多个请求同时获取一个缓存key数据的情况,如下图所示:3个Client分别请求到WebServer获取缓存,WebServer同样会处理3个请求到CacheServer获取数据。

graph LR
  Client1 --> WebServer
  Client2 --> WebServer
  Client3 --> WebServer
  WebServer --> CacheServer
  WebServer --> CacheServer
  WebServer --> CacheServer

为了提升性能,减少对CacheServer的请求压力,并发下可以只用其中一个请求去CacheServer读取缓存数据,其它的请求只需要等待并共享这一个请求获得的缓存数据即可。

graph LR
  Client1 --> WebServer
  Client2 --> WebServer
  Client3 --> WebServer
  WebServer --> CacheServer

Go源码内部的internal/singleflight包便实现了这样的功能,https://github.com/golang/go/blob/master/src/internal/singleflight/singleflight.go

golang的扩展库中也有这样的一个实现: golang.org/x/sync/singleflight。它们的代码逻辑大致一样。

示例

下面的示例模拟了10个go协程去获取缓存数据,这里使用一个随机数替代:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"

    "golang.org/x/sync/singleflight"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    sf := &singleflight.Group{}
    var wg sync.WaitGroup
    wg.Add(10)
    for i := 0; i < 10; i++ {
        go func() {
            defer wg.Done()
            i, err, shard := sf.Do("test", func() (interface{}, error) {
                time.Sleep(time.Second)
                return rand.Intn(10), nil
            })
            fmt.Println(i, err, shard)
        }()
    }
    wg.Wait()
}

运行它可以得到输出:

1
2
3
4
5
6
7
8
9
10
3 <nil> true
3 <nil> true
3 <nil> true
3 <nil> true
3 <nil> true
3 <nil> true
3 <nil> true
3 <nil> true
3 <nil> true
3 <nil> true

可以看到10个go协程获取到了同样的随机数,也就是它们共享了返回结果。

Do的返回值分别是 val, err, shared。shared即是否共享了同一结果。

源码

上面的代码示例很简单,功能却强大。那么它是怎么实现的呢?源码也很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type Group struct {
    mu sync.Mutex       // protects m
    m  map[string]*call // lazily initialized
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        c.dups++
        g.mu.Unlock()
        c.wg.Wait()
    
        if e, ok := c.err.(*panicError); ok {
            panic(e)
        } else if c.err == errGoexit {
            runtime.Goexit()
        }
        return c.val, c.err, true
    }
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()
    
    g.doCall(c, key, fn)
    return c.val, c.err, c.dups > 0
}

源码中主要的处理逻辑是:

  1. 通过一个map:g.m 判断是否第一个请求的协程(注意map是非并发安全的,所以需要加锁)
  2. 通过sync.WaitGroup阻塞其它的协程

第一个获取锁的请求,因为g.m中key还不存在,会直接进入下面的逻辑:

1
2
3
4
5
6
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

g.doCall(c, key, fn)
  1. c.wg.Add(1) //waitgroup
  2. 设置g.m[key]
  3. 释放锁
  4. 调用g.doCall 获取数据

第一个请求释放锁后g.m中已经存在key了,其它的请求,就会进入下面的代码逻辑, 通过waitgroupc.wg.Wait() 等待第一个请求得到的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
g.mu.Lock()
if g.m == nil {
    g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
    c.dups++
    g.mu.Unlock()
    c.wg.Wait()

    if e, ok := c.err.(*panicError); ok {
        panic(e)
    } else if c.err == errGoexit {
        runtime.Goexit()
    }
    return c.val, c.err, true
}

doCall是执行fn并获取返回结果的函数,fn执行完成后,写入返回结果,返回通过defer删除map g.m 中的key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    ...
    defer func(){
        c.wg.Done()
        g.mu.Lock()
        defer g.mu.Unlock()
        if !c.forgotten {
            delete(g.m, key)
        }
    }
    ...
    c.val, c.err = fn()
    ...
}

以上便是singleflight源码中的实现逻辑,singleflight共提供了3个方法:

1
2
3
Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) 
DoChan(key string, fn func() (interface{}, error)) <-chan Result
Forget(key string) 
  1. Do 通过参数key执行fn并获取结果。
  2. DoChan 和Do大致相同,只不过返回结果是一个 chan,我们可以通过chan获取数据,还可以进行超时等处理。
  3. Forget 忘记这个key,即删除map中的key,以便后续其它请求能继续处理这个key。(在internal/singleflight/singleflight.go中为Forget命名为ForgetUnshared)

总结

singleflight模式适合在并发场景下对同一个key合并处理一个请求,并共享返回的结果。除了合并处理缓存请求,还可以利用它处理缓存击穿的问题,若缓存的key过期了,可以防止大量请求同时打到数据库上,同样只需要一个请求去数据库获取并缓存数据即可。

singleflight源码简单易懂,singflight模式同样值得我们借鉴并应用到业务场景中去优化我们的代码。Go源码中也还有其它值得我们探索和学习的地方。