0x00 前言
前面分析过一款简单的反向代理实现:一个 Http(s) 网关的实现分析,这篇文章分析一款商用的 LB 开源项目:gobetween。它是一款 Pure-Golang 实现的四层代理网关,文档在此,本文来探索下其实现及核心的源码分析。
For a long time all of us have been using “traditional” load balancers / proxies like nginx, haproxy, and others. But in modern world balancing become more and more flexible because of environment changes are made more often. Nodes behind load balancer are spawning and disappearing according to load and/or other requirements. Auto scaling and containerization became almost a “silver bullet” in modern IT infrastructure architectures. In the IP-telephony world DNS SRV records are main mechanism to find out nearest and less loaded call router. Same situation is in modern microservices world, but unfortunately, there are almost no lb / proxy that has flexible and complete automatic discovery feature. There are lot’s of tricks and workarounds like this. gobetween is aiming to fill this gap and provide fast, flexible and full-featured load balancing solution for modern microservice architectures.
gobetween 的架构图如下:
0x01 特性
官方提供的特性,如下:
- Fast L4 Load Balancing – 支持代理的方式
- TCP - with optional The PROXY Protocol support
- TLS - TLS Termination + ACME & TLS Proxy
- UDP - with optional virtual sessions and transparent mode
- Clear & Flexible Configuration with TOML or JSON – 提供本地配置或远程配置
- File - read configuration from the file
- URL - query URL by HTTP and get configuration from the response body
- Consul - query Consul key-value storage API for configuration
- Management REST API – 管理端的 API 设置及基础监控、后端节点管理等
- System Information - general server info
- Configuration - dump current config
- Servers - list, create & delete
- Stats & Metrics - for servers and backends including rx/tx, status, active connections & etc.
- Discovery – 后端服务发现的方式
- Static - hardcode backends list in the config file
- Docker - query backends from Docker / Swarm API filtered by label
- Exec - execute an arbitrary program and get backends from its stdout
- JSON - query arbitrary http url and pick backends from response json (of any structure)
- Plaintext - query arbitrary http and parse backends from response text with customized regexp
- SRV - query DNS server and get backends from SRV records
- Consul - query Consul Services API for backends
- LXD - query backends from LXD
- Healthchecks – 支持健康检查的方式
- Ping - simple TCP ping healthcheck
- Exec - execute arbitrary program passing host & port as options, and read healthcheck status from the stdout
- Probe - send specific bytes to backend (udp, tcp or tls) and expect a correct answer (bytes or regexp)
- Balancing Strategies (with SNI support) – 后端节点负载均衡的策略
- Weight - select backend from pool based relative weights of backends
- Roundrobin - simple elect backend from pool in circular order
- Iphash - route client to the same backend based on client ip hash
- Iphash1 - same as iphash but backend removal consistent (clients remain connecting to the same backend, even if some other backends down)
- Leastconn - select backend with least active connections
- Leastbandwidth - backends with least bandwidth
从上面的特性中,也可以看出,一个代理网关需要具备的基本要素,在先前这篇文章 一个 Http(s) 网关的实现分析 也梳理过:
- 管理端 API:提供(代理)后端 backend 实时信息、配置、统计信息(metrics)等管理及查询的 Restful 接口
- 代理网关进行服务发现的机制
- 对后端 backend 节点的健康检查
- 负载均衡的策略
- 代理网关的实现(TCP/UDP/HTTP 等)
- 易用的负载均衡器配置
0x02 分析路线
个人比较感兴趣的点有如下几块:
- Gateway 实现的模型,各个子模块之间的联动策略及通信方式
- 和
Consul
的结合做服务发现(Service Discovery) - Metrics 指标及采集方法
- 配置热重启
- Gateway 的扩展能力及高可用实现
0x03 代码分析 - 总览
此外,在 gobetween 的实现中,大部分异步通信都是通过 channel 来完成的,由 scheduler 中的 for...select
结构完成大部分核心的事假调度。
核心逻辑罗列如下:
- scheduler:负责整个网关各类事件的调度及处理
- discovery:负责后端 backend 节点的服务发现
- server-proxy:代理的实现逻辑
下面,我们按照上面的基本要素来分析下 gobetween 的实现:
配置 Config
gobetween 的 主要配置结构如下:
Config
是全局配置,不难了解各参数的意义,注意 Servers map[string]Server
,一个 Server
(Key)名字代表了一个 LB 负载均衡器:
type Config struct {
Logging LoggingConfig `toml:"logging" json:"logging"`
Api ApiConfig `toml:"api" json:"api"`
Metrics MetricsConfig `toml:"metrics" json:"metrics"`
Defaults ConnectionOptions `toml:"defaults" json:"defaults"`
Acme *AcmeConfig `toml:"acme" json:"acme"`
Profiler *ProfilerConfig `toml:"profiler" json:"profiler"`
Servers map[string]Server `toml:"servers" json:"servers"`
}
type Server struct {
ConnectionOptions
// hostname:port
Bind string `toml:"bind" json:"bind"`
// tcp | udp | tls
Protocol string `toml:"protocol" json:"protocol"`
// weight | leastconn | roundrobin
Balance string `toml:"balance" json:"balance"`
// Optional configuration for server name indication
Sni *Sni `toml:"sni" json:"sni"`
// Optional configuration for protocol = tls
Tls *Tls `toml:"tls" json:"tls"`
// Optional configuration for backend_tls_enabled = true
BackendsTls *BackendsTls `toml:"backends_tls" json:"backends_tls"`
// Optional configuration for protocol = udp
Udp *Udp `toml:"udp" json:"udp"`
// Access configuration
Access *AccessConfig `toml:"access" json:"access"`
// ProxyProtocol configuration
ProxyProtocol *ProxyProtocol `toml:"proxy_protocol" json:"proxy_protocol"`
// Discovery configuration
Discovery *DiscoveryConfig `toml:"discovery" json:"discovery"`
// Healthcheck configuration
Healthcheck *HealthcheckConfig `toml:"healthcheck" json:"healthcheck"`
}
0x04 核心数据结构
src/core
下面定义了 gobetween 的核心结构的抽象,这里列出来一下:
1、Balancer
结构,负载均衡算法的抽象,需要实现 Elect
方法:
/**
* Balancer interface
*/
type Balancer interface {
/**
* Elect backend based on Balancer implementation
*/
Elect(Context, []*Backend) (*Backend, error)
}
2、Server 结构:抽象 LB 负载均衡器的公共接口,对应 于此
type Server interface {
/**
* Start server
*/
Start() error
/**
* Stop server and wait until it stop
*/
Stop()
/**
* Get server configuration
*/
Cfg() config.Server
}
3、ReadWriteCount
结构:
type ReadWriteCount struct {
/* Read bytes count */
CountRead uint
/* Write bytes count */
CountWrite uint
Target Target
}
4、Context
及 TcpContext
:抽象了 TCP 连接的属性
type Context interface {
String() string
Ip() net.IP
Port() int
Sni() string
}
/**
* Proxy tcp context
*/
type TcpContext struct {
Hostname string
/**
* Current client connection
*/
Conn net.Conn
}
/**
* Service is a global facility that could be Enabled or Disabled for a number
* of core.Server instances, depending on their configration. See services/registry
* for exact examples.
*/
type Service interface {
/**
* Enable service for Server
*/
Enable(Server) error
/**
* Disable service for Server
*/
Disable(Server) error
}
6、Backend
:定义了后端节点及统计信息的通用结构
/**
* Backend means upstream server
* with all needed associate information
*/
type Backend struct {
Target
Priority int `json:"priority"`
Weight int `json:"weight"`
Sni string `json:"sni,omitempty"`
Stats BackendStats `json:"stats"`
}
/**
* Backend status
*/
type BackendStats struct {
Live bool `json:"live"`
Discovered bool `json:"discovered"`
TotalConnections int64 `json:"total_connections"`
ActiveConnections uint `json:"active_connections"`
RefusedConnections uint64 `json:"refused_connections"`
RxBytes uint64 `json:"rx"`
TxBytes uint64 `json:"tx"`
RxSecond uint `json:"rx_second"`
TxSecond uint `json:"tx_second"`
}
0x05 核心模块分析 - 主流程
我们从 main.go 开始,main
方法中独立启动了 3
个子逻辑,传入参数为 cfg
配置:
manager
:核心逻辑metrics
:启动 metrics 服务api
:使用gin
框架构建的管理端 CGI 服务
这里通过 goroutine 的方式实现,好处是简化了通信,如果作为独立的服务来实现就需要提供接口给其他模块进行调用了。
func main(){
...
// Process flags and start
cmd.Execute(func(cfg *config.Config) {
// Configure logging
logging.Configure(cfg.Logging.Output, cfg.Logging.Level, cfg.Logging.Format)
// Start manager
manager.Initialize(*cfg)
/* setup metrics */
metrics.Start((*cfg).Metrics)
// Start API
api.Start((*cfg).Api)
// block forever
<-(chan string)(nil)
})
}
下面就这 3
个自逻辑展开进行分析。
0x06 Manage 核心管理逻辑
创建和启动代理 Server
Manage 模块 的核心方法是 manage.Create()
,在 prepareConfig
方法 中,根据配置的类型,如 TLS
配置、启动代理的类型、服务发现的类型等等初始化 config.Server
结构,然后通过 server.New()
方法创建一个代理网关结构,最后通过 server.Start()
启动代理:
PS:注意这里是启动一个代理类型,
func Initialize(cfg config.Config) {
......
// Go through config and start servers for each server
for name, serverCfg := range cfg.Servers {
err := Create(name, serverCfg)
if err != nil {
log.Fatal(err)
}
}
......
}
// 启动单个代理
func Create(name string, cfg config.Server) error {
servers.Lock()
defer servers.Unlock()
if _, ok := servers.m[name]; ok {
return errors.New("Server with this name already exists:" + name)
}
// 根据配置初始化 Server 结构
c, err := prepareConfig(name, cfg, defaults)
if err != nil {
return err
}
// 初始化
server, err := server.New(name, c)
if err != nil {
return err
}
for _, srv := range services {
err = srv.Enable(server)
if err != nil {
return err
}
}
// 启动代理
if err = server.Start(); err != nil {
return err
}
servers.m[name] = server
return nil
}
接下来,我们看下 server.New()
及 server.Start()
做了什么事情。
Server 的初始化及启动
本节以 Tcp 代理的初始化及启动 代码为例,先看下 tcp.Server
的结构体定义,从此结构入手来分析一个代理的实现要素:
这里列举下 tcp.Server
的结构中比较重要的成员:
scheduler
:调度器,负责本代理的(后端选择)负载均衡算法、后端节点的服务发现、健康度探测以及代理自身的数据指标统计clients
:保存了本代理的活跃接入 TCP 连接connect/disconnect
:channel
类型,用于连接接入 / 退出时的异步通信access
:用于代理访问权限检查
具体结构和初始化方法如下代码所示:
type Server struct {
/* Server friendly name */
name string
/* Listener */
listener net.Listener
/* Configuration */
cfg config.Server
/* Scheduler deals with discovery, balancing and healthchecks */
scheduler scheduler.Scheduler // 每一个代理结构都包含一个调度器 scheduler
/* Current clients connection */
clients map[string]net.Conn //map 结构保持了 TCP 代理的所有连接(前置)
/* Stats handler */
statsHandler *stats.Handler // 代理的统计回调
/* ----- channels ----- */
/* Channel for new connections */
connect chan (*core.TcpContext)
/* Channel for dropping connections or connectons to drop */
disconnect chan (net.Conn)
/* Stop channel */
stop chan bool
/* Tls config used to connect to backends */
backendsTlsConfg *tls.Config
/* Tls config used for incoming connections */
tlsConfig *tls.Config
/* Get certificate filled by external service */
GetCertificate func(*tls.ClientHelloInfo) (*tls.Certificate, error)
/* ----- modules ----- */
/* Access module checks if client is allowed to connect */
access *access.Access
}
// 初始化 Server 的方法:
func New(name string, cfg config.Server) (*Server, error) {
......
log := logging.For("server")
var err error = nil
statsHandler := stats.NewHandler(name)
// Create server
server := &Server{
name: name,
cfg: cfg,
stop: make(chan bool),
disconnect: make(chan net.Conn),
connect: make(chan *core.TcpContext),
clients: make(map[string]net.Conn),
statsHandler: statsHandler,
scheduler: scheduler.Scheduler{ // 初始化 scheduler
Balancer: balance.New(cfg.Sni, cfg.Balance), // 负载均衡器
Discovery: discovery.New(cfg.Discovery.Kind, *cfg.Discovery), // 服务发现
Healthcheck: healthcheck.New(cfg.Healthcheck.Kind, *cfg.Healthcheck), // 健康检查
StatsHandler: statsHandler, // 状态上报 & 监控
},
}
......
}
初始化 Server
代理结构完成之后,接下来就是启动代理工作的流程 server.Start()
,这里主要逻辑如下:
- 代理
server.Start()
中,启动一个子 routione 完成连接的调度(server.HandleClientDisconnect
和server.HandleClientConnect
方法)及退出时资源的回收工作 - 启动一个子 goroutine,实现代理状态统计逻辑
this.statsHandler.Start()
- 启动一个子 goroutine,运行代理的调度器
scheduler.Start()
- 在主 goroutine 中启动 Tcp 代理的
Listen
方法,接收连接事件,若有新连接接入时,通过Server.connect
这个chan (*core.TcpContext)
类型的 channel,将该事件通知到1
中的逻辑,触发对新连接的代理连接逻辑实现
下面是 server.Start()
的代码:
func (this *Server) Start() error {
var err error
this.tlsConfig, err = tlsutil.MakeTlsConfig(this.cfg.Tls, this.GetCertificate)
if err != nil {
return err
}
// 代理工作的核心循环
go func() {
for {
select {
// 处理连接断开的事件
case client := <-this.disconnect:
this.HandleClientDisconnect(client)
// 处理新连接事件
case ctx := <-this.connect:
this.HandleClientConnect(ctx)
// 处理代理退出事件
case <-this.stop:
this.scheduler.Stop()
this.statsHandler.Stop()
if this.listener != nil {
this.listener.Close()
for _, conn := range this.clients {
conn.Close()
}
}
this.clients = make(map[string]net.Conn)
return
}
}
}()
// Start stats handler
this.statsHandler.Start()
// Start scheduler
this.scheduler.Start()
// Start listening
if err := this.Listen(); err != nil {
this.Stop()
return err
}
return nil
}
这里我们先看下 server.Listen()
方法:
func (this *Server) Listen() (err error) {
log := logging.For("server.Listen")
// create tcp listener
this.listener, err = net.Listen("tcp", this.cfg.Bind)
if err != nil {
log.Error("Error starting", this.cfg.Protocol+"server:", err)
return err
}
sniEnabled := this.cfg.Sni != nil
// 在子 goroutine 中接受连接,不优雅(参考 fasthttp)
go func() {
for {
conn, err := this.listener.Accept()
if err != nil {
log.Error(err)
return
}
// 处理新连接
go this.wrap(conn, sniEnabled)
}
}()
return nil
}
// 处理新连接的方法
func (this *Server) wrap(conn net.Conn, sniEnabled bool) {
log := logging.For("server.Listen.wrap")
var hostname string
var err error
if sniEnabled {
var sniConn net.Conn
sniConn, hostname, err = sni.Sniff(conn, utils.ParseDurationOrDefault(this.cfg.Sni.ReadTimeout, time.Second*2))
if err != nil {
log.Error("Failed to get / parse ClientHello for sni:", err)
conn.Close()
return
}
conn = sniConn
}
if this.tlsConfig != nil {
conn = tls.Server(conn, this.tlsConfig)
}
// 将新连接封装为 core.TcpContext 结构,放入 channel
this.connect <- &core.TcpContext{
hostname,
conn,
}
}
下面一篇文章,再分析下 gobetween 代理实现的部分代码细节。下一小节我们看下 server.scheduler
的结构及实现。
0x07 Server 代理模块的 Scheduler 结构及实现
上一节说到 Server
中重要的 成员 Scheduler
,这个结构是实现整个负载均衡调度的核心,它包含了如下组件:
- 负载均衡选择器
core.Balancer
- 后端服务发现组件
discovery.Discovery
- 健康检查组件
healthcheck.Healthcheck
- Metrics 收集组件
stats.Handler
type Scheduler struct {
/* Balancer impl */
Balancer core.Balancer
/* Discovery impl */
Discovery *discovery.Discovery
/* Healthcheck impl */
Healthcheck *healthcheck.Healthcheck
/* ----- backends ------*/
/* Current cached backends map */
backends map[core.Target]*core.Backend
/* Stats */
StatsHandler *stats.Handler
/* ----- channels ----- */
/* Backend operation channel */
ops chan Op
/* Stop channel */
stop chan bool
/* Elect backend channel */
elect chan ElectRequest
}
Scheduler 的核心方法
Scheduler.Start()
是 Scheduler
的核心启动方法,
func (this *Scheduler) Start() {
log := logging.For("scheduler")
log.Info("Starting scheduler", this.StatsHandler.Name)
this.ops = make(chan Op)
this.elect = make(chan ElectRequest)
this.stop = make(chan bool)
this.backends = make(map[core.Target]*core.Backend)
this.Discovery.Start()
this.Healthcheck.Start()
// backends stats pusher ticker
backendsPushTicker := time.NewTicker(2 * time.Second)
/**
* Goroutine updates and manages backends
*/
go func() {
for {
select {
/* ----- discovery ----- */
// handle newly discovered backends
case backends := <-this.Discovery.Discover():
this.HandleBackendsUpdate(backends)
this.Healthcheck.In <- this.Targets()
this.StatsHandler.BackendsCounter.In <- this.Targets()
/* ------ healthcheck ----- */
// handle backend healthcheck result
case checkResult := <-this.Healthcheck.Out:
this.HandleBackendLiveChange(checkResult.Target, checkResult.Status == healthcheck.Healthy)
/* ----- stats ----- */
// push current backends to stats handler
case <-backendsPushTicker.C:
this.StatsHandler.Backends <- this.Backends()
// handle new bandwidth stats of a backend
case bs := <-this.StatsHandler.BackendsCounter.Out:
this.HandleBackendStatsChange(bs.Target, &bs)
/* ----- operations ----- */
// handle backend operation
case op := <-this.ops:
this.HandleOp(op)
// elect backend
case electReq := <-this.elect:
this.HandleBackendElect(electReq)
/* ----- stop ----- */
// handle scheduler stop
case <-this.stop:
log.Info("Stopping scheduler", this.StatsHandler.Name)
backendsPushTicker.Stop()
this.Discovery.Stop()
this.Healthcheck.Stop()
metrics.RemoveServer(fmt.Sprintf("%s", this.StatsHandler.Name), this.backends)
return
}
}
}()
}
后端服务发现组件 Discovery
Discovery 主要用于与服务注册中心通信,拿到服务名字对应的实时在线后端列表:
Discovery
组件中,不同的注册中心通过 map[string]func(config.DiscoveryConfig) interface{}
来存储,其中 value 对应的是相应服务发现方法的初始化。以 consul 为例:
func init() {
registry["consul"] = NewConsulDiscovery
}
Discovery
结构如下,fetch FetchFunc
成员,在初始化时,会被赋值为 consul 的实现 consulFetch
,其他成员的含义见注释:
type Discovery struct {
/**
* Cached backends
*/
backends *[]core.Backend // 缓存从 consul-API 获取到的后端节点
/**
* Function to fetch / discovery backends
*/
fetch FetchFunc //consul-API 拉取后端节点的方法
/**
* Options for fetch
*/
opts DiscoveryOpts // 服务发现的配置选项
/**
* Discovery configuration
*/
cfg config.DiscoveryConfig
/**
* Channel where to push newly discovered backends
*/
out chan ([]core.Backend) // 用于和 scheduler 模块通信的 channel,用于传送最新的 backends(后端节点)
/**
* Channel for stopping discovery
*/
stop chan bool
}
discovery.consulFetch
方法,通过 consul 的 Client,通过调用 client.Health().Service()
方法获取后端节点列表,并存在 backends *[]core.Backend
中:
func consulFetch(cfg config.DiscoveryConfig) (*[]core.Backend, error) {
......
scheme := "http"
transport := &http.Transport{
DisableKeepAlives: true,
}
// Enable tls if needed
if cfg.ConsulTlsEnabled {
tlsConfig := &consul.TLSConfig{
Address: cfg.ConsulHost,
CertFile: cfg.ConsulTlsCertPath,
KeyFile: cfg.ConsulTlsKeyPath,
CAFile: cfg.ConsulTlsCacertPath,
}
tlsClientConfig, err := consul.SetupTLSConfig(tlsConfig)
if err != nil {
return nil, err
}
transport.TLSClientConfig = tlsClientConfig
scheme = "https"
}
// Parse http timeout
timeout := utils.ParseDurationOrDefault(cfg.Timeout, consulTimeout)
// Create consul client
client, _ := consul.NewClient(&consul.Config{
Token: cfg.ConsulAclToken,
Scheme: scheme,
Address: cfg.ConsulHost,
Datacenter: cfg.ConsulDatacenter,
HttpAuth: &consul.HttpBasicAuth{
Username: cfg.ConsulAuthUsername,
Password: cfg.ConsulAuthPassword,
},
HttpClient: &http.Client{Timeout: timeout, Transport: transport},
})
// Query service
service, _, err := client.Health().Service(cfg.ConsulServiceName, cfg.ConsulServiceTag, cfg.ConsulServicePassingOnly, nil)
if err != nil {
return nil, err
}
// Gather backends
backends := []core.Backend{}
for _, entry := range service {
s := entry.Service
sni := ""
for _, tag := range s.Tags {
split := strings.SplitN(tag, "=", 2)
if len(split) != 2 {
continue
}
if split[0] != "sni" {
continue
}
sni = split[1]
}
var host string
if s.Address != "" {
host = s.Address
} else {
host = entry.Node.Address
}
backends = append(backends, core.Backend{
Target: core.Target{
Host: host,
Port: fmt.Sprintf("%v", s.Port),
},
Priority: 1,
Weight: 1,
Stats: core.BackendStats{
Live: true,
},
Sni: sni,
})
}
return &backends, nil
}
负载均衡选择组件 Balancer
gobetween 的负载均衡算法也是 map[string]reflect.Type
方式存储的,通过 name
初始化注册,这里我们简单分析下 RoundrobinBalancer
的实现:
var typeRegistry = make(map[string]reflect.Type)
func init() {
typeRegistry["roundrobin"] = reflect.TypeOf(RoundrobinBalancer{})
}
func New(sniConf *config.Sni, balance string) core.Balancer {
balancer := reflect.New(typeRegistry[balance]).Elem().Addr().Interface().(core.Balancer)
if sniConf == nil {
return balancer
}
return &middleware.SniBalancer{
SniConf: sniConf,
Delegate: balancer,
}
}
实现 Balancer
的核心是实现 Elect
方法,该方法在 Scheduler.HandleBackendElect()
方法 中被调用,达到根据负载均衡算法选择后端的目的:
/**
* Roundrobin balancer
*/
type RoundrobinBalancer struct {
/* Current backend position */
current int
}
/**
* Elect backend using roundrobin strategy
*/
func (b *RoundrobinBalancer) Elect(context core.Context, backends []*core.Backend) (*core.Backend, error) {
if len(backends) == 0 {
return nil, errors.New("Can't elect backend, Backends empty")
}
sort.SliceStable(backends, func(i, j int) bool {
return backends[i].Target.String() < backends[j].Target.String()
})
if b.current >= len(backends) {
b.current = 0
}
backend := backends[b.current]
b.current += 1
return backend, nil
}
健康检查组件 Healthcheck
Healthcheck
结构 如下,值得关注的是 workers []*Worker
这个成员实现了一个 工作池 workerpool,该组件的核心流程如下所示:
type Healthcheck struct {
/* Healthcheck function */
check CheckFunc
/* Healthcheck configuration */
cfg config.HealthcheckConfig
/* Input channel to accept targets */
In chan []core.Target // 与 scheduler 模块通信 channel,接收待探测的后端节点
/* Output channel to send check results for individual target */
Out chan CheckResult // 与 scheduler 模块通信 channel,将拨测结果告知 scheduler
/* Current check workers */
workers []*Worker
/* Channel to handle stop */
stop chan bool
}
如官方文档描述,healthycheck 提供了 3 种健康探测的方式,在 init()
中进行了注册,每个 value 都是对应的方法入口,在 Healthcheck
的初始化方法 healthcheck.New()
中传递给 Healthcheck
结构的 check
成员:
func init() {
registry["ping"] = ping
registry["probe"] = probe
registry["exec"] = exec
registry["none"] = nil
}
func New(strategy string, cfg config.HealthcheckConfig) *Healthcheck {
// 根据 strategy 的名字获取对应的探测方法的指针
check := registry[strategy]
h := Healthcheck{
check: check, // 初始化
cfg: cfg,
In: make(chan []core.Target),
Out: make(chan CheckResult),
workers: []*Worker{}, // 初始化为空
stop: make(chan bool),
}
return &h
}
healthycheck 的核心循环 healthy.Start()
逻辑中,主要完成了 2
件事情:
- 监听
this.In
这个 channel,从Scheduler
模块获取最新的后端列表 - 监听
this.stop
,触发上层的控制退出指令,清理资源和退出
func (this *Healthcheck) Start() {
go func() {
for {
select {
/* got new targets */
case targets := <-this.In:
this.UpdateWorkers(targets)
/* got stop requst */
case <-this.stop:
// Stop all workers
for i := range this.workers {
this.workers[i].Stop()
}
// And free it's memory
this.workers = []*Worker{}
return
}
}
}()
}
我们再看下 this.UpdateWorkers()
这个方法,根据传入的后端节点 targets
,
func (this *Healthcheck) UpdateWorkers(targets []core.Target) {
//result 为本次更新的 workerlist
result := []*Worker{}
// Keep or add needed workers
for _, t := range targets {
var keep *Worker
// 遍历旧的 workers 列表
for i := range this.workers {
c := this.workers[i]
if t.EqualTo(c.target) {
// 还在使用的 worker
keep = c
break
}
}
// 如果没找到针对 targets 的 worker,那么就新建一个 worker
if keep == nil {
keep = &Worker{
target: t,
stop: make(chan bool),
out: this.Out, //this.Out 为 healthycheck 的 channel
cfg: this.cfg,
check: this.check,
LastResult: CheckResult{
Status: Initial,
},
}
// 启动 worker
keep.Start()
}
// 将目前在使用的 worker 加入列表
result = append(result, keep)
}
// Stop needed workers
// 移除不再使用的 worker
for i := range this.workers {
c := this.workers[i]
remove := true
for _, t := range targets {
if c.target.EqualTo(t) {
remove = false
break
}
}
if remove {
// 向 worker-goroutine 发送退出 channel
c.Stop()
}
}
// 更新 worker 列表
this.workers = result
}
func (this *Healthcheck) Stop() {
this.stop <- true
}
最后,简单看下 Worker
的结构定义,及核心工作流程 Worker.Start()
方法,此方法完成了如下事情:
- 开启一个定时器
time.NewTicker(interval)
定期检测,interval
的时间由配置指定 - 单独开启 goroutine 运行
this.check
方法,在此方法中,执行真正的健康检查逻辑,结果通过result chan<- CheckResult
异步通知 worker - 在
this.process
方法中,将检测的结果通过healthy.Out
这个 channel 传递给Scheduler
模块 - 等待上层通知本 worker 退出
type Worker struct {
/* Target to monitor and check */
target core.Target
/* Function that does actual check */
check CheckFunc
/* Channel to write changed check results */
out chan<- CheckResult
/* Healthcheck configuration */
cfg config.HealthcheckConfig
/* Stop channel to worker to stop */
stop chan bool
/* Last confirmed check result */
LastResult CheckResult
/* Current passes count, if LastResult.Live = true */
passes int
/* Current fails count, if LastResult.Live = false */
fails int
}
/**
* Start worker
*/
func (this *Worker) Start() {
// Special case for no healthcheck, don't actually start worker
if this.cfg.Kind == "none" {
return
}
interval, _ := time.ParseDuration(this.cfg.Interval)
ticker := time.NewTicker(interval)
c := make(chan CheckResult, 1)
go func() {
/* Check health before any delay*/
log.Debug("Initial check", this.cfg.Kind, "for", this.target)
go this.check(this.target, this.cfg, c)
for {
select {
/* new check interval has reached */
case <-ticker.C:
log.Debug("Next check", this.cfg.Kind, "for", this.target)
go this.check(this.target, this.cfg, c)
/* new check result is ready */
case checkResult := <-c:
log.Debug("Got check result", this.cfg.Kind, ":", checkResult)
this.process(checkResult)
/* request to stop worker */
case <-this.stop:
ticker.Stop()
//close(c) // TODO: Check!
return
}
}
}()
}
func (this *Worker) process(checkResult CheckResult) {
//......
if checkResult.Status == this.LastResult.Status {
// check status not changed
return
}
if checkResult.Status == Unhealthy {
this.passes = 0
this.fails++
} else if checkResult.Status == Healthy {
this.fails = 0
this.passes++
}
if this.passes == 0 && this.fails >= this.cfg.Fails ||
this.fails == 0 && this.passes >= this.cfg.Passes {
this.LastResult = checkResult
log.Info("Sending to scheduler:", this.LastResult)
// 通过 out 这个 channel(本质是 healthy 的 Out)将结果通知给 Scheduler
this.out <- checkResult
}
}
Metrics 组件 Handler
0x08 Metrics 模块
Metrcis 模块的代码 位于此,主要完成了如下事情(较为典型的实现):
- 暴露了给 Prometheus-Client 的采集的 接口
- 提供给 Manage 模块上报 Prometheus-Vec 的方法:
ReportHandleStatsChange
ReportHandleBackendStatsChange
ReportHandleOp
ReportHandleStatsChange
Metrics 的代码实现非常简洁及典型。主要指标如下,个人感觉也加入访问后端的延迟,成功率等等。
serverCount *prometheus.GaugeVec
serverActiveConnections *prometheus.GaugeVec
serverRxTotal *prometheus.GaugeVec
serverTxTotal *prometheus.GaugeVec
serverRxSecond *prometheus.GaugeVec
serverTxSecond *prometheus.GaugeVec
backendActiveConnections *prometheus.GaugeVec
backendRefusedConnections *prometheus.GaugeVec
backendTotalConnections *prometheus.GaugeVec
backendRxBytes *prometheus.GaugeVec
backendTxBytes *prometheus.GaugeVec
backendRxSecond *prometheus.GaugeVec
backendTxSecond *prometheus.GaugeVec
backendLive *prometheus.GaugeVec
0x09 API 模块
API 模块的代码 位于此,主要使用 gin
构建的 API 管理端。主要提供了如下一些功能:
- Dump current config as TOML:导出当前配置
- Servers Restful api implementation:通过调用
manage
模块提供的接口来操作后端资源及属性