前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:etcd(16)

golang源码分析:etcd(16)

作者头像
golangLeetcode
发布2023-09-20 08:29:55
1510
发布2023-09-20 08:29:55
举报

server/storage/backend/backend.go定义了后端存储的核心接口和具体实现,本质上是对boltdb的相关接口的一个封装

代码语言:javascript
复制
type Backend interface {
  // ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
  ReadTx() ReadTx
  BatchTx() BatchTx
  // ConcurrentReadTx returns a non-blocking read transaction.
  ConcurrentReadTx() ReadTx


  Snapshot() Snapshot
  Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
  // Size returns the current size of the backend physically allocated.
  // The backend can hold DB space that is not utilized at the moment,
  // since it can conduct pre-allocation or spare unused space for recycling.
  // Use SizeInUse() instead for the actual DB size.
  Size() int64
  // SizeInUse returns the current size of the backend logically in use.
  // Since the backend can manage free space in a non-byte unit such as
  // number of pages, the returned value can be not exactly accurate in bytes.
  SizeInUse() int64
  // OpenReadTxN returns the number of currently open read transactions in the backend.
  OpenReadTxN() int64
  Defrag() error
  ForceCommit()
  Close() error


  // SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
  SetTxPostLockInsideApplyHook(func())
}

snapshot接口核心方法只有计算大小、写、关闭三个:

代码语言:javascript
复制
type Snapshot interface {
  // Size gets the size of the snapshot.
  Size() int64
  // WriteTo writes the snapshot into the given writer.
  WriteTo(w io.Writer) (n int64, err error)
  // Close closes the snapshot.
  Close() error
}

读事物的buffer是需要加锁的

代码语言:javascript
复制
type txReadBufferCache struct {
  mu         sync.Mutex
  buf        *txReadBuffer
  bufVersion uint64
}

backend是Backend接口的具体实现:

代码语言:javascript
复制
type backend struct {
  // size and commits are used with atomic operations so they must be
  // 64-bit aligned, otherwise 32-bit tests will crash


  // size is the number of bytes allocated in the backend
  size int64
  // sizeInUse is the number of bytes actually used in the backend
  sizeInUse int64
  // commits counts number of commits since start
  commits int64
  // openReadTxN is the number of currently open read transactions in the backend
  openReadTxN int64
  // mlock prevents backend database file to be swapped
  mlock bool


  mu    sync.RWMutex
  bopts *bolt.Options
  db    *bolt.DB


  batchInterval time.Duration
  batchLimit    int
  batchTx       *batchTxBuffered


  readTx *readTx
  // txReadBufferCache mirrors "txReadBuffer" within "readTx" -- readTx.baseReadTx.buf.
  // When creating "concurrentReadTx":
  // - if the cache is up-to-date, "readTx.baseReadTx.buf" copy can be skipped
  // - if the cache is empty or outdated, "readTx.baseReadTx.buf" copy is required
  txReadBufferCache txReadBufferCache


  stopc chan struct{}
  donec chan struct{}


  hooks Hooks


  // txPostLockInsideApplyHook is called each time right after locking the tx.
  txPostLockInsideApplyHook func()


  lg *zap.Logger
}

响应的config定义了初始化backend对象需要的属性

代码语言:javascript
复制
type BackendConfig struct {
  // Path is the file path to the backend file.
  Path string
  // BatchInterval is the maximum time before flushing the BatchTx.
  BatchInterval time.Duration
  // BatchLimit is the maximum puts before flushing the BatchTx.
  BatchLimit int
  // BackendFreelistType is the backend boltdb's freelist type.
  BackendFreelistType bolt.FreelistType
  // MmapSize is the number of bytes to mmap for the backend.
  MmapSize uint64
  // Logger logs backend-side operations.
  Logger *zap.Logger
  // UnsafeNoFsync disables all uses of fsync.
  UnsafeNoFsync bool `json:"unsafe-no-fsync"`
  // Mlock prevents backend database file to be swapped
  Mlock bool


  // Hooks are getting executed during lifecycle of Backend's transactions.
  Hooks Hooks
}

newBackend就是初始化一个bolt对象,设置读事务和批量事物的buffer,最后启动一个协程提供存储服务:

