Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊promtail的Client

聊聊promtail的Client

原创
作者头像
code4it
修改于 2021-01-21 03:19:42
修改于 2021-01-21 03:19:42
83200
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下promtail的Client

Client

loki/pkg/promtail/client/client.go

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Client pushes entries to Loki and can be stopped
type Client interface {
    api.EntryHandler
    // Stop goroutine sending batch of entries.
    Stop()
}

Client接口内嵌了api.EntryHandler接口,定义了Stop方法

EntryHandler

loki/pkg/promtail/api/types.go

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// EntryHandler is something that can "handle" entries.
type EntryHandler interface {
    Handle(labels model.LabelSet, time time.Time, entry string) error
}

EntryHandler接口定义了Handle方法

client

loki/pkg/promtail/client/client.go

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Client for pushing logs in snappy-compressed protos over HTTP.
type client struct {
    logger  log.Logger
    cfg     Config
    client  *http.Client
    quit    chan struct{}
    once    sync.Once
    entries chan entry
    wg      sync.WaitGroup

    externalLabels model.LabelSet
}

// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {
    if len(c.externalLabels) > 0 {
        ls = c.externalLabels.Merge(ls)
    }

    // Get the tenant  ID in case it has been overridden while processing
    // the pipeline stages, then remove the special label
    tenantID := c.getTenantID(ls)
    if _, ok := ls[ReservedLabelTenantID]; ok {
        // Clone the label set to not manipulate the input one
        ls = ls.Clone()
        delete(ls, ReservedLabelTenantID)
    }

    c.entries <- entry{tenantID, ls, logproto.Entry{
        Timestamp: t,
        Line:      s,
    }}
    return nil
}

// Stop the client.
func (c *client) Stop() {
    c.once.Do(func() { close(c.quit) })
    c.wg.Wait()
}

client定义了logger、cfg、client、quit、once、entries、wg、externalLabels属性;它实现了Client接口的Handle、Stop方法;Handle方法判断LabelSet是否包含ReservedLabelTenantID,如果包含则会执行ls.Clone()及然后移除,之后构造entry发送到c.entries这个channel;Stop方法执行close(c.quit)

run

loki/pkg/promtail/client/client.go

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *client) run() {
    batches := map[string]*batch{}

    // Given the client handles multiple batches (1 per tenant) and each batch
    // can be created at a different point in time, we look for batches whose
    // max wait time has been reached every 10 times per BatchWait, so that the
    // maximum delay we have sending batches is 10% of the max waiting time.
    // We apply a cap of 10ms to the ticker, to avoid too frequent checks in
    // case the BatchWait is very low.
    minWaitCheckFrequency := 10 * time.Millisecond
    maxWaitCheckFrequency := c.cfg.BatchWait / 10
    if maxWaitCheckFrequency < minWaitCheckFrequency {
        maxWaitCheckFrequency = minWaitCheckFrequency
    }

    maxWaitCheck := time.NewTicker(maxWaitCheckFrequency)

    defer func() {
        // Send all pending batches
        for tenantID, batch := range batches {
            c.sendBatch(tenantID, batch)
        }

        c.wg.Done()
    }()

    for {
        select {
        case <-c.quit:
            return

        case e := <-c.entries:
            batch, ok := batches[e.tenantID]

            // If the batch doesn't exist yet, we create a new one with the entry
            if !ok {
                batches[e.tenantID] = newBatch(e)
                break
            }

            // If adding the entry to the batch will increase the size over the max
            // size allowed, we do send the current batch and then create a new one
            if batch.sizeBytesAfter(e) > c.cfg.BatchSize {
                c.sendBatch(e.tenantID, batch)

                batches[e.tenantID] = newBatch(e)
                break
            }

            // The max size of the batch isn't reached, so we can add the entry
            batch.add(e)

        case <-maxWaitCheck.C:
            // Send all batches whose max wait time has been reached
            for tenantID, batch := range batches {
                if batch.age() < c.cfg.BatchWait {
                    continue
                }

                c.sendBatch(tenantID, batch)
                delete(batches, tenantID)
            }
        }
    }
}

client的run方法创建time.NewTicker(maxWaitCheckFrequency),然后for循环,如果是c.entries读取到了数据就执行batch.add(e),如果是maxWaitCheck触发了则遍历batches,执行c.sendBatch(tenantID, batch)及delete;最后quit的时候,还有defer方法遍历batches执行c.sendBatch(tenantID, batch)

sendBatch

