首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >如何使用摄入管道和自定义日志基准测试Elasticsearch性能

如何使用摄入管道和自定义日志基准测试Elasticsearch性能

原创
作者头像
点火三周
修改2025-05-18 08:27:16
修改2025-05-18 08:27:16
1300
举报
文章被收录于专栏:Elastic Stack专栏Elastic Stack专栏

在设置Elasticsearch集群时,最常见的用例之一是摄入和搜索日志。本篇博客旨在为您提供一个基准测试环境,以便了解集群在处理工作负载时的表现。通过这个环境,您可以测试不同的设置,比如更改映射、删除字段、调整摄入管道,甚至测试数据集的极限以识别每秒可处理的文档数量。此外,您还可以考虑在此基础上运行警报,并了解其对整体集群的影响。

每个工作负载和日志消息都各不相同。有人可能收集防火墙日志,其允许与拒绝规则的比例不同,VPN连接日志几乎没有;而另一些人可能有大量的VPN连接日志。总体而言,每个日志源都是独特的。

Elastic不建议在生产集群上运行Rally,因为Rally轨道涉及破坏性操作,可能导致数据丢失。此外,对接收其他负载的集群进行基准测试也没有意义,因为Rally指标无法正确解读。

使用自定义日志轨道

今天我们重点介绍自定义日志轨道。如果您不想使用我们提供的预制解决方案,如安全或日志轨道,那么这就很适合您。

我们需要完成以下任务,并将一步步引导您完成:

  • 重新索引所需字段的数据子集
  • 从索引/数据流中提取数据
  • 将其存储到磁盘上
  • 根据需要提取元数据信息(例如摄入管道)

在涉及到修改数据的摄入管道时,确保拥有包含所有原始数据的字段是必要的。最常用的是event.original字段。

开始吧!在这个例子中,我们将使用Kibana的Web日志示例数据。本篇博客适用于至少使用2.12版本的Rally(之前的版本文件结构不同)。

数据设置

在我们的例子中,数据格式如下:

代码语言:json
复制
"_source": {
    "agent": "Mozilla/5.0 (X11; Linux x86_64; rv:6.0a1) Gecko/20110421 Firefox/6.0a1",
    "bytes": 5166,
    "clientip": "33.16.170.252",
    "extension": "zip",
    "geo": {
      "srcdest": "US:PH",
      "src": "US",
      "dest": "PH",
      "coordinates": {
        "lat": 33.6324825,
        "lon": -83.84955806
      }
    },
    "host": "artifacts.elastic.co",
    "index": "kibana_sample_data_logs",
    "ip": "33.16.170.252",
    "machine": {
      "ram": 2147483648,
      "os": "win xp"
    },
    "memory": null,
    "message": "33.16.170.252 - - [2018-08-03T09:27:38.140Z] \"GET /kibana/kibana-6.3.2-windows-x86_64.zip HTTP/1.1\" 200 5166 \"-\" \"Mozilla/5.0 (X11; Linux x86_64; rv:6.0a1) Gecko/20110421 Firefox/6.0a1\"",
    "phpmemory": null,
    "referer": "http://nytimes.com/success/sunita-suni-williams",
    "request": "/kibana/kibana-6.3.2-windows-x86_64.zip",
    "response": 200,
    "tags": [
      "success",
      "security"
    ],
    "@timestamp": "2024-12-27T09:27:38.140Z",
    "url": "https://artifacts.elastic.co/downloads/kibana/kibana-6.3.2-windows-x86_64.zip",
    "utc_time": "2024-12-27T09:27:38.140Z",
    "event": {
      "dataset": "sample_web_logs"
    },
    "bytes_gauge": 5166,
    "bytes_counter": 17071806
  }

这意味着我们只需在Rally轨道中保留message字段。我们将创建一个带有remove处理器的摄入管道来执行保留操作。由于我们希望复制数据,因此会删除设置的\_id。原始数据集中只有大约14,000个文档,为了基准测试摄入管道和各种处理器的影响,我们需要足够的数据。然而,复制数据意味着无法准确得出磁盘使用情况,因为信息相似导致的压缩率可能很高。

