0x00 前言
上一篇文章聊到了 Kubernetes 中的负载均衡实现:Kubernetes 应用改造(三):负载均衡,其中提到了一款开源项目:kuberesolver,这篇文章就来分析下它的实现。
0x01 Kubernetes 相关概念
了解 Kubernetes 的朋友应该知道,可以通过 Kubernetes APIserver 提供的 REST API 直接操作 Kubernetes 的内置资源(如 Pod
、Service
、Deployment
、ReplicaSet
、StatefulSet
、Job
、CronJob
等),当然前提是操作方(如某个业务 Pod
容器)需要有相关的接口操作权限。参见 一文带你彻底厘清 Kubernetes 中的证书工作机制
APIServer
在 Kubernetes 架构中,Etcd 存储位于 APIServer 之后,集群内的各种组件均需要需要统一经过 APIServer 做身份认证和鉴权等安全控制,即认证 + 授权,准入后才可以访问对应的资源数据(存储在 Etcd 中)。
GET
(SELECT):从服务器读取资源,GET
请求对应 Kubernetes API 的获取信息功能。需要获取信息时需要使用 GET 请求。POST
(CREATE):在服务器新建一个资源。POST
请求对应 Kubernetes API 的创建功能。需要创建 Pods、ReplicaSet 或者 Service 的时候请使用这种方式发起请求。PUT
(UPDATE):在服务器更新资源(客户端提供改变后的完整资源),对应更新 Nodes 或 Pods 的状态、ReplicaSet 的自动备份数量等。PATCH
(UPDATE):在服务器更新资源(客户端提供改变的属性)DELETE
(DELETE):删除资源,如删除 Pods 等
0x02 Kubernetes 的 List-Watch 机制
Kubernetes 提供的 Watch 功能是建立在对 Etcd 的 Watch 之上的,当 Etcd 的 Key-Value(Kubernetes 中资源的持久化信息) 出现变化时,会通知 APIServer。
对于 Kubernetes 集群内的各种资源,Kubernetes 的控制管理器和调度器需要感知到各种资源的状态变化(比如创建、更新、删除),然后根据变化事件履行自己的管理职责。Kubernetes 的实现是使用 APIServer 来充当简单的消息总线(Message Bus)的角色,提供两类机制(Push && Pull)来实现消息传递:
List
机制:提供查询当前最新状态的接口,以HTTP
短连接方式提供Watch
(监听)机制:所有的组件通过Watch
机制建立HTTP
长链接,随时获悉自己感兴趣的资源的变化事件,实现对应的功能后还是调用 APIServer 来写入组件的Spec
,比如客户端 (Kubelet/Scheduler/Controller-manager) 通过 List-Watch 机制监听 APIServer 中资源 (Pods/ReplicaSet/Service 等等) 的 Create//Update/Delete 事件,并针对事件类型调用相应的事件处理函数。
以 Pods 为例,List API 一般为 GET /api/v1/pods
, Watch API 一般为 GET /api/v1/watch/pods
,并且会带上 watch=true
,表示采用 HTTP 长连接持续监听 Pods 相关事件,每当有事件来临,返回一个 WatchEvent。
Watch 的实现机制
通常使用 HTTP 大都是短连接方式,那么 Watch 是如何通过 HTTP 长连接获取 APIServer 发来的资源变更事件呢?答案就是 Chunked transfer encoding。如下,注意看 HTTP Response 中的 Transfer-Encoding: chunked
:
$ curl -i http://{kube-api-server-ip}:8080/api/v1/watch/pods?watch=yes
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 02 Jan 2020 20:22:59 GMT
Transfer-Encoding: chunked
{"type":"ADDED", "object":{"kind":"Pod","apiVersion":"v1",...}}
{"type":"ADDED", "object":{"kind":"Pod","apiVersion":"v1",...}}
{"type":"MODIFIED", "object":{"kind":"Pod","apiVersion":"v1",...}}
下面给出两个实际的查询例子:
List 查询
$ curl -X GET -i http://127.0.0.1:8080/api/v1/pods
HTTP/1.1 200 OK
Content-Type: application/json
Date: Tue, 15 Mar 2016 08:18:25 GMT
Transfer-Encoding: chunked
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/pods",
"resourceVersion": "228211"
},
"items": [
{
"metadata": {
"name": "nginx-app-o0kvl",
"generateName": "nginx-app-",
"namespace": "default",
"selfLink": "/api/v1/namespaces/default/pods/nginx-app-o0kvl",
"uid": "090cc0c8-ea84-11e5-8c79-42010af00003",
"resourceVersion": "228094",
"creationTimestamp": "2016-03-15T08:00:51Z",
"labels": {
"run": "nginx-app"
},
"annotations": {
"kubernetes.io/created-by": "{\"kind\":\"SerializedReference\",\"apiVersion\":\"v1\",\"reference\":{\"kind\":\"ReplicationController\",\"namespace\":\"default\",\"name\":\"nginx-app\",\"uid\":\"090bfb57-ea84-11e5-8c79-42010af00003\",\"apiVersion\":\"v1\",\"resourceVersion\":\"228081\"}}\n"
}
},
"spec": {
"volumes": [
{
"name": "default-token-wpfjn",
"secret": {
"secretName": "default-token-wpfjn"
}
}
],
"containers": [
{
"name": "nginx-app",
"image": "nginx",
"ports": [
{
"containerPort": 80,
"protocol": "TCP"
}
],
"resources": {},
"volumeMounts": [
{
"name": "default-token-wpfjn",
"readOnly": true,
"mountPath": "/var/run/secrets/kubernetes.io/serviceaccount"
}
],
"terminationMessagePath": "/dev/termination-log",
"imagePullPolicy": "Always"
}
],
"restartPolicy": "Always",
"terminationGracePeriodSeconds": 30,
"dnsPolicy": "ClusterFirst",
"serviceAccountName": "default",
"serviceAccount": "default",
"nodeName": "10.240.0.4",
"securityContext": {}
},
"status": {
"phase": "Running",
"conditions": [
{
"type": "Ready",
"status": "True",
"lastProbeTime": null,
"lastTransitionTime": "2016-03-15T08:00:52Z"
}
],
"hostIP": "10.240.0.4",
"podIP": "172.16.49.2",
"startTime": "2016-03-15T08:00:51Z",
"containerStatuses": [
{
"name": "nginx-app",
"state": {
"running": {
"startedAt": "2016-03-15T08:00:52Z"
}
},
"lastState": {},
"ready": true,
"restartCount": 0,
"image": "nginx",
"imageID": "docker://sha256:af4b3d7d5401624ed3a747dc20f88e2b5e92e0ee9954aab8f1b5724d7edeca5e",
"containerID": "docker://b97168314ad58404dbce7cb94291db7a976d2cb824b39e5864bf4bdaf27af255"
}
]
}
}
]
}
Watcher 查询
$ curl -X GET -i http://127.0.0.1:8080/api/v1/pods?watch=true
HTTP/1.1 200 OK
Transfer-Encoding: chunked
Date: Tue, 15 Mar 2016 08:00:09 GMT
Content-Type: text/plain; charset=utf-8
Transfer-Encoding: chunked
{"type":"ADDED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"nginx-app-o0kvl","generateName":"nginx-app-","namespace":"default","selfLink":"/api/v1/namespaces/default/pods/nginx-app-o0kvl","uid":"090cc0c8-ea84-11e5-8c79-42010af00003","resourceVersion":"228082","creationTimestamp":"2016-03-15T08:00:51Z","labels":{"run":"nginx-app"},"annotations":{"kubernetes.io/created-by":"{\"kind\":\"SerializedReference\",\"apiVersion\":\"v1\",\"reference\":{\"kind\":\"ReplicationController\",\"namespace\":\"default\",\"name\":\"nginx-app\",\"uid\":\"090bfb57-ea84-11e5-8c79-42010af00003\",\"apiVersion\":\"v1\",\"resourceVersion\":\"228081\"}}\n"}},"spec":{"volumes":[{"name":"default-token-wpfjn","secret":{"secretName":"default-token-wpfjn"}}],"containers":[{"name":"nginx-app","image":"nginx","ports":[{"containerPort":80,"protocol":"TCP"}],"resources":{},"volumeMounts":[{"name":"default-token-wpfjn","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","imagePullPolicy":"Always"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","securityContext":{}},"status":{"phase":"Pending"}}}
{"type":"MODIFIED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"nginx-app-o0kvl","generateName":"nginx-app-","namespace":"default","selfLink":"/api/v1/namespaces/default/pods/nginx-app-o0kvl","uid":"090cc0c8-ea84-11e5-8c79-42010af00003","resourceVersion":"228084","creationTimestamp":"2016-03-15T08:00:51Z","labels":{"run":"nginx-app"},"annotations":{"kubernetes.io/created-by":"{\"kind\":\"SerializedReference\",\"apiVersion\":\"v1\",\"reference\":{\"kind\":\"ReplicationController\",\"namespace\":\"default\",\"name\":\"nginx-app\",\"uid\":\"090bfb57-ea84-11e5-8c79-42010af00003\",\"apiVersion\":\"v1\",\"resourceVersion\":\"228081\"}}\n"}},"spec":{"volumes":[{"name":"default-token-wpfjn","secret":{"secretName":"default-token-wpfjn"}}],"containers":[{"name":"nginx-app","image":"nginx","ports":[{"containerPort":80,"protocol":"TCP"}],"resources":{},"volumeMounts":[{"name":"default-token-wpfjn","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","imagePullPolicy":"Always"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","nodeName":"10.240.0.4","securityContext":{}},"status":{"phase":"Pending"}}}
{"type":"MODIFIED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"nginx-app-o0kvl","generateName":"nginx-app-","namespace":"default","selfLink":"/api/v1/namespaces/default/pods/nginx-app-o0kvl","uid":"090cc0c8-ea84-11e5-8c79-42010af00003","resourceVersion":"228088","creationTimestamp":"2016-03-15T08:00:51Z","labels":{"run":"nginx-app"},"annotations":{"kubernetes.io/created-by":"{\"kind\":\"SerializedReference\",\"apiVersion\":\"v1\",\"reference\":{\"kind\":\"ReplicationController\",\"namespace\":\"default\",\"name\":\"nginx-app\",\"uid\":\"090bfb57-ea84-11e5-8c79-42010af00003\",\"apiVersion\":\"v1\",\"resourceVersion\":\"228081\"}}\n"}},"spec":{"volumes":[{"name":"default-token-wpfjn","secret":{"secretName":"default-token-wpfjn"}}],"containers":[{"name":"nginx-app","image":"nginx","ports":[{"containerPort":80,"protocol":"TCP"}],"resources":{},"volumeMounts":[{"name":"default-token-wpfjn","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","imagePullPolicy":"Always"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","nodeName":"10.240.0.4","securityContext":{}},"status":{"phase":"Pending","conditions":[{"type":"Ready","status":"False","lastProbeTime":null,"lastTransitionTime":"2016-03-15T08:00:51Z","reason":"ContainersNotReady","message":"containers with unready status: [nginx-app]"}],"hostIP":"10.240.0.4","startTime":"2016-03-15T08:00:51Z","containerStatuses":[{"name":"nginx-app","state":{"waiting":{"reason":"ContainerCreating","message":"Image: nginx is ready, container is creating"}},"lastState":{},"ready":false,"restartCount":0,"image":"nginx","imageID":""}]}}}
{"type":"MODIFIED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"nginx-app-o0kvl","generateName":"nginx-app-","namespace":"default","selfLink":"/api/v1/namespaces/default/pods/nginx-app-o0kvl","uid":"090cc0c8-ea84-11e5-8c79-42010af00003","resourceVersion":"228094","creationTimestamp":"2016-03-15T08:00:51Z","labels":{"run":"nginx-app"},"annotations":{"kubernetes.io/created-by":"{\"kind\":\"SerializedReference\",\"apiVersion\":\"v1\",\"reference\":{\"kind\":\"ReplicationController\",\"namespace\":\"default\",\"name\":\"nginx-app\",\"uid\":\"090bfb57-ea84-11e5-8c79-42010af00003\",\"apiVersion\":\"v1\",\"resourceVersion\":\"228081\"}}\n"}},"spec":{"volumes":[{"name":"default-token-wpfjn","secret":{"secretName":"default-token-wpfjn"}}],"containers":[{"name":"nginx-app","image":"nginx","ports":[{"containerPort":80,"protocol":"TCP"}],"resources":{},"volumeMounts":[{"name":"default-token-wpfjn","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","imagePullPolicy":"Always"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","nodeName":"10.240.0.4","securityContext":{}},"status":{"phase":"Running","conditions":[{"type":"Ready","status":"True","lastProbeTime":null,"lastTransitionTime":"2016-03-15T08:00:52Z"}],"hostIP":"10.240.0.4","podIP":"172.16.49.2","startTime":"2016-03-15T08:00:51Z","containerStatuses":[{"name":"nginx-app","state":{"running":{"startedAt":"2016-03-15T08:00:52Z"}},"lastState":{},"ready":true,"restartCount":0,"image":"nginx","imageID":"docker://sha256:af4b3d7d5401624ed3a747dc20f88e2b5e92e0ee9954aab8f1b5724d7edeca5e","containerID":"docker://b97168314ad58404dbce7cb94291db7a976d2cb824b39e5864bf4bdaf27af255"}]}}}
通过上面的例子对 kuberesolver 中 HTTP 接口返回的接口解析部分的代码有所帮助。
0x03 看看 kuberesolver
先简单介绍下 kuberesolver 的使用方法,在 gRPC 客户端做如下形式的调用:
import "github.com/sercand/kuberesolver/v3"
// Register kuberesolver to grpc before calling grpc.Dial
kuberesolver.RegisterInCluster()
// it is same as
resolver.Register(kuberesolver.NewBuilder(nil /*custom kubernetes client*/ , "kubernetes"))
// if schema is 'kubernetes' then grpc will use kuberesolver to resolve addresses
cc, err := grpc.Dial("kubernetes:///service.namespace:portname", opts...)
// 或者
grpc.DialContext(ctx, "kubernetes:///service:grpc", grpc.WithBalancerName("round_robin"), grpc.WithInsecure())
kuberesolver 支持如下形式的寻址方式,在 parseResolverTarget
方法 中实现 服务发现寻址 的解析:
kubernetes:///service-name:8080
kubernetes:///service-name:portname
kubernetes:///service-name.namespace:8080
kubernetes://namespace/service-name:8080
kubernetes://service-name:8080/
kubernetes://service-name.namespace:8080/
kuberesolver 整体架构
根据先前已有对 gRPC +
Etcd/Consul 负载均衡器的实现经验,思路大致都是一样的:
- 实现
Resolver
:创建全局Resolver
的标识符及实现grpc.Resolver
的接口 - 实现
Watcher
(一般作为Resolver
的子协程单独创建):接收 Kubernetes 的API
的改变通知并调用 gRPC 的接口通知 gRPC 内部
kuberesolver 的整体项目架构如下所示:
下面按照架构图的模块对源码进一步分析。
0x04 结构定义 && 功能
models.go
中定义了 kubernetes 的 API 返回结果的 JSON
结构体,对于 HTTP-Watcher 接口返回的结果,即监听的事件分为三种:
const (
Added EventType = "ADDED"
Modified EventType = "MODIFIED"
Deleted EventType = "DELETED"
Error EventType = "ERROR"
)
0x05 Resolver 实现
先看看 Resolver
的实现,Resolver
的 name 为 kubernetes
,此值会用在 grpc.Dial()
的模式中。
const (
kubernetesSchema = "kubernetes"
defaultFreq = time.Minute * 30
)
kubeBuilder 及 kubeBuilder.Build 方法
kubeBuilder
结构及 NewBuilder
初始化,注意其中的 k8sClient
,本质就是一个 http.Client
封装:
// NewBuilder creates a kubeBuilder which is used by grpc resolver.
func NewBuilder(client K8sClient, schema string) resolver.Builder {
return &kubeBuilder{
k8sClient: client,
schema: schema,
}
}
type kubeBuilder struct {
k8sClient K8sClient
schema string
}
下面是核心函数 Build
,构造生成 resolver.Builder
的方法:
// Build creates a new resolver for the given target.
//
// gRPC dial calls Build synchronously, and fails if the returned error is
// not nil.
func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if b.k8sClient == nil {
if cl, err := NewInClusterK8sClient(); err == nil {
b.k8sClient = cl
} else {
return nil, err
}
}
// 解析用户传入的 resolver 地址
ti, err := parseResolverTarget(target)
if err != nil {
return nil, err
}
if ti.serviceNamespace == "" {
ti.serviceNamespace = getCurrentNamespaceOrDefault()
}
// 初始化 kResolver 结构体
ctx, cancel := context.WithCancel(context.Background())
r := &kResolver{
target: ti,
ctx: ctx,
cancel: cancel,
cc: cc,
rn: make(chan struct{}, 1),
k8sClient: b.k8sClient,
t: time.NewTimer(defaultFreq),
freq: defaultFreq,
endpoints: endpointsForTarget.WithLabelValues(ti.String()),
addresses: addressesForTarget.WithLabelValues(ti.String()),
}
// 开启单独的 watcher 模式
go until(func() {
r.wg.Add(1)
err := r.watch()
if err != nil && err != io.EOF {
grpclog.Errorf("kuberesolver: watching ended with error='%v', will reconnect again", err)
}
}, time.Second, ctx.Done())
return r, nil
}
此方法的核心步骤(通用范式)是:
1、首先,解析 parseResolverTarget
传入的地址,本项目中支持如下格式
kubernetes:///service-name:8080
kubernetes:///service-name:portname
kubernetes:///service-name.namespace:8080
kubernetes://namespace/service-name:8080
kubernetes://service-name:8080/
kubernetes://service-name.namespace:8080/
上面的地址格式被解析为如下结构 targetInfo
,在 Kubernetes 的定义中,APIServer 的调用服务地址必须包含如下关键信息:
serviceName
:服务 nameserviceNamespace
:命名空间port
:APIServer 的端口
type targetInfo struct {
serviceName string
serviceNamespace string
port string
resolveByPortName bool
useFirstPort bool
}
func (ti targetInfo) String() string {
return fmt.Sprintf("kubernetes://%s/%s:%s", ti.serviceNamespace, ti.serviceName, ti.port)
}
2、第二步,根据解析的结果,初始化 kResolver
结构体,此结构体包含了独立 goroutine 运行所需要的所有成员:
type kResolver struct {
target targetInfo //APISERVER 地址
ctx context.Context // 用于 goroutine 退出
cancel context.CancelFunc
cc resolver.ClientConn // 用于调用 grpc 的方法通知
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{} // 用于触发 ResolveNow() 逻辑的信号 signal
k8sClient K8sClient // 用于请求 APISERVER 的 HTTP 客户端
// wg is used to enforce Close() to return after the watcher() goroutine has finished.
wg sync.WaitGroup
t *time.Timer // 用于 LIST 方式(定时拉取一次全量,用于初始化时)
freq time.Duration
endpoints prometheus.Gauge
addresses prometheus.Gauge
}
3、最后,开启独立的子 goroutine,实现 Watcher
逻辑
...
// 开启单独的 watcher 模式
go until(func() {
r.wg.Add(1)
err := r.watch()
if err != nil && err != io.EOF {
grpclog.Errorf("kuberesolver: watching ended with error='%v', will reconnect again", err)
}
}, time.Second, ctx.Done())
...
kResolver 的实现
作为 resovler.Builder
的实例化实现(在 return
中返回 resovler.Builder
),kResolver
必须实现 resovler.Builder
定义的方法:
ResolveNow
:立即执行一次 resolve,本方法非必要实现Close
:关闭 Watcher,回收资源
// ResolveNow will be called by gRPC to try to resolve the target name again.
// It's just a hint, resolver can ignore this if it's not necessary.
func (k *kResolver) ResolveNow(resolver.ResolveNowOptions) {
select {
case k.rn <- struct{}{}:
default:
}
}
// Close closes the resolver.
func (k *kResolver) Close() {
k.cancel()
k.wg.Wait()
}
kResolver.resolve
kResolver.resolve
是 Watcher 的核心方法,这是个经典的 for-select-loop
,主要接受如下几类触发的 channel 事件:
k.ctx.Done()
:上层通知退出k.t.C
:定时器触发k.rn
:ResolveNow
触发up, hasMore := <-sw.ResultChan()
:streamWatcher
有新消息(事件)通知时,调用k.handle(up.Object)
处理事件,即监听的资源发生了改变(扩容 / 缩容 / 下线等)
func (k *kResolver) handle(e Endpoints) {
// 通过 makeAddresses 得到全量 IP 列表
result, _ := k.makeAddresses(e)
// k.cc.NewServiceConfig(sc)
if len(result) > 0 {
k.cc.NewAddress(result)
}
k.endpoints.Set(float64(len(e.Subsets)))
k.addresses.Set(float64(len(result)))
}
func (k *kResolver) resolve() {
e, err := getEndpoints(k.k8sClient, k.target.serviceNamespace, k.target.serviceName)
if err == nil {
k.handle(e)
} else {
grpclog.Errorf("kuberesolver: lookup endpoints failed: %v", err)
}
// Next lookup should happen after an interval defined by k.freq.
k.t.Reset(k.freq)
}
func (k *kResolver) watch() error {
defer k.wg.Done()
// watch endpoints lists existing endpoints at start
sw, err := watchEndpoints(k.k8sClient, k.target.serviceNamespace, k.target.serviceName)
if err != nil {
return err
}
for {
select {
case <-k.ctx.Done():
return nil
case <-k.t.C:
k.resolve()
case <-k.rn:
k.resolve()
case up, hasMore := <-sw.ResultChan():
if hasMore {
k.handle(up.Object)
} else {
return nil
}
}
}
}
0x06 K8sClient 分析
在分析 Watcher 的逻辑之前,先看下 k8sClient
封装,正如文章开始说的那样,提供了 List
和 Watch
两种调用方式:
watchEndpoints
方法:提供Watch
方式的增量接口getEndpoints
方法:提供List
方式的全量接口
k8sClient 抽象接口 && 定义 && 初始化
// K8sClient is minimal kubernetes client interface
// 大写为 interface{}
type K8sClient interface {
Do(req *http.Request) (*http.Response, error)
GetRequest(url string) (*http.Request, error)
Host() string
}
// 小写为具体实现
type k8sClient struct {
host string
token string //http-token 认证方式
httpClient *http.Client // 真正的 http.Client
}
注意看初始化里面:
- APIServer 的证书加载路径
net.http
默认是长连接方式,不过这里并未涉及超时等相关参数,默认为阻塞 client
const(
serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token"
serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
kubernetesNamespaceFile = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
)
// NewInClusterK8sClient creates K8sClient if it is inside Kubernetes
func NewInClusterK8sClient() (K8sClient, error) {
host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
if len(host) == 0 || len(port) == 0 {
return nil, fmt.Errorf("unable to load in-cluster configuration, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined")
}
token, err := ioutil.ReadFile(serviceAccountToken)
if err != nil {
return nil, err
}
ca, err := ioutil.ReadFile(serviceAccountCACert)
if err != nil {
return nil, err
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(ca)
transport := &http.Transport{TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS10,
RootCAs: certPool,
}}
httpClient := &http.Client{Transport: transport, Timeout: time.Nanosecond * 0}
return &k8sClient{
host: "https://" + net.JoinHostPort(host, port),
token: string(token),
httpClient: httpClient,
}, nil
}
getEndpoints && watchEndpoints
getEndpoints
方法 就是普通的 GET 查询,采用 HTTP 短连接方式
func getEndpoints(client K8sClient, namespace, targetName string) (Endpoints, error) {
u, err := url.Parse(fmt.Sprintf("%s/api/v1/namespaces/%s/endpoints/%s",
client.Host(), namespace, targetName))
if err != nil {
return Endpoints{}, err
}
req, err := client.GetRequest(u.String())
if err != nil {
return Endpoints{}, err
}
resp, err := client.Do(req)
if err != nil {
return Endpoints{}, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return Endpoints{}, fmt.Errorf("invalid response code %d", resp.StatusCode)
}
result := Endpoints{}
err = json.NewDecoder(resp.Body).Decode(&result)
return result, err
}
而 watchEndpoints
方法 则实现了 Http 的长连接方式(由服务端控制),通过调用 newStreamWatcher
返回一个 streamWatcher
,传入参数为 resp.Body
。
特别注意这里的 HTTP 客户端的初始化方式,是适用于 Transfer-Encoding: chunked
模式的:
func watchEndpoints(client K8sClient, namespace, targetName string) (watchInterface, error) {
u, err := url.Parse(fmt.Sprintf("%s/api/v1/watch/namespaces/%s/endpoints/%s",
client.Host(), namespace, targetName))
if err != nil {
return nil, err
}
req, err := client.GetRequest(u.String())
if err != nil {
return nil, err
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
return nil, fmt.Errorf("invalid response code %d", resp.StatusCode)
}
return newStreamWatcher(resp.Body), nil
}
0x07 Watcher 实现
接上一小节,Watcher 逻辑代码主要 在此,其本质就是 streamWatcher
,streamWatcher
扮演了这么一种角色,先回想下一般的 HTTP 请求模型:
//...
resp, err := client.Do(req)
if err != nil {
//...
}
defer resp.Body.Close()
json.NewDecoder(resp.Body).Decode(&result)
//...
上面这个是非常普通的 HTTP 请求代码,其中的 resp.Body
的类型是 io.ReadCloser
,在 HTTP 的 Transfer-Encoding: chunked
模式下,如果不关闭 resp.Body
,那么就可以实现在这个 stream 长连接上不断的读取数据。
切记不要调用 defer resp.Body.Close()
。
streamWatcher 的结构
streamWatcher
的结构如下,其中的 result chan Event
用于向外部传递解析的结果(HTTP-API 接口返回)
// Interface can be implemented by anything that knows how to watch and report changes.
type watchInterface interface {
// Stops watching. Will close the channel returned by ResultChan(). Releases
// any resources used by the watch.
Stop()
// Returns a chan which will receive all the events. If an error occurs
// or Stop() is called, this channel will be closed, in which case the
// watch should be completely cleaned up.
ResultChan() <-chan Event
}
// StreamWatcher turns any stream for which you can write a Decoder interface
// into a watch.Interface.
type streamWatcher struct {
result chan Event // 用来向外部传递解析 respose 的结果
r io.ReadCloser
decoder *json.Decoder
sync.Mutex
stopped bool
}
注意上面的 watchInterface
暴露的对外部的接口:
ResultChan
方法:返回streamWatcher
结构中的result chan Event
成员给外部Stop
方法:关闭整个streamWatcher
的监听逻辑,清理资源并退出
streamWatcher
的实现是个非常经典的观察者模式。
初始化 NewStreamWatcher && Watcher
在 newStreamWatcher
中,通过单独调用 go sw.receive()
来开启独立的 Watcher 逻辑:
// NewStreamWatcher creates a StreamWatcher from the given io.ReadClosers.
func newStreamWatcher(r io.ReadCloser) watchInterface {
sw := &streamWatcher{
r: r,
decoder: json.NewDecoder(r),
result: make(chan Event),
}
go sw.receive()
return sw
}
// receive reads result from the decoder in a loop and sends down the result channel.
func (sw *streamWatcher) receive() {
defer close(sw.result)
defer sw.Stop() // 在此方法中关闭 resp.Body
//watcher LOOP
for {
// 通过 sw.Decode() 不停的触发 Read
obj, err := sw.Decode()
if err != nil {
// 错误处理
// Ignore expected error.
if sw.stopping() {
return
}
switch err {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
grpclog.Infof("kuberesolver: Unexpected EOF during watch stream event decoding: %v", err)
default:
grpclog.Infof("kuberesolver: Unable to decode an event from the watch stream: %v", err)
}
return
}
sw.result <- obj
}
}
最后看下 Decode
方法,解析 HTTP-API 的响应,将 JSON 序列化的结果返回:
// Decode blocks until it can return the next object in the writer. Returns an error
// if the writer is closed or an object can't be decoded.
func (sw *streamWatcher) Decode() (Event, error) {
var got Event
if err := sw.decoder.Decode(&got); err != nil {
return Event{}, err
}
switch got.Type {
case Added, Modified, Deleted, Error:
return got, nil
default:
return Event{}, fmt.Errorf("got invalid watch event type: %v", got.Type)
}
}
0x08 实践
0x09 总结
本文分析了使用 Kubernetes API 的方式来实现 gRPC 负载均衡的一种思路,项目中也是这么使用来实现 Kubernetes Service 的负载均衡的。