Prometheus是基于Pull模式抓取监控数据,首先要能够发现需要监控的目标对象target,特别Prometheus
最开始设计是一个面向云原生应用程序的,云原生、容器场景下按需的资源使用方式对于监控系统而言就意味着没有了一个固定的监控目标,所有的监控对象(基础设施、应用、服务)都在动态的变化。而对于Prometheus而言其解决方案就是引入一个中间的代理人(服务注册中心),这个代理人掌握着当前所有监控目标的访问信息,Prometheus只需要向这个代理人询问有哪些监控目标控即可, 这种模式被称为服务发现(service discovery)。
如上图,SD模块专门负责去发现需要监控的target信息,Prometheus去从SD模块订阅该信息,有target信息时会推送给Prometheus,然后Prometheus拿到target信息后通过pull http协议去拉取监控指标数据。
Prometheus支持的服务发现协议是非常丰富的,目前已支持多达二十多种服务发现协议:
<azure_sd_config>
<consul_sd_config>
<digitalocean_sd_config>
<docker_sd_config>
<dockerswarm_sd_config>
<dns_sd_config>
<ec2_sd_config>
<openstack_sd_config>
<file_sd_config>
<gce_sd_config>
<hetzner_sd_config>
<http_sd_config>
<kubernetes_sd_config>
<kuma_sd_config>
<lightsail_sd_config>
<linode_sd_config>
<marathon_sd_config>
<nerve_sd_config>
<serverset_sd_config>
<triton_sd_config>
<eureka_sd_config>
<scaleway_sd_config>
<static_config>
上图描述Prometheus服务发现协议比较笼统,Prometheus服务发现实现原理大致如下图:
如上图所述,Prometheus服务发现机制大致涉及到三个部分:
1、配置处理模块解析的prometheus.yml
配置中scrape_configs
部分,将配置的job
生成一个个Discoverer
服务,不同的服务发现协议都会有各自的Discoverer
实现方式,它们根据实现逻辑去发现target
,并将其放入到targets
容器中;
2、discoveryManager
组件内部有个定时周期触发任务,每5秒检查targets
容器,如果有变更则将targets
容器中target
信息放入到syncCh
通道中;
3、scrape
组件会监听syncCh
通道,这样需要监控的targets
信息就传递给scrape
组件,然后reload
将target
纳入监控开始抓取监控指标。
配置处理部分会根据scrape_configs
部分配置的不同协议类型生成不同Discoverer
,然后根据它们内部不同的实现逻辑去发现target
,discoveryManager
组件则相当于一个搬运工,scrape
组件则是一个使用者,这两个组件都无感知服务发现协议的差异。
下面分别来分析下配置处理、discoveryManager
组件和scrape
组件在服务发现方面的具体实现流程。
上节分析Prometheus
启动流程,有个配置加载
组件通过reloadConfig
加载解析prometheus
配置文件后,在reloader
中循环调用各个组件的ApplyConfig(cfg map[string]Configs)
方法处理配置,这其中就包括discovery/manager.go
:
reloader
中定义如下:
{
name: "scrape_sd",
//从配置文件中提取Section:scrape_configs
reloader: func(cfg *config.Config) error {
c := make(map[string]discovery.Configs)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfigs
}
return discoveryManagerScrape.ApplyConfig(c)
},
}
那下面就从discovery/manager.go
中定义的ApplyConfig()
方法分析。
1、根据配置注册provider:
for name, scfg := range cfg {
//根据配置注册provider
failedCount += m.registerProviders(scfg, name)
discoveredTargets.WithLabelValues(m.name, name).Set()
}
其中关键的是m.registerProviders(scfg, name)
,继续跟踪:
d, err := cfg.NewDiscoverer(DiscovererOptions{
Logger: log.With(m.logger, "discovery", typ),
})
2、然后将所有注册到m.providers
数组中的provider
进行启动:
for _, prov := range m.providers {
// 启动服务发现实例
m.startProvider(m.ctx, prov)
}
跟踪到m.startProvider(m.ctx, prov)
方法中:
updates := make(chan []*targetgroup.Group)
// 执行run 每个服务发现都有自己的run方法。
go p.d.Run(ctx, updates)
// 更新发现的服务
go m.updater(ctx, p, updates)
发现这里主要是启动两个协程,它们之间使用updates通道类型变量进行通信。
总结来说(见下图):
1、每个Config
都会对应创建一个Discoverer
实例,并被封装到provider
存储在m.providers
数组中;
2、然后遍历providers
数组进行启动操作,启动操作启动了两个协程:
a、Discoverer.Run
协程逻辑中主要根据发现协议发现targets
;
b、然后通过通道传递给discovery/Manager.updater
协程中,将其存放到m.targets
集合map中;
配置处理这里还有个比较关键的:Discoverer
会根据不同协议实现发现target
,它是如何实现的呢?
首先,我们来看下Discoverer
实例创建:d, err := cfg.NewDiscoverer()
,它是一个接口定义:
type Config interface {
Name() string
NewDiscoverer(DiscovererOptions) (Discoverer, error)
}
每种服务发现协议都在自己的SDConfig
中实现了各自的NewDiscoverver()
方法,这样就可以将服务发现逻辑封装到Discovererver
实现中:
上节《Prometheus启动流程》一节分析过会启动discoveryManagerScrape
组件通过通道将targets
数据信息传递给scrapeManager
组件(见下图):
1、discoveryManagerScrape
组件启动入口:
g.Add(
func() error {
err := discoveryManagerScrape.Run()
level.Info(logger).Log("msg", "Scrape discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
cancelScrape()
},
)
2、一直跟踪会进入到sender()
方法中,配置处理模块说过,有个协程会将Discoverer
组件发现的targets
信息存储到m.targets
集合map
中,然后给m.triggerSend
发送信号,sender
方法中就是启动定时周期触发器监听m.triggerSend
信号:
func (m *Manager) sender() {
// 周期性定时器定时触发任务,这里是5s触发一次
ticker := time.NewTicker(m.updatert)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
select {
case <-m.triggerSend:
sentUpdates.WithLabelValues(m.name).Inc()
select {
case m.syncCh <- m.allGroups():
default:
delayedUpdates.WithLabelValues(m.name).Inc()
level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle")
select {
case m.triggerSend <- struct{}{}:
default:
}
}
default:
}
}
}
}
监听到m.triggerSend
信号,则执行m.syncCh <- m.allGroups()
,我们来看下m.allGroups()
干了什么?
func (m *Manager) allGroups() map[string][]*targetgroup.Group {
m.mtx.RLock()
defer m.mtx.RUnlock()
tSets := map[string][]*targetgroup.Group{}
for pkey, tsets := range m.targets {
var n int
for _, tg := range tsets {
// Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
// to signal that it needs to stop all scrape loops for this target set.
tSets[pkey.setName] = append(tSets[pkey.setName], tg)
n += len(tg.Targets)
}
discoveredTargets.WithLabelValues(m.name, pkey.setName).Set(float64(n))
}
return tSets
}
其实就是将m.targets
数据发送到m.syncCh
通道上,所以,discoveryManager
组件比较简单,就是一个搬运工。
scrapeManager
组件启动:scrapeManager.Run(discoveryManagerScrape.SyncCh())
,通道syncCh是被scrapeManager组件持有的,跟踪进入Run方法中:
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
go m.reloader()
for {
select {
//通过管道获取被监控的服务(targets)
case ts := <-tsets:
m.updateTsets(ts)
select {
// 关闭 Scrape Manager 处理信号
//若从服务发现 (serviceDiscover)有服务(targets)变动,则给管道triggerReload传值,并触发reloader()方法更新服务
case m.triggerReload <- struct{}{}:
default:
}
case <-m.graceShut:
return nil
}
}
}
通过case ts := <-tsets
获取到syncCh通道上传递过来的targets数据,然后调用m.updateTsets(ts)
将targets
数据存储到scrapeManager.targetSets
中,然后给m.triggerReload
发送信号。
这个方法中go m.reloader()
启动了一个协程,进入reloader()
方法中:
func (m *Manager) reloader() {
//定时器5s
ticker := time.NewTicker( * time.Second)
defer ticker.Stop()
for {
select {
case <-m.graceShut:
return
// 若服务发现(serviceDiscovery)有服务(targets)变动,就会向管道triggerReload写入值,定时器每5s判断一次triggerReload管道是否有值,若有值,则触发reload方法
case <-ticker.C:
select {
case <-m.triggerReload:
m.reload()
case <-m.graceShut:
return
}
}
}
}
也是通过定时周期触发任务监听m.triggerReload
信号,执行m.reload()
将targets
加载进来。
前面分析了服务发现运行机制,可以看下面图梳理下前面流程逻辑: