配置 SQL 过滤

最近更新时间:2025-10-31 10:40:53

我的收藏
说明:
当前功能灰度中,如控制台提示当前集群暂未开启该功能, 请提交工单联系我们处理。

背景

MQTT 标准规范定义了 Topic Filter 的概念,允许订阅者基于 MQTT Topic Name 的层级结构和通配符来选择需要接收的消息。虽然 Topic 和通配符提供了较强的过滤能力,但在灰度发布、A/B 测试、系统升级等场景中,仅依赖 Topic 过滤仍无法满足更灵活的业务需求。

实现原理

MQTT 5.0协议引入了 Subscribe User Property 机制,本产品基于此机制扩展支持了 Subscribe User Property 的过滤语义,从而支持更细粒度的消息过滤能力。当订阅消息时,如果 Subscribe User Properties 中包含 Key 为 $where,Value 为合法的 WHERE 子句,MQTT Server 将在推送消息时依据该 WHERE 子句对消息进行过滤,仅将满足条件的消息投递给订阅者




基本工作流程

1. ​​订阅与声明​​:订阅者发起 Subscribe 请求,在 User Property 中声明过滤条件($where)。
2. ​​条件解析​​:服务端解析并校验 WHERE 子句的有效性。
3. ​​消息匹配​​:当有消息发布时,服务端对匹配主题的消息应用所有订阅者的过滤条件。
4. ​​精准投递​​:仅将满足条件的消息投递给订阅者。

SQL 过滤语法

WHERE子 句支持丰富的操作符和函数,用于构建灵活的过滤条件。

支持的操作符

类型
操作符
示例
说明
​​比较运算符​​
=, !=, >, >=, <, <=
payload.temp > 30
比较数值或字符串
​​逻辑运算符​​
AND, OR, NOT
temp > 25 AND hum < 70
组合多个条件
​​范围判断​​
IN
clientid IN ('client1', 'client2')
判断字段值是否在列表中
​​空值判断​​
IS NULL
payload.location IS NULL
判断字段是否为NULL
​​模式匹配​​
LIKE
topic LIKE 'sensor/%/temp'
简单的通配符匹配
​​条件表达式​​
CASE WHEN...THEN...ELSE...END
CASE WHEN qos > 0 THEN 'important' ELSE 'normal' END
实现条件逻辑

支持的函数

类型
函数示例
说明
​​字符串函数​​
UPPER(), LOWER(), LENGTH()
处理文本数据
​​数学函数​​
ABS()
计算绝对值
​​条件函数​​
COALESCE()
返回参数中第一个非NULL值

注意事项

1. 每个 Subscribe 请求的 User Properties 中,只能有一个 Key 为 $where 的属性。若存在多个 $where→ WHERE 子句的属性对,仅第一个生效
2. ​​对于消息中的用户属性(User Property),若存在多个同名的 Key-Value 对,仅取最后一个出现的 Value 参与过滤表达式的计算。
3. ​​若过滤表达式引用的字段在消息属性中不存在,则该字段的值将被视为NULL
4. 字符串字面量请使用单引号表示,例如:WHERE type = 'string-literal'

示例

package com.tencent.tdmq.mqtt.quickstart.paho.v5.async;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;

public class BasicQuickStart {
public static void main(String[] args) throws MqttException, InterruptedException {
String serverUri = "tcp://mqtt-xxx.mqtt.tdmqcloud.com:1883";
String clientId = "deviceBasic";

String topic = "home/room/1";
String[] topicFilters = new String[] {"home/#"};
int[] qos = new int[] {1};

MqttAsyncClient client = new MqttAsyncClient(serverUri, clientId, new MemoryPersistence());
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName("YOUR-USERNAME");
options.setPassword("YOUR-PASSWORD".getBytes(StandardCharsets.UTF_8));
options.setCleanStart(true);
options.setSessionExpiryInterval(TimeUnit.DAYS.toSeconds(1));

client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse response) {
System.out.println("Disconnected: " + response.getReasonString());
}

@Override
public void mqttErrorOccurred(MqttException e) {
e.printStackTrace();
}

@Override
public void messageArrived(String s, MqttMessage message) throws Exception {
byte[] payload = message.getPayload();
String content;
if (4 == payload.length) {
ByteBuffer buf = ByteBuffer.wrap(payload);
content = String.valueOf(buf.getInt());
} else {
content = new String(payload, StandardCharsets.UTF_8);
}
System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s], properties=%s%n",
topic, message.getQos(), content, message.getProperties());
}

@Override
public void deliveryComplete(IMqttToken token) {

}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);
}

@Override
public void authPacketArrived(int i, MqttProperties properties) {
}
});
client.connect(options).waitForCompletion();
try {
// Subscribe
MqttSubscription[] subscriptions = new MqttSubscription[topicFilters.length];
for (int i = 0; i < topicFilters.length; i++) {
subscriptions[i] = new MqttSubscription(topicFilters[i], qos[i]);
}
MqttProperties subscribeProperties = new MqttProperties();
List<UserProperty> userProperties = new ArrayList<>();
UserProperty userProperty = new UserProperty("$where", "where $QoS = 1 AND k1 = 'v1'");
userProperties.add(userProperty);
subscribeProperties.setUserProperties(userProperties);
client.subscribe(subscriptions, null, null, subscribeProperties).waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}


int total = 128;
for (int i = 0; i < total; i++) {
byte[] payload = new byte[4];
ByteBuffer buffer = ByteBuffer.wrap(payload);
buffer.putInt(i);
MqttMessage message = new MqttMessage(payload);
message.setQos(1);
MqttProperties properties = new MqttProperties();
properties.setContentType("application/json");
properties.setResponseTopic("response/topic");
message.setProperties(properties);
System.out.printf("Prepare to publish message %d%n", i);
// P2P topic format: {first-topic}/p2p/{target-client-id}
client.publish(topic, message);
System.out.printf("Published message %d%n", i);
TimeUnit.MILLISECONDS.sleep(100);
}
TimeUnit.MINUTES.sleep(3);
client.disconnect();
}
}