0x00 开篇
网上基于 websocket 打通 kubernetes pod 的实现非常多,但是受限于 webconsole 的不便利性,这边文章来分析下如何使用 SSH 方式打通 kubernetes 的登录
Kubectl exec 的原理简介
通过 kubectl exec 进行容器的数据流如下:

0x01 实现思路
1、remotecommand 暴露的 Executor 方法 
实现的核心是打通 IO(即 stdin 与 stdout)之间的桥接,如下图:

kubernetes 的 client-go 包提供了集群中的容器建立长连接的方法,并设置容器的 stdin,stdout 等。此 package 提供了基于 SPDY 协议的 Executor,其本质是一个 interface,用于和 pod 终端的流的传输及通信。Executor 接口定义了 Stream 方法,该方法会建立一个流传输的连接,直到服务端和调用端一端关闭连接,才会停止传输(原 package 中给出的实现是基于 HTTP-SPDY 的,不适用于笔者的场景,所有需要重新实现 Stream 方法)
此外,考虑到 StreamOptions 是 Stream 方法的参数,在实现 Stream 方法时需要填充该结构。
// Executor is an interface for transporting shell-style streams.
type Executor interface {
	// Stream initiates the transport of the standard shell streams. It will transport any
	// non-nil stream to a remote system, and return an error if a problem occurs. If tty
	// is set, the stderr stream is not used (raw TTY manages stdout and stderr over the
	// stdout stream).
	Stream(options StreamOptions) error
}
func (e *streamExecutor) Stream(options StreamOptions) error{}
// StreamOptions holds information pertaining to the current streaming session:
// input/output streams, if the client is requesting a TTY, and a terminal size queue to
// support terminal resizing.
type StreamOptions struct {
	Stdin             io.Reader
	Stdout            io.Writer
	Stderr            io.Writer
	Tty               bool
	TerminalSizeQueue TerminalSizeQueue
}
关于 SteamOption 中各个成员的作用,可以参见 V1 的实现;简言之,就是外部对接容器内部暴露的输入 / 输出。
TerminalSizeQueue 结构
TerminalSizeQueue is capable of returning terminal resize events as they occur:TerminalSizeQueue 结构的作用是向容器进行时传递客户端侧的窗口变化的通知事件(事件包含当前的终端的 Weight 与 Height 值):即当客户终端 TTY 窗口的 size 发生了改变时,需要通知容器进行时调整显示的 size,否则会出现内外窗口显示不一致的问题
type TerminalSizeQueue interface {
	// Next returns the new terminal size after the terminal has been resized. It returns nil when
	// monitoring has been stopped.
	Next() *TerminalSize
}
TerminalSize 结构如下:
// TerminalSize and TerminalSizeQueue was a part of k8s.io/kubernetes/pkg/util/term
// and were moved in order to decouple client from other term dependencies
// TerminalSize represents the width and height of a terminal.
type TerminalSize struct {
	Width  uint16
	Height uint16
}
2、定义核心结构体,构造 StreamOptions 所需的成员 
// TtyHandler
type PyStreamOption interface {
	io.Reader	//stdin
	io.Writer	//stdout && stderr
	//io.Writer
	remotecommand.TerminalSizeQueue
}
定义如上结构 PyStreamOption,然后需要实例化 PyStreamOption;注意实例化之后,必须实现该 interface 的 3 个方法:
Read(p []byte) (int, error):从前置 SSH 会话(ssh.Channel)中读取数据,转发到用户侧Write(p []byte) (int, error):从前置 SSH 会话获取用户输入,写入 podNext() *TerminalSize:即时调整终端的 size
设置的目的是:
- 对接 terminal 的输入输出
 - 实时监听窗口大小的改变并调整
 
