0x00 前言
本文分析下 gobetween 的 metrics 采集方式。
0x01 ReadWriteCount 采集
以 ReadWriteCount
采集为例,在 proxy.go
中,封装了类型 io.Copy
方法,将入流量 readN
、出流量 writeN
通过 channel 向上层发送(需要考虑下性能问题?)
func Copy(to io.Writer, from io.Reader, ch chan<- core.ReadWriteCount) error {
buf := make([]byte, BUFFER_SIZE)
var err error = nil
for {
readN, readErr := from.Read(buf)
if readN > 0 {
writeN, writeErr := to.Write(buf[0:readN])
if writeN > 0 {
// 入流量,出流量
ch <- core.ReadWriteCount{CountRead: uint(readN), CountWrite: uint(writeN)}
}
if writeErr != nil {
err = writeErr
break
}
if readN != writeN {
err = io.ErrShortWrite
break
}
}
if readErr == io.EOF {
break
}
if readErr != nil {
err = readErr
break
}
}
return err
}
上述的 channel 接收方在 proxy
方法 中,该方法返回outStats := make(chan core.ReadWriteCount)
给调用方,用于接受出入流量的 metrics 数据:
func proxy(to net.Conn, from net.Conn, timeout time.Duration) <-chan core.ReadWriteCount {
log := logging.For("proxy")
stats := make(chan core.ReadWriteCount)
outStats := make(chan core.ReadWriteCount)
rwcBuffer := core.ReadWriteCount{}
ticker := time.NewTicker(PROXY_STATS_PUSH_INTERVAL)
flushed := false
// Stats collecting goroutine
go func() {
if timeout > 0 {
from.SetReadDeadline(time.Now().Add(timeout))
}
for {
select {
case <-ticker.C:
// 每隔一段时间,上报一次 rwcBuffer 的数据,并且重置 flushed 的状态
if !rwcBuffer.IsZero() {
outStats <- rwcBuffer
}
flushed = true
// 接收方
case rwc, ok := <-stats:
if !ok {
//channel 被关闭
ticker.Stop()
if !flushed && !rwcBuffer.IsZero() {
outStats <- rwcBuffer
}
close(outStats)
return
}
if timeout > 0 && rwc.CountRead > 0 {
// 保活计时器更新
from.SetReadDeadline(time.Now().Add(timeout))
}
// Remove non blocking
// 视 flushed 进行累加或者初始化(实现按照时间间隔上报读数)
if flushed {
rwcBuffer = rwc
} else {
rwcBuffer.CountWrite += rwc.CountWrite
rwcBuffer.CountRead += rwc.CountRead
}
flushed = false
}
}
}()
// Run proxy copier
go func() {
err := Copy(to, from, stats)
// hack to determine normal close. TODO: fix when it will be exposed in golang
e, ok := err.(*net.OpError)
if err != nil && (!ok || e.Err.Error() != "use of closed network connection") {
log.Warn(err)
}
to.Close()
from.Close()
// Stop stats collecting goroutine
close(stats)
}()
return outStats
}
调用proxy
方法位于server.go
代理实现的核心逻辑中:
func (this *Server) handle(ctx *core.TcpContext) {
//......
/* ----- Stat proxying ----- */
log.Debug("Begin ", clientConn.RemoteAddr(), " -> ", this.listener.Addr(), " -> ", backendConn.RemoteAddr())
cs := proxy(clientConn, backendConn, utils.ParseDurationOrDefault(*this.cfg.BackendIdleTimeout, 0))
bs := proxy(backendConn, clientConn, utils.ParseDurationOrDefault(*this.cfg.ClientIdleTimeout, 0))
isTx, isRx := true, true
for isTx || isRx {
select {
case s, ok := <-cs:
isRx = ok
if !ok {
cs = nil
continue
}
this.scheduler.IncrementRx(*backend, s.CountWrite)
case s, ok := <-bs:
isTx = ok
if !ok {
bs = nil
continue
}
this.scheduler.IncrementTx(*backend, s.CountWrite)
}
}
log.Debug("End ", clientConn.RemoteAddr(), " -> ", this.listener.Addr(), " -> ", backendConn.RemoteAddr())
}
0x0 参考
FEATURED TAGS
Latex
gRPC
负载均衡
OpenSSH
Authentication
Consul
Etcd
Kubernetes
性能优化
Python
分布式锁
WebConsole
后台开发
Golang
OpenSource
Nginx
Vault
网络安全
Perl
分布式理论
Raft
正则表达式
Redis
分布式
限流
go-redis
微服务
反向代理
ReverseProxy
Cache
缓存
连接池
OpenTracing
GOMAXPROCS
GoMicro
微服务框架
日志
zap
Pool
Kratos
Hystrix
熔断
并发
Pipeline
证书
Prometheus
Metrics
PromQL
Breaker
定时器
Timer
Timeout
Kafka
Xorm
MySQL
Fasthttp
bytebufferpool
任务队列
队列
异步队列
GOIM
Pprof
errgroup
consistent-hash
Zinx
网络框架
设计模式
HTTP
Gateway
Queue
Docker
网关
Statefulset
NFS
Machinery
Teleport
Zero Trust
Oxy
存储
Confd
热更新
OAuth
SAML
OpenID
Openssl
AES
微服务网关
IM
KMS
安全
数据结构
hashtable
Sort
Asynq
基数树
Radix
Crontab
热重启
系统编程
sarama
Go-Zero
RDP
VNC
协程池
UDP
hashmap
网络编程
自适应技术
环形队列
Ring Buffer
Circular Buffer
InnoDB
timewheel
GroupCache
Jaeger
GOSSIP
CAP
Bash
websocket
事务
GC
TLS
singleflight
闭包
Helm
network
iptables
MITM
HTTPS
Tap
Tun
路由
wireguard
gvisor
Git
NAT
协议栈
Envoy
FRP
DPI
gopacket
Cgroup
Namespace
DNS
eBPF
GoZero
Gost
Clash
gopsutil
HIDS
ELKEID
XDP
TC
Linux
Systemd
netlink