前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【prometheus】-02 一张图彻底搞懂Prometheus服务发现机制

【prometheus】-02 一张图彻底搞懂Prometheus服务发现机制

作者头像
Reactor2020
发布2023-03-22 19:10:50
7790
发布2023-03-22 19:10:50
举报
文章被收录于专栏:【云原生 • Prometheus】

概述

Prometheus是基于Pull模式抓取监控数据,首先要能够发现需要监控的目标对象target,特别Prometheus最开始设计是一个面向云原生应用程序的,云原生、容器场景下按需的资源使用方式对于监控系统而言就意味着没有了一个固定的监控目标,所有的监控对象(基础设施、应用、服务)都在动态的变化。而对于Prometheus而言其解决方案就是引入一个中间的代理人(服务注册中心),这个代理人掌握着当前所有监控目标的访问信息,Prometheus只需要向这个代理人询问有哪些监控目标控即可, 这种模式被称为服务发现(service discovery)。

如上图,SD模块专门负责去发现需要监控的target信息,Prometheus去从SD模块订阅该信息,有target信息时会推送给Prometheus,然后Prometheus拿到target信息后通过pull http协议去拉取监控指标数据。

Prometheus支持的服务发现协议是非常丰富的,目前已支持多达二十多种服务发现协议:

代码语言:javascript
复制
<azure_sd_config>
<consul_sd_config>
<digitalocean_sd_config>
<docker_sd_config>
<dockerswarm_sd_config>
<dns_sd_config>
<ec2_sd_config>
<openstack_sd_config>
<file_sd_config>
<gce_sd_config>
<hetzner_sd_config>
<http_sd_config>
<kubernetes_sd_config>
<kuma_sd_config>
<lightsail_sd_config>
<linode_sd_config>
<marathon_sd_config>
<nerve_sd_config>
<serverset_sd_config>
<triton_sd_config>
<eureka_sd_config>
<scaleway_sd_config>
<static_config>

服务发现原理图

上图描述Prometheus服务发现协议比较笼统,Prometheus服务发现实现原理大致如下图:

如上图所述,Prometheus服务发现机制大致涉及到三个部分:

1、配置处理模块解析的prometheus.yml配置中scrape_configs部分,将配置的job生成一个个Discoverer服务,不同的服务发现协议都会有各自的Discoverer实现方式,它们根据实现逻辑去发现target,并将其放入到targets容器中;

2、discoveryManager组件内部有个定时周期触发任务,每5秒检查targets容器,如果有变更则将targets容器中target信息放入到syncCh通道中;

3、scrape组件会监听syncCh通道,这样需要监控的targets信息就传递给scrape组件,然后reloadtarget纳入监控开始抓取监控指标。

配置处理部分会根据scrape_configs部分配置的不同协议类型生成不同Discoverer,然后根据它们内部不同的实现逻辑去发现targetdiscoveryManager组件则相当于一个搬运工,scrape组件则是一个使用者,这两个组件都无感知服务发现协议的差异。

下面分别来分析下配置处理、discoveryManager组件和scrape组件在服务发现方面的具体实现流程。

配置处理

上节分析Prometheus启动流程,有个配置加载组件通过reloadConfig加载解析prometheus配置文件后,在reloader中循环调用各个组件的ApplyConfig(cfg map[string]Configs)方法处理配置,这其中就包括discovery/manager.go:

reloader中定义如下:

代码语言:javascript
复制
{
 name: "scrape_sd",
 //从配置文件中提取Section:scrape_configs
 reloader: func(cfg *config.Config) error {
  c := make(map[string]discovery.Configs)
  for _, v := range cfg.ScrapeConfigs {
   c[v.JobName] = v.ServiceDiscoveryConfigs
  }
  return discoveryManagerScrape.ApplyConfig(c)
 },
}

那下面就从discovery/manager.go中定义的ApplyConfig()方法分析。

1、根据配置注册provider:

代码语言:javascript
复制
for name, scfg := range cfg {
    //根据配置注册provider
 failedCount += m.registerProviders(scfg, name)
 discoveredTargets.WithLabelValues(m.name, name).Set()
}

其中关键的是m.registerProviders(scfg, name),继续跟踪:

代码语言:javascript
复制
d, err := cfg.NewDiscoverer(DiscovererOptions{
 Logger: log.With(m.logger, "discovery", typ),
})

2、然后将所有注册到m.providers数组中的provider进行启动:

代码语言:javascript
复制
for _, prov := range m.providers {
 // 启动服务发现实例
 m.startProvider(m.ctx, prov)
}

跟踪到m.startProvider(m.ctx, prov)方法中:

代码语言:javascript
复制
updates := make(chan []*targetgroup.Group)
// 执行run  每个服务发现都有自己的run方法。
go p.d.Run(ctx, updates)
// 更新发现的服务
go m.updater(ctx, p, updates)

发现这里主要是启动两个协程,它们之间使用updates通道类型变量进行通信。

总结来说(见下图):

1、每个Config都会对应创建一个Discoverer实例,并被封装到provider存储在m.providers数组中;

2、然后遍历providers数组进行启动操作,启动操作启动了两个协程:

a、Discoverer.Run协程逻辑中主要根据发现协议发现targets

