0x00 开篇
网上基于 websocket 打通 kubernetes pod 的实现非常多,但是受限于 webconsole 的不便利性,这边文章来分析下如何使用 SSH 方式打通 kubernetes 的登录
Kubectl exec 的原理简介
通过 kubectl exec
进行容器的数据流如下:
0x01 实现思路
1、remotecommand 暴露的 Executor
方法
实现的核心是打通 IO(即 stdin
与 stdout
)之间的桥接,如下图:
kubernetes 的 client-go 包提供了集群中的容器建立长连接的方法,并设置容器的 stdin
,stdout
等。此 package 提供了基于 SPDY 协议的 Executor
,其本质是一个 interface
,用于和 pod 终端的流的传输及通信。Executor
接口定义了 Stream
方法,该方法会建立一个流传输的连接,直到服务端和调用端一端关闭连接,才会停止传输(原 package 中给出的实现是基于 HTTP-SPDY 的,不适用于笔者的场景,所有需要重新实现 Stream
方法)
此外,考虑到 StreamOptions
是 Stream
方法的参数,在实现 Stream
方法时需要填充该结构。
// Executor is an interface for transporting shell-style streams.
type Executor interface {
// Stream initiates the transport of the standard shell streams. It will transport any
// non-nil stream to a remote system, and return an error if a problem occurs. If tty
// is set, the stderr stream is not used (raw TTY manages stdout and stderr over the
// stdout stream).
Stream(options StreamOptions) error
}
func (e *streamExecutor) Stream(options StreamOptions) error{}
// StreamOptions holds information pertaining to the current streaming session:
// input/output streams, if the client is requesting a TTY, and a terminal size queue to
// support terminal resizing.
type StreamOptions struct {
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
Tty bool
TerminalSizeQueue TerminalSizeQueue
}
关于 SteamOption
中各个成员的作用,可以参见 V1 的实现;简言之,就是外部对接容器内部暴露的输入 / 输出。
TerminalSizeQueue 结构
TerminalSizeQueue is capable of returning terminal resize events as they occur:TerminalSizeQueue
结构的作用是向容器进行时传递客户端侧的窗口变化的通知事件(事件包含当前的终端的 Weight
与 Height
值):即当客户终端 TTY 窗口的 size
发生了改变时,需要通知容器进行时调整显示的 size
,否则会出现内外窗口显示不一致的问题
type TerminalSizeQueue interface {
// Next returns the new terminal size after the terminal has been resized. It returns nil when
// monitoring has been stopped.
Next() *TerminalSize
}
TerminalSize 结构如下:
// TerminalSize and TerminalSizeQueue was a part of k8s.io/kubernetes/pkg/util/term
// and were moved in order to decouple client from other term dependencies
// TerminalSize represents the width and height of a terminal.
type TerminalSize struct {
Width uint16
Height uint16
}
2、定义核心结构体,构造 StreamOptions
所需的成员
// TtyHandler
type PyStreamOption interface {
io.Reader //stdin
io.Writer //stdout && stderr
//io.Writer
remotecommand.TerminalSizeQueue
}
定义如上结构 PyStreamOption
,然后需要实例化 PyStreamOption
;注意实例化之后,必须实现该 interface
的 3
个方法:
Read(p []byte) (int, error)
:从前置 SSH 会话(ssh.Channel)中读取数据,转发到用户侧Write(p []byte) (int, error)
:从前置 SSH 会话获取用户输入,写入 podNext() *TerminalSize
:即时调整终端的 size
设置的目的是:
- 对接 terminal 的输入输出
- 实时监听窗口大小的改变并调整
这样,后续调用 Stream
方法时,只要将 StreamOptions
的 Stdin
/Stdout
都设置为 PyStreamOption
,Executor
会通过你定义的 Write
和 Read
方法来传输数据。
3、建立容器(连接)的方法
注意下面的 CreateK8SStream
方法,最后会使用 remotecommand
的 exec.Stream
方法来实现打通容器登录的过程(stdin
/stdout
/stderr
由参数传入),即上一步定义的 TtyHandler
结构的相关成员:
func CreateK8SStream(option PyStreamOption) {
//...
//NewSPDYExecutor
req := cli.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec")
option := &corev1.PodExecOptions{
Command: cmd,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: true,
}
req.VersionedParams(
option,
scheme.ParameterCodec,
)
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return err
}
//Stream
err = exec.Stream(remotecommand.StreamOptions{
Stdin: stdin, // 来自 option
Stdout: stdout, // 来自 option
Stderr: stderr, // 来自 option
Tty: true, // 来自 option,设置 TTY
TerminalSizeQueue: terminalSizeQueue, // 来自 option
})
//...
}
接下来的思路就是如何生成一个 Executor
,来打通 SSH 到 SPDY(主要还是打通上图的 STDIN/STDOUT)
0x02 文件 COPY 的实现
如前文描述,方法 exec.Stream
执行时会向 APServer 发起一个请求,APServer 会通过代理机制将请求转发给相应节点上的 kubelet 服务,kubelet 会通过 CRI 接口调用 runtime 的接口,最终调用流式接口中的 Exec()
进入到 container 执行。通常针对一般化的命令调用,输入和输出以文本为主,并不会占用带宽和 APServer 的资源,但是当涉及到文件传输的时候则会占用较多带宽,因此这种方式不太适合大容量文件的 copy,如何优化?
- 增加流控
- 在文件 copy 场景下,直接向 kubelet 发起
exec
请求(绕过 APServer),但是需要解决访问 kubelet 地址、认证等相关问题
以文件 copy 场景为例,使用的命令是 kubectl cp
,前提是 Pod 中对应的 container 中安装了 tar
命令,本质上该指令还是调用了 exec
接口,然后执行 tar
命令进行文件的拷贝动作
#宿主机 –> Pod
#将宿主机 /tmp/test_pod.txt copy 到 mycentos-7b59b5b755-8rbgc 对应的第一个容器中的 /root 目录下
kubectl cp /tmp/test_pod.txt default/mycentos-7b59b5b755-8rbgc:/root
#Pod –> 宿主机
#将 mycentos-7b59b5b755-8rbgc 中 /root/from_pod.txt 文件拷贝到宿主机 /tmp 目录下,并命名为 from_pod.new
kubectl cp default/mycentos-7b59b5b755-8rbgc:/root/from_pod.txt /tmp/from_pod.new
#将 mycentos-7b59b5b755-8rbgc 中 /root/pod_dir 目录下的文件拷贝到宿主机 /tmp 目录下
kubectl cp default/mycentos-7b59b5b755-8rbgc:/root/pod_dir /tmp
为什么要使用 tar
呢?tar
支持压缩从标准输入读取文件名列表
find . -type f -name "*.jpg" -print | xargs tar -czvf images.tar.gz
find $HOME -name "*.doc" -print0 | tar -cvf /tmp/file.tar --null -T -
find $HOME -type f -name "*.sh" | xargs tar cfvz /nfs/x230/my-shell-scripts.tgz
# 从容器向外拷贝,通过 tar 命令压缩将输出重定向到标准输出
tar cf - ${srcFile}
# 从外面向容器内拷贝,将标准输入数据通过 tar 解压写入到 container 内的文件目录
tar --no-same-permissions --no-same-owner -xmf -
# OR
tar -xmf -
通过 client-go
向 pod 所在节点实现查询 kubelet 地址,然后向 kubelet 直接发起 exec
请求,代码如下:
func main() {
// 此处直接使用了 / root/.kube/config 文件(有足够的权限)
// 如果使用 service account token 的话,还需要额外创建 role 和 rolebinding
// 当直接访问 kubelet 接口的时候,kubelet 支持使用证书和 token 进行认证,
// 使用 rbac 对请求进行鉴权操作
var kubeConfig *string
if home := homedir.HomeDir(); home != "" {
kubeConfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeConfig = flag.String("kubeconfig", "","absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeConfig)
if err != nil {
panic(err)
}
// 构建请求,直接请求 kubelet 需要的各种参数
params := url.Values{}
params.Add("tty", "1")
params.Add("input", "0")
params.Add("output", "1")
params.Add("error", "0")
params.Add("command", "ls")
params.Add("command", "/")
url := &url.URL{
Scheme: "https",
Host: "10.0.0.1:10250",
Path: "/exec/default/busybox/busybox1",
RawQuery: params.Encode(),
}
// 此处配置证书相关配置,由于是测试,这块直接忽略了,在实际生产使用时可从文件读取
// 此处配置的 CAFile 在更新后,client-go 能够定时自动刷新
config.TLSClientConfig.CAFile = ""
config.TLSClientConfig.CAData = nil
config.TLSClientConfig.Insecure = true
executor, err := remotecommand.NewSPDYExecutor(config, "POST", url)
if err != nil {
panic(err)
}
done := make(chan error)
var buf bytes.Buffer
wrap := func() {
err := executor.Stream(remotecommand.StreamOptions{
Stdout: &buf,
Tty: true,
})
done <- err
}
go wrap()
fmt.Println("wait for out")
select {
case err := <-done:
if err != nil {
fmt.Println("Command exit with error", err)
}
fmt.Println(string(buf.Bytes()))
}
}
0x03 问题
笔者项目中遇到了在某些 kubernetes 集群版本下,当网关退出时,容器内的连接(网关到 kubernetes APIserver)、容器内 bash
残留的问题,如下描述:
1、NewSPDYExecutor
创建的 stream 泄漏问题:
关联相关 issue:
- How to cancel a SPDYExecutor stream? #554
- How to cancel a RESTClient exec? Can add context to the request? #884
本项目中涉及到如下库:
go 1.17
require (
// ......
k8s.io/api v0.24.3
k8s.io/apimachinery v0.24.3
k8s.io/client-go v0.24.3
k8s.io/kubectl v0.22.0
)
问题原因是通过 exec.Stream
构建的阻塞方法,开发者无法主动关闭这个 goroutine(网上也有相关 wrapped 改造,但是个人感觉不优雅),解决方案是使用新版本的 client-go
库,它提供了 StreamWithContext
方法
func main(){
// ...
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return err
}
err = exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: true,
TerminalSizeQueue: terminalSizeQueue,
})
// ...
}
2、bash
残留:这个目前没发现解决方法,只能通过在容器内kill
掉残留的bash
进程
3、不当使用 TerminalSizeQueue
导致的内存泄漏问题(该机制用于异步监控窗口尺寸更新)
这里以 v0.30.2 版本分析下窗口动态更新的实现(v3
),先看下用户实现的 Next
方法是在何处调用的,如下:
// streamProtocolV3 implements version 3 of the streaming protocol for attach
// and exec. This version adds support for resizing the container's terminal.
type streamProtocolV3 struct {
*streamProtocolV2
resizeStream io.Writer //resizeStream 是一个 io.Writer
}
func (p *streamProtocolV3) handleResizes() {
if p.resizeStream == nil || p.TerminalSizeQueue == nil {
// 这两个都需要定义才能启用
return
}
go func() {
defer runtime.HandleCrash()
encoder := json.NewEncoder(p.resizeStream)
for {
// 在循环中调用 p.TerminalSizeQueue.Next() 拿到一个 size(当前的窗口尺寸)
size := p.TerminalSizeQueue.Next()
if size == nil {
// 注意:协程退出的条件,需要外界主动触发
return
}
if err := encoder.Encode(&size); err != nil {
runtime.HandleError(err)
}
}
}()
}
// stream:streamProtocolV3创建execstream入口
func (p *streamProtocolV3) stream(conn streamCreator) error {
if err := p.createStreams(conn); err != nil {
return err
}
// now that all the streams have been created, proceed with reading & copying
errorChan := watchErrorStream(p.errorStream, &errorDecoderV3{})
//默认在创建stream时,会异步启动handleResizes方法
p.handleResizes()
p.copyStdin()
var wg sync.WaitGroup
p.copyStdout(&wg)
p.copyStderr(&wg)
// we're waiting for stdout/stderr to finish copying
wg.Wait()
// waits for errorStream to finish reading with an error or nil
return <-errorChan
}
这段代码可知,在 handleResizes
中异步方式启动并监听由 Next
方法传递的窗口信息,并将新的 size 信息编码为 JSON 格式发送给 p.resizeStream
,特别注意这里 goroutine 退出的条件是 size == nil
,所以外部在调用 exec.StreamWithContext
时一定要注意在合适的时机关闭该 goroutine,不然会造成内存泄漏