loki/pkg/promtail/client/client.go

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *client) sendBatch(tenantID string, batch *batch) {
    buf, entriesCount, err := batch.encode()
    if err != nil {
        level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
        return
    }
    bufBytes := float64(len(buf))
    encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)

    ctx := context.Background()
    backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig)
    var status int
    for backoff.Ongoing() {
        start := time.Now()
        status, err = c.send(ctx, tenantID, buf)
        requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())

        if err == nil {
            sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
            sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
            for _, s := range batch.streams {
                lbls, err := parser.ParseMetric(s.Labels)
                if err != nil {
                    // is this possible?
                    level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err)
                    return
                }
                var lblSet model.LabelSet
                for i := range lbls {
                    if lbls[i].Name == LatencyLabel {
                        lblSet = model.LabelSet{
                            model.LabelName(HostLabel):    model.LabelValue(c.cfg.URL.Host),
                            model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value),
                        }
                    }
                }
                if lblSet != nil {
                    streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds())
                }
            }
            return
        }

        // Only retry 429s, 500s and connection-level errors.
        if status > 0 && status != 429 && status/100 != 5 {
            break
        }

        level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
        batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
        backoff.Wait()
    }

    if err != nil {
        level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
        droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
        droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
    }
}

sendBatch方法先通过batch.encode()编码为buf,然后通过c.send(ctx, tenantID, buf)进行发送

send

loki/pkg/promtail/client/client.go

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (c *client) send(ctx context.Context, tenantID string, buf []byte) (int, error) {
    ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
    defer cancel()
    req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))
    if err != nil {
        return -1, err
    }
    req = req.WithContext(ctx)
    req.Header.Set("Content-Type", contentType)
    req.Header.Set("User-Agent", UserAgent)

    // If the tenant ID is not empty promtail is running in multi-tenant mode, so
    // we should send it to Loki
    if tenantID != "" {
        req.Header.Set("X-Scope-OrgID", tenantID)
    }

    resp, err := c.client.Do(req)
    if err != nil {
        return -1, err
    }
    defer helpers.LogError("closing response body", resp.Body.Close)

    if resp.StatusCode/100 != 2 {
        scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen))
        line := ""
        if scanner.Scan() {
            line = scanner.Text()
        }
        err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line)
    }
    return resp.StatusCode, err
}

send方法执行一个POST的http请求发送到c.cfg.URL.String()

小结

