0x00 再看 RR-Picker 实现
前文中分析了官方提供的轮询 Picker
代码,我们可以使用 gRPC 提供的 balancer
包中的接口实现自定义的选择器 Picker
,也就是自定义的负载均衡逻辑,只需要三步即可。这篇文章,讨论下,我们自己实现的 Picker
逻辑是如何 gRPC 中生效的。
一个 RR-Picker 实现步骤
一个简单的实现如下所示:
- 第一步:设定全局
balancer
的名字和创建全局变量(package 级别)。
var _ base.PickerBuilder = &roundRobinPickerBuilder{} // 创建全局变量 (package 级别)
var _ balancer.Picker = &roundRobinPicker{} // 创建全局变量 (package 级别)
const RoundRobin = "round_robin" // 注册到 resolver 中的 lb 全局名字
// newRoundRobinBuilder creates a new roundrobin balancer builder.
func newRoundRobinBuilder() balancer.Builder {
return base.NewBalancerBuilderWithConfig(RoundRobin, &roundRobinPickerBuilder{}, base.Config{HealthCheck: true})
}
func init() {
balancer.Register(newRoundRobinBuilder())
}
- 第二步:定义
PickerBuilder
的实例化结构,并实现接口中定义的Build
方法,该方法返回一个balancer.Picker
。
type roundRobinPickerBuilder struct{}
func (*roundRobinPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
if len(readySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
var scs []balancer.SubConn
for addr, sc := range readySCs {
weight := 1
if addr.Metadata != nil {
if m, ok := addr.Metadata.(*map[string]string); ok {
w, ok := (*m)["weight"]
if ok {
n, err := strconv.Atoi(w)
if err == nil && n > 0 {
weight = n
}
}
}
}
for i := 0; i < weight; i++ {
scs = append(scs, sc)
}
}
return &roundRobinPicker{
subConns: scs,
next: rand.Intn(len(scs)),
}
}
- 第三步: 构建
roundRobinPicker
,该结构需要实现balancer.Picker
定义的Pick
方法。
type roundRobinPicker struct {
subConns []balancer.SubConn
mu sync.Mutex
next int
}
func (p *roundRobinPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
p.mu.Lock()
sc := p.subConns[p.next]
p.next = (p.next + 1) % len(p.subConns)
p.mu.Unlock()
return sc, nil, nil
}
至此,一个基础的 RR-Picker 接口就基本实现了,下面我们看看,gRPC 运行中,在何时调用我们实现的 Balancer 逻辑的。
引入的问题
和之前分析代码的文章一样,这里先引入三个问题:
- 自定义实现的
PickerBuilder
如何被调用? - 自定义实现的
Picker
如何被调用? - 何时返回
Picker.Pick
的结果给上层 gRPC 客户端或 RPC 方法?
0x01 balancer.Picker 与 base.PickerBuilder
上面实现(封装)的两个结构,roundRobinPickerBuilder
对应了 base.PickerBuilder
,roundRobinPicker
对应了 balancer.Picker
,下面就分析这两个结构的作用。
balancer.Picker
balancer.Picker
定义如下,从描述看其即将会被 V2Picker
取代,这里我们先只看 Picker
。Picker
也是一个 interface{}
,封装它必须要实现 Pick
方法,该方法是从给定的连接池中,选取一个可用连接并返回。
// Picker is used by gRPC to pick a SubConn to send an RPC.
// Balancer is expected to generate a new picker from its snapshot every time its
// internal state has changed.
//
// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
//
// Deprecated: use V2Picker instead
type Picker interface {
// Pick returns the SubConn to be used to send the RPC.
// The returned SubConn must be one returned by NewSubConn().
//
// This functions is expected to return:
// - a SubConn that is known to be READY;
// - ErrNoSubConnAvailable if no SubConn is available, but progress is being
// made (for example, some SubConn is in CONNECTING mode);
// - other errors if no active connecting is happening (for example, all SubConn
// are in TRANSIENT_FAILURE mode).
//
// If a SubConn is returned:
// - If it is READY, gRPC will send the RPC on it;
// - If it is not ready, or becomes not ready after it's returned, gRPC will
// block until UpdateBalancerState() is called and will call pick on the
// new picker. The done function returned from Pick(), if not nil, will be
// called with nil error, no bytes sent and no bytes received.
//
// If the returned error is not nil:
// - If the error is ErrNoSubConnAvailable, gRPC will block until UpdateBalancerState()
// - If the error is ErrTransientFailure or implements IsTransientFailure()
// bool, returning true:
// - If the RPC is wait-for-ready, gRPC will block until UpdateBalancerState()
// is called to pick again;
// - Otherwise, RPC will fail with unavailable error.
// - Else (error is other non-nil error):
// - The RPC will fail with the error's status code, or Unknown if it is
// not a status error.
//
// The returned done() function will be called once the rpc has finished,
// with the final status of that RPC. If the SubConn returned is not a
// valid SubConn type, done may not be called. done may be nil if balancer
// doesn't care about the RPC status.
Pick(ctx context.Context, info PickInfo) (conn SubConn, done func(DoneInfo), err error)
}
base.PickerBuilder
base.PickerBuilder
定义如下,其定义了一个 Build
方法,该方法需要返回 balancer.Picker
。
// PickerBuilder creates balancer.Picker.
type PickerBuilder interface {
// Build takes a slice of ready SubConns, and returns a picker that will be
// used by gRPC to pick a SubConn.
Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker
}
// V2PickerBuilder creates balancer.V2Picker.
type V2PickerBuilder interface {
// Build returns a picker that will be used by gRPC to pick a SubConn.
Build(info PickerBuildInfo) balancer.V2Picker
}
当我们创建了自定义的 PickerBuilder
后,通过 base.NewBalancerBuilderWithConfig
方法,注册我们实现的 Picker
逻辑
// NewBalancerBuilderWithConfig returns a base balancer builder configured by the provided config.
func NewBalancerBuilderWithConfig(name string, pb PickerBuilder, config Config) balancer.Builder {
return &baseBuilder{
name: name,
pickerBuilder: pb, // 这里保存了我们自定义了 roundRobinPickerBuilder
config: config,
}
}
在包的 init
方法中,使用 balancer.Register
方法 注册我们的 Picker
var (
// m is a map from name to balancer builder.
m = make(map[string]Builder) // 全局 balancer-Map
)
// Register registers the balancer builder to the balancer map. b.Name
// (lowercased) will be used as the name registered with this builder. If the
// Builder implements ConfigParser, ParseConfig will be called when new service
// configs are received by the resolver, and the result will be provided to the
// Balancer in UpdateClientConnState.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Balancers are
// registered with the same name, the one registered last will take effect.
func Register(b Builder) {
m[strings.ToLower(b.Name())] = b
}
0x02 应用自定义 Picker
这节,我们从 NewBalancerBuilderWithConfig
方法开始,看下自定义的 Picker 是如何生效的。
// NewBalancerBuilderWithConfig returns a base balancer builder configured by the provided config.
func NewBalancerBuilderWithConfig(name string, pb PickerBuilder, config Config) balancer.Builder {
return &baseBuilder{ // 返回 baseBuilder
name: name,
pickerBuilder: pb, // 这里保存了我们自定义了 roundRobinPickerBuilder
config: config,
}
}
在 baseBuilder
的 Build
方法 中,初始化了重要的 balancer.Balancer
。
func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
bal := &baseBalancer{
cc: cc,
pickerBuilder: bb.pickerBuilder, // 这里是我们自定义的 roundRobinPickerBuilder
v2PickerBuilder: bb.v2PickerBuilder,
subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config,
}
if bb.pickerBuilder != nil {
bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
} else {
bal.v2Picker = NewErrPickerV2(balancer.ErrNoSubConnAvailable)
}
return bal
}
在 balancer.Balancer
这个 interface{}
中,UpdateSubConnState
和 HandleSubConnStateChange
功能是一样的,我们继续分析下 UpdateSubConnState
和 UpdateClientConnState
这两个方法在哪里被调用的。
// Balancer takes input from gRPC, manages SubConns, and collects and aggregates
// the connectivity states.
//
// It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.
//
// HandleSubConnectionStateChange, HandleResolvedAddrs and Close are guaranteed
// to be called synchronously from the same goroutine.
// There's no guarantee on picker.Pick, it may be called anytime.
type Balancer interface {
// HandleSubConnStateChange is called by gRPC when the connectivity state
// of sc has changed.
// Balancer is expected to aggregate all the state of SubConn and report
// that back to gRPC.
// Balancer should also generate and update Pickers when its internal state has
// been changed by the new state.
//
// Deprecated: if V2Balancer is implemented by the Balancer,
// UpdateSubConnState will be called instead.
HandleSubConnStateChange(sc SubConn, state connectivity.State)
// HandleResolvedAddrs is called by gRPC to send updated resolved addresses to
// balancers.
// Balancer can create new SubConn or remove SubConn with the addresses.
// An empty address slice and a non-nil error will be passed if the resolver returns
// non-nil error to gRPC.
//
// Deprecated: if V2Balancer is implemented by the Balancer,
// UpdateClientConnState will be called instead.
HandleResolvedAddrs([]resolver.Address, error)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}
UpdateSubConnState 的实现
在 UpdateSubConnState
方法 中,其中有个很重要的方法 regeneratePicker
,在发生下面的情况时,需要重建 Picker,这个很好理解,注意看下 func (*roundRobinPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker
的 readySCs
参数,这个参数表示当前可用的连接(池),如果连接发生问题,当然需要重建连接池。
// 当下面情况发生时,需要重新创建 Picker:
// - 连接由其他状态转变为 Ready 状态
// - 连接由 Ready 状态转变为其他状态
// - balancer 转变为 TransientFailure 状态
// - balancer 由 TransientFailure 转变为 Non-TransientFailure 状态
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
if grpclog.V(2) {
grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
}
oldS, ok := b.scStates[sc]
if !ok {
if grpclog.V(2) {
grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
}
return
}
b.scStates[sc] = s
switch s {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
}
oldAggrState := b.state
b.state = b.csEvltr.RecordTransition(oldS, s)
// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
(b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
// 重新生成 Picker
b.regeneratePicker(state.ConnectionError)
}
if b.picker != nil {
// 将自定义的 picker 传入 UpdateBalancerState,b.picker 是我们自己实现的 lb 逻辑
b.cc.UpdateBalancerState(b.state, b.picker)
} else {
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.v2Picker})
}
}
regeneratePicker
的实现如下,注意看 if b.pickerBuilder != nil {...b.picker = b.pickerBuilder.Build(readySCs)...}
这里,就解答了文章的第一个问题,在这里调用了我们自定义的 PickerBuilder
方法。注意:这里的 readySCs
可以理解为当前健康的连接池。
// regeneratePicker takes a snapshot of the balancer, and generates a picker
// from it. The picker is
// - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
// - built by the pickerBuilder with all READY SubConns otherwise.
func (b *baseBalancer) regeneratePicker(err error) {
if b.state == connectivity.TransientFailure {
if b.pickerBuilder != nil {
b.picker = NewErrPicker(balancer.ErrTransientFailure)
} else {
if err != nil {
b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(err))
} else {
// This means the last subchannel transition was not to
// TransientFailure (otherwise err must be set), but the
// aggregate state of the balancer is TransientFailure, meaning
// there are no other addresses.
b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(errors.New("resolver returned no addresses")))
}
}
return
}
if b.pickerBuilder != nil {
readySCs := make(map[resolver.Address]balancer.SubConn)
// Filter out all ready SCs from full subConn map.
for addr, sc := range b.subConns {
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
//readySCs 中保存了 connectivity.Ready(连接就绪)的连接集合
readySCs[addr] = sc
}
}
b.picker = b.pickerBuilder.Build(readySCs) // 重要:执行自定义的 PickerBuilder,文中的 roundRobinPickerBuilder 方法
} else {
readySCs := make(map[balancer.SubConn]SubConnInfo)
// Filter out all ready SCs from full subConn map.
for addr, sc := range b.subConns {
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
readySCs[sc] = SubConnInfo{Address: addr}
}
}
b.v2Picker = b.v2PickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})// 重要:执行自定义的 PickerBuilder,文中的 roundRobinPickerBuilder 方法
}
}
0x03 自定义 Picker 的调用(1)
第二个问题,在哪里应用自定义的 Picker?先看下刚才出现的 UpdateBalancerState
方法,在 UpdateSubConnState
方法中进行调用。
func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
ccb.mu.Lock()
defer ccb.mu.Unlock()
if ccb.subConns == nil {
return
}
// Update picker before updating state. Even though the ordering here does
// not matter, it can lead to multiple calls of Pick in the common start-up
// case where we wait for ready and then perform an RPC. If the picker is
// updated later, we could call the "connecting" picker when the state is
// updated, and then call the "ready" picker after the picker gets updated.
ccb.cc.blockingpicker.updatePicker(p) // 传入 pick 的实现
ccb.cc.csMgr.updateState(s)
}
接下来,看下 updatePicker
方法,该方法注册了传入的 balancer.Picker
:
// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
pw.updatePickerV2(&v2PickerWrapper{picker: p, connErr: pw.connErr})
}
v2PickerWrapper
的定义如下,v2PickerWrapper wraps a balancer:
// v2PickerWrapper wraps a balancer.Picker while providing the
// balancer.V2Picker API. It requires a pickerWrapper to generate errors
// including the latest connectionError. To be deleted when balancer.Picker is
// updated to the balancer.V2Picker API.
type v2PickerWrapper struct {
picker balancer.Picker
connErr *connErr
}
很明显,在 v2PickerWrapper
的 Picker
方法实现中,解答了这个问题:
func (v *v2PickerWrapper) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
sc, done, err := v.picker.Pick(info.Ctx, info)
// 调用我们自定义的 Picker,roundRobinPicker,返回一个可用的连接 sc 给调用方
if err != nil {
if err == balancer.ErrTransientFailure {
return balancer.PickResult{}, balancer.TransientFailureError(fmt.Errorf("%v, latest connection error: %v", err, v.connErr.connectionError()))
}
return balancer.PickResult{}, err
}
return balancer.PickResult{SubConn: sc, Done: done}, nil
}
pickerWrapper 的 pick 实现
// pick returns the transport that will be used for the RPC.
// It may block in the following cases:
// - there's no picker
// - the current picker returns ErrNoSubConnAvailable
// - the current picker returns other errors and failfast is false.
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
var ch chan struct{}
var lastPickErr error
for {
pw.mu.Lock()
if pw.done {
pw.mu.Unlock()
return nil, nil, ErrClientConnClosing
}
if pw.picker == nil {
ch = pw.blockingCh
}
if ch == pw.blockingCh {
// This could happen when either:
// - pw.picker is nil (the previous if condition), or
// - has called pick on the current picker.
pw.mu.Unlock()
select {
case <-ctx.Done():
var errStr string
if lastPickErr != nil {
errStr = "latest balancer error:" + lastPickErr.Error()
} else if connectionErr := pw.connectionError(); connectionErr != nil {
errStr = "latest connection error:" + connectionErr.Error()
} else {
errStr = ctx.Err().Error()
}
switch ctx.Err() {
case context.DeadlineExceeded:
return nil, nil, status.Error(codes.DeadlineExceeded, errStr)
case context.Canceled:
return nil, nil, status.Error(codes.Canceled, errStr)
}
case <-ch:
}
continue
}
ch = pw.blockingCh
p := pw.picker
pw.mu.Unlock()
pickResult, err := p.Pick(info)
if err != nil {
if err == balancer.ErrNoSubConnAvailable {
continue
}
if tfe, ok := err.(interface{ IsTransientFailure() bool }); ok && tfe.IsTransientFailure() {
if !failfast {
lastPickErr = err
continue
}
return nil, nil, status.Error(codes.Unavailable, err.Error())
}
if _, ok := status.FromError(err); ok {
return nil, nil, err
}
// err is some other error.
return nil, nil, status.Error(codes.Unknown, err.Error())
}
acw, ok := pickResult.SubConn.(*acBalancerWrapper)
if !ok {
grpclog.Error("subconn returned from pick is not *acBalancerWrapper")
continue
}
if t, ok := acw.getAddrConn().getReadyTransport(); ok {
if channelz.IsOn() {
return t, doneChannelzWrapper(acw, pickResult.Done), nil
}
return t, pickResult.Done, nil
}
if pickResult.Done != nil {
// Calling done with nil error, no bytes sent and no bytes received.
// DoneInfo with default value works.
pickResult.Done(balancer.DoneInfo{})
}
grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
// If ok == false, ac.state is not READY.
// A valid picker always returns READY subConn. This means the state of ac
// just changed, and picker will be updated shortly.
// continue back to the beginning of the for loop to repick.
}
}
0x04 自定义 Picker 的调用(2)
最后一个问题,在哪里返回 Picker 的结果给上层 gRPC 客户端呢?在 ClientConn
的 getTransport
方法,看到了对上述 pickerWrapper
的 pick
实现的调用:
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ // 注意这里调用的是 pick(小写)方法,自定义的 Picker 在 picker 中
Ctx: ctx,
FullMethodName: method,
})
if err != nil {
return nil, nil, toRPCErr(err)
}
return t, done, nil
}
在 newClientMethod
的 gRPC 客户端方法中,我们通过 getTransport
方法获取了 Transport 层中抽象出来的 ClientTransport 和 ServerTransport,实际上就是获取一个连接给后续 RPC 调用传输使用。到此,gRPC 的客户端就获取了由自定义 LoadBalancer 算法得到的最终的 TCP
连接
0x05 总结
本文分析了 gRPC 是如何将自定义的 Picker
实现应用在最终的负载均衡流程,理解 Picker
的实现原理有助于我们实现更健壮的 Loadbalancer 逻辑。
0x06 参考
转载请注明出处,本文采用 CC4.0 协议授权