我们先看下DB结构:
type DB struct {
//数据库驱动,在go-sql-driver/driver.go的MysqlSQLDriver空结构体
driver driver.Driver
dsn string //数据库链接字符串
numClosed uint64 //关闭的链接总数量
mu sync.Mutex //并发安全的结构体
freeConn []*driverConn //空闲链接
connRequests map[uint64]chan connRequest // 等待的请求
nextRequest uint64 // connRequests的key 用于获取下一个链接
numOpen int //打开和挂起的链接
openerCh chan struct{}//connectionOpener() 读取并通过驱动建立数据库
链接,maybeOpenNewConnections用于通知建立链接,
closed bool //是否调用DB.Colse()
dep map[finalCloser]depSet
lastPut map[*driverConn]string //链接对应的stack信息,只是用于调试
maxIdle int // 0则使用defaultMaxIdleConns 负数意味着不保留空闲链接
maxOpen int //
maxLifetime time.Duration// 链接可以被重用的最大时间,及链接的生命周期
//独立的协程用于清理过期链接,默认最小周期一秒,如果maxLifetime已经设置则最小周期为maxLifetime
cleanerCh chan struct{}
}
sql.Open:
获取db句柄我们需要调用sql.Open(driverName,dataSourceName string),
首先校验驱动是否合法,有一个全局驱动map:
drivers = make(map[string]driver.Driver)
然后初始化一个sq.DB结构体对象,同时设置DB.openerCh队列大小为1百万,
然后开个goroutine不断从openerCh队列中获取maybeOpenNewConnections建立链接的请求,调用DB.openNewConnection获取mysqlConn数据库链接。
我们来看下DB.openNewConnection做了什么:
ci, err := db.driver.Open(db.dsn)
调用driver的Open方法建立数据库链接
a,如果链接建立失败,则调用db.maybeOpenNewConnections()发起重新建立链接的请求,DB.maxOpen-DB.numOpen用于计算此次需要建立链接的数量
b,如果链接建立成功,构造driverConn对象,如果connRequests有等待的请求随机从connRequests 删除一个等待的请求,否则如果freeConn没有达到maxIdelConn则向freeConn空闲链接切片中追加driverConn对象。
驱动建立链接过程:
//go-sql-driver/mysql/driver.go
func (d MySQLDriver) Open(dsn string):
1,构造mysqlConn对象,设置请求最大包的大小为16M-2字节
2,解析数据库连接字符串用来设置mysqlConn相应字段
3,dials map中根据解析dsn得到的net协议获取相应注册的DialDialFunc,如果没有注册对应net协议的DialFunc则使用net.Dialer的Dial方法建立连接,如果建立的是TCP连接则通过net.TCPConn.SetKeepAlive(true)建立持久连接。
4,会为每个连接分配4k大小的读,写共用的同步bytes buffer,buf是同步的 不会存在同时读写一个buf情况。
5,发送数据包,建立mysql握手信息
6,发送客户端鉴权包
7,如果dsn没有设置最大客户端数据包,则发送请求到mysql获取mysql的max_allowed_packet变量,如果小于16M则 使用mysql的包大小设置,否则用客户端默认的大小。
8,最后处理dsn的查询参数,用于设置mysql环境信息
如:?charset=utf8, ==> "SET NAMES utf8"
其他k=v ===>"SET k=v"
驱动什么时候在哪里注册
//go-sql-driver/mysql/driver.go
func init() {
sql.Register("mysql", &MySQLDriver{})
}
可以知道在_ "github.com/go-sql-driver/mysql"时候自动注册mysql驱动
DB.Ping方法:
如果freeeConn有连接,先从freeConn中获取连接,然后执行ping请求,如果获取的链接过期了或者链接已经关闭了,会报driver.ErrBadConn错误,会重试一次,如果仍然报这个错误或者freeConn没有空闲链接可用, 则直接调用driver.Open方法建立链接。
Prepare:
从缓存中或者使用驱动的Open方法 获取一个driverConn对象:
typedriverConnstruct{
db *DB
createdAt time.Time //链接创建时间
sync.Mutex
ci driver.Conn
closed bool
finalClosed bool// 链接是否关闭
//prepared statement
openStmtmap[*driverStmt]bool
inUse bool
onPut []func()
dbmuClosed bool //与closed一同被设置
}
//driverConn对象是对driver.Conn的包装,
//通过调用驱动mysqlConn.Prepare 获取
//prepared statement,构造driverStmt对象,
//并将该对象放入driverConn的
//openStmt(driverStmt对象池)中。
//inUse表示链接是否被使用,
//onPut当链接被放回free poll,链接池中是否做
资源清理的
ifc.inUse {//c是*driverConn
c.onPut = append(c.onPut,func() {
ds.Close()//ds是*driverStmt
})
}
typedriverStmtstruct{
sync.Locker//*driverConn对象
//绑定到一个链接上,不能被多个协程并发使用
si driver.Stmt
closed bool
closeErr error//调用colse的报错信息
}
Prepare实现:
//go-sql-driver/mysql/connection.go
func (mc *mysqlConn) Prepare(query string) (driver.Stmt, error) {
if mc.closed.IsSet() {//链接是否关闭
errLog.Print(ErrInvalidConn)
return nil, driver.ErrBadConn
}
// 按照mysql协议发送prepare命令
err := mc.writeCommandPacketStr(comStmtPrepare, query)
if err != nil {
return nil, mc.markBadConn(err)
}
stmt := &mysqlStmt{//构造driver.Stmt对象
mc: mc,
}
// 获取并解析请求的响应,得到4个字节小端字节序的statement id
// 和16位长度表示的statement参数个数
columnCount, err := stmt.readPrepareResultPacket()
if err == nil {
if stmt.paramCount > 0 {
if err = mc.readUntilEOF(); err != nil {
return nil, err
}
}
if columnCount > 0 {
err = mc.readUntilEOF()
}
}
return stmt, err
}
Stmt.Exec
1,先尝试两次从缓存或者打开新的链接,如果失败就直接创建一个新链接
2,成功获取链接之后,将arguments转换成[]driver.NamedValue
3,使用获取到的链接+转换成的参数 按照mysql协议发送到mysql服务器执行,并获取返回结果,构造mysqlResult对象。
Exec实现:
//go-sql-driver/mysql/statement.go
func (stmt *mysqlStmt) Exec(args []driver.Value) (driver.Result,error)
DB.Exec
实现类似Stmt.Exec,省去了prepare部分
DB.Begin
//go-sql-driver/mysql/connection.go
1,获取链接
2,发送START TRANSACTION请求 构造mysqlTx对象
3,构造sql.Tx对象,同时开启一个协程 通过context的cancel 监听事务Commit or Rollback事件或者Tx关联的context被关闭。
tx = &Tx{
//sql.DB
db: db,
//driver.Conn
dc: dc,
releaseConn: release,
//*sql.Tx
txi: txi,
//from context
cancel: cancel,
ctx: ctx,
}
gotx.awaitDone()
returntx, nil
// awaitDone会一直阻塞直到 Tx中的context已经canceld
// 如果事务没有Commit或者Rollback 则rolls back
func (tx *Tx) awaitDone() {
//等待事务Commit or Rollback,或者等待事务关联的context关闭
//丢弃并关闭用于确保事务关闭并释放资源的链接
//如果事务已经Commit或者Rollback则此Rollback不会做任何操作
tx.rollback(true)
}
mysqlTx对象:
typemysqlTxstruct{
mc *mysqlConn
}
func(tx *mysqlTx) Commit() (err error) {
iftx.mc == nil tx.mc.closed.IsSet() {
returnErrInvalidConn
}
//通过链接按照mysql协议发送commit请求
err = tx.mc.exec("COMMIT")
tx.mc = nil
return
}
func(tx *mysqlTx) Rollback() (err error) {
iftx.mc == nil tx.mc.closed.IsSet() {
returnErrInvalidConn
}
//通过链接按照mysql协议发送commit请求
err = tx.mc.exec("ROLLBACK")
tx.mc = nil
return
}
Tx:
typeTxstruct{
db *DB
//事务中的链接在活跃的时候 防止关闭事务
closemu sync.RWMutex
//事务关联的链接
dc *driverConn
txi driver.Tx
//关闭事务的时候调用
releaseConnfunc(error)
//标记事务是否commit或者rollback
//只能标记一次
done int32
//跟事务关联的prepared statement,
//事务Commit或者Rollback的时候
//调用tx.closePrepared关闭。
stmtsstruct{
sync.Mutex
v []*Stmt
}
//关闭事务的时候调用的
//用于带有context的事务
cancelfunc()
//带有context的事务
//缺省是background context
ctx context.Context
}
tx是一个正在进行中的数据库事务,事务必须以Commit或者Rollback结束。
tx一旦调用commit或者callback之后,继续调用则会报:ErrTxDone 错误。
事务关联的stmts(通过tx或者db创建的)在调用Commit或者Rollback之后会被关闭。
//提交事务,并将链接放回链接池
func (tx *Tx) Commit() error {
//原子执行比较并交换值
if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
return ErrTxDone//事务已经Commit或者Rollback
}
select {
default:
case
return tx.ctx.Err()
}
var err error
withLock(tx.dc, func() {
err = tx.txi.Commit()//并发安全的
})
if err != driver.ErrBadConn {
tx.closePrepared()
}
tx.close(err)//将链接放回链接池
return err
}
// 中止事务,并将链接放回链接池
func (tx *Tx) rollback(discardConn bool) error {
if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
return ErrTxDone
}
var err error
withLock(tx.dc, func() {
err = tx.txi.Rollback()
})
if err != driver.ErrBadConn {
tx.closePrepared()
}
if discardConn {
err = driver.ErrBadConn
} tx.close(err)//将链接放回链接池
return err
}
func (tx *Tx) closePrepared() {
tx.stmts.Lock()
defer tx.stmts.Unlock()
for _, stmt := range tx.stmts.v {
//发送关闭statement命令,携带statement id
stmt.Close()
}
}
领取专属 10元无门槛券
私享最新 技术干货