gRPC 通过 ETCD 实现服务注册发现

gRPC etcd discovery

Posted by alovn on July 27, 2019

gRPC (google.golang.org/grpc/naming)中提供了两个接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 命名解析
// Resolver creates a Watcher for a target to track its resolution changes.
//
// Deprecated: please use package resolver.
type Resolver interface {
    // Resolve creates a Watcher for target.
    Resolve(target string) (Watcher, error)
}

// 服务发现, 检测节点变化
// Watcher watches for the updates on the specified target.
//
// Deprecated: please use package resolver.
type Watcher interface {
    // Next blocks until an update or error happens. It may return one or more
    // updates. The first call should get the full set of the results. It should
    // return an error if and only if Watcher cannot recover.
    Next() ([]*Update, error)
    // Close closes the Watcher.
    Close()
}

etcd3 默认已经实现了 Resolver 和 Watcher

1
github.com/coreos/etcd/clientv3/naming

服务端注册到ETCD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
config := clientv3.Config{
    Endpoints:   []string{"http://s1004.lab.org:2379"},
    DialTimeout: 3 * time.Second,
    //Username:    username,
    //Password:    password,
}

cli, err := clientv3.New(config)
if err != nil {
    panic(err)
}
defer cli.Close()

ttl := 2
addr := "/_grpc/service/hello-service"

ss, err := concurrency.NewSession(c, concurrency.WithTTL(ttl))
if err != nil {
    log.Fatalf("error: %s", err)
}
//创建命名解析
r := &etcdnaming.GRPCResolver{Client: c}
//注册服务名
if err = r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr}, clientv3.WithLease(ss.Lease())); err != nil {
    log.Fatalf("error: %s", err)
}

后来翻看 etcd 在github的源码, 发现etcd已经对服务注册进行了封装

1
github.com/coreos/etcd/proxy/grpcproxy

代码编写也更简短方便:

1
2
3
4
5
6
7
8
9
10
11
12
etcdClient, err:= clientv3.New(clientv3.Config{
    Endpoints: []string{"http://s1004.lab.org:2379"},
    DialTimeout: time.Second * 3,
})
if err != nil {
    panic(err)
}

addr := fmt.Sprintf("%s:%d", "localhost", *port)//TODO: get ip
registTTL := 2
serviceName  := "hello-service"
grpcproxy.Register(etcdClient, fmt.Sprintf("/_grpc/service/%s", serviceName), addr, registTTL)

客户端服务发现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
config := clientv3.Config{
    Endpoints:   []string{"http://s1004.lab.org:2379"},
    DialTimeout: 5 * time.Second,
    //Username:    username,
    //Password:    password,
}

cli, err := clientv3.New(config)
if err != nil {
    panic(err)
}

r := &etcdnaming.GRPCResolver{Client: cli}
b := grpc.RoundRobin(r)

addr := "/_grpc/service/hello-service"

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
conn, err := grpc.DialContext(
    ctx,
    addr,
    grpc.WithInsecure(),
    grpc.WithTimeout(time.Second*5),
    grpc.WithBalancer(b),
    grpc.WithBlock(),
)
if err != nil {
    fmt.Printf("dial service(%s) by etcd resolver server error (%v)", addr, err.Error())
    panic(err)
}
defer conn.Close()
request := &pb.HelloRequest{Greeting: fmt.Sprintf("send: %d", i)}
client := pb.NewHelloServiceClient(conn.grpcConn)
resp, err := client.SayHello(context.Background(), request)
fmt.Printf("resp: %+v, err: %+v\n", resp, err)

代码示例

代码示例1

代码示例2