0x00 前言
本文分析两个典型的 tcp-framework 实现:
xtcp 是一个轻量级的 tcp 框架,支持用户自定义如下属性:
- how to define the protocol format
- how to create server and client
- how to custom the logger
- how to handle event
- how to stop the server and client
getty:a netty like asynchronous network I/O library based on tcp/udp/websocket; a bidirectional RPC framework based on JSON/Protobuf; a microservice framework based on zookeeper/etcd
0x01 核心结构封装
Server
// Server used for running a tcp server.
type Server struct {
Opts *Options
stopped chan struct{}
wg sync.WaitGroup
mu sync.Mutex
once sync.Once
lis net.Listener
conns map[*Conn]bool
}
conns map[*Conn]bool
Conn:单个连接
// A Conn represents the server side of an tcp connection.
type Conn struct {
sync.Mutex
Opts *Options
RawConn net.Conn
UserData interface{}
sendBufList chan []byte
closed chan struct{}
state int32
wg sync.WaitGroup
once sync.Once
SendDropped uint32
sendBytes uint64
recvBytes uint64
dropped uint32
}
0x0 getty 库:介绍
getty一个类似 Netty 的异步网络 I/O 库,是一个很典型的tcpframe,Getty 基于分层设计,主要分为数据交互层、业务控制层、网络层,同时还提供非常易于扩展的监控接口,对外暴露的网络库使用接口。初步分析可以从其echo-server例子入手,先窥探下getty构建一个简单的服务端,(至少)需要用户实现哪些功能?
协议
echo协议,这个似乎没啥好说的,需要用户自行设置TLV的协议
type EchoPkgHeader struct {
Magic uint32
LogID uint32 // log id
Sequence uint32 // request/response sequence
ServiceID uint32 // service id
Command uint32 // operation command code
Code int32 // error code
Len uint16 // body length
_ uint16
_ int32 // reserved, maybe used as package md5 checksum
}
type EchoPackage struct {
H EchoPkgHeader //Unmarshal 和 Marshal方法
B string
}
echo-server
从initServer
切入,最核心的设置部分为server.RunEventLoop(newSession)
,其中最重要的两个方法:
session.SetPkgHandler(echoPkgHandler)
session.SetEventListener(echoMsgHandler)
func newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
)
if conf.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic("bad connection type")
}
tcpConn.SetNoDelay(conf.GettySessionParam.TcpNoDelay)
tcpConn.SetKeepAlive(conf.GettySessionParam.TcpKeepAlive)
if conf.GettySessionParam.TcpKeepAlive {
tcpConn.SetKeepAlivePeriod(conf.GettySessionParam.keepAlivePeriod)
}
tcpConn.SetReadBuffer(conf.GettySessionParam.TcpRBufSize)
tcpConn.SetWriteBuffer(conf.GettySessionParam.TcpWBufSize)
session.SetName(conf.GettySessionParam.SessionName)
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(echoPkgHandler) //设置pkg处理方法
session.SetEventListener(echoMsgHandler) //设置事件钩子
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
// session.SetTaskPool(taskPool)
return nil
}
func initServer() {
var (
addr string
portList []string
server getty.Server
)
portList = conf.Ports
if len(portList) == 0 {
panic("portList is nil")
}
for _, port := range portList {
addr = gxnet.HostAddress2(conf.Host, port)
serverOpts := []getty.ServerOption{getty.WithLocalAddress(addr)}
serverOpts = append(serverOpts, getty.WithServerTaskPool(taskPool))
server = getty.NewTCPServer(serverOpts...)
// run server
server.RunEventLoop(newSession) //核心入口
log.Debug("server bind addr ok")
serverList = append(serverList, server)
}
}
1、session.SetPkgHandler(echoPkgHandler)
echoPkgHandler
是EchoPackageHandler
,实现了Read
/Write
方法,从例子看,需要用户自行实现序列化Read
以及反序列化Write
这两个方法
func (h *EchoPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
var (
err error
len int
pkg EchoPackage //用户需要自定义的协议
buf *bytes.Buffer
)
buf = bytes.NewBuffer(data)
len, err = pkg.Unmarshal(buf)
if err != nil {
if err == ErrNotEnoughStream {
return nil, 0, nil
}
return nil, 0, err
}
return &pkg, len, nil
}
func (h *EchoPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
var (
ok bool
err error
startTime time.Time
echoPkg *EchoPackage
buf *bytes.Buffer
)
startTime = time.Now()
if echoPkg, ok = pkg.(*EchoPackage); !ok {
//bad package
return nil, errors.New("invalid echo package!")
}
buf, err = echoPkg.Marshal()
if err != nil {
return nil, err
}
log.Debug("WriteEchoPkgTimeMs = %s", time.Since(startTime).String())
return buf.Bytes(), nil
}
2、session.SetEventListener(echoMsgHandler)
感觉这个是比较核心的设置,主要是设置一次会话Session的
// EventListener is used to process pkg that received from remote session
type EventListener interface {
// OnOpen invoked when session opened
// If the return error is not nil, @Session will be closed.
OnOpen(Session) error
// OnClose invoked when session closed.
OnClose(Session)
// OnError invoked when got error.
OnError(Session, error)
// OnCron invoked periodically, its period can be set by (Session)SetCronPeriod
OnCron(Session)
// OnMessage invoked when getty received a package. Pls attention that do not handle long time
// logic processing in this func. You'd better set the package's maximum length.
// If the message's length is greater than it, u should should return err in
// Reader{Read} and getty will close this connection soon.
//
// If ur logic processing in this func will take a long time, u should start a goroutine
// pool(like working thread pool in cpp) to handle the processing asynchronously. Or u
// can do the logic processing in other asynchronous way.
// !!!In short, ur OnMessage callback func should return asap.
//
// If this is a udp event listener, the second parameter type is UDPContext.
OnMessage(Session, interface{})
}
现在看echo-server
的实现,首先是EchoMessageHandler
的定义:
type EchoMessageHandler struct {
handlers map[uint32]PackageHandler //定义自定义协议命令字的处理方法
rwlock sync.RWMutex
sessionMap map[getty.Session]*clientEchoSession
}
func newEchoMessageHandler() *EchoMessageHandler {
handlers := make(map[uint32]PackageHandler)
handlers[heartbeatCmd] = hbHandler //处理方法1
handlers[echoCmd] = msgHandler //处理方法2
return &EchoMessageHandler{sessionMap: make(map[getty.Session]*clientEchoSession), handlers: handlers}
}
再看各个方法的实现:
func (h *EchoMessageHandler) OnOpen(session getty.Session) error {
var err error
h.rwlock.RLock()
if conf.SessionNumber <= len(h.sessionMap) {
err = errTooManySessions
}
h.rwlock.RUnlock()
if err != nil {
return err
}
log.Info("got session:%s", session.Stat())
h.rwlock.Lock()
h.sessionMap[session] = &clientEchoSession{session: session}
h.rwlock.Unlock()
return nil
}
func (h *EchoMessageHandler) OnError(session getty.Session, err error) {
log.Info("session %s got error %v, will be closed.", session.Stat(), err)
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
func (h *EchoMessageHandler) OnClose(session getty.Session) {
log.Info("session %s is closing......", session.Stat())
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
func (h *EchoMessageHandler) OnMessage(session getty.Session, pkg interface{}) {
p, ok := pkg.(*EchoPackage)
if !ok {
//illegal package
return
}
handler, ok := h.handlers[p.H.Command]
if !ok {
log.Error("illegal command %d", p.H.Command)
return
}
err := handler.Handle(session, p)
if err != nil {
h.rwlock.Lock()
if _, ok := h.sessionMap[session]; ok {
h.sessionMap[session].reqNum++
}
h.rwlock.Unlock()
}
}
func (h *EchoMessageHandler) OnCron(session getty.Session) {
var (
flag bool
active time.Time
)
h.rwlock.RLock()
if _, ok := h.sessionMap[session]; ok {
active = session.GetActive()
if conf.sessionTimeout.Nanoseconds() < time.Since(active).Nanoseconds() {
flag = true
log.Warn("session %s timeout %s, reqNum %d",
session.Stat(), time.Since(active).String(), h.sessionMap[session].reqNum)
}
}
h.rwlock.RUnlock()
if flag {
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
session.Close()
}
}