这样,后续调用 Stream 方法时,只要将 StreamOptions 的 Stdin/Stdout 都设置为 PyStreamOption,Executor 会通过你定义的 Write 和 Read 方法来传输数据。
3、建立容器(连接)的方法 
注意下面的 CreateK8SStream 方法,最后会使用 remotecommand 的 exec.Stream 方法来实现打通容器登录的过程(stdin/stdout/stderr 由参数传入),即上一步定义的 TtyHandler 结构的相关成员:
func CreateK8SStream(option PyStreamOption) {
	//...
	//NewSPDYExecutor
	req := cli.CoreV1().RESTClient().Post().
			Resource("pods").
			Name(podName).
			Namespace(namespace).
			SubResource("exec")
	option := &corev1.PodExecOptions{
		Command: cmd,
		Stdin:   true,
		Stdout:  true,
		Stderr:  true,
		TTY:     true,
	}
	req.VersionedParams(
		option,
		scheme.ParameterCodec,
	)
	exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
	if err != nil {
		return err
	}
	//Stream
	err = exec.Stream(remotecommand.StreamOptions{
		Stdin:             stdin,		// 来自 option
		Stdout:            stdout,		// 来自 option
		Stderr:            stderr,		// 来自 option
		Tty:               true,		// 来自 option,设置 TTY
		TerminalSizeQueue: terminalSizeQueue,	// 来自 option
	})
	//...
}
接下来的思路就是如何生成一个 Executor,来打通 SSH 到 SPDY(主要还是打通上图的 STDIN/STDOUT)
0x02 文件 COPY 的实现
如前文描述,方法 exec.Stream 执行时会向 APServer 发起一个请求,APServer 会通过代理机制将请求转发给相应节点上的 kubelet 服务,kubelet 会通过 CRI 接口调用 runtime 的接口,最终调用流式接口中的 Exec() 进入到 container 执行。通常针对一般化的命令调用,输入和输出以文本为主,并不会占用带宽和 APServer 的资源,但是当涉及到文件传输的时候则会占用较多带宽,因此这种方式不太适合大容量文件的 copy,如何优化?
- 增加流控
 - 在文件 copy 场景下,直接向 kubelet 发起 
exec请求(绕过 APServer),但是需要解决访问 kubelet 地址、认证等相关问题 
以文件 copy 场景为例,使用的命令是 kubectl cp,前提是 Pod 中对应的 container 中安装了 tar 命令,本质上该指令还是调用了 exec 接口,然后执行 tar 命令进行文件的拷贝动作
#宿主机 –> Pod
#将宿主机 /tmp/test_pod.txt copy 到 mycentos-7b59b5b755-8rbgc 对应的第一个容器中的 /root 目录下
kubectl cp /tmp/test_pod.txt default/mycentos-7b59b5b755-8rbgc:/root
#Pod –> 宿主机
#将 mycentos-7b59b5b755-8rbgc 中 /root/from_pod.txt 文件拷贝到宿主机 /tmp 目录下,并命名为 from_pod.new
kubectl cp default/mycentos-7b59b5b755-8rbgc:/root/from_pod.txt  /tmp/from_pod.new
#将 mycentos-7b59b5b755-8rbgc 中 /root/pod_dir 目录下的文件拷贝到宿主机 /tmp 目录下
kubectl cp default/mycentos-7b59b5b755-8rbgc:/root/pod_dir /tmp
为什么要使用 tar 呢?tar 支持压缩从标准输入读取文件名列表
find . -type f -name "*.jpg" -print | xargs tar -czvf images.tar.gz
find $HOME -name "*.doc" -print0 | tar -cvf /tmp/file.tar --null -T -
find $HOME -type f -name "*.sh" | xargs tar cfvz /nfs/x230/my-shell-scripts.tgz
# 从容器向外拷贝,通过 tar 命令压缩将输出重定向到标准输出
tar cf - ${srcFile}
# 从外面向容器内拷贝,将标准输入数据通过 tar 解压写入到 container 内的文件目录
tar --no-same-permissions --no-same-owner -xmf -
# OR
tar -xmf -
通过 client-go 向 pod 所在节点实现查询 kubelet 地址,然后向 kubelet 直接发起 exec 请求,代码如下:
func main() {
        // 此处直接使用了 / root/.kube/config 文件(有足够的权限)
        // 如果使用 service account token 的话,还需要额外创建 role 和 rolebinding
        // 当直接访问 kubelet 接口的时候,kubelet 支持使用证书和 token 进行认证,
        // 使用 rbac 对请求进行鉴权操作
        var kubeConfig *string
        if home := homedir.HomeDir(); home != "" {
                kubeConfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
        } else {
                kubeConfig = flag.String("kubeconfig", "","absolute path to the kubeconfig file")
        }
        flag.Parse()
        config, err := clientcmd.BuildConfigFromFlags("", *kubeConfig)
        if err != nil {
                panic(err)
        }
        // 构建请求,直接请求 kubelet 需要的各种参数
        params := url.Values{}
        params.Add("tty", "1")
        params.Add("input", "0")
        params.Add("output", "1")
        params.Add("error", "0")
        params.Add("command", "ls")
        params.Add("command", "/")
        url := &url.URL{
                Scheme:   "https",
                Host:     "10.0.0.1:10250",
                Path:     "/exec/default/busybox/busybox1",
                RawQuery: params.Encode(),
        }
        // 此处配置证书相关配置,由于是测试,这块直接忽略了,在实际生产使用时可从文件读取
        // 此处配置的 CAFile 在更新后,client-go 能够定时自动刷新
        config.TLSClientConfig.CAFile = ""
        config.TLSClientConfig.CAData = nil
        config.TLSClientConfig.Insecure = true
        executor, err := remotecommand.NewSPDYExecutor(config, "POST", url)
        if err != nil {
                panic(err)
        }
        done := make(chan error)
        var buf bytes.Buffer
        wrap := func() {
                err := executor.Stream(remotecommand.StreamOptions{
                        Stdout: &buf,
                        Tty: true,
                })
                done <- err
        }
        go wrap()
        fmt.Println("wait for out")
        select {
        case err := <-done:
                if err != nil {
                        fmt.Println("Command exit with error", err)
                }
                fmt.Println(string(buf.Bytes()))
        }
}
0x03 问题
笔者项目中遇到了在某些 kubernetes 集群版本下,当网关退出时,容器内的连接(网关到 kubernetes APIserver)、容器内 bash 残留的问题,如下描述:
1、NewSPDYExecutor 创建的 stream 泄漏问题:
关联相关 issue:
- How to cancel a SPDYExecutor stream? #554
 - How to cancel a RESTClient exec? Can add context to the request? #884
 
