Kratos 源码分析:Tracing (一)

分析 Kratos 的 opentracing 实现:概念与数据结构抽象

Posted by pandaychen on October 10, 2020

0x00 前言

前一篇文章 微服务基础之 链路追踪(OpenTracing) 大致介绍了 Opentracing 的概念,这篇文章分析下 Kratos 库提供的 OpenTracing 实现,源码目录。Kratos 内置的组件大都接入了 Tracing,其特性如下:

  • Kratos 内部的 trace 基于 opentracing 语义
  • 使用 protobuf 协议描述 trace 结构
  • 集成了全链路支持(gRPC/HTTP/MySQL/Redis/Memcached 等)

0x01 Opentracing 回顾

简单回顾下 OpenTracing 的数据模型: 一条 Trace(调用链)可认为是一个由多个 Span 组成的有向无环图(DAG 图),Span 与 Span 的关系被命名为 References,常用基于时间轴的时序图来展现 Trace 的调用路径。

一般 Span 代表一次独立的调用过程,Span 包含以下的状态:

  • An operation name,操作名称
  • A start timestamp,起始时间
  • A finish timestamp,结束时间
  • Span Tag,一组键值对构成的 Span 标签集合。键值对中,键必须为 string,值可以是字符串,布尔,或者数字类型
  • Span Log,一组 span 的日志集合。每次 log 操作包含一个键值对,以及一个时间戳

此外,还有很重要的概念:SpanContext,常用来表示跨进程传递 Span 的场景(比如:RPC/HTTP 调用的客户端到服务端),通常在 HTTP-Header 中以 Key-Value 形式传递

0x01 Tracing 的通用封装及方法

sample

sample 给出了一个采样率的实现:

// sampler decides whether a new trace should be sampled or not.
type sampler interface {
	IsSampled(traceID uint64, operationName string) (bool, float32)
	Close() error
}

type probabilitySampling struct {
	probability float32		// 初始化传入,采样率
	slot        [slotLength]int64
}

核心函数为 sample.IsSampled()

func (p *probabilitySampling) IsSampled(traceID uint64, operationName string) (bool, float32) {
	for _, ignored := range ignoreds {
		if operationName == ignored {
			return false, 0
		}
	}
	now := time.Now().Unix()
	idx := oneAtTimeHash(operationName) % slotLength
	old := atomic.LoadInt64(&p.slot[idx])
	if old != now {
		atomic.SwapInt64(&p.slot[idx], now)
		return true, 1
	}
	return rand.Float32() < float32(p.probability), float32(p.probability)
}

Tracing 的格式

Kratos 中提供了两种 Trace 数据格式化 的类型,分别为 HTTPFormatGRPCFormat 两种。被设计为两个接口:Carrierpropagator,前者作为读写的封装,后者为跨进程 Span 传递的封装

// Carrier propagator must convert generic interface{} to something this
// implement Carrier interface, Trace can use Carrier to represents itself.
type Carrier interface {
	Set(key, val string)
	Get(key string) string
}

// propagator is responsible for injecting and extracting `Trace` instances
// from a format-specific "carrier"
type propagator interface {
	Inject(carrier interface{}) (Carrier, error)
	Extract(carrier interface{}) (Carrier, error)
}

举例来说,HTTP 协议的 Carrierpropagator 实现如下,从代码看极易理解:

  1. httpCarrier 主要通过 http 包进行 Header 的读写操作
  2. httpPropagator 在执行 HTTP 调用时,将 HTTP 的 Header 抽象为 httpCarrier,而后就可以调用 httpCarrierSet 或者 Get 方法了
func (h httpCarrier) Set(key, val string) {
	http.Header(h).Set(key, val)
}

func (h httpCarrier) Get(key string) string {
	return http.Header(h).Get(key)
}

func (httpPropagator) Inject(carrier interface{}) (Carrier, error) {
	header, ok := carrier.(http.Header)
	if !ok {
		return nil, ErrInvalidCarrier
	}
	if header == nil {
		return nil, ErrInvalidTrace
	}
	return httpCarrier(header), nil
}

