接者看下storage目录的外层几个文件server/storage/backend.go,它其实仅仅对文件server/storage/backend/backend.go里面的接口的一个包装,真正的接口定义和具体实现都在内层。同时又加入了hook。
func newBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
bcfg := backend.DefaultBackendConfig(cfg.Logger)
bcfg.Hooks = hooks
return backend.New(bcfg)
下面几个方法都是对它的一个包装:
func OpenSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks *BackendHooks) (backend.Backend, error) {
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
return OpenBackend(cfg, hooks), nil
func OpenBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
go func() {
beOpened <- newBackend(cfg, hooks)
}()
func RecoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks *BackendHooks) (backend.Backend, error) {
consistentIndex, _ = schema.ReadConsistentIndex(oldbe.ReadTx())
return OpenSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks)
server/storage/hooks.go定义了hook的具体实现有个属性cindex.ConsistentIndexer
type BackendHooks struct {
indexer cindex.ConsistentIndexer
lg *zap.Logger
// confState to Be written in the next submitted Backend transaction (if dirty)
confState raftpb.ConfState
// first write changes it to 'dirty'. false by default, so
// not initialized `confState` is meaningless.
confStateDirty bool
confStateLock sync.Mutex
}
func NewBackendHooks(lg *zap.Logger, indexer cindex.ConsistentIndexer) *BackendHooks {
return &BackendHooks{lg: lg, indexer: indexer}
}
hook主要包括提交前hook点和设置初始状态两个方法:
func (bh *BackendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
schema.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState)
func (bh *BackendHooks) SetConfState(confState *raftpb.ConfState) {
server/storage/metrics.go里面定义了etcd的一系列prometheus的matrics指标,比如
prometheus.MustRegister(quotaBackendBytes)
server/storage/quota.go定义了quota的接口和两个实现,一个是默认的passthroughQuota,另一个是backendQuota:
type Quota interface {
// Available judges whether the given request fits within the quota.
Available(req interface{}) bool
// Cost computes the charge against the quota for a given request.
Cost(req interface{}) int
// Remaining is the amount of charge left for the quota.
Remaining() int64
}
type passthroughQuota struct{}
func (*passthroughQuota) Available(interface{}) bool { return true }
BackendQuota里面包含了属性 backend.Backend
type BackendQuota struct {
be backend.Backend
maxBackendBytes int64
}
func NewBackendQuota(lg *zap.Logger, quotaBackendBytesCfg int64, be backend.Backend, name string) Quota {
return &BackendQuota{be, quotaBackendBytesCfg}
根据backend的一系列数据和消息,计算quota是否够用
func (b *BackendQuota) Available(v interface{}) bool {
return b.be.Size()+int64(cost) < b.maxBackendBytes
server/storage/storage.go里定义了Storage存储的核心接口:包括保存消息、保存快照、释放快照锁、同步wal日志等
type Storage interface {
// Save function saves ents and state to the underlying stable storage.
// Save MUST block until st and ents are on stable storage.
Save(st raftpb.HardState, ents []raftpb.Entry) error
// SaveSnap function saves snapshot to the underlying stable storage.
SaveSnap(snap raftpb.Snapshot) error
// Close closes the Storage and performs finalization.
Close() error
// Release releases the locked wal files older than the provided snapshot.
Release(snap raftpb.Snapshot) error
// Sync WAL
Sync() error
// MinimalEtcdVersion returns minimal etcd storage able to interpret WAL log.
MinimalEtcdVersion() *semver.Version
}
storage实现了这些接口它的核心属性主要有两部分wal和snapshhot
type storage struct {
lg *zap.Logger
s *snap.Snapshotter
// Mutex protected variables
mux sync.RWMutex
w *wal.WAL
}
func NewStorage(lg *zap.Logger, w *wal.WAL, s *snap.Snapshotter) Storage {
return &storage{lg: lg, w: w, s: s}
}
保存snapshot的同时也会保存wal
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
walsnap := walpb.Snapshot{
err := st.s.SaveSnap(snap)
return st.w.SaveSnapshot(walsnap)
释放锁的流程也一样
func (st *storage) Release(snap raftpb.Snapshot) error {
if err := st.w.ReleaseLockTo(snap.Metadata.Index); err != nil {
return st.s.ReleaseSnapDBs(snap)
但是Save和Sync的时候只涉及到wal日志
func (st *storage) Save(s raftpb.HardState, ents []raftpb.Entry) error {
return st.w.Save(s, ents)
server/storage/util.go里面封装了消息处理的一些方法,比如版本相关的等。
func CreateConfigChangeEnts(lg *zap.Logger, ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
func GetEffectiveNodeIDsFromWalEntries(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!