分析完raft算法,回来看etcd server的代码就比较清晰了,我们从入口文件server/main.go开始
func main() {
etcdmain.Main(os.Args)
}
server/etcdmain/main.go启动server的同时会启动一个proxy
switch cmd {
case "gateway", "grpc-proxy":
if err := rootCmd.Execute(); err != nil {
startEtcdOrProxyV2(args)
我们首先看下proxy的实现,它是基于cobra生成的一个简单命令server/etcdmain/gateway.go
rootCmd = &cobra.Command{
Use: "etcd",
Short: "etcd server",
SuggestFor: []string{"etcd"},
}
入口函数:
func startGateway(cmd *cobra.Command, args []string) {
srvs := discoverEndpoints(lg, gatewayDNSCluster, gatewayCA, gatewayInsecureDiscovery, gatewayDNSClusterServiceName)
for _, ep := range srvs.Endpoints {
h, p, serr := net.SplitHostPort(ep)
if serr != nil {
fmt.Printf("error parsing endpoint %q", ep)
os.Exit(1)
}
var port uint16
fmt.Sscanf(p, "%d", &port)
srvs.SRVs = append(srvs.SRVs, &net.SRV{Target: h, Port: port})
tp := tcpproxy.TCPProxy{
Logger: lg,
Listener: l,
Endpoints: srvs.SRVs,
MonitorInterval: gatewayRetryDelay,
}
tp.Run()
for {
in, err := tp.Listener.Accept()
go tp.serve(in)
proxy的实现是一个标准的tcp代理,源码位于server/proxy/tcpproxy/userspace.go
func (tp *TCPProxy) serve(in net.Conn) {
for {
out, err = net.Dial("tcp", remote.addr)
go func() {
io.Copy(in, out)
in.Close()
out.Close()
}()
io.Copy(out, in)
out.Close()
in.Close()
server/etcdmain/etcd.go中会进行etcd的初始化:
func startEtcdOrProxyV2(args []string) {
switch which {
case dirMember:
stopped, errc, err = startEtcd(&cfg.ec)
case dirProxy:
stopped, errc, err = startEtcd(&cfg.ec)
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
e, err := embed.StartEtcd(cfg)
具体的核心实现位于server/embed/etcd.go的StartEtcd
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
if e.Peers, err = configurePeerListeners(cfg); err != nil {
srvcfg := config.ServerConfig{
Name: cfg.Name,
ClientURLs: cfg.AdvertiseClientUrls,
PeerURLs: cfg.AdvertisePeerUrls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapshotCount: cfg.SnapshotCount,
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.Durl,
DiscoveryProxy: cfg.Dproxy,
DiscoveryCfg: cfg.DiscoveryCfg,
NewCluster: cfg.IsNewCluster(),
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
WaitClusterReadyTimeout: cfg.ExperimentalWaitClusterReadyTimeout,
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
QuotaBackendBytes: cfg.QuotaBackendBytes,
BackendBatchLimit: cfg.BackendBatchLimit,
BackendFreelistType: backendFreelistType,
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
SocketOpts: cfg.SocketOpts,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
BcryptCost: cfg.BcryptCost,
TokenTTL: cfg.AuthTokenTTL,
CORS: cfg.CORS,
HostWhitelist: cfg.HostWhitelist,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
CompactHashCheckEnabled: cfg.ExperimentalCompactHashCheckEnabled,
CompactHashCheckTime: cfg.ExperimentalCompactHashCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
ForceNewCluster: cfg.ForceNewCluster,
EnableGRPCGateway: cfg.EnableGRPCGateway,
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
WarningUnaryRequestDuration: cfg.WarningUnaryRequestDuration,
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
ExperimentalMaxLearners: cfg.ExperimentalMaxLearners,
V2Deprecation: cfg.V2DeprecationEffective(),
}
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
e.Server.Start()
if err = e.servePeers(); err != nil {
return e, err
}
if err = e.serveClients(); err != nil {
return e, err
}
if err = e.serveMetrics(); err != nil {
return e, err
}
最终初始化server对象的代码位于server/etcdserver/server.go
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!