func (httpPropagator) Extract(carrier interface{}) (Carrier, error) {
	header, ok := carrier.(http.Header)
	if !ok {
		return nil, ErrInvalidCarrier
	}
	if header == nil {
		return nil, ErrTraceNotFound
	}
	return httpCarrier(header), nil
}

而 gRPC 的 Carrierpropagator 是通过 grpc.metadata 实现的,大同小异,这里不再详述:

func (grpcPropagator) Inject(carrier interface{}) (Carrier, error) {
	md, ok := carrier.(metadata.MD)
	if !ok {
		return nil, ErrInvalidCarrier
	}
	if md == nil {
		return nil, ErrInvalidTrace
	}
	return grpcCarrier(md), nil
}

func (grpcPropagator) Extract(carrier interface{}) (Carrier, error) {
	md, ok := carrier.(metadata.MD)
	if !ok {
		return nil, ErrInvalidCarrier
	}
	if md == nil {
		return nil, ErrTraceNotFound
	}
	return grpcCarrier(md), nil
}

0x03 Trace 实现总览 && 数据结构

Kratos Tracing 的实现 目录在此,主要完成了如下工作:

  1. 实现 Trace 的接口规范
  2. 提供 Trace 对 Tracer 接口的实现,供业务及 Kratos 其他模块接入使用
  3. 本身不提供整套 Trace 数据存储,通过 Repoter 接口第三方 Tracing 系统(ZipkinJaeger 等等),前提是实现各自的上报协议

整体流程如下所示: kratos-tracing-reporter

几个要点 && 问题:

  1. Trace 生成的数据何时存储?
  2. 提供了 DAPPER 作为 Tracer 的实例化
  3. 如何提升采样的性能压力
  4. 如何 Mock?

代码组织

概念

  • Tag:标签,本质是 KV 字符串
  • Span:(局部)调用过程
  • Tree
  • Annotation

Tracing-basic

从上图可以看出,一个跟踪树结构是由多个 Span 构成的,每个 Span 代表了一次方法的运行周期

kratos-tracing-struct

Tag 定义

TagLogField 结构,表示最基础的 kv 结构,实现 在此,包含了系统使用到的 Tag 的定义及结构体:

// Tag interface
type Tag struct {
	Key   string
	Value interface{}
}
// LogField LogField
type LogField struct {
	Key   string
	Value string
}

//tag 字符串定义
const (
	// The software package, framework, library, or module that generated the associated Span.
	// E.g., "grpc", "django", "JDBI".
	// type string
	TagComponent = "component"

	// Database instance name.
	// E.g., In java, if the jdbc.url="jdbc:mysql://127.0.0.1:3306/customers", the instance name is "customers".
	// type string
	TagDBInstance = "db.instance"

	// A database statement for the given database type.
	// E.g., for db.type="sql", "SELECT * FROM wuser_table"; for db.type="redis", "SET mykey'WuValue'".
	TagDBStatement = "db.statement"

	// Database type. For any SQL database, "sql". For others, the lower-case database category,
	// e.g. "cassandra", "hbase", or "redis".
	// type string
	TagDBType = "db.type"

	// Username for accessing database. E.g., "readonly_user" or "reporting_user"
	// type string
  TagDBUser = "db.user"
}

此外,还提供了不同类型的创建的 Tag 结构的方法,如最常用的 string 类型对应的 TagString

// TagString new string tag.
func TagString(key string, val string) Tag {
	return Tag{Key: key, Value: val}
}

SpanContext 结构

在介绍 Span 之前,先引入 SpanContext概念,SpanContext 保存了分布式追踪的上下文信息,包括 Trace id,Span id 以及其它需要传递到下游服务的内容。 一个 OpenTracing 的实现需要将 SpanContext 通过某种序列化协议 (Wire Protocol) 在进程边界上进行传递,以将不同进程中的 Span 关联到同一个 Trace 上。 对于 HTTP 请求来说,SpanContext 一般是采用 HTTP header 进行传递的。

// 此外,一般和 spanContext 关联的 Kratos 的 spanContext

这里 spanContext 也可以直观理解为上面 Tracing Tree 的一个 Tree-Node:SpanContext implements opentracing.SpanContext

