gRPC (google.golang.org/grpc/naming)中提供了两个接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 命名解析
// Resolver creates a Watcher for a target to track its resolution changes.
//
// Deprecated: please use package resolver.
type Resolver interface {
// Resolve creates a Watcher for target.
Resolve(target string) (Watcher, error)
}
// 服务发现, 检测节点变化
// Watcher watches for the updates on the specified target.
//
// Deprecated: please use package resolver.
type Watcher interface {
// Next blocks until an update or error happens. It may return one or more
// updates. The first call should get the full set of the results. It should
// return an error if and only if Watcher cannot recover.
Next() ([]*Update, error)
// Close closes the Watcher.
Close()
}
etcd3 默认已经实现了 Resolver 和 Watcher
1
github.com/coreos/etcd/clientv3/naming
服务端注册到ETCD
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
config := clientv3.Config{
Endpoints: []string{"http://s1004.lab.org:2379"},
DialTimeout: 3 * time.Second,
//Username: username,
//Password: password,
}
cli, err := clientv3.New(config)
if err != nil {
panic(err)
}
defer cli.Close()
ttl := 2
addr := "/_grpc/service/hello-service"
ss, err := concurrency.NewSession(c, concurrency.WithTTL(ttl))
if err != nil {
log.Fatalf("error: %s", err)
}
//创建命名解析
r := &etcdnaming.GRPCResolver{Client: c}
//注册服务名
if err = r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr}, clientv3.WithLease(ss.Lease())); err != nil {
log.Fatalf("error: %s", err)
}
后来翻看 etcd 在github的源码, 发现etcd已经对服务注册进行了封装
1
github.com/coreos/etcd/proxy/grpcproxy
代码编写也更简短方便:
1
2
3
4
5
6
7
8
9
10
11
12
etcdClient, err:= clientv3.New(clientv3.Config{
Endpoints: []string{"http://s1004.lab.org:2379"},
DialTimeout: time.Second * 3,
})
if err != nil {
panic(err)
}
addr := fmt.Sprintf("%s:%d", "localhost", *port)//TODO: get ip
registTTL := 2
serviceName := "hello-service"
grpcproxy.Register(etcdClient, fmt.Sprintf("/_grpc/service/%s", serviceName), addr, registTTL)
客户端服务发现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
config := clientv3.Config{
Endpoints: []string{"http://s1004.lab.org:2379"},
DialTimeout: 5 * time.Second,
//Username: username,
//Password: password,
}
cli, err := clientv3.New(config)
if err != nil {
panic(err)
}
r := &etcdnaming.GRPCResolver{Client: cli}
b := grpc.RoundRobin(r)
addr := "/_grpc/service/hello-service"
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
conn, err := grpc.DialContext(
ctx,
addr,
grpc.WithInsecure(),
grpc.WithTimeout(time.Second*5),
grpc.WithBalancer(b),
grpc.WithBlock(),
)
if err != nil {
fmt.Printf("dial service(%s) by etcd resolver server error (%v)", addr, err.Error())
panic(err)
}
defer conn.Close()
request := &pb.HelloRequest{Greeting: fmt.Sprintf("send: %d", i)}
client := pb.NewHelloServiceClient(conn.grpcConn)
resp, err := client.SayHello(context.Background(), request)
fmt.Printf("resp: %+v, err: %+v\n", resp, err)