前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用 EMQX 和 eKuiper 进行 MQTT 流处理:快速教程

使用 EMQX 和 eKuiper 进行 MQTT 流处理:快速教程

原创
作者头像
EMQ映云科技
发布2023-07-14 14:57:59
5080
发布2023-07-14 14:57:59
举报
文章被收录于专栏:EMQ 物联网

引言

MQTT 协议是一种专为物联网应用而设计的轻量级消息传输协议。它具有简单、开放、易于实现的特点,是物联网应用的理想选择。MQTT 数据以连续实时的方式进行传输,非常适合由流处理引擎进行处理。

EMQX 是一款大规模分布式物联网 MQTT Broker,能够高效、可靠地连接海量的物联网设备,并实时处理和分发消息和事件流数据。eKuiper 是一个开源的流处理引擎,可以对流数据进行过滤、转换和聚合等操作。

本文将向您展示如何使用 eKuiper 实时流处理引擎来处理来自 EMQX 的 MQTT 数据。

MQTT Stream Processing with EMQX and eKuiper
MQTT Stream Processing with EMQX and eKuiper

场景描述

假设我们有个 MQTT 主题 demo/sensor,用于在 EMQX 中接收温度和湿度数据。我们希望使用 eKuiper 订阅该主题,并用流处理技术对数据进行处理和分析。然后,我们可以根据分析结果,触发用户的 HTTP 服务,或者将结果保存到外部存储中。

EMQX

由于 EMQX 支持标准的 MQTT 协议,所以 eKuiper 可以连接到任何版本的 EMQX。在这里,我们使用 EMQX Cloud 提供的免费公共 MQTT Broker 进行测试:

集群

集群地址

监听端口

emqx1

broker.emqx.io

1883

eKuiper

eKuiper 可以部署在边缘或云端。我们可以使用 Docker 进行快速安装。

代码语言:txt
复制
docker run -p 9081:9081 -d --name kuiper -e MQTT_SOURCE__DEFAULT__SERVER=tcp://broker.emqx.io:1883 lfedge/ekuiper:1.10.0

我们可以用这个命令拉取并运行 eKuiper 1.10.0 版本的 docker 镜像。我们将 REST API 端口设置为 9081,在本教程中,我们将使用 REST API 来管理 eKuiper。我们还通过环境变量把默认的 MQTT Broker 地址指向了 EMQX Cloud 集群。

如果您想使用其他方法安装 eKuiper,请查看安装指南

EMQX ECP (EMQX Edge-to-Cloud Platform) 是专为云边协同而打造的高级 MQTT 平台。它提供了专业的 Web UI 让您可以方便地管理 eKuiper。在本教程中,您也可以使用 ECP 来管理 eKuiper。更多细节,请参考 ECP 文档

配置 eKuiper 订阅 MQTT 数据流

MQTT 数据是一种无界的、连续的流式数据。在 eKuiper 中,我们使用流的概念来映射这种类型的数据。要处理 MQTT 数据,我们首先要创建一个流来描述数据。

我们用 eKuiper REST API 来创建一个流:

代码语言:txt
复制
POST http://127.0.0.1:9081/streams
Content-Type: application/json

{
  "sql": "CREATE STREAM demoMqttStream (temperature FLOAT, humidity FLOAT) WITH (TYPE=\"mqtt\", DATASOURCE=\"demo/sensor\", FORMAT=\"json\", SHARED=\"true\")"
}

用 Postman 等 HTTP 客户端发送上面的请求,将创建一个名为 demoMqttStream 的流,它是 MQTT 类型的数据源。datasource 属性的值是 demo/sensor,表示订阅 MQTT 的 demo/sensor 主题。数据格式是 JSON。SHARED 选项表示这个流可以被所有规则共享。

注意:我们运行 eKuiper docker 容器时,MQTT Broker 地址默认是 tcp://broker.emqx.io:1883。如果您用的是别的 MQTT Broker,请在安装时换成您的 Broker 地址。如果您想改变 MQTT Broker 地址或其他 MQTT 连接参数,如认证相关配置,可以修改 data/mqtt_souce.yaml 文件里的设置。您可以用 +# 通配符订阅多个主题,在 datasource 属性里使用这些通配符。比如,demo/+ 是订阅所有以 demo/ 开头的主题。demo/# 是订阅所有以 demo/ 开头的主题和 demo/ 下的所有子主题。

流处理 MQTT 数据

在 eKuiper 中,我们用规则来定义流处理的工作流程。规则是 SQL 语句,它规定了数据处理的方式和处理后执行的动作。除了连续的数据处理,像 eKuiper 这样的流处理引擎还支持有状态处理。我们将演示两个流处理和有状态处理的例子。

有状态的报警规则

第一个流处理例子是监测温度和湿度数据,温度上升超过 0.5 或湿度上升超过 1 就触发报警。这要求处理引擎能够记住前一条数据的状态,并和当前数据比较。

假设我们有个 URL 为 http://yourhost/alert 的 HTTP webhook,用来接收报警数据。我们首先用下面的 HTTP 请求创建一个规则。

代码语言:txt
复制
###
POST http://{{host}}/rules
Content-Type: application/json

{
  "id": "rule1",
  "sql": "SELECT temperature, humidity FROM demoMqttStream WHERE temperature - LAG(temperature) > 0.5 OR humidity - LAG(humidity) > 1",
  "actions": [{
    "rest": {
      "url": "http://yourhost/alert",
      "method": "post",
      "sendSingle": true
    }
  }]
}

上述请求创建了一个名为 rule1 的规则,该规则对应的 SQL 语句如下:

代码语言:txt
复制
SELECT temperature, humidity 
FROM demoMqttStream 
WHERE 
  temperature - LAG(temperature) > 0.5 
  OR humidity - LAG(humidity) > 1