// SpanContext implements opentracing.SpanContext
type spanContext struct {
	// TraceID represents globally unique ID of the trace.
	// Usually generated as a random number.
	TraceID uint64

	// SpanID represents span ID that must be unique within its trace,
	// but does not have to be globally unique.
	SpanID uint64

	// ParentID refers to the ID of the parent span.
	// Should be 0 if the current span is a root span.
	ParentID uint64		// 属于哪个父 SPAN

	// Flags is a bitmap containing such bits as 'sampled' and 'debug'.
	Flags byte

	// Probability
	Probability float32

	// Level current level
	Level int
}

SpanContext 的字符串格式如下(方便日志记录):

func (c spanContext) String() string {
	base := make([]string, 4)
	base[0] = strconv.FormatUint(uint64(c.TraceID), 16)
	base[1] = strconv.FormatUint(uint64(c.SpanID), 16)
	base[2] = strconv.FormatUint(uint64(c.ParentID), 16)
	base[3] = strconv.FormatUint(uint64(c.Flags), 16)
	return strings.Join(base, ":")
}

Span 结构

这里再回顾下 Span:一个具有名称和时间长度的操作,例如一个 REST 调用或者数据库操作等。Span 是分布式追踪的最小跟踪单位,一个 Trace 由多段 Span 组成。追踪信息包含时间戳、 事件、 方法名(Family+Title) 、 注释(TAG/Comment)

// 客户端和服务器上的时间戳来自不同的主机, 我们必须考虑到时间偏差,RPC 客户端发送一个请求之后, 服务器端才能接收到, 对于响应也是一样的(服务器先响应, 然后客户端才能接收到这个响应) 。这样一来,服务器端的 RPC 就有一个时间戳的一个上限和下限

先看下 Span 结构定义

// Span is a trace span.
type Span struct {
	dapper        *dapper		// 属于哪个 dapper
	context       spanContext   // 本 span 对应的上下文(节点信息)
	operationName string		// 本 span 的名字:通过 SetTitle 方法设置
	startTime     time.Time		//span 开始时间
	duration      time.Duration		//span 持续时间
	tags          []Tag     // 包含了若干个 Tag
	logs          []*protogen.Log
	childs        int		// 本 span 的 Fork-child 数量
}

Span 结构体定义看,Span 是属于某个 dapper 的,在微服务中,就是一个完整的(子)调用过程,有调用开始时间 SetTitle 和耗时 duration,有标记自己唯一属性的上下文结构 spanContext 以及 KV 标记 tags

Span 实现的几个核心方法如下:
1、Span.Fork() 方法

func (s *Span) Fork(serviceName, operationName string) Trace {
	// 是否超过最大 child 数目
	if s.childs > _maxChilds {
		// if child span more than max childs set return noopspan
		return noopspan{}
	}
	s.childs++
	// 为了兼容临时为 New 的 Span 设置 span.kind
	return s.dapper.newSpanWithContext(operationName, s.context).SetTag(TagString(TagSpanKind, "client"))
}

2、Span.Finish() 方法

func (s *Span) Finish(perr *error) {
	s.duration = time.Since(s.startTime)
	if perr != nil && *perr != nil {
		err := *perr
		s.SetTag(TagBool(TagError, true))
		s.SetLog(Log(LogMessage, err.Error()))
		if err, ok := err.(stackTracer); ok {
			s.SetLog(Log(LogStack, fmt.Sprintf("%+v", err.StackTrace())))
		}
	}
	s.dapper.report(s)
}

3、Span.String() 方法:返回 spanContext 的序列化字符串

func (s *Span) String() string {
	return s.context.String()
}

4、Span.SetTag()方法:

func (s *Span) SetTag(tags ...Tag) Trace {
	if !s.context.isSampled() && !s.context.isDebug() {
		return s
	}
	if len(s.tags) < _maxTags {
		s.tags = append(s.tags, tags...)
	}
	if len(s.tags) == _maxTags {
		s.tags = append(s.tags, Tag{Key: "trace.error", Value: "too many tags"})
	}
	return s
}

