本篇文章分析下,Kratos 中如何封装 gRPC 中服务发现的接口,已经在项目中是如何使用的。
0x00 再看 gRPC 的 Resolver(解析器)接口
1.1 Resolver 暴露的三个接口
前文说过,gRPC 内置的服务治理功能,对开发者暴露了服务发现的 interface{}
,resolver.Builder
和 resolver.ClientConn
和 resolver.Resolver
,相关代码。开发者在实例化这三个接口之后,就可以实现从指定的 scheme 中获取服务列表,通知 balancer 并与这些服务端建立 RPC 长连接。
- resolver.Builder
- resolver.ClientConn
- resolver.Resolver
- resolver.Builder
Builder
用于 gRPC 内部创建Resolver
接口的实现,但注意内部声明的Build()
方法将接口ClientConn
作为参数传入了,在前文的分析中,我们了解到ClientConn
结库 是非常重要的结构,其成员conns map[*addrConn]struct{}
中维护了所有从注册中心获取到的服务端列表。// Builder creates a resolver that will be used to watch name resolution updates. type Builder interface { // Build creates a new resolver for the given target. // // gRPC dial calls Build synchronously, and fails if the returned error is // not nil. Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error) // Scheme returns the scheme supported by this resolver. // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md. Scheme() string }
- resolver.ClientConn
ClientConn
接口中,UpdateState
方法需要传入State
结构,NewAddress
方法需要传入Address
结构,看代码可以发现其中包含了Addresses []Address // Resolved addresses for the target
,可以看出是需要将服务发现得到的Address
对象列表告诉ClientConn
的对象。// ClientConn contains the callbacks for resolver to notify any updates // to the gRPC ClientConn. // // This interface is to be implemented by gRPC. Users should not need a // brand new implementation of this interface. For the situations like // testing, the new implementation should embed this interface. This allows // gRPC to add new methods to this interface. type ClientConn interface { // UpdateState updates the state of the ClientConn appropriately. UpdateState(State) // NewAddress is called by resolver to notify ClientConn a new list // of resolved addresses. // The address list should be the complete list of resolved addresses. // // Deprecated: Use UpdateState instead. NewAddress(addresses []Address) // NewServiceConfig is called by resolver to notify ClientConn a new // service config. The service config should be provided as a json string. // // Deprecated: Use UpdateState instead. NewServiceConfig(serviceConfig string) }
- resolver.Resolver
Resolver
提供了ResolveNow
用于被 gRPC 尝试重新进行服务发现// Resolver watches for the updates on the specified target. // Updates include address updates and service config updates. type Resolver interface { // ResolveNow will be called by gRPC to try to resolve the target name // again. It's just a hint, resolver can ignore this if it's not necessary. // // It could be called multiple times concurrently. ResolveNow(ResolveNowOption) // Close closes the resolver. Close() }
1.2 梳理 Resolver 过程
通过这三个接口,再次梳理下 gRPC 的服务发现实现逻辑
-
通过
Builder.Build()
进行Reslover
的创建,在Build()
的过程中将服务发现的地址信息丢给ClientConn
用于内部连接创建(通过ClientConn.UpdateState()
实现)等逻辑; - 当
client
在Dial
时会根据target
解析的scheme
获取对应的Builder
,代码位置func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { ... ... // Determine the resolver to use. cc.parsedTarget = parseTarget(cc.target) grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme) resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme) // 通过 scheme(名字) 获取对应的 resolver if resolverBuilder == nil { // If resolver builder is still nil, the parsed target's scheme is // not registered. Fallback to default resolver and set Endpoint to // the original target. grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) cc.parsedTarget = resolver.Target{ Scheme: resolver.GetDefaultScheme(), Endpoint: target, } resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme) if resolverBuilder == nil { return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme) } } ... ... // Build the resolver. rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) // 通过 gRPC 提供的 Wrapper,应用我们实现的 resolver 逻辑 if err != nil { return nil, fmt.Errorf("failed to build resolver: %v", err) } cc.mu.Lock() cc.resolverWrapper = rWrapper cc.mu.Unlock() ... ... }
- 当
Dial
成功会创建出结构体ClientConn
的对象 官方代码位置(注意不是上面的ClientConn
接口),可以看到结构体ClientConn
内的成员resolverWrapper
又实现了接口ClientConn
的方法 官方代码位置func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { // 初始化 CC cc := &ClientConn{ target: target, csMgr: &connectivityStateManager{}, conns: make(map[*addrConn]struct{}), dopts: defaultDialOptions(), blockingpicker: newPickerWrapper(), czData: new(channelzData), firstResolveEvent: grpcsync.NewEvent(), } ... ... ... ... return cc, nil }
- 当
resolverWrapper
被初始化时就会调用Build
方法 官方代码位置,其中参数为接口ClientConn
传入的是ccResolverWrapper
// newCCResolverWrapper uses the resolver.Builder to build a Resolver and // returns a ccResolverWrapper object which wraps the newly built resolver. func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) { ccr := &ccResolverWrapper{ cc: cc, done: grpcsync.NewEvent(), } var credsClone credentials.TransportCredentials if creds := cc.dopts.copts.TransportCredentials; creds != nil { credsClone = creds.Clone() } rbo := resolver.BuildOptions{ DisableServiceConfig: cc.dopts.disableServiceConfig, DialCreds: credsClone, CredsBundle: cc.dopts.copts.CredsBundle, Dialer: cc.dopts.copts.Dialer, } var err error // We need to hold the lock here while we assign to the ccr.resolver field // to guard against a data race caused by the following code path, // rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up // accessing ccr.resolver which is being assigned here. ccr.resolverMu.Lock() defer ccr.resolverMu.Unlock() ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo) if err != nil { return nil, err } return ccr, nil }
- 当用户基于
Builder
的实现进行UpdateState
调用时,则会触发结构体ClientConn
的updateResolverState
方法 官方代码位置,updateResolverState
则会对传入的Address
进行初始化等逻辑 官方代码位置func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { defer cc.firstResolveEvent.Fire() cc.mu.Lock() // Check if the ClientConn is already closed. Some fields (e.g. // balancerWrapper) are set to nil when closing the ClientConn, and could // cause nil pointer panic if we don't have this check. if cc.conns == nil { cc.mu.Unlock() return nil } if err != nil { // May need to apply the initial service config in case the resolver // doesn't support service configs, or doesn't provide a service config // with the new addresses. cc.maybeApplyDefaultServiceConfig(nil) if cc.balancerWrapper != nil { cc.balancerWrapper.resolverError(err) } // No addresses are valid with err set; return early. cc.mu.Unlock() return balancer.ErrBadResolverState } var ret error if cc.dopts.disableServiceConfig || s.ServiceConfig == nil { cc.maybeApplyDefaultServiceConfig(s.Addresses) // TODO: do we need to apply a failing LB policy if there is no // default, per the error handling design? } else { if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok { cc.applyServiceConfigAndBalancer(sc, s.Addresses) } else { ret = balancer.ErrBadResolverState if cc.balancerWrapper == nil { var err error if s.ServiceConfig.Err != nil { err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err) } else { err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config) } cc.blockingpicker.updatePicker(base.NewErrPicker(err)) cc.csMgr.updateState(connectivity.TransientFailure) cc.mu.Unlock() return ret } } } var balCfg serviceconfig.LoadBalancingConfig if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil { balCfg = cc.sc.lbConfig.cfg } cbn := cc.curBalancerName bw := cc.balancerWrapper cc.mu.Unlock() if cbn != grpclbName { // Filter any grpclb addresses since we don't have the grpclb balancer. for i := 0; i <len(s.Addresses); { if s.Addresses[i].Type == resolver.GRPCLB { copy(s.Addresses[i:], s.Addresses[i+1:]) s.Addresses = s.Addresses[:len(s.Addresses)-1] continue } i++ } } uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) if ret == nil { ret = uccsErr // prefer ErrBadResolver state since any other error is // currently meaningless to the caller. } return ret }
- 至此整个服务发现过程就结束了。从中也可以看出 gRPC 官方提供的三个接口还是很灵活的,但也正因为灵活要实现稍微麻烦一些,而
Address
官方代码位置 如果直接被业务拿来用于服务节点信息的描述结构则显得有些过于简单。
所以 warden
包装了 gRPC 的整个服务发现实现逻辑,代码分别位于 pkg/naming/naming.go
和 warden/resolver/resolver.go
,其中:
naming.go
内定义了用于描述业务实例的Instance
结构、用于服务注册的Registry
接口、用于服务发现的Resolver
接口resolver.go
内实现了 gRPC 官方的resolver.Builder
和resolver.Resolver
接口,但也暴露了naming.go
内的naming.Builder
和naming.Resolver
接口
0x01 gRPC 嵌入 Register 实现
(未完待续)
0x02 kratos 对 Resolver 的二次封装
(未完待续)
0x03 参考
转载请注明出处,本文采用 CC4.0 协议授权