前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >零基础教你自建MQTT服务器并实现双向通讯

零基础教你自建MQTT服务器并实现双向通讯

作者头像
Tinywan
发布2024-06-04 19:01:05
1980
发布2024-06-04 19:01:05
举报
文章被收录于专栏:开源技术小栈开源技术小栈

概述

随着物联网技术的快速发展,MQTT(Message Queuing Telemetry Transport)消息队列遥测传输协议,作为一种轻量级的通讯协议,被广泛应用于物联网设备之间的通讯。

MQTT 是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

本次教程中,将探讨如何基于EMQX平台自建MQTT服务器,并实现设备之间的高效通讯。无论是在工业控制、智能家居还是智能城市等领域,搭建自己的MQTT服务器都能为我们带来更大的灵活性和可扩展性。让我们一起深入了解这个过程,为物联网应用打下坚实的基础。

MQTT通信的架构

上面架构图来自EMQX官网,其中中间绿色部分即我们要搭建的MQTT BrokerMQTT服务器搭建完成后,我们可以通过各种编程语言类库对服务器发起连接请求,以及主题发布和订阅。而编程语言类库中我们就可以使用workerman的mqtt扩展库与服务器进行通信。

MQTT概念

  • Publisher(发布者):消息的发出者,负责发送消息。
  • Subscriber(订阅者):消息的订阅者,负责接收并处理消息。
  • Broker(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT协议的消息中间件都可以充当。
  • Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。
  • Payload(负载);可以理解为发送消息的内容。
  • QoS(消息质量):全称 Quality of Service,即消息的发送质量,主要有QoS 0QoS 1QoS 2三个等级,下面分别介绍下:
    • QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;
    • QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;
    • QoS 2(Exactly Once):只有一次,确保消息只到达一次。

EMQX

EMQX平台作为一款开源的MQTT消息服务器,提供了稳定可靠的消息传输服务。本次教程中,我们将探讨如何基于EMQX平台自建MQTT服务器,并实现设备之间的高效通讯。无论是在工业控制、智能家居还是智能城市等领域,搭建自己的MQTT服务器都能为我们带来更大的灵活性和可扩展性。

EMQX 官网:https://www.emqx.io

安装 MQTTX 服务端

本次教程中,我们将使用 Docker 部署,使用 Docker 指令直接部署,可以使用以下命令

获取 Docker 镜像

代码语言:javascript
复制
docker pull emqx/emqx

启动 Docker 容器

代码语言:javascript
复制
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:latest

访问仪表板

安装完成后,打开浏览器,并在地址栏中输入 http://localhost:18083以访问 EMQX 仪表板,您可以从那里连接到客户端或检查运行状态。

代码语言:javascript
复制
默认用户名: admin
默认密码:public

进入 EMQX 管理页面

安装 MQTTX 客户端

MQTTX 客户端我们选用workermanmqtt扩展库与服务器进行通信。workerman/mqtt 是一个基于workerman的异步mqtt 客户端库,可用于接收或者发送mqtt协议的消息。支持QoS 0QoS 1QoS 2。支持MQTT3.13.1.15版本。

安装

代码语言:javascript
复制
composer require workerman/mqtt

订阅客户端

subscribe.php 代码

代码语言:javascript
复制
<?php
/**
 * @desc Subscriber(订阅者):消息的订阅者,负责接收并处理消息。
 * @author Tinywan(ShaoBo Wan)
 * @date 2024/5/30 20:35
 */
declare(strict_types=1);

require_once __DIR__ . '/../vendor/autoload.php';

use Workerman\Worker;

$worker = new Worker();
$worker->onWorkerStart = function () {
    $options = [
        'username' => 'Tinywan',
        'password' => '123456',
    ];
    $mqtt = new Workerman\Mqtt\Client('mqtt://192.168.13.168:1883', $options);
    $mqtt->onConnect = function ($mqtt) {
        // 主题 Topic 可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。
        $topic = 'resty';
        $mqtt->subscribe($topic);
    };
    $mqtt->onMessage = function ($topic, $content) {
        echo '[订阅者][收到主题]:' . $topic . PHP_EOL;
        echo '[订阅者][收到内容]:' . $content . PHP_EOL;
    };
    $mqtt->connect();
};
Worker::runAll();

命令行运行 php subscribe.php start 启动

启动成功后,即可看到设备已经成功连接到服务器。我们在 EMQX 服务器中的客户端页面中可以查看到设备的连接状态。

客户端发布

接下来,我们测试发布和订阅主题是否正常

publish.php 代码

代码语言:javascript
复制
<?php
/**
 * @desc Publisher(发布者):消息的发出者,负责发送消息。
 * @author Tinywan(ShaoBo Wan)
 * @date 2024/5/30 20:46
 */
declare(strict_types=1);

require_once __DIR__ . '/../vendor/autoload.php';

use Workerman\Worker;

$worker = new Worker();
$worker->onWorkerStart = function () {
    $mqtt = new Workerman\Mqtt\Client('mqtt://192.168.13.168:1883');
    $mqtt->onConnect = function ($mqtt) {
        // 主题 Topic
        $topic = 'resty';
        // 负载 Payload 可以理解为发送消息的内容
        $payload = 'Hello Tinywan mqtt';
        $mqtt->publish($topic, $payload);
    };
    $mqtt->connect();
};
Worker::runAll();

命令行运行 php publish.php start 发布消息。

以下是 订阅客户端 subscribe.php收到的消息

至此,我们已经成功实现了 客户端 与 EMQX 服务器之间的通讯。

主题订阅

主题监控

Websocket 工具

订阅消息

发布消息

通过websocket发布消息

MQTT客户端订阅消息

使用 MQTT.js 库

MQTT.js 是一个开源的 MQTT 协议的客户端库,使用 JavaScript 编写,主要用于 Node.js 和 浏览器环境中。是JavaScript 环境下的 MQTT 客户端库。可以用于微信小程序、支付宝小程序等定制浏览器环境。

可以直接在HTML文件中进行调用:

代码语言:javascript
复制
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>

实现简单功能,EMQX websocket 连接服务地址

代码语言:javascript
复制
ws://localhost:8083/mqtt

MQTT.html 案例代码

代码语言:javascript
复制
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>开源技术小栈</title>
</head>
<body>
<h3>零基础教你自建MQTT服务器并实现通讯</h3>
<div style="height: 800px;">
    <label>【客户端】【目标Topic】:<input id="targetTopicInput" type="text"></label><br>
    <label>【客户端】【发送的消息】:<input id="messageInput" type="text"></label><br>
    <button onclick="sendMessage()">发送</button>
    <button onclick="clearMessage()">清空</button>
    <div id="messageDiv"></div>
</div>
</body>

<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
    /**
     * EMQX websocket 连接地址
     * @type {string}
     */
    const url = 'ws://localhost:8083/mqtt';
    /**
     * 订阅的topic
     * @type {string}
     */
    const topic = 'resty';
    /**
     * 连接到消息队列
     */
    let client = mqtt.connect(url);
    client.on('connect', function () {
        /**
         * 连接成功后订阅topic
         */
        client.subscribe(topic, function (err) {
            if (!err) {
                showMessage("[订阅者][Topic主题]:" + topic + "成功!");
            }
        });
    });

    /**
     * 获取订阅topic中的消息
     */
    client.on('message', function (topic, message) {
        showMessage("[订阅者][收到消息]:" + message.toString());
    });

    /**
     * 发送消息
     */
    function sendMessage() {
        let targetTopic = document.getElementById("targetTopicInput").value;
        let message = document.getElementById("messageInput").value;
        // 向目标topic中发送消息
        client.publish(targetTopic, message);
        showMessage("[发送消息给]" + targetTopic + "的消息:" + message);
    }

    /**
     * 从URL中获取参数
     * @param name
     * @returns {null|string}
     */
    function getQueryString(name) {
        let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");
        let r = window.location.search.substr(1).match(reg);
        if (r != null) {
            return decodeURIComponent(r[2]);
        }
        return null;
    }

    /**
     * 在消息列表中展示消息
     * @param message
     */
    function showMessage(message) {
        let messageDiv = document.getElementById("messageDiv");
        let messageEle = document.createElement("div");
        messageEle.innerText = message;
        messageDiv.appendChild(messageEle);
    }

    /**
     * 清空消息列表
     */
    function clearMessage() {
        let messageDiv = document.getElementById("messageDiv");
        messageDiv.innerHTML = "";
    }
</script>
</html>

发送消息

接受消息

小结

根据官方文档我们知道publishsubscribe流程如何实现。对于subscribe由于需要实时获取来自硬件方面的数据或其他客户端的数据,因此subscribe需要以cli模式守护运行在系统后台。但是publish消息一般跟系统内的业务逻辑相关。可以通过MQTT客户端Websocket客户端发送消息。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-05-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开源技术小栈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
    • MQTT通信的架构
      • MQTT概念
        • EMQX
        • 安装 MQTTX 服务端
          • 获取 Docker 镜像
            • 启动 Docker 容器
              • 访问仪表板
              • 安装 MQTTX 客户端
                • 安装
                  • 订阅客户端
                    • 客户端发布
                      • 主题订阅
                        • 主题监控
                        • Websocket 工具
                          • 订阅消息
                            • 发布消息
                            • 使用 MQTT.js 库
                            • 小结
                            相关产品与服务
                            消息队列 CMQ
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档