5、Span.setLog()方法:

func (s *Span) setLog(logs ...LogField) Trace {
	protoLog := &protogen.Log{
		Timestamp: time.Now().UnixNano(),
		Fields:    make([]*protogen.Field, len(logs)),
	}
	for i := range logs {
		protoLog.Fields[i] = &protogen.Field{Key: logs[i].Key, Value: []byte(logs[i].Value)}
	}
	s.logs = append(s.logs, protoLog)
	return s
}

0x04 Tracer 接口及抽象

Kratos 的 Tracer 接口定义在 这里TracerTrace,此外,对外部提供了 trace.SetGlobalTracer() 方法,用于设置自定义的 tracer 对象,如 Kratos 提供了 zipkin 的接入。

dapper.go 中,完整的实现了 Trace 这个接口。

// Tracer is a simple, thin interface for Trace creation and propagation.
type Tracer interface {
	// New trace instance with given title.
	New(operationName string, opts ...Option) Trace
	// Inject takes the Trace instance and injects it for
	// propagation within `carrier`. The actual type of `carrier` depends on
	// the value of `format`.
	Inject(t Trace, format interface{}, carrier interface{}) error
	// Extract returns a Trace instance given `format` and `carrier`.
	// return `ErrTraceNotFound` if trace not found.
	Extract(format interface{}, carrier interface{}) (Trace, error)
}

type Trace interface {
	// return current trace id.
	TraceID() string
	// Fork fork a trace with client trace.
	Fork(serviceName, operationName string) Trace

	// Follow
	Follow(serviceName, operationName string) Trace

	// Finish when trace finish call it.
	Finish(err *error)

	// Scan scan trace into info.
	// Deprecated: method Scan is deprecated, use Inject instead of Scan
	// Scan(ti *Info)

	// Adds a tag to the trace.
	//
	// If there is a pre-existing tag set for `key`, it is overwritten.
	//
	// Tag values can be numeric types, strings, or bools. The behavior of
	// other tag value types is undefined at the OpenTracing level. If a
	// tracing system does not know how to handle a particular value type, it
	// may ignore the tag, but shall not panic.
	// NOTE current only support legacy tag: TagAnnotation TagAddress TagComment
	// other will be ignore
	SetTag(tags ...Tag) Trace

	// LogFields is an efficient and type-checked way to record key:value
	// NOTE current unsupport
	SetLog(logs ...LogField) Trace

	// Visit visits the k-v pair in trace, calling fn for each.
	Visit(fn func(k, v string))

	// SetTitle reset trace title
	SetTitle(title string)
}

看到这里,产生了一个疑问:`Tracer` 和 `Trace` 这两个接口的使用场景分别为如何?

0x05 Dapper 实现

Dapper 是 Kratos 提供的 trace.Tracer 接口的实例化实现,是一个全局结构。先看看它的 定义

type dapper struct {
	serviceName   string		// 一个 dapper 对应一个服务名
	disableSample bool
	tags          []Tag			//dapper 的 KV 属性
	reporter      reporter		//dapper 数据存储的实现
	propagators   map[interface{}]propagator	//dapper 的格式
	pool          *sync.Pool	// 复用 context,为代码优化所用
	stdlog        *log.Logger
	sampler       sampler		// 采样逻辑
}

通过 NewTracer 方法创建一个 Dapper,注意这里需要传入 reporter(Kratos 提供了基于 Zipkinreporter 实现),初始化 Dapper 的方法如下:

// NewTracer new a tracer.
func NewTracer(serviceName string, report reporter, disableSample bool) Tracer {
	// 初始化采样策略
	sampler := newSampler(_probability)

	// default internal tags
	tags := extendTag()
	stdlog := log.New(os.Stderr, "trace", log.LstdFlags)
	return &dapper{
		serviceName:   serviceName,		// 服务名(dapper)
		disableSample: disableSample,
		propagators: map[interface{}]propagator{
			HTTPFormat: httpPropagator{},
			GRPCFormat: grpcPropagator{},
		},
		reporter: report,
		sampler:  sampler,
		tags:     tags,
		pool:     &sync.Pool{New: func() interface{} { return new(Span) }},	// 初始化 sync.Pool,复用 Span
		stdlog:   stdlog,
	}
}