代码语言:javascript
复制
func newBackend(bcfg BackendConfig) *backend {
      db, err := bolt.Open(bcfg.Path, 0600, bopts)
      b := &backend{
    bopts: bopts,
    db:    db,
readTx: &readTx{
      baseReadTx:baseReadTx{
        buf: txReadBuffer{
      b.batchTx = newBatchTxBuffered(b)
      go b.run()		
代码语言:javascript
复制
func (b *backend) BatchTx() BatchTx {
  return b.batchTx
}
代码语言:javascript
复制
func (b *backend) SetTxPostLockInsideApplyHook(hook func()) {

Snapshot返回一个Snapshot对象,里面包含一个boltdb的只读事物

代码语言:javascript
复制
func (b *backend) Snapshot() Snapshot {
      b.batchTx.Commit()
      tx, err := b.db.Begin(false)
      go func() {
              case <-stopc:
        snapshotTransferSec.Observe(time.Since(start).Seconds())
      return &snapshot{tx, stopc, donec}

Hash方法会计算对应的crc,遍历所有bucket里面的k,v然后计算所有内容的crc

代码语言:javascript
复制
func (b *backend) Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error) {


      h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
      err := b.db.View(func(tx *bolt.Tx) error {
          c := tx.Cursor()
    for next, _ := c.First(); next != nil; next, _ = c.Next() {
      b := tx.Bucket(next)
        h.Write(next)
      b.ForEach(func(k, v []byte) error {
        if ignores != nil && !ignores(next, k) {
          h.Write(k)
          h.Write(v)
      return h.Sum32(), nil
}

run方法会做定时的事物提交和系统关闭时候的提交

代码语言:javascript
复制
func (b *backend) run() {
      for {
    select {
    case <-t.C:
    case <-b.stopc:
      b.batchTx.CommitAndStop()
        if b.batchTx.safePending() != 0 {
      b.batchTx.Commit()
    }
    t.Reset(b.batchInterval)

defrag会进行碎片整理操作:

代码语言:javascript
复制
  func (b *backend) defrag() error {
      tmpdb, err := bolt.Open(tdbp, 0600, &options)
      err = defragdb(b.db, tmpdb, defragLimit)

会整理bolt的所有k,v

代码语言:javascript
复制
func defragdb(odb, tmpdb *bolt.DB, limit int) error {
      tx, err := odb.Begin(false)
      for next, _ := c.First(); next != nil; next, _ = c.Next() {
        b := tx.Bucket(next)
        if err = b.ForEach(func(k, v []byte) error {
          tmptx, err = tmpdb.Begin(true)
          tmpb = tmptx.Bucket(next)
代码语言:javascript
复制
func (b *backend) begin(write bool) *bolt.Tx {
        size := tx.Size()
  db := tx.DB()
  stats := db.Stats()
  atomic.StoreInt64(&b.size, size)

server/storage/backend/batch_tx.go 定义了Bucket和事务相关接口,这些都是和bolt对应的。

代码语言:javascript
复制
type Bucket interface {
  // ID returns a unique identifier of a bucket.
  // The id must NOT be persisted and can be used as lightweight identificator
  // in the in-memory maps.
  ID() BucketID
  Name() []byte
  // String implements Stringer (human readable name).
  String() string


  // IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
  // overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
  // is known to never overwrite any key so range is safe.
  IsSafeRangeBucket() bool
}
代码语言:javascript
复制
type BatchTx interface {
  ReadTx
  UnsafeCreateBucket(bucket Bucket)
  UnsafeDeleteBucket(bucket Bucket)
  UnsafePut(bucket Bucket, key []byte, value []byte)
  UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
  UnsafeDelete(bucket Bucket, key []byte)
  // Commit commits a previous tx and begins a new writable one.
  Commit()
  // CommitAndStop commits the previous tx and does not create a new one.
  CommitAndStop()
  LockInsideApply()
  LockOutsideApply()
}

batchTx的具体实现是对bolt的Tx进行了嵌入

代码语言:javascript
复制
type batchTx struct {
  sync.Mutex
  tx      *bolt.Tx
  backend *backend


  pending int
}
代码语言:javascript
复制
func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
      _, err := t.tx.CreateBucket(bucket.Name())  
代码语言:javascript
复制
func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {

Put就是bolt的k/v的Put操作

代码语言:javascript
复制
func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {
      bucket := t.tx.Bucket(bucketType.Name())
      if err := bucket.Put(key, value); err != nil {
代码语言:javascript
复制
func (t *batchTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
代码语言:javascript
复制
func (t *batchTx) commit(stop bool) {
      err := t.tx.Commit()

batchTxBuffered仅仅是加了个buffer而已

代码语言:javascript
复制
type batchTxBuffered struct {
  batchTx
  buf txWriteBuffer
}

server/storage/backend/config_default.go里定义了配置

代码语言:javascript
复制
func (bcfg *BackendConfig) mmapSize() int { return int(bcfg.MmapSize) }

server/storage/backend/config_linux.go每种平台的配置都有一定的差别

代码语言:javascript
复制
var boltOpenOptions = &bolt.Options{
  MmapFlags:      syscall.MAP_POPULATE,
  NoFreelistSync: true,
}
代码语言:javascript
复制
func (bcfg *BackendConfig) mmapSize() int { return int(bcfg.MmapSize) }

server/storage/backend/hooks.go里定义了Hooks的接口,方便我们使用Backend的时候设置前置Hook操作

代码语言:javascript
复制
type HookFunc func(tx BatchTx)
代码语言:javascript
复制
type Hooks interface {
  // OnPreCommitUnsafe is executed before Commit of transactions.
  // The given transaction is already locked.
  OnPreCommitUnsafe(tx BatchTx)
}

并提供了具体实现

代码语言:javascript
复制
type hooks struct {
  onPreCommitUnsafe HookFunc
}
代码语言:javascript
复制
func (h hooks) OnPreCommitUnsafe(tx BatchTx) {
  h.onPreCommitUnsafe(tx)
}

server/storage/backend/metrics.go里定义对应的监控指标

代码语言:javascript
复制
prometheus.MustRegister(commitSec)
prometheus.MustRegister(rebalanceSec)

server/storage/backend/read_tx.go定义了读事务接口和实现

代码语言:javascript
复制
type ReadTx interface {
  Lock()
  Unlock()
  RLock()
  RUnlock()


  UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
  UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
}

具体实现中也是对bolt的事物的一个封装

代码语言:javascript
复制
type baseReadTx struct {
  // mu protects accesses to the txReadBuffer
  mu  sync.RWMutex
  buf txReadBuffer


  // TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
  // txMu protects accesses to buckets and tx on Range requests.
  txMu    *sync.RWMutex
  tx      *bolt.Tx
  buckets map[BucketID]*bolt.Bucket
  // txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
  txWg *sync.WaitGroup
}
代码语言:javascript
复制
func (baseReadTx *baseReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
代码语言:javascript
复制
func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
      keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)

读事物分两种实现,一个是普通读,一个是并发读

代码语言:javascript
复制
type readTx struct {
  baseReadTx
}
代码语言:javascript
复制
type concurrentReadTx struct {
  baseReadTx
}

server/storage/backend/tx_buffer.go事务buffer是一个map,把每个bucket分开单独处理

代码语言:javascript
复制
type txBuffer struct {
  buckets map[BucketID]*bucketBuffer
}

写buffer也是一样的

代码语言:javascript
复制
type txWriteBuffer struct {
  txBuffer
  // Map from bucket ID into information whether this bucket is edited
  // sequentially (i.e. keys are growing monotonically).
  bucket2seq map[BucketID]bool
}
代码语言:javascript
复制
func (txw *txWriteBuffer) put(bucket Bucket, k, v []byte) {
代码语言:javascript
复制
type txReadBuffer struct {
  txBuffer
  // bufVersion is used to check if the buffer is modified recently
  bufVersion uint64
}
代码语言:javascript
复制
func (txr *txReadBuffer) Range(bucket Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
代码语言:javascript
复制
func (txr *txReadBuffer) ForEach(bucket Bucket, visitor func(k, v []byte) error) error {

kv里的key和val都是[]byte这里和bolt是一致的减少了编解码的烦恼

代码语言:javascript
复制
type kv struct {
  key []byte
  val []byte
}  
代码语言:javascript
复制
type bucketBuffer struct {
  buf []kv
  // used tracks number of elements in use so buf can be reused without reallocation.
  used int
}
代码语言:javascript
复制
func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {

server/storage/backend/verify.go会对环境变量进行一系列验证

代码语言:javascript
复制
func ValidateCalledInsideApply(lg *zap.Logger) {
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-09-14 00:00,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档