promtail的client定义了logger、cfg、client、quit、once、entries、wg、externalLabels属性;它实现了Client接口的Handle、Stop方法;Handle方法构造entry发送到c.entries这个channel;Stop方法执行close(c.quit);然后它还有一个run方法将entry添加到batch,然后将batch通过http的POST请求发送到指定的地址。

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Grafana Promtail 配置解析
-print-config-stderr 通过 ./promtail 直接运行Promtail时能够快速输出配置 -log-config-reverse-order 配置通过反向输出,这样再Grafana中就能从上到下正确读取
阿提说说
2024/01/11
1.3K0
Promtail 配置文件说明
Promtail 是负责收集日志发送给 loki 的代理程序,Promtail 默认通过一个 config.yaml 文件进行配置,其中包含 Promtail 服务端信息、存储位置以及如何从文件中抓取日志等配置。
我是阳明
2021/05/17
21.7K3
Promtail 配置文件说明
聊聊promtail的positions
promtail的Positions接口定义了GetString、Get、PutString、Put、Remove、SyncPeriod、Stop方法;positions实现了Positions接口;其Get方法从p.positions读取数据;其Put方法写数据到p.positions中;其SyncPeriod方法返回的是p.cfg.SyncPeriod;其Remove方法将path从p.positions中删除。
code4it
2021/01/21
7730
聊聊promtail的positions
日志收集系统loki+promtail+Grafana 部署
一、简 介 Loki是受Prometheus启发由Grafana Labs团队开源的水平可扩展,高度可用的多租户日志聚合系统。 开发语言: Google Go。它的设计具有很高的成本效益,并且易于操作。使用标签来作为索引,而不是对全文进行检索,也就是说,你通过这些标签既可以查询日志的内容也可以查询到监控的数据签,极大地降低了日志索引的存储。系统架构十分简单,由以下3个部分组成 :
熬夜的花斑狗
2022/01/11
6.2K0
聊聊dubbo-go-proxy的Client
dubbo-go-proxy的client.Client接口定义了Init、Close、Call、MapParams方法;其dubbo.Client实现了client.Client接口;其主要是通过mapper进行参数转换,然后通过GenericService.Invoke进行请求。
code4it
2021/02/08
3040
聊聊cortex的kv.Client
github.com/cortexproject/cortex/pkg/ring/kv/client.go
code4it
2021/01/27
5190
聊聊cortex的kv.Client
使用loki+promtail+alertmanager+prometheusAlert实现自定义日志采集查看和监控告警
后面介绍部署方式都是二进制部署,这些应用都可以使用容器进行部署,思路都是一样的,本文就不再介绍了
没有故事的陈师傅
2025/01/07
4490
使用loki+promtail+alertmanager+prometheusAlert实现自定义日志采集查看和监控告警
聊聊loki的Query
loki的Query接口定义了Exec方法,返回Result;Result定义了Data、Statistics属性;query实现了Query接口,其Exec方法执行q.Eval(ctx)及stats.Snapshot。
code4it
2021/01/28
4800
聊聊loki的Query
聊聊cortex的Distributor
cortex的Distributor提供了Push、Query方法;Push方法会通过d.ingestersRing.ShuffleShard确定subRing;之后通过ring.DoBatch提交keys;Query方法通过d.GetIngestersForQuery获取replicationSet,再通过d.queryIngesters获取matrix。
code4it
2021/01/25
3490
聊聊cortex的Distributor
深入剖析Alertmanager:解锁告警管理的核心逻辑
在当今复杂的IT系统架构中,监控体系对于保障系统的稳定运行至关重要。而Alertmanager作为监控体系里关键的一环,在处理告警信息、确保相关人员及时响应等方面发挥着无可替代的作用。它就像是一个信息枢纽,接收来自各个监控源的告警信息,经过一系列智能处理后,精准地将关键信息传递给相关人员。
没有故事的陈师傅
2025/01/22
2330
深入剖析Alertmanager:解锁告警管理的核心逻辑
技术分享 | AlertManager 源码解析
AlertManager 是处理对应用程序的告警的,比如Promethus的服务端。对于输入的告警,会经过分组、抑制、静默、去重等步骤,最终并将告警发送到接受者(邮箱等)。
爱可生开源社区
2022/11/16
1.1K0
Loki被限流了,Limits_Config到底限了个啥?
Loki中拥有这众多的limit策略,有的已经开放到配置文件中,还有的配置代码中已经实现但还没开放出来。大部分情况下开发者给了出一些默认参数足够优秀,不过有的时候我们也不免需要微调。那么小白这次先简单捡几个比较重要的策略来说明下Limits_Config中到底限制了什么。
云原生小白
2021/05/13
3.1K0
Loki被限流了,Limits_Config到底限了个啥?
在 EKS 中实现基于 Promtail + Loki + Grafana 容器日志解决方案
如果今天谈论到要部署一套日志系统,相信用户首先会想到的就是经典的ELK架构,或者现在被称为Elastic Stack。Elastic Stack架构为Elasticsearch + Logstash + Kibana + Beats的组合,其中,Beats负责日志的采集, Logstash负责做日志的聚合和处理,Elasticsearch作为日志的存储和搜索系统,Kibana作为可视化前端展示,整体架构如下图所示:
我是阳明
2021/06/25
2.9K0
在 EKS 中实现基于 Promtail + Loki + Grafana 容器日志解决方案
实践一把Loki,体验掌上起舞的轻盈
对此不太熟悉的同学,可以先看这篇文章。可以看到,他是grafana家族的,界面支持上自然有保证。有了它,就不用在grafana和kibana之间来回切换了。
xjjdog
2020/06/23
1.2K0
grafana loki轻量级日志收集系统
Loki的第一个稳定版本于2019年11月19日发布 是 Grafana Labs 团队最新的开源项目 是一个水平可扩展,高可用性,多租户的日志聚合系统 Loki 特性
章工运维
2023/05/19
1.2K0
聊聊kingbus的command.go
kingbus的command.go提供了Close、handleQuery、writeOK、handleBinlogDumpGtid、handleRegisterSlave等方法
code4it
2020/06/20
3090
聊聊kingbus的command.go
研究Fabric中Etcd的Raft应用
Fabric的共识服务设计成了可插拔的模块,以此满足了根据不同应用场景切换不同共识选项的需求。在Hyperledger Fabric最新版本中,Fabric系统的共识模块中实现了三种共识算法,其中包括Solo,Kafka以及Raft算法。官方推荐的是使用Raft共识算法,但是为了更好地理解Fabric中的共识模块,我们也简单介绍一下Solo和Kafka这两种共识算法。
KunkkaWu
2022/08/03
9901
日志收集系统loki+promtail+Grafana 部署
一、简 介 Loki是受Prometheus启发由Grafana Labs团队开源的水平可扩展,高度可用的多租户日志聚合系统。 开发语言: Google Go。它的设计具有很高的成本效益,并且易于操作。使用标签来作为索引,而不是对全文进行检索,也就是说,你通过这些标签既可以查询日志的内容也可以查询到监控的数据签,极大地降低了日志索引的存储。系统架构十分简单,由以下3个部分组成 :
用户8143208
2022/08/19
2.8K0
Loki 源码分析之日志写入
前面我们介绍了 Loki 的一些基本使用配置,但是对 Loki 还是了解不够深入,官方文档写得较为凌乱,而且没有跟上新版本,为了能够对 Loki 有一个更深入的认识,做到有的放矢,这里面我们尝试对 Loki 的源码进行一些简单的分析,由于有很多模块和实现细节,这里我们主要是对核心功能进行分析,希望对大家有所帮助。本文首先对日志的写入过程进行简单分析。
我是阳明
2021/06/25
1.3K0
Loki 源码分析之日志写入
Promtail Pipeline 日志处理配置
Promtail 是 Loki 官方支持的日志采集端,在需要采集日志的节点上运行采集代理,再统一发送到 Loki 进行处理。除了使用 Promtail,社区还有很多采集日志的组件,比如 fluentd、fluent bit 等,都是比较优秀的。
我是阳明
2021/05/17
13.2K0
Promtail Pipeline 日志处理配置
相关推荐
Grafana Promtail 配置解析
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验