HTTP(s) 代理中间件库:Oxy

分析基于 Golang 的 HTTP(s) 代理中间件库 Oxy:Part One

Posted by pandaychen on December 16, 2020

0x00 前言

Oxy:Go middlewares for HTTP servers & proxies 是一个 HTTP(s) 代理开发(中间件)库,另一个负载均衡网关项目 vulcand 也基于此库来实现反向代理(Reverse Proxy Engine)功能。它的 主要组件如下,框架还是围绕着原生 net/http 库实现的:

  • Buffer retries and buffers requests and responses
  • Stream passes-through requests, supports chunked encoding with configurable flush interval
  • Forward forwards requests to remote location and rewrites headers
  • Roundrobin is a round-robin load balancer
  • Circuit Breaker Hystrix-style circuit breaker
  • Connlimit Simultaneous connections limiter
  • Ratelimit Rate limiter (based on tokenbucket algo)
  • Trace Structured request and response logger

同类型的其他 package 还有:

无论是 gin Web 框架,还是 HTTP(s) 代理,都离不开对 http.Handler 的封装、对 HTTP 路由、Header、参数及其他 HTTP 关键参数的重写及改造,而标准库 net/http 只是提供了最基础的 CS 架构下的 Request-Response 模型:

Every handler is http.Handler, so writing and plugging in a middleware is easy

使用介绍

从官方提供的使用例子,先了解下 Oxy 的基础功能:

1、Simple reverse proxy,简单的反向代理,将 8080 的 HTTP 请求发送给 http://localhost:63450

import (
  "net/http"
  "github.com/vulcand/oxy/forward"
  "github.com/vulcand/oxy/testutils"
)

// Forwards incoming requests to whatever location URL points to, adds proper forwarding headers
fwd, _ := forward.New()

redirect := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
    // let us forward this request to another server
    req.URL = testutils.ParseURI("http://localhost:63450")
    // 将上层的 Request 和 ResponseWriter 传递给 fwd
    fwd.ServeHTTP(w, req)
})

func main(){
        // ...
        // that's it! our reverse proxy is ready!
        s := &http.Server{
                Addr:           ":8080",
                Handler:        redirect,
        }
        s.ListenAndServe()
}

2、给反向代理的多后端增加负载均衡策略

import (
        "github.com/vulcand/oxy/forward"
        "github.com/vulcand/oxy/roundrobin"
        "github.com/vulcand/oxy/testutils"
        "net/http"
)

func main() {
        // Forwards incoming requests to whatever location URL points to, adds proper forwarding headers
        fwd, _ := forward.New()
        lb, _ := roundrobin.New(fwd)

        url1 := testutils.ParseURI("http://localhost:63450")
        url2 := testutils.ParseURI("http://localhost:63451")

        lb.UpsertServer(url1)
        lb.UpsertServer(url2)

        s := &http.Server{
                Addr:    ":8080",
                Handler: lb,
        }
        s.ListenAndServe()
}

3、额外处理重试及错误

import (
  "net/http"
  "github.com/vulcand/oxy/forward"
  "github.com/vulcand/oxy/buffer"
  "github.com/vulcand/oxy/roundrobin"
)

// Forwards incoming requests to whatever location URL points to, adds proper forwarding headers

fwd, _ := forward.New()
lb, _ := roundrobin.New(fwd)

// buffer will read the request body and will replay the request again in case if forward returned status
// corresponding to nework error (e.g. Gateway Timeout)
buffer, _ := buffer.New(lb, buffer.Retry(`IsNetworkError() && Attempts() < 2`))

lb.UpsertServer(url1)
lb.UpsertServer(url2)

// that's it! our reverse proxy is ready!
s := &http.Server{
	Addr:           ":8080",
	Handler:        buffer,
}
s.ListenAndServe()

0x01 Forward 子组件:反向代理模块分析

本文代码基于v1.4.1,Forward 转发组件是HTTP转发功能的基础组件,其他模块如lb、buffer等都基于此基础进行构建。用 Forward 组件实现反向代理的一种调用方式如下:

FwdObj *forward.Forwarder

func API_handler(w http.ResponseWriter, req *http.Request) {
        req.URL = testutils.ParseURI("xxxx.backend.com")
        // 创建forward.RoundTripper及设置属性
        r := forward.RoundTripper(
                &http.Transport{
                        TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
                },
        )
        //使用forward包设置HTTP转发器
        FwdObj, _ = forward.New(forward.PassHostHeader(true), forward.Logger(x_logger), r)
        //执行反向代理的功能
        FwdObj.ServeHTTP(w, req)
}

Forward 组件封装了 HTTP 和 websocket 两种转发的实现,核心结构是httpForwarder,对http请求进行处理后,最终还是调用httputil.ReverseProxy进行反向代理的功能:

revproxy := httputil.ReverseProxy{
        Director: func(req *http.Request) {
                f.modifyRequest(req, inReq.URL)
        },
        Transport:      f.roundTripper,
        FlushInterval:  f.flushInterval,
        ModifyResponse: f.modifyResponse,
        BufferPool:     f.bufferPool,
        ErrorHandler:   ctx.errHandler.ServeHTTP,
}

核心结构

oxy在最终调用http.ReverseProxy构建转发器之前做了大量封装,主要体现在Forwarder结构中

