首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

go mysql源码渗透

我们先看下DB结构:

type DB struct {

//数据库驱动,在go-sql-driver/driver.go的MysqlSQLDriver空结构体

driver driver.Driver

dsn string //数据库链接字符串

numClosed uint64 //关闭的链接总数量

mu sync.Mutex //并发安全的结构体

freeConn []*driverConn //空闲链接

connRequests map[uint64]chan connRequest // 等待的请求

nextRequest uint64 // connRequests的key 用于获取下一个链接

numOpen int //打开和挂起的链接

openerCh chan struct{}//connectionOpener() 读取并通过驱动建立数据库

链接,maybeOpenNewConnections用于通知建立链接,

closed bool //是否调用DB.Colse()

dep map[finalCloser]depSet

lastPut map[*driverConn]string //链接对应的stack信息,只是用于调试

maxIdle int // 0则使用defaultMaxIdleConns 负数意味着不保留空闲链接

maxOpen int //

maxLifetime time.Duration// 链接可以被重用的最大时间,及链接的生命周期

//独立的协程用于清理过期链接,默认最小周期一秒,如果maxLifetime已经设置则最小周期为maxLifetime

cleanerCh chan struct{}

}

sql.Open:

获取db句柄我们需要调用sql.Open(driverName,dataSourceName string),

首先校验驱动是否合法,有一个全局驱动map:

drivers = make(map[string]driver.Driver)

然后初始化一个sq.DB结构体对象,同时设置DB.openerCh队列大小为1百万,

然后开个goroutine不断从openerCh队列中获取maybeOpenNewConnections建立链接的请求,调用DB.openNewConnection获取mysqlConn数据库链接。

我们来看下DB.openNewConnection做了什么:

ci, err := db.driver.Open(db.dsn)

调用driver的Open方法建立数据库链接

a,如果链接建立失败,则调用db.maybeOpenNewConnections()发起重新建立链接的请求,DB.maxOpen-DB.numOpen用于计算此次需要建立链接的数量

b,如果链接建立成功,构造driverConn对象,如果connRequests有等待的请求随机从connRequests 删除一个等待的请求,否则如果freeConn没有达到maxIdelConn则向freeConn空闲链接切片中追加driverConn对象。

驱动建立链接过程:

//go-sql-driver/mysql/driver.go

func (d MySQLDriver) Open(dsn string):

1,构造mysqlConn对象,设置请求最大包的大小为16M-2字节

2,解析数据库连接字符串用来设置mysqlConn相应字段

3,dials map中根据解析dsn得到的net协议获取相应注册的DialDialFunc,如果没有注册对应net协议的DialFunc则使用net.Dialer的Dial方法建立连接,如果建立的是TCP连接则通过net.TCPConn.SetKeepAlive(true)建立持久连接。

4,会为每个连接分配4k大小的读,写共用的同步bytes buffer,buf是同步的 不会存在同时读写一个buf情况。

5,发送数据包,建立mysql握手信息

6,发送客户端鉴权包

7,如果dsn没有设置最大客户端数据包,则发送请求到mysql获取mysql的max_allowed_packet变量,如果小于16M则 使用mysql的包大小设置,否则用客户端默认的大小。

8,最后处理dsn的查询参数,用于设置mysql环境信息

如:?charset=utf8, ==> "SET NAMES utf8"

其他k=v ===>"SET k=v"

驱动什么时候在哪里注册

//go-sql-driver/mysql/driver.go

func init() {

sql.Register("mysql", &MySQLDriver{})

}

可以知道在_ "github.com/go-sql-driver/mysql"时候自动注册mysql驱动

DB.Ping方法:

如果freeeConn有连接,先从freeConn中获取连接,然后执行ping请求,如果获取的链接过期了或者链接已经关闭了,会报driver.ErrBadConn错误,会重试一次,如果仍然报这个错误或者freeConn没有空闲链接可用, 则直接调用driver.Open方法建立链接。

Prepare:

从缓存中或者使用驱动的Open方法 获取一个driverConn对象:

typedriverConnstruct{

db *DB

createdAt time.Time //链接创建时间

sync.Mutex

ci driver.Conn

closed bool

finalClosed bool// 链接是否关闭

//prepared statement

openStmtmap[*driverStmt]bool

inUse bool

onPut []func()

dbmuClosed bool //与closed一同被设置

}

//driverConn对象是对driver.Conn的包装,

//通过调用驱动mysqlConn.Prepare 获取

//prepared statement,构造driverStmt对象,

//并将该对象放入driverConn的

//openStmt(driverStmt对象池)中。

//inUse表示链接是否被使用,

//onPut当链接被放回free poll,链接池中是否做

资源清理的

ifc.inUse {//c是*driverConn

c.onPut = append(c.onPut,func() {

ds.Close()//ds是*driverStmt

})

}

typedriverStmtstruct{

sync.Locker//*driverConn对象

//绑定到一个链接上,不能被多个协程并发使用

si driver.Stmt

closed bool

closeErr error//调用colse的报错信息

}

Prepare实现:

//go-sql-driver/mysql/connection.go