这个 SQL 从 demoMqttStream 里选出变化达到我们条件的温度和湿度数据。LAG 函数用来获取前一条数据。

actions 属性规定了规则触发后的动作。这里,我们用 rest 动作把数据发送到 http://yourhost/alert 。发送的是 SQL 筛选出的数据,以 JSON 格式发送。所以,发送的数据是这样的:

代码语言:txt
复制
{
  "temperature": 25.5,
  "humidity": 60.5
}

测试规则

我们可以用 MQTTX 或者其他 MQTT 客户端来发布 MQTT 数据到 demo/sensor 主题。规则会处理这些数据。比如,我们发送以下数据到主题:

代码语言:txt
复制
{"temperature": 25.5, "humidity": 60.5}
{"temperature": 26.1, "humidity": 62}
{"temperature": 25.9, "humidity": 62.1}
{"temperature": 26.5, "humidity": 62.3}

我们将在 HTTP 警报服务中收到下列数据:

代码语言:txt
复制
{"temperature": 26.1, "humidity": 62}
{"temperature": 26.5, "humidity": 62.3}

这是因为只有第二和第四条消息,温度上升超 0.5 或湿度上升超 1。

第二个例子是计算每分钟的平均温度和湿度,并把它发送回 EMQX。这涉及到一个经典的流处理概念,叫做时间窗口。我们可以用以下 HTTP 请求来创建一个规则。

代码语言:txt
复制
###
POST http://{{host}}/rules
Content-Type: application/json

{
  "id": "rule2",
  "sql": "SELECT 
  trunc(avg(temperature), 2) as avg_temperature, trunc(avg(humidity), 2) as avg_humidity, window_end() as ts FROM demoMqttStream GROUP BY TumblingWindow(mi, 1)",
  "actions": [{
    "mqtt": {
      "server": "tcp://broker.emqx.io:1883",
      "topic": "result/aggregation",
      "sendSingle": true
    }
  }]
}

上述请求创建了一个名为 rule2 的规则,该规则对应的 SQL 语句如下:

代码语言:txt
复制
SELECT 
  trunc(avg(temperature), 2) as avg_temperature, 
  trunc(avg(humidity), 2) as avg_humidity,
  window_end() as ts
FROM demoMqttStream
GROUP BY TumblingWindow(mi, 1)

这个 SQL 会选出每分钟的温度和湿度平均值。时间窗口在 GROUP BY 子句中用 TumblingWindow 定义。这种窗口类型把 MQTT 数据分成固定长度的窗口。在 SELECT 子句中,我们用聚合函数 avg 来计算时间窗口内温度和湿度的平均值。window_end() 函数用来获取时间窗口的结束时间,这样我们就能知道这些平均值对应的时间段。trunc 函数用来把平均值四舍五入到两位小数。

actions 属性规定了规则触发后的动作。这里,我们用 mqtt 动作发送数据到 EMQX 的 result/aggregation 主题。发送的是 SQL 筛选出的数据,以 JSON 格式发送。所以,发送到主题的数据是这样的:

代码语言:txt
复制
{
  "avg_temperature": 25.5,
  "avg_humidity": 60.5,
  "ts": 1621419600000
}

测试规则

同样,我们可以用 MQTTX 或者其他 MQTT 客户端来发布 MQTT 数据到 demo/sensor 主题。规则会处理这些数据。比如,我们每 30 秒发送一条数据到主题,两分钟的数据如下所示:

代码语言:txt
复制
{"temperature": 25.5, "humidity": 60.5}
{"temperature": 26.1, "humidity": 62}
{"temperature": 25.9, "humidity": 62.1}
{"temperature": 26.5, "humidity": 62.3}

我们将在 HTTP 警报服务中收到下列数据:

代码语言:txt
复制
{"avg_temperature": 25.8, "avg_humidity": 61.25, "ts": 1621419600000}
{"avg_temperature": 26.2, "avg_humidity": 62.2, "ts": 1621419660000}

我们发送了两分钟的数据,所以得到了两个每分钟的平均值。

结语

在本教程中,我们学习了如何使用 eKuiper 处理 MQTT 数据。通过本教程,您能够:

  • 通过订阅 EMQX MQTT Broker 主题接收 MQTT 数据
  • 制定规则来处理 MQTT 数据
  • 将处理后的数据反馈给 EMQX Broker

我们用两个示例展示了 eKuiper 对 MQTT 数据的流处理能力。eKuiper 强大的流处理能力可以应用于多种流式数据源。欢迎您探索 eKuiper 的各种功能,构建实时高效的 MQTT 数据处理通道。

版权声明: 本文为 EMQ 原创,转载请注明出处。 原文链接:https://www.emqx.com/zh/blog/mqtt-stream-processing-with-emqx-and-ekuiper

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言
  • 场景描述
    • EMQX
      • eKuiper
      • 配置 eKuiper 订阅 MQTT 数据流
      • 流处理 MQTT 数据
        • 有状态的报警规则
          • 测试规则
      • 结语
      相关产品与服务
      物联网
      腾讯连连是腾讯云物联网全新商业品牌,它涵盖一站式物联网平台 IoT Explorer,连连官方微信小程序和配套的小程序 SDK、插件和开源 App,并整合腾讯云内优势产品能力,如大数据、音视频、AI等。同时,它打通腾讯系 C 端内容资源,如QQ音乐、微信支付、微保、微众银行、医疗健康等生态应用入口。提供覆盖“云-管-边-端”的物联网基础设施,面向“消费物联”和 “产业物联”两大赛道提供全方位的物联网产品和解决方案,助力企业高效实现数字化转型。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档