代码语言:json
复制
PUT _ingest/pipeline/rally-drop-fields
{
  "processors": [
    {
        "remove": {
          "field": "_id"
        }
    },
    {
      "remove": {
        "keep": [
          "message", "@timestamp"
        ]
      }
    }
  ]
}

这样我们就只保留了message@timestamp字段,因为它们包含了最多的信息。

下一步是创建一个定义custom-track为数据流的索引模板。这将确保模板和正确的映射被应用。我们将利用内置的ecs@mappings模板来确保映射所有ECS字段。如果您使用的不是ECS,建议具体映射出字段及其映射方式。Rally将复制所有这些并成为轨道的一部分。

代码语言:json
复制
PUT _index_template/custom-track
{
  "data_stream": {
    "allow_custom_routing": false,
    "hidden": false
  },
  "index_patterns": [
    "custom-track"
  ],
  "composed_of": [
    "ecs@mappings"
  ]
}

接下来的步骤是重新索引数据,我们将多次执行此命令。

代码语言:json
复制
POST _reindex
{
    "source": {
        "index": "kibana_sample_data_logs*"
    },
    "dest": {
        "index": "custom-track",
        "pipeline": "rally-drop-fields",
        "op_type": "create"
    }
}

现在我们可以简单地执行以下命令:

代码语言:json
复制
GET custom-track/_count

这将返回该索引中的文档数量以及分片数量。我们大约有1300万份文档。

代码语言:json
复制
{
  "count": 13004376,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  }
}

这些数据足以进行摄入管道影响的有效测试。我们需要注意的是,原始文档被多次复制,这意味着查看分片大小和磁盘使用时,由于数据的相似性,压缩效果可能优于更为多样的数据。因此,分片大小和磁盘使用可能不代表您的真实数据。

Rally

现在我们进入安装并运行Rally的主机。在Rally主机上,我们可以运行create-track命令,它将完成我们需要的一切。阅读更多内容,请查看自定义工作负载定义:Tracks - Rally文档

代码语言:bash
复制
esrally create-track --data-streams "custom-track" --track "webserver" --target-hosts=https://es:port --client-options="verify_certs:false,basic_auth_user:'username',basic_auth_password:'password'"

我们创建了一个名为webserver的轨道,并从custom-track数据流中加载数据。创建的轨道包含一个单一挑战,以下是控制台输出。