func (mc *mysqlConn) Prepare(query string) (driver.Stmt, error) {

if mc.closed.IsSet() {//链接是否关闭

errLog.Print(ErrInvalidConn)

return nil, driver.ErrBadConn

}

// 按照mysql协议发送prepare命令

err := mc.writeCommandPacketStr(comStmtPrepare, query)

if err != nil {

return nil, mc.markBadConn(err)

}

stmt := &mysqlStmt{//构造driver.Stmt对象

mc: mc,

}

// 获取并解析请求的响应,得到4个字节小端字节序的statement id

// 和16位长度表示的statement参数个数

columnCount, err := stmt.readPrepareResultPacket()

if err == nil {

if stmt.paramCount > 0 {

if err = mc.readUntilEOF(); err != nil {

return nil, err

}

}

if columnCount > 0 {

err = mc.readUntilEOF()

}

}

return stmt, err

}

Stmt.Exec

1,先尝试两次从缓存或者打开新的链接,如果失败就直接创建一个新链接

2,成功获取链接之后,将arguments转换成[]driver.NamedValue

3,使用获取到的链接+转换成的参数 按照mysql协议发送到mysql服务器执行,并获取返回结果,构造mysqlResult对象。

Exec实现:

//go-sql-driver/mysql/statement.go

func (stmt *mysqlStmt) Exec(args []driver.Value) (driver.Result,error)

DB.Exec

实现类似Stmt.Exec,省去了prepare部分

DB.Begin

//go-sql-driver/mysql/connection.go

1,获取链接

2,发送START TRANSACTION请求 构造mysqlTx对象

3,构造sql.Tx对象,同时开启一个协程 通过context的cancel 监听事务Commit or Rollback事件或者Tx关联的context被关闭。

tx = &Tx{

//sql.DB

db: db,

//driver.Conn

dc: dc,

releaseConn: release,

//*sql.Tx

txi: txi,

//from context

cancel: cancel,

ctx: ctx,

}

gotx.awaitDone()

returntx, nil

// awaitDone会一直阻塞直到 Tx中的context已经canceld

// 如果事务没有Commit或者Rollback 则rolls back

func (tx *Tx) awaitDone() {

//等待事务Commit or Rollback,或者等待事务关联的context关闭

//丢弃并关闭用于确保事务关闭并释放资源的链接

//如果事务已经Commit或者Rollback则此Rollback不会做任何操作

tx.rollback(true)

}

mysqlTx对象:

typemysqlTxstruct{

mc *mysqlConn

}

func(tx *mysqlTx) Commit() (err error) {

iftx.mc == nil tx.mc.closed.IsSet() {

returnErrInvalidConn

}

//通过链接按照mysql协议发送commit请求

err = tx.mc.exec("COMMIT")

tx.mc = nil

return

}

func(tx *mysqlTx) Rollback() (err error) {

iftx.mc == nil tx.mc.closed.IsSet() {

returnErrInvalidConn

}

//通过链接按照mysql协议发送commit请求

err = tx.mc.exec("ROLLBACK")

tx.mc = nil

return

}

Tx:

typeTxstruct{

db *DB

//事务中的链接在活跃的时候 防止关闭事务

closemu sync.RWMutex

//事务关联的链接

dc *driverConn

txi driver.Tx

//关闭事务的时候调用

releaseConnfunc(error)

//标记事务是否commit或者rollback

//只能标记一次

done int32

//跟事务关联的prepared statement,

//事务Commit或者Rollback的时候

//调用tx.closePrepared关闭。

stmtsstruct{

sync.Mutex

v []*Stmt

}

//关闭事务的时候调用的

//用于带有context的事务

cancelfunc()

//带有context的事务

//缺省是background context

ctx context.Context

}

tx是一个正在进行中的数据库事务,事务必须以Commit或者Rollback结束。

tx一旦调用commit或者callback之后,继续调用则会报:ErrTxDone 错误。

事务关联的stmts(通过tx或者db创建的)在调用Commit或者Rollback之后会被关闭。

//提交事务,并将链接放回链接池

func (tx *Tx) Commit() error {

//原子执行比较并交换值

if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {

return ErrTxDone//事务已经Commit或者Rollback

}

select {

default:

case

return tx.ctx.Err()

}

var err error

withLock(tx.dc, func() {

err = tx.txi.Commit()//并发安全的

})

if err != driver.ErrBadConn {

tx.closePrepared()

}

tx.close(err)//将链接放回链接池

return err

}

// 中止事务,并将链接放回链接池

func (tx *Tx) rollback(discardConn bool) error {

if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {

return ErrTxDone

}

var err error

withLock(tx.dc, func() {

err = tx.txi.Rollback()

})

if err != driver.ErrBadConn {

tx.closePrepared()

}

if discardConn {

err = driver.ErrBadConn

} tx.close(err)//将链接放回链接池

return err

}

func (tx *Tx) closePrepared() {

tx.stmts.Lock()

defer tx.stmts.Unlock()

for _, stmt := range tx.stmts.v {

//发送关闭statement命令,携带statement id

stmt.Close()

}

}

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180128G0I8CI00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券