本项目中涉及到如下库:
go 1.17
require (
		// ......
        k8s.io/api v0.24.3
        k8s.io/apimachinery v0.24.3
        k8s.io/client-go v0.24.3
        k8s.io/kubectl v0.22.0
)
问题原因是通过 exec.Stream 构建的阻塞方法,开发者无法主动关闭这个 goroutine(网上也有相关 wrapped 改造,但是个人感觉不优雅),解决方案是使用新版本的 client-go 库,它提供了 StreamWithContext方法
func main(){
	// ...
	exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
	if err != nil {
		return err
	}
	err = exec.Stream(remotecommand.StreamOptions{
		Stdin:             stdin,
		Stdout:            stdout,
		Stderr:            stderr,
		Tty:               true,
		TerminalSizeQueue: terminalSizeQueue,
	})
	// ...
}
2、bash 残留:这个目前没发现解决方法,只能通过在容器内kill掉残留的bash进程
3、不当使用 TerminalSizeQueue 导致的内存泄漏问题(该机制用于异步监控窗口尺寸更新)
这里以 v0.30.2 版本分析下窗口动态更新的实现(v3),先看下用户实现的 Next 方法是在何处调用的,如下:
// streamProtocolV3 implements version 3 of the streaming protocol for attach
// and exec. This version adds support for resizing the container's terminal.
type streamProtocolV3 struct {
	*streamProtocolV2
	resizeStream io.Writer		//resizeStream 是一个 io.Writer
}
func (p *streamProtocolV3) handleResizes() {
	if p.resizeStream == nil || p.TerminalSizeQueue == nil {
		// 这两个都需要定义才能启用
		return
	}
	go func() {
		defer runtime.HandleCrash()
		encoder := json.NewEncoder(p.resizeStream)
		for {
			// 在循环中调用 p.TerminalSizeQueue.Next() 拿到一个 size(当前的窗口尺寸)
			size := p.TerminalSizeQueue.Next()
			if size == nil {
				// 注意:协程退出的条件,需要外界主动触发
				return
			}
			if err := encoder.Encode(&size); err != nil {
				runtime.HandleError(err)
			}
		}
	}()
}
// stream:streamProtocolV3创建execstream入口
func (p *streamProtocolV3) stream(conn streamCreator) error {
        if err := p.createStreams(conn); err != nil {
                return err
        }
        // now that all the streams have been created, proceed with reading & copying
        errorChan := watchErrorStream(p.errorStream, &errorDecoderV3{})
		//默认在创建stream时,会异步启动handleResizes方法
        p.handleResizes()
        p.copyStdin()
        var wg sync.WaitGroup
        p.copyStdout(&wg)
        p.copyStderr(&wg)
        // we're waiting for stdout/stderr to finish copying
        wg.Wait()
        // waits for errorStream to finish reading with an error or nil
        return <-errorChan
}
这段代码可知,在 handleResizes 中异步方式启动并监听由 Next 方法传递的窗口信息,并将新的 size 信息编码为 JSON 格式发送给 p.resizeStream,特别注意这里 goroutine 退出的条件是 size == nil,所以外部在调用 exec.StreamWithContext 时一定要注意在合适的时机关闭该 goroutine,不然会造成内存泄漏