// Forwarder wraps two traffic forwarding implementations: HTTP and websockets.
// It decides based on the specified request which implementation to use.
type Forwarder struct {
	*httpForwarder
	*handlerContext
	stateListener URLForwardingStateListener          //
	stream        bool
}
// httpForwarder is a handler that can reverse proxy
// HTTP traffic
type httpForwarder struct {
	roundTripper   http.RoundTripper        //用于转发请求的RoundTripper(必需包含),默认http.DefaultTransport
	rewriter       ReqRewriter              //用于改写转发请求
	passHost       bool                     //
	flushInterval  time.Duration
	modifyResponse func(*http.Response) error       //用于改写响应请求的回调

	tlsClientConfig *tls.Config

	log OxyLogger

	bufferPool                    httputil.BufferPool
	websocketConnectionClosedHook func(req *http.Request, conn net.Conn)
}

// handlerContext defines a handler context for error reporting and logging.
type handlerContext struct {
	errHandler utils.ErrorHandler
}

// UrlForwardingStateListener alias on URLForwardingStateListener.
// Deprecated: use URLForwardingStateListener instead.
type UrlForwardingStateListener = URLForwardingStateListener

// URLForwardingStateListener URL forwarding state listener.
type URLForwardingStateListener func(*url.URL, int)

ServeHTTP实现

Forwarder的ServeHTTP方法实现如下:

// ServeHTTP decides which forwarder to use based on the specified
// request and delegates to the proper implementation.
func (f *Forwarder) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	if f.log.GetLevel() >= log.DebugLevel {
		logEntry := f.log.WithField("Request", utils.DumpHTTPRequest(req))
		logEntry.Debug("vulcand/oxy/forward: begin ServeHttp on request")
		defer logEntry.Debug("vulcand/oxy/forward: completed ServeHttp on request")
	}

	if f.stateListener != nil {
		f.stateListener(req.URL, StateConnected)
		defer f.stateListener(req.URL, StateDisconnected)
	}
        //判断是否支持websocket
	if IsWebsocketRequest(req) {
		f.httpForwarder.serveWebSocket(w, req, f.handlerContext)
	} else {
		f.httpForwarder.serveHTTP(w, req, f.handlerContext)
	}
}

IsWebsocketRequest根据http Header中的关键字判断当前是不是websocket请求:

// IsWebsocketRequest determines if the specified HTTP request is a websocket handshake request.
func IsWebsocketRequest(req *http.Request) bool {
	containsHeader := func(name, value string) bool {
		items := strings.Split(req.Header.Get(name), ",")
		for _, item := range items {
			if value == strings.ToLower(strings.TrimSpace(item)) {
				return true
			}
		}
		return false
	}
	return containsHeader(Connection, "upgrade") && containsHeader(Upgrade, "websocket")
}

httpForwarder.serveHTTP方法:

// serveHTTP forwards HTTP traffic using the configured transport.
func (f *httpForwarder) serveHTTP(w http.ResponseWriter, inReq *http.Request, ctx *handlerContext) {
	if f.log.GetLevel() >= log.DebugLevel {
		logEntry := f.log.WithField("Request", utils.DumpHTTPRequest(inReq))
		logEntry.Debug("vulcand/oxy/forward/http: begin ServeHttp on request")
		defer logEntry.Debug("vulcand/oxy/forward/http: completed ServeHttp on request")
	}

	start := clock.Now().UTC()

        //先复制一份http.Request
	outReq := new(http.Request)
	*outReq = *inReq // includes shallow copies of maps, but we handle this in Director

	revproxy := httputil.ReverseProxy{
		Director: func(req *http.Request) {
			f.modifyRequest(req, inReq.URL)
		},
		Transport:      f.roundTripper,
		FlushInterval:  f.flushInterval,
		ModifyResponse: f.modifyResponse,
		BufferPool:     f.bufferPool,
		ErrorHandler:   ctx.errHandler.ServeHTTP,
	}

	if f.log.GetLevel() >= log.DebugLevel {
		pw := utils.NewProxyWriter(w)
		revproxy.ServeHTTP(pw, outReq)

		if inReq.TLS != nil {
			f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v tls:version: %x, tls:resume:%t, tls:csuite:%x, tls:server:%v",
				inReq.URL, pw.StatusCode(), pw.GetLength(), clock.Now().UTC().Sub(start),
				inReq.TLS.Version,
				inReq.TLS.DidResume,
				inReq.TLS.CipherSuite,
				inReq.TLS.ServerName)
		} else {
			f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v",
				inReq.URL, pw.StatusCode(), pw.GetLength(), clock.Now().UTC().Sub(start))
		}
	} else {
		revproxy.ServeHTTP(w, outReq)
	}

	for key := range w.Header() {
		if strings.HasPrefix(key, http.TrailerPrefix) {
			if fl, ok := w.(http.Flusher); ok {
				fl.Flush()
			}
			break
		}
	}
}

modifyRequest方法

httputil.ReverseProxy中的Director,采用modifyRequest方法:

// Modify the request to handle the target URL.
func (f *httpForwarder) modifyRequest(outReq *http.Request, target *url.URL) {
	outReq.URL = utils.CopyURL(outReq.URL)
	outReq.URL.Scheme = target.Scheme
	outReq.URL.Host = target.Host

	u := f.getURLFromRequest(outReq)

	outReq.URL.Path = u.Path
	outReq.URL.RawPath = u.RawPath
	outReq.URL.RawQuery = u.RawQuery
	outReq.RequestURI = "" // Outgoing request should not have RequestURI

	outReq.Proto = "HTTP/1.1"
	outReq.ProtoMajor = 1
	outReq.ProtoMinor = 1

	if f.rewriter != nil {
		f.rewriter.Rewrite(outReq)
	}

	// Do not pass client Host header unless optsetter PassHostHeader is set.
	if !f.passHost {
		outReq.Host = target.Host
	}
}

0x02 参考