代码语言:plaintext
复制
    ____        ____
   / __ \____ _/ / /_  __
  / /_/ / __ `/ / / / / /
 / _, _/ /_/ / / / /_/ /
/_/ |_|\__,_/_/_/\__, /
                /____/

[INFO] Connected to Elasticsearch cluster version [8.17.0] flavor [default] 

Extracting documents for index [.ds-custom-track-2024....     1000/1000 docs [100.0% done]
Extracting documents for index [.ds-custom-tra...     13004376/13004376 docs [100.0% done]
[INFO] Track webserver has been created. Run it with: esrally --track-path=/home/philippkahr/tracks/webserver

----------------------------------
[INFO] SUCCESS (took 1146 seconds)
----------------------------------

我们现在有了一个轨道!这很棒!默认情况下,Rally将在当前用户的主目录中创建一个名为tracks的文件夹,子文件夹名为webserver,因为我们就是这么命名的。

challenges文件夹中有一个default.json文件。文件夹中还有operationschallenges两个文件夹。对于这篇博客,我们忽略operations文件夹。这个文件包含了挑战描述,默认看起来如下所示。

我们需要在track.json中调整几个地方。在indices对象中,我们想将名字改为custom-track-rally,在corpora对象中的target-index中也设置为custom-track-rally。我们现在使用的是普通索引,而不是任何数据流;否则,我们需要使用数据流配置。

这是track.json文件。

代码语言:json
复制
{% import "rally.helpers" as rally with context %}
{
  "version": 2,
  "description": "Tracker-generated track for webserver",
  "indices": [
    {
      "name": "custom-track-rally",
      "body": ".ds-custom-track-2024.12.23-000001.json"
    }
  ],
  "corpora": [
    {
      "name": "custom-track-rally",
      "documents": [
        {
          "target-index": "custom-track-rally",
          "source-file": ".ds-custom-track-2024.12.23-000001-documents.json.bz2",
          "document-count": 13004376,
          "compressed-bytes": 213486562,
          "uncompressed-bytes": 3204126156
        }
      ]
    }
  ],
  
  "operations": [
    {{ rally.collect(parts="operations/*.json") }}
  ],
  "challenges": [
    {{ rally.collect(parts="challenges/*.json") }}
  ]
   
}

这是default.json文件,不需要进行任何更改。

代码语言:json
复制
{
    "name": "my-challenge",
    "description": "My new challenge",
    "default": true,
    "schedule": [
    {
      "operation": "delete-index"
    },
    {
      "operation": {
        "operation-type": "create-index",
        "settings": {{index_settings | default({}) | tojson}}
      }
    },
    {
      "operation": {
        "operation-type": "cluster-health",
        "index": "custom-track-rally",
        "request-params": {
          "wait_for_status": "{{cluster_health | default('green')}}",
          "wait_for_no_relocating_shards": "true"
        },
        "retry-until-success": true
      }
    },
    {
      "operation": {
        "operation-type": "bulk",
        "bulk-size": {{bulk_size | default(5000)}},
        "ingest-percentage": {{ingest_percentage | default(100)}}
      },
      "clients": {{bulk_indexing_clients | default(8)}}
    }
  ]
}

如我们所见,第一步是删除索引,这将始终是第一步。现在我们要创建两个挑战。第一个挑战是尽可能快地索引文档,第二个挑战是在其中编写摄入管道。

如何运行这个轨道?对于Elastic提供的开箱即用的轨道,只需指定--track即可,因为它知道在哪里找到所有数据。这对自定义轨道来说是不适用的——我们可以简单地指定--track-path参数。完整命令如下。在此时,我们应该执行它以确保这个轨道能正常工作并索引数据。--challenge参数仅在您需要指定其他挑战时才需要。当我们创建带有摄入管道的挑战时可以使用它。现在可以去掉这个参数,因为在default.json中有一个名为default: true的标志。

代码语言:bash
复制
esrally race --user-tags='{"benchmark_id":"custom-1"}' --track-path=~/tracks/webserver --kill-running-processes --target-hosts=https://10.164.15.204:9200 --pipeline=benchmark-only --client-options="verify_certs:false,basic_auth_user:'username',basic_auth_password:'password'" --track-params='{"bulk_indexing_clients":20,"number_of_shards":1,"number_of_replicas":1}'

现在我们确认轨道工作正常,可以复制并粘贴整个default.json文件并重命名为index-pipeline.json。默认创建的挑战称为my-challenge,标志为default: true。我们现在需要将其调整为false并设置名称为ingest-pipeline。名称很重要,因为这是--challenge参数的值。

代码语言:json
复制
{
    "name": "ingest-pipeline",
    "description": "My ingest pipeline challenge",
    "default": false,
    "schedule": [
    {
      "operation": "delete-index"
    },....

现在schedule数组包含相同的步骤:删除索引、创建索引和批量请求。我们需要一个额外的步骤,那就是添加摄入管道。

代码语言:json
复制
   {
      "name": "index-pipeline",
      "schedule": [
        {
          "operation": "delete-index"
        },
        {
          "operation": {
            "operation-type": "create-index",
            "settings": {{index_settings | default({}) | tojson}}
          }
        },
        {
          "operation": {
            "operation-type": "put-pipeline",
            "id": "custom-track-pipeline",
            "body": {
              "processors": [
                {
                  "dissect": {
                    "field": "message",
                    "pattern": "%{source.ip} %{} [%{@timestamp}] \"%{http.request.method} %{url.path} %{http.version}\" %{http.request.status_code} %{http.request.bytes} \"-\" \"%{user_agent}"
                  }
                },
                {
                  "user_agent": {
                      "field": "user_agent"
                  }
                },
                {
                  "geoip": {
                      "field": "source.ip",
                      "target_field": "source.geo"
                  }
                }
              ]
            }
          }
        },
{
      "operation": {
        "operation-type": "cluster-health",
        "index": "custom-track-rally",
        "request-params": {
          "wait_for_status": "{{cluster_health | default('green')}}",
          "wait_for_no_relocating_shards": "true"
        },
        "retry-until-success": true
      }
    },
        {
          "operation": {
            "operation-type": "bulk",
            "pipeline": "custom-track-pipeline",
            "bulk-size": {{bulk_size | default(5000)}},
            "ingest-percentage": {{ingest_percentage | default(100)}}
          },
          "clients": {{bulk_indexing_clients | default(8)}}
        }
      ]
    }

我们添加了一个新对象来配置摄入管道,并在底部的批量操作中添加了管道名称。这确保了管道在Rally中总是相同的版本。

我们可以运行上面的相同命令,只需将挑战改为index-pipeline即可。

摄入管道

我们将一起创建一个摄入管道,解析大部分message字段并将其放入正确的字段中。本篇博客不涉及不同处理器(如GROK与Dissect)的影响基准测试。您可以添加一个新挑战,称之为pipeline-with-grok,然后与pipeline-with-dissect进行基准测试。我们通过Kibana开发工具使用摄入管道模拟API,它有很好的自动补全功能,我们可以提供一些示例文档来快速测试并确保我们的操作是正确的。

代码语言:json
复制
POST _ingest/pipeline/_simulate
{
  "docs": [
      {"_source": {
        "message":"66.154.51.14 - - [2018-09-14T10:41:52.659Z] \"GET /styles/app.css HTTP/1.1\" 200 6901 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"","@timestamp":"2025-02-07T10:41:52.659Z"}}
  ],
  "pipeline": {
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": """%{source.ip} %{} [%{@timestamp}] "%{http.request.method} %{url.path} %{http.version}" %{http.request.status_code} %{http.request.bytes} "-" "%{user_agent}"""
        }
      },
      {
        "user_agent": {
            "field": "user_agent"
        }
      },
      {
         "geoip": {
            "field": "source.ip",
            "target_field": "source.geo"
         }
      }
    ]
  }
}

让我们快速过一下。我们希望提取一些信息并将其放入相应的Elastic Common Schema (ECS)字段中。此外,我们想解析出user_agent字符串。我们没有在管道中使用日期处理器,因为日期是以ISO8601格式呈现的,因此映射会自动解析。我们还进行了一次geoip查找,以便通过地理位置信息丰富数据。

查看结果

我们建议进行三次或更多次相同的测试,以确保获得一个合理的性能基线。再次强调,请不要在生产集群上运行此测试,因为这可能会主动删除数据。此外,当您全力运行时,可能会导致集群不稳定。

查看下图,您可以看到添加摄入管道后,处理时间大约翻倍。我们看到没有摄入管道的平均处理时间为2.21秒,而有摄入管道的平均处理时间为4.25秒。不过,我们可以从上方的CPU图表得出结论,测试之间CPU的平均使用率并没有变化。这仅仅表明,由于整个运行时间更长,CPU使用率会保持更高的时间更长。所有顶部带有数字4的品红色线条是Rally测试的开始时间。

总体而言,您可以按照本博客进行管道测试。还有一种可能性是基于集成包进行elastic-package基准测试,但那是另一篇文章的主题。

Dashboard
Dashboard

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 使用自定义日志轨道
  • 数据设置
  • Rally
  • 摄入管道
  • 查看结果
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档