由 Dapper 创建 Span

func (d *dapper) New(operationName string, opts ...Option) Trace {
	opt := defaultOption
	for _, fn := range opts {
		fn(&opt)
	}
	traceID := genID()
	var sampled bool
	var probability float32
	if d.disableSample {
		sampled = true
		probability = 1
	} else {
		sampled, probability = d.sampler.IsSampled(traceID, operationName)
	}
	pctx := spanContext{TraceID: traceID}
	if sampled {
		pctx.Flags = flagSampled
		pctx.Probability = probability
	}
	if opt.Debug {
		pctx.Flags |= flagDebug
		return d.newSpanWithContext(operationName, pctx).SetTag(TagString(TagSpanKind, "server")).SetTag(TagBool("debug", true))
	}
	// 为了兼容临时为 New 的 Span 设置 span.kind
	return d.newSpanWithContext(operationName, pctx).SetTag(TagString(TagSpanKind, "server"))
}

func (d *dapper) newSpanWithContext(operationName string, pctx spanContext) Trace {
	sp := d.getSpan()
	// is span is not sampled just return a span with this context, no need clear it
	//if !pctx.isSampled() {
	//	sp.context = pctx
	//	return sp
	//}
	if pctx.Level > _maxLevel {
		// if span reach max limit level return noopspan
		return noopspan{}
	}
	level := pctx.Level + 1
	nctx := spanContext{
		TraceID:  pctx.TraceID,
		ParentID: pctx.SpanID,
		Flags:    pctx.Flags,
		Level:    level,
	}
	if pctx.SpanID == 0 {
		nctx.SpanID = pctx.TraceID
	} else {
		nctx.SpanID = genID()
	}
	sp.operationName = operationName
	sp.context = nctx
	sp.startTime = time.Now()
	sp.tags = append(sp.tags, d.tags...)
	return sp
}

Inject

func (d *dapper) Inject(t Trace, format interface{}, carrier interface{}) error {
	// if carrier implement Carrier use direct, ignore format
	carr, ok := carrier.(Carrier)
	if ok {
		t.Visit(carr.Set)
		return nil
	}
	// use Built-in propagators
	pp, ok := d.propagators[format]
	if !ok {
		return ErrUnsupportedFormat
	}
	carr, err := pp.Inject(carrier)
	if err != nil {
		return err
	}
	if t != nil {
		t.Visit(carr.Set)
	}
	return nil
}

func (d *dapper) Extract(format interface{}, carrier interface{}) (Trace, error) {
	sp, err := d.extract(format, carrier)
	if err != nil {
		return sp, err
	}
	// 为了兼容临时为 New 的 Span 设置 span.kind
	return sp.SetTag(TagString(TagSpanKind, "server")), nil
}

func (d *dapper) extract(format interface{}, carrier interface{}) (Trace, error) {
	// if carrier implement Carrier use direct, ignore format
	carr, ok := carrier.(Carrier)
	if !ok {
		// use Built-in propagators
		pp, ok := d.propagators[format]
		if !ok {
			return nil, ErrUnsupportedFormat
		}
		var err error
		if carr, err = pp.Extract(carrier); err != nil {
			return nil, err
		}
	}
	pctx, err := contextFromString(carr.Get(KratosTraceID))
	if err != nil {
		return nil, err
	}
	// NOTE: call SetTitle after extract trace
	return d.newSpanWithContext("", pctx), nil
}

report 上报数据

func (d *dapper) report(sp *Span) {
	if sp.context.isSampled() {
		if err := d.reporter.WriteSpan(sp); err != nil {
			d.stdlog.Printf("marshal trace span error: %s", err)
		}
	}
	d.putSpan(sp)
}

sync.Pool 复用 span

dapper 的内部方法 putSpangetSpan,利用 sync.PoolSpan 结构进行复用,以提高采集效率:

func (d *dapper) putSpan(sp *Span) {
	if len(sp.tags) > 32 {
		sp.tags = nil
	}
	if len(sp.logs) > 32 {
		sp.logs = nil
	}
	d.pool.Put(sp)
}

