MQTT客户端的使用

先使用composer下载MQTT客户端:/program/php/bin/php /usr/local/bin/composer require php-mqtt/client



MQTT客户端的使用比较简单,下面直接给出相关代码:



<?php
// 文件:MqttClient.class.php
declare(strict_types=1);

require_once __DIR__ . '/vendor/autoload.php'; // 引入php-mqtt/client库(MQTT客户端)

use PhpMqtt\Client\MqttClient as PhpMqttClient;
use PhpMqtt\Client\ConnectionSettings;
use PhpMqtt\Client\Exceptions\MqttClientException;
use PhpMqtt\Client\Exceptions\RepositoryException;
use PhpMqtt\Client\Exceptions\DataTransferException;
use PhpMqtt\Client\Exceptions\InvalidMessageException;
use PhpMqtt\Client\Exceptions\ProtocolViolationException;
use PhpMqtt\Client\Exceptions\ProtocolNotSupportedException;
use PhpMqtt\Client\Exceptions\ConfigurationInvalidException;
use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException;

/**
 * MQTT客户端类
 */
class MqttClient
{
    private string $server = 'localhost'; // MQTT服务器IP地址
    private int $port = 1883; // MQTT服务端口号
    private string $username = '********'; // 账号
    private string $password = '********'; // 密码
    private string $topic = 'php-mqtt/client/test'; // 主题

    private PhpMqttClient $phpMqttClient;

    /**
     * 构造方法
     *
     * @param string $clientId MQTT客户端ID号
     */
    public function __construct(string $clientId)
    {
        try {
            $this->phpMqttClient = new PhpMqttClient($this->server, $this->port, $clientId);
        } catch (ProtocolNotSupportedException $e) {
            exit('创建MQTT客户端异常(ProtocolNotSupportedException):' . $e->getMessage());
        }

        $connectionSettings = (new ConnectionSettings)->setUsername($this->username)->setPassword($this->password);

        try {
            $this->phpMqttClient->connect($connectionSettings, true);
        } catch (ConfigurationInvalidException $e) {
            exit('连接MQTT服务器异常(ConfigurationInvalidException):' . $e->getMessage());
        } catch (ConnectingToBrokerFailedException $e) {
            exit('连接MQTT服务器异常(ConnectingToBrokerFailedException):' . $e->getMessage());
        }
    }

    /**
     * 订阅方法
     *
     * @param callable $callback 回调函数
     * @return void
     */
    public function subscribe(callable $callback): void
    {
        try {
            $this->phpMqttClient->subscribe($this->topic, $callback);
        } catch (MqttClientException $e) {
            exit('订阅异常(MqttClientException):' . $e->getMessage());
        }

        try {
            $this->phpMqttClient->loop(); // 重要提醒:这里会发生阻塞
        } catch (ProtocolViolationException $e) {
            exit('运行事件循环异常(ProtocolViolationException):' . $e->getMessage());
        } catch (InvalidMessageException $e) {
            exit('运行事件循环异常(InvalidMessageException):' . $e->getMessage());
        } catch (MqttClientException $e) {
            exit('运行事件循环异常(MqttClientException):' . $e->getMessage());
        }

        $this->disconnect();
    }

    /**
     * 发布方法
     *
     * @param string $message 消息
     * @return void
     */
    public function publish(string $message): void
    {
        try {
            $this->phpMqttClient->publish($this->topic, $message);
        } catch (RepositoryException $e) {
            exit('发布异常(RepositoryException):' . $e->getMessage());
        } catch (DataTransferException $e) {
            exit('发布异常(DataTransferException):' . $e->getMessage());
        }
    }

    /**
     * 客户端与服务器断开连接
     *
     * @return void
     */
    public function disconnect(): void
    {
        if ($this->phpMqttClient->isConnected()) {
            try {
                $this->phpMqttClient->disconnect();
            } catch (DataTransferException $e) {
                exit('客户端与服务器断开连接异常(DataTransferException):' . $e->getMessage());
            }
        }
    }
}



<?php
// 文件:subscriber.mqtt.php
// [root@localhost ~]# /program/php/bin/php /inetpub/wwwroot/mqtt/subscriber.mqtt.php
declare(strict_types=1);
PHP_SAPI !== 'cli' && exit('脚本只能在命令行执行');
ini_set('display_errors', 'On');
error_reporting(-1);
set_time_limit(0);
ini_set('memory_limit', '-1');

function json_encode_array(array $array): string
{
    try {
        $json = json_encode($array, JSON_THROW_ON_ERROR | JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
    } catch (JsonException) {
        $json = false;
    }

    return is_string($json) ? $json : '';
}

require_once __DIR__ . '/MqttClient.class.php';

$clientId = 'test-subscriber'; // MQTT客户端ID号

$mqttClient = new MqttClient($clientId);

$mqttClient->subscribe(function (string $topic, string $message, bool $retained, array $matchedWildcards): void {
    $datetime = date('Y-m-d H:i:s');
    echo "---------- [SUBSCRIBER][$datetime] ----------" . PHP_EOL;
    echo '$topic            => ' . $topic . PHP_EOL;
    echo '$message          => ' . $message . PHP_EOL;
    echo '$retained         => ' . ($retained ? 'true' : 'false') . PHP_EOL;
    echo '$matchedWildcards => ' . json_encode_array($matchedWildcards) . PHP_EOL;
});

//========== 总结 ==========//
// 1、可以有多个subscriber,但是这些subscriber的clientId不能有重复,正确来说是不管subscriber还是publisher,
//    对于MQTT服务器来说clientId都不能有重复。
// 2、当publisher在某个topic发布消息时,如果有多个subscriber订阅了这个topic,那么这些subscriber都会收到消息。



<?php
// 文件:publisher.mqtt.php
// [root@localhost ~]# /program/php/bin/php /inetpub/wwwroot/mqtt/publisher.mqtt.php
declare(strict_types=1);
PHP_SAPI !== 'cli' && exit('脚本只能在命令行执行');
ini_set('display_errors', 'On');
error_reporting(-1);
set_time_limit(0);
ini_set('memory_limit', '-1');

require_once __DIR__ . '/MqttClient.class.php';

$clientId = 'test-publisher'; // MQTT客户端ID号

$mqttClient = new MqttClient($clientId);

$messages = ['孩儿立志出乡关', '学不成名誓不还', '埋骨何须桑梓地', '人生无处不青山'];

foreach ($messages as $message) {
    sleep(2);
    $mqttClient->publish($message);
}

$mqttClient->disconnect();

Copyright © 2025 码农人生. All Rights Reserved