功能概述
规则引擎支持用户配置规则将符合条件的设备上报数据转发到 消息队列 CKafka 版 (以下简称 CKAFKA ),用户的应用服务器再从 CKAFKA 中读取数据内容进行处理。以此利用 CKAFKA 高吞吐量的优势,为用户打造高可用性的消息链路。
前提条件
已有可使用的实例,可参见 实例管理。
已创建规则,具体操作可参见 创建规则。
配置步骤
1. 登录 物联网开发平台控制台,单击目标实例,选择左侧菜单栏数据流转 > 规则引擎。
2. 单击需要配置的规则名称,在规则详情页面,单击添加行为操作。
3. 在弹出的“添加规则”窗口,填写相关信息。
3.1 选择行为类型“数据转发到消息队列(CKAFKA)”。
说明:
第一次使用时会提示用户授权访问 CKAFKA,您需单击立即授权才能继续创建。


3.2 依次选择 CKAFKA 实例和 Topic。
3.3 然后单击保存即可。

4.1 创建 CKAFKA 实例,进入 实例列表。可备注好实例名称方便区分其它实例。

4.2 单击实例 ID 找到 Topic 列表,单击新建,默认创建3个分区数,消息类型会随机转发到分区中。

5. 检查好规则引擎中的筛选数据和行为操作配置,确保对应设备的数据能成功转发到 CKAFKA。

6. 用 MQTTX 工具来模拟设备上报消息,确保设备订阅的 Topic 中的 code 数据为0,其他值需核查消息内容。

7. 根据设备的云端诊断日志来判断设备 Test 是否转发成功。

8. 进入 CKafka 控制台中的 消息查询,筛选对应资源的时间戳来匹配当前设备的消息。

9. 单击操作列中的查看详情,如下图所示,key 对应产品 ID 和设备名称信息,Value 对应数据转发设备信息。

10. 复制 payload 的加密内容,用 Base64 编码工具 进行解码,然后核对 JSON 数据是否正确。

数据格式
消息类型包括设备上报的 Topic 消息和平台监测到设备状态变化的通知消息,两类消息转发成功后,CKAFKA 收到的数据格式不同,如下:
设备上报的 Topic 消息:转发成功后,CKAFKA 收到的数据格式如下:
{"MsgType": "Forward","Event": "","Topic": "7PQ0I75ZWY/D1S742XVM1/Test","Seq": 32569,"PayloadLen": 44,"ProductId": "D1S742XVM1","DeviceName": "Test","Payload": "ewogICAgIm1ldGhvZCI6InJlcG9ydCIsCiAgICAiY2xpZW50VG9rZW4iOiIxMjMiLAogICAgInBhcmFtcyI6ewogICAgICAgICJkaWFubGlhbmciOjExCiAgICAgICB9Cn0=","Time": "2022-08-11 19:17:24.943","TimeMills": 1660216644943,"Reason": ""}
设备状态变化通知:当平台监测到设备状态变化后,会触发此消息转发,转发成功后,收到的数据格式如下:
{"MsgType": "Forward","Event": "","Topic": "$state/report/D1S742XVM1/Test","Seq": 0,"PayloadLen": 178,"ProductId": "D1S742XVM1","DeviceName": "Test","Payload": "eyJkZXZpY2VOYW1lIjoicHNrRGV2aWNlMDAxIiwiZXZlbnQiOiJFVl9PTkxJTkUiLCJwcm9kdWN0SUQiOiJLNzJDUkFJRzk4IiwicmVhc29uIjoiUkVBU09OX0RFVklDRV9DT05ORUNUIiwidGltZXN0YW1wIjoxNjc2OTY1MzUxLCJ0b3BpYyI6IiRzdGF0ZS9yZXBvcnQvSzcyQ1JBSUc5OC9wc2tEZXZpY2UwMDEifQ==","Time": "2023-02-21 15:42:31","TimeMills": 1676965351605,"Reason": ""}
Payload 进行 Base64 解码后:
{"deviceName": "Test","event": "EV_OFFLINE","productID": "D1S742XVM1","reason": "REASON_DEVICE_DISCONNECT","timestamp": 1677068839,"topic": "$state/report/D1S742XVM1/Test"}
字段说明
字段 | 描述 |
MsgType | 取值为“Forward”。 |
payload | Base64 解码后为设备上报的消息内容。 |
PayloadLen | 此设备上报消息的长度。 |
timemills | 转发行为触发时的时间戳,毫秒级别。 |
seq | 序列号。 |
timestamp | 时间戳。 |
topic | 主题信息。 |
devicename | 设备名称。 |
Time | 转发行为触发的时间,例如 "2022-08-11 12:00:00"。 |
TimeMills | 转发行为触发的时间戳,毫秒级别。 |
productid | 产品 ID。 |
event | EV_ONLINE:上线。 EV_OFFLINE:下线。 |
reason | REASON_DEVICE_DISCONNECT:设备断连。 REASON_STATE_KICKED:服务端主动踢下线。 REASON_DEVICE_KICKED:设备端互踢下线。 REASON_KEEPALIVE_TIMEOUT:设备端超时断连。 |
重发机制
重发机制用于在消息转发过程中发生失败的情况下,进行再次重发以达到接收消息的目的,具体说明如下:
若消息转发失败,系统则会进行转发重试,重试按照1s、3s、10s的时间间隔依次进行,若三次重试均失败,则将消息丢弃掉。
若用户配置了“转发错误行为操作”,在三次重试失败后,将按“转发错误行为操作”的配置,再进行一次消息转发,如果仍失败,则将消息丢弃掉。