func (d *dapper) getSpan() *Span {
	sp := d.pool.Get().(*Span)
	sp.dapper = d
	sp.childs = 0
	sp.tags = sp.tags[:0]
	sp.logs = sp.logs[:0]
	return sp
}

Kratos 默认提供了 Zipkin 的接入方法,实现 在此,需要完成如下步骤:

  1. 实现 reporter 接口,其中包含了 WriteSpanClose 两个方法
  2. 通过 trace.NewTracer 构造 dapper 跟踪对象
  3. 通过 trace.SetGlobalTracer 将全局 tracer 初始化为我们设置的对象
// Init init trace report.
func Init(c *Config) {
	if c.BatchSize == 0 {
		c.BatchSize = 100
	}
	if c.Timeout == 0 {
		c.Timeout = xtime.Duration(200 * time.Millisecond)
	}
	trace.SetGlobalTracer(trace.NewTracer(env.AppID, newReport(c), c.DisableSample))
}

pb 的结构定义

对于 TagFieldLogSpan,也实现了 pb 的定义,结构 在此

单一结构 TagField

message Tag {
  enum Kind {
    STRING = 0;
    INT = 1;
    BOOL = 2;
    FLOAT = 3;
  }
  string key = 1;
  Kind kind = 2;
  bytes value = 3;
}

message Field {
  string key = 1;
  bytes value = 2;
}

SpanRef,类似于指针的定义,指向另外一个 Span

// SpanRef describes causal relationship of the current span to another span (e.g. 'child-of')
message SpanRef {
  enum RefType {
    CHILD_OF = 0;
    FOLLOWS_FROM = 1;
  }
  RefType ref_type = 1;
  uint64 trace_id = 2;
  uint64 span_id = 3;
}

复合结构 Log

message Log {
  // Deprecated: Kind no long use
  enum Kind {
    STRING = 0;
    INT = 1;
    BOOL = 2;
    FLOAT = 3;
  }
  string key = 1;
  // Deprecated: Kind no long use
  Kind kind = 2;
  // Deprecated: Value no long use
  bytes value = 3;
  int64 timestamp = 4;
  repeated Field fields = 5;      // 多 fields
}

复合结构 Span,包含了 repeated SpanRefrepeated Tagrepeated Log

// Span represents a named unit of work performed by a service.
message Span {
  int32 version = 99;
  string service_name = 1;
  string operation_name = 2;
  // Deprecated: caller no long required
  string caller = 3;
  uint64 trace_id = 4;
  uint64 span_id = 5;
  uint64 parent_id = 6;
  // Deprecated: level no long required
  int32  level = 7;
  // Deprecated: use start_time instead instead of start_at
  int64 start_at = 8;
  // Deprecated: use duration instead instead of finish_at
  int64 finish_at = 9;
  float sampling_probability = 10;
  string env = 19;
  google.protobuf.Timestamp start_time = 20;
  google.protobuf.Duration duration = 21;
  repeated SpanRef references = 22;
  repeated Tag tags = 11;
  repeated Log logs = 12;
}

0x 核心结构 contex

// SpanContext implements opentracing.SpanContext
type spanContext struct {
	// TraceID represents globally unique ID of the trace.
	// Usually generated as a random number.
	TraceID uint64

	// SpanID represents span ID that must be unique within its trace,
	// but does not have to be globally unique.
	SpanID uint64

	// ParentID refers to the ID of the parent span.
	// Should be 0 if the current span is a root span.
	ParentID uint64

	// Flags is a bitmap containing such bits as 'sampled' and 'debug'.
	Flags byte

	// Probability
	Probability float32

	// Level current level
	Level int
}

0x0 其他

util.go 文件中,定义了全局 context-Key 及生成方法:

var _ctxkey ctxKey = "kratos/pkg/net/trace.trace"

// FromContext returns the trace bound to the context, if any.
func FromContext(ctx context.Context) (t Trace, ok bool) {
	t, ok = ctx.Value(_ctxkey).(Trace)
	return
}