b、然后通过通道传递给discovery/Manager.updater协程中,将其存放到m.targets集合map中;

配置处理这里还有个比较关键的:Discoverer会根据不同协议实现发现target,它是如何实现的呢?

首先,我们来看下Discoverer实例创建:d, err := cfg.NewDiscoverer(),它是一个接口定义:

代码语言:javascript
复制
type Config interface {
 Name() string
 NewDiscoverer(DiscovererOptions) (Discoverer, error)
}

每种服务发现协议都在自己的SDConfig中实现了各自的NewDiscoverver()方法,这样就可以将服务发现逻辑封装到Discovererver实现中:

discoveryManager组件

上节《Prometheus启动流程》一节分析过会启动discoveryManagerScrape组件通过通道将targets数据信息传递给scrapeManager组件(见下图):

1、discoveryManagerScrape组件启动入口:

代码语言:javascript
复制
g.Add(
 func() error {
  err := discoveryManagerScrape.Run()
  level.Info(logger).Log("msg", "Scrape discovery manager stopped")
  return err
 },
 func(err error) {
  level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
  cancelScrape()
 },
)

2、一直跟踪会进入到sender()方法中,配置处理模块说过,有个协程会将Discoverer组件发现的targets信息存储到m.targets集合map中,然后给m.triggerSend发送信号,sender方法中就是启动定时周期触发器监听m.triggerSend信号:

代码语言:javascript
复制
func (m *Manager) sender() {
    // 周期性定时器定时触发任务,这里是5s触发一次
 ticker := time.NewTicker(m.updatert)
 defer ticker.Stop()

 for {
  select {
  case <-m.ctx.Done():
   return
  case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
   select {
   case <-m.triggerSend:
    sentUpdates.WithLabelValues(m.name).Inc()
    select {
    case m.syncCh <- m.allGroups():
    default:
     delayedUpdates.WithLabelValues(m.name).Inc()
     level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle")
     select {
     case m.triggerSend <- struct{}{}:
     default:
     }
    }
   default:
   }
  }
 }
}

监听到m.triggerSend信号,则执行m.syncCh <- m.allGroups(),我们来看下m.allGroups()干了什么?

代码语言:javascript
复制
func (m *Manager) allGroups() map[string][]*targetgroup.Group {
 m.mtx.RLock()
 defer m.mtx.RUnlock()

 tSets := map[string][]*targetgroup.Group{}
 for pkey, tsets := range m.targets {
  var n int
  for _, tg := range tsets {
   // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
   // to signal that it needs to stop all scrape loops for this target set.
   tSets[pkey.setName] = append(tSets[pkey.setName], tg)
   n += len(tg.Targets)
  }
  discoveredTargets.WithLabelValues(m.name, pkey.setName).Set(float64(n))
 }
 return tSets
}

其实就是将m.targets数据发送到m.syncCh通道上,所以,discoveryManager组件比较简单,就是一个搬运工。

scrape组件

scrapeManager组件启动:scrapeManager.Run(discoveryManagerScrape.SyncCh()),通道syncCh是被scrapeManager组件持有的,跟踪进入Run方法中:

代码语言:javascript
复制
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
 go m.reloader()
 for {
  select {
  //通过管道获取被监控的服务(targets)
  case ts := <-tsets:
   m.updateTsets(ts)

   select {
   // 关闭 Scrape Manager 处理信号
   //若从服务发现 (serviceDiscover)有服务(targets)变动,则给管道triggerReload传值,并触发reloader()方法更新服务
   case m.triggerReload <- struct{}{}:
   default:
   }

  case <-m.graceShut:
   return nil
  }
 }
}

通过case ts := <-tsets获取到syncCh通道上传递过来的targets数据,然后调用m.updateTsets(ts)targets数据存储到scrapeManager.targetSets中,然后给m.triggerReload发送信号。

这个方法中go m.reloader()启动了一个协程,进入reloader()方法中:

代码语言:javascript
复制
func (m *Manager) reloader() {
 //定时器5s
 ticker := time.NewTicker( * time.Second)
 defer ticker.Stop()

 for {
  select {
  case <-m.graceShut:
   return
   // 若服务发现(serviceDiscovery)有服务(targets)变动,就会向管道triggerReload写入值,定时器每5s判断一次triggerReload管道是否有值,若有值,则触发reload方法
  case <-ticker.C:
   select {
   case <-m.triggerReload:
    m.reload()
   case <-m.graceShut:
    return
   }
  }
 }
}

也是通过定时周期触发任务监听m.triggerReload信号,执行m.reload()targets加载进来。

总结

前面分析了服务发现运行机制,可以看下面图梳理下前面流程逻辑:

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-01-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Reactor2020 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • 服务发现原理图
  • 配置处理
  • discoveryManager组件
  • scrape组件
  • 总结
相关产品与服务
Prometheus 监控服务
Prometheus 监控服务(TencentCloud Managed Service for Prometheus,TMP)是基于开源 Prometheus 构建的高可用、全托管的服务,与腾讯云容器服务(TKE)高度集成,兼容开源生态丰富多样的应用组件,结合腾讯云可观测平台-告警管理和 Prometheus Alertmanager 能力,为您提供免搭建的高效运维能力,减少开发及运维成本。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档