Xorm 使用总结(Mysql)

Xorm-Mysql 用法及避坑总结

Posted by pandaychen on June 29, 2020

0x00 前言

本文介绍 Xorm-MYSQL 的使用(自动分表)、超时封装、Tracing 封装及日常遇到的问题。Xorm 支持多种数据库驱动, 如: Mysql、Mariadb、Tidb、Postgres、Oracle 等等。

0x01 基础

Xorm-Mysql 的基础使用方式汇总如下,其他细节可以参见 Xorm 的 官方文档

1、Engine 相关
在项目中需要注意的是可以使用 goroutine 和 engine.Ping() 的方式来实现 Mysql 长连接保活机制。

// 创建 engine 对象
engine, err := xorm.NewEngine("mysql", "user:pwd@tcp(ip:port)/dbname?charset=utf8")
if err != nil {
    log.Fatalf("init engine fail! err:%+v", err)
}

// 连接池配置
engine.SetMaxOpenConns(30)                  // 最大 db 连接
engine.SetMaxIdleConns(10)                  // 最大 db 连接空闲数
engine.SetConnMaxLifetime(30 * time.Minute) // 超过空闲数连接存活时间

// 日志相关配置
engine.ShowSQL(true)                      // 打印日志
engine.Logger().SetLevel(core.LOG_DEBUG) // 打印日志级别
engine.SetLogger()                       // 设置日志输出 (控制台, 日志文件, 系统日志等)


// 测试连通性
if err = engine.Ping(); err != nil {
    log.Fatalf("ping to db fail! err:%+v", err)
}

0x02 Xorm 事务

使用 Xorm 事务处理时,需要创建 Session,可以混用 ORM 方法和 RAW 方法,如下(注意:Mysql 引擎为 innodb 才支持事务,myisam 是不支持事务):

func main{
    session := engine.NewSession()
    defer session.Close()
    // add Begin() before any action
    // 开启事务
    err := session.Begin()
    user1 := Userinfo{Username: "pandaychen", Created: time.Now()}
    _, err = session.Insert(&user1)
    if err != nil {
        // 发生错误时进行回滚
        session.Rollback()
        return
    }
    user2 := Userinfo{Username: "panda"}
    _, err = session.Where("id = ?", 2).Update(&user2)
    if err != nil {
        session.Rollback()
        return
    }

    _, err = session.Exec("delete from userinfo where username = ?", user2.Username)
    if err != nil {
        session.Rollback()
        return
    }

    // add Commit() after all actions
    // 完成事务
    err = session.Commit()
    if err != nil {
        return
    }
}

0x03 Xorm 的事件钩子

Xorm 支持事件钩子,见 文档。比如 BeforeInsertAfterInsert(前者在进行插入记录之前被调用,后者在完成插入记录之后被调用):

func (a *Account) BeforeInsert() {
	log.Printf("before insert: %s", a.Name)
}

func (a *Account) AfterInsert() {
	log.Printf("after insert: %s", a.Name)
}

0x04 Xorm 常用函数及方法

参考官方文档

在同一会话(Session)执行

可以使用xorm 的会话机制(Session)来批量执行SQL,Session.Exec 用于执行原始 SQL 命令,并返回受影响的行数以及在操作过程中遇到的错误,如下:

session := xorm.NewSession()
defer session.Close()

sqlStmt := "INSERT INTO users (name, age) VALUES ('userA', 25)"
affected, err := session.Exec(sqlStmt)
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Inserted %d rows\n", affected)

//more SQLS
sqlStmt = "UPDATE users SET age = ? WHERE name = ?"
affected, err = session.Exec(sqlStmt, 30, "userA")
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Updated %d rows\n", affected)

0x05 基于 Xorm 的自动分表

本小节简单介绍下如何利用 Xorm 实现 Mysql 分表

  • 分表的优点:避免单表数据量过大带来的操作性能瓶颈
  • 分表的缺点:数据的关联性可能受到影响,可能会消耗额外的查询(如按表遍历)及运算压力等
  • 分表的方式:按天、按业务等

分表的需求

通常,创建 Mysql Engine 的时候,使用下面的 engine 来操作具体的数据库 Table,那么问题来了,Table 的名字如何自动切换?

engine, err := xorm.NewEngine("mysql", "user:pwd@tcp(ip:port)/dbname?charset=utf8")

针对分表的场景,需要满足如下需求:

  1. 如何定时的自动创建一个新表(类似 Crontab)
  2. 如何解决 Golang 结构去创建一个以时间结尾为表名的数据库表(固定 prefix 或者 suffix)
  3. 如何解决 golang 类和表的映射问题,若映射不对,则影响数据插入操作(engine 和表的 mapping 问题)

自动创建新表

可以利用 Cron 解决定时创建新表的问题

func NewDBEngine(dburl string) (*DBEngine, error) {
    fmt.Println(dburl)
    db, err := xorm.NewEngine("mysql", dburl)
    if err != nil {
        return nil, err
    }
    err = db.Ping()
    if err != nil {
        return nil, err
    }
    db.SetMaxOpenConns(10)

    db.ShowSQL(false)
    log.Infof("connect to database(%v) server OK!", dburl)

    engine := DBEngine{
        dbengine: db,
    }
    engine.dbengine.SetMapper(core.GonicMapper{})

    engine.Cron = cron.New()
    engine.Cron.AddFunc(SPEC, engine.InitTable)
    engine.commParam = make(chan CommParam, GET_CACHE_DATA)
    Init(&engine)
    return &engine, nil
}

自定义 Table(固定 fix)

可以利用 xorm mapper 解决,xorm 支持驼峰或自定义的表格命名方式

func (db *DBEngine) InitTable() {
    var date string
    if int(time.Now().Month()) >= 10 {
        date = "_" + strconv.Itoa(int(time.Now().Year())) + strconv.Itoa(int(time.Now().Month()))
    } else {
        date = "_" + strconv.Itoa(int(time.Now().Year())) + "0" + strconv.Itoa(int(time.Now().Month()))
    }
    tbMapper := core.NewSuffixMapper(core.GonicMapper{}, date)
    db.dbengine.SetTableMapper(tbMapper)

    for key, v := range TablePrefix {
        isExist, err := db.dbengine.IsTableExist(key + date)
        if err != nil {
            log.Errorf("[Database] inittable failed for %v", err)
            return
        }
        if isExist {
            log.Infof("[Database] The %v table  is exist!!", key+date)
        } else {
            err := db.dbengine.Sync2(v)
            if err != nil {
                log.Errorf("[Database] Create table error for %v %v", err)
            }
        }
    }
}

自动映射表

利用 mapper 改变表格的命名和类的映射关系

var TablePrefix = map[string]interface{}{
    "gw_rx":     model.Gw_rx{},
    "gw_tx":     model.Gw_tx{},
    "gw_stats":  model.Gw_stats{},
    "mac_tx":    model.Mac_tx{},
    "mac_rx":    model.Mac_rx{},
    "mac_error": model.Mac_error{},
    "app_rx":    model.App_rx{},
    "app_tx":    model.App_tx{},
    "app_join":  model.App_join{},
    "app_ack":   model.App_ack{},
    "app_error": model.App_error{},
    "rxinfo":    model.Rxinfo{},
    "txinfo":    model.Txinfo{},
}

0x06 Xorm Tracing 实现

这里采用 xorm.io/xorm 1.0.3 版本来实现 xorm 的 tracing 功能,支持以 Hook 钩子方式侵入 XORM 执行过程。用户侧仅需要实现 contexts.Hook 的方法,加入 tracing 的机制即可

xorm 的 hooker 实现

xorm 的 Engine 提供了 AddHook 方法,用于注入自定义钩子:

// AddHook adds a context Hook
func (engine *Engine) AddHook(hook contexts.Hook) {
    // 调用 `core.DB` 的 AddHook 方法
	engine.db.AddHook(hook)
}

// AddHook adds hook
func (db *DB) AddHook(h ...contexts.Hook) {
	db.hooks.AddHook(h...)
}

上述 db.hooks.AddHookcontexts.Hooks类型 暴露的注册方法,传入参数为 contexts.Hook 类型(接口类型):

type Hook interface {
	BeforeProcess(c *ContextHook) (context.Context, error)
	AfterProcess(c *ContextHook) error
}

type Hooks struct {
	hooks []Hook
}

func (h *Hooks) AddHook(hooks ...Hook) {
	h.hooks = append(h.hooks, hooks...)
}

由此了解到,如果要实现 xorm hook,需要传入一个 contexts.Hook,需要实现两个方法(BeforeProcessAfterProcess)就能实现这个接口。

ContextHook

BeforeProcess 方法的参数是 ContextHook,如下,其中的参数会用于 tracing 逻辑:

// ContextHook represents a hook context
type ContextHook struct {
	start       time.Time
	Ctx         context.Context
	SQL         string        // log content or SQL
	Args        []interface{} // if it's a SQL, it's the arguments
	Result      sql.Result
	ExecuteTime time.Duration
	Err         error // SQL executed error
}
  • Ctx:本次 orm 操作的上下文,我们的 span 需要存储在这里
  • SQLArgs:可作为 spanLog
  • Err:错误,可以作为 spanLog

一些细节

db.beforeProcess 的实现为例(代码如下),就是实际 SQL 查询过程中调用日志和 Hook 的过程, Hook 参数传递使用的是指针,即将 contexts.ContextHook 的指针传入钩子函数执行流程,允许我们直接操作其成员 c.Ctx 以达到Hook的过程。

func (db *DB) beforeProcess(c *contexts.ContextHook) (context.Context, error) {
	if db.NeedLogSQL(c.Ctx) {
	    // <-- 重要,这里是将日志上下文转化成值传递
	    // 所以不能修改 context.Context 的内容
		db.Logger.BeforeSQL(log.LogContext(*c))
	}
	// Hook 是指针传递,所以可以修改 context.Context 的内容
	ctx, err := db.hooks.BeforeProcess(c)
	if err != nil {
		return nil, err
	}
	return ctx, nil
}

func (db *DB) afterProcess(c *contexts.ContextHook) error {
    // 和 beforeProcess 同理,日志上下文不能修改 context.Context 的内容
    // 而 hook 可以
	err := db.hooks.AfterProcess(c)
	if db.NeedLogSQL(c.Ctx) {
		db.Logger.AfterSQL(log.LogContext(*c))
	}
	return err
}

Hook 实现

实现代码在 。核心步骤三点:
1、定义 XormHook 结构,注意不要使用该结构来进行 span 传递

type XormHook struct {
	name string
}

2、实现 hook 的公共接口

// 前置钩子实现
func (h *XormHook) BeforeProcess(ctx *contexts.ContextHook) (context.Context, error) {
	span, _ := opentracing.StartSpanFromContext(ctx.Ctx, "xorm-hook")

	// 将 span 注入 c.Ctx 中
	ctx.Ctx = context.WithValue(ctx.Ctx, xormHookSpanCtxKey, span)

	return ctx.Ctx, nil
}

func (h *XormHook) AfterProcess(c *contexts.ContextHook) error {
	sp, ok := c.Ctx.Value(xormHookSpanCtxKey).(opentracing.Span)
	if !ok {
		//no span,logger?
		return nil
	}
	// 结束前上报
	defer sp.Finish()

	//log details
	if c.Err != nil {
		//log error
		sp.LogFields(tlog.Object("err", c.Err))
	}

	// 使用 xorm 的 builder 将查询语句和参数结合
	sql, err := builder.ConvertToBoundSQL(c.SQL, c.Args)
	if err == nil {
		// mark sql
		sp.LogFields(tlog.String(enums.TagDBStatement, sql))
	}
	sp.LogFields(tlog.String(enums.TagDBInstance, h.name))
	sp.LogFields(tlog.Object("args", c.Args))
	sp.SetTag(enums.TagDBExecuteCosts, c.ExecuteTime)

	return nil
}

3、在 xorm 包初始化时 挂载XormHook

func NewXormClient(option *XormOption) (*XormClient, error) {
    //...
	xormCli.Engine, err = xorm.NewEngine(option.Driver, option.Dsn)
	if err != nil {
		return nil, err
	}

	xormCli.SetDefaultContext(context.WithValue(context.Background(), clientInstance, &xormCli))
    // 注入钩子实现
	xormCli.AddHook(NewXormHook(option.Name))
    //...
	return &xormCli, nil
}

4、在调用时,传入上下文 context 至 xorm 的 Engine.Context(ctx),然后运行 SQL 即可

0x07 参考