// NewContext new a trace context.
// NOTE: This method is not thread safe.
func NewContext(ctx context.Context, t Trace) context.Context {
	return context.WithValue(ctx, _ctxkey, t)
}

0x0 小结

基于上面的分析,我们小结下 Kratos 的 Tracing 实现: tracing-kratos

0x0 Trace 实例化应用

我们以 Warden 模块的 客户端的实现 client.go 为例,看下 Tracing 如何使用。
具体在 client 的拦截器 handle() 方法中,如下:

func (c *Client) handle() grpc.UnaryClientInterceptor {
	return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {
		var (
			ok     bool
			t      trace.Trace
			gmd    metadata.MD
			conf   *ClientConfig
			cancel context.CancelFunc
			addr   string
			p      peer.Peer
		)
		var ec ecode.Codes = ecode.OK
		// apm tracing
		if t, ok = trace.FromContext(ctx); ok {
			t = t.Fork("", method)  // 实际上这里是调用了 nooptracer 的 Fork 方法
			defer t.Finish(&err)
		}

		// setup metadata
		gmd = baseMetadata()
		trace.Inject(t, trace.GRPCFormat, gmd)
		c.mutex.RLock()
		if conf, ok = c.conf.Method[method]; !ok {
			conf = c.conf
		}
		c.mutex.RUnlock()
		brk := c.breaker.Get(method)
		if err = brk.Allow(); err != nil {
			_metricClientReqCodeTotal.Inc(method, "breaker")
			return
		}
		defer onBreaker(brk, &err)
		var timeOpt *TimeoutCallOption
		for _, opt := range opts {
			var tok bool
			timeOpt, tok = opt.(*TimeoutCallOption)
			if tok {
				break
			}
		}
		if timeOpt != nil && timeOpt.Timeout > 0 {
			ctx, cancel = context.WithTimeout(nmd.WithContext(ctx), timeOpt.Timeout)
		} else {
			_, ctx, cancel = conf.Timeout.Shrink(ctx)
		}

		defer cancel()
		nmd.Range(ctx,
			func(key string, value interface{}) {
				if valstr, ok := value.(string); ok {
					gmd[key] = []string{valstr}
				}
			},
			nmd.IsOutgoingKey)
		// merge with old matadata if exists
		if oldmd, ok := metadata.FromOutgoingContext(ctx); ok {
			gmd = metadata.Join(gmd, oldmd)
		}
		ctx = metadata.NewOutgoingContext(ctx, gmd)

		opts = append(opts, grpc.Peer(&p))
		if err = invoker(ctx, method, req, reply, cc, opts...); err != nil {
			gst, _ := gstatus.FromError(err)
			ec = status.ToEcode(gst)
			err = errors.WithMessage(ec, gst.Message())
		}
		if p.Addr != nil {
			addr = p.Addr.String()
		}
		if t != nil {
			t.SetTag(trace.String(trace.TagAddress, addr), trace.String(trace.TagComment, ""))
		}
		return
	}
}

1、调用 trace.FromContext() 方法

type ctxKey string

var _ctxkey ctxKey = "kratos/pkg/net/trace.trace"

// FromContext returns the trace bound to the context, if any.
// 这里返回值 t Trace 是 interface,所以可以以 Trace 类型直接返回
func FromContext(ctx context.Context) (t Trace, ok bool) {
	t, ok = ctx.Value(_ctxkey).(Trace)
	return
}

0x06 第三方 Tracing 兼容

kratos 本身不提供整套 trace 数据方案,但在 net/trace/report.go 内声明了 repoter 接口,可以简单的集成现有开源系统,比如:zipkinjaeger

zipkin + Kratos

可以看 Zipkin 的协议上报实现,具体使用方式如下:

  1. 搭建可用 Zipkin 集群
  2. 在业务代码的 main 函数内进行初始化,如下:
import "github.com/bilibili/kratos/pkg/net/trace/zipkin"

func main(){
	......
    zipkin.Init(&zipkin.Config{
        Endpoint: "http://localhost:9411/api/v2/spans",
    })
    ......
}

zipkin 效果图

zipkin

0x07 参考