<?php // 文件:MqttClient.class.php declare(strict_types=1); require_once __DIR__ . '/vendor/autoload.php'; // 引入php-mqtt/client库(MQTT客户端) use PhpMqtt\Client\MqttClient as PhpMqttClient; use PhpMqtt\Client\ConnectionSettings; use PhpMqtt\Client\Exceptions\MqttClientException; use PhpMqtt\Client\Exceptions\RepositoryException; use PhpMqtt\Client\Exceptions\DataTransferException; use PhpMqtt\Client\Exceptions\InvalidMessageException; use PhpMqtt\Client\Exceptions\ProtocolViolationException; use PhpMqtt\Client\Exceptions\ProtocolNotSupportedException; use PhpMqtt\Client\Exceptions\ConfigurationInvalidException; use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException; /** * MQTT客户端类 */ class MqttClient { private string $server = 'localhost'; // MQTT服务器IP地址 private int $port = 1883; // MQTT服务端口号 private string $username = '********'; // 账号 private string $password = '********'; // 密码 private string $topic = 'php-mqtt/client/test'; // 主题 private PhpMqttClient $phpMqttClient; /** * 构造方法 * * @param string $clientId MQTT客户端ID号 */ public function __construct(string $clientId) { try { $this->phpMqttClient = new PhpMqttClient($this->server, $this->port, $clientId); } catch (ProtocolNotSupportedException $e) { exit('创建MQTT客户端异常(ProtocolNotSupportedException):' . $e->getMessage()); } $connectionSettings = (new ConnectionSettings)->setUsername($this->username)->setPassword($this->password); try { $this->phpMqttClient->connect($connectionSettings, true); } catch (ConfigurationInvalidException $e) { exit('连接MQTT服务器异常(ConfigurationInvalidException):' . $e->getMessage()); } catch (ConnectingToBrokerFailedException $e) { exit('连接MQTT服务器异常(ConnectingToBrokerFailedException):' . $e->getMessage()); } } /** * 订阅方法 * * @param callable $callback 回调函数 * @return void */ public function subscribe(callable $callback): void { try { $this->phpMqttClient->subscribe($this->topic, $callback); } catch (MqttClientException $e) { exit('订阅异常(MqttClientException):' . $e->getMessage()); } try { $this->phpMqttClient->loop(); // 重要提醒:这里会发生阻塞 } catch (ProtocolViolationException $e) { exit('运行事件循环异常(ProtocolViolationException):' . $e->getMessage()); } catch (InvalidMessageException $e) { exit('运行事件循环异常(InvalidMessageException):' . $e->getMessage()); } catch (MqttClientException $e) { exit('运行事件循环异常(MqttClientException):' . $e->getMessage()); } $this->disconnect(); } /** * 发布方法 * * @param string $message 消息 * @return void */ public function publish(string $message): void { try { $this->phpMqttClient->publish($this->topic, $message); } catch (RepositoryException $e) { exit('发布异常(RepositoryException):' . $e->getMessage()); } catch (DataTransferException $e) { exit('发布异常(DataTransferException):' . $e->getMessage()); } } /** * 客户端与服务器断开连接 * * @return void */ public function disconnect(): void { if ($this->phpMqttClient->isConnected()) { try { $this->phpMqttClient->disconnect(); } catch (DataTransferException $e) { exit('客户端与服务器断开连接异常(DataTransferException):' . $e->getMessage()); } } } }
<?php // 文件:subscriber.mqtt.php // [root@localhost ~]# /program/php/bin/php /inetpub/wwwroot/mqtt/subscriber.mqtt.php declare(strict_types=1); PHP_SAPI !== 'cli' && exit('脚本只能在命令行执行'); ini_set('display_errors', 'On'); error_reporting(-1); set_time_limit(0); ini_set('memory_limit', '-1'); function json_encode_array(array $array): string { try { $json = json_encode($array, JSON_THROW_ON_ERROR | JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); } catch (JsonException) { $json = false; } return is_string($json) ? $json : ''; } require_once __DIR__ . '/MqttClient.class.php'; $clientId = 'test-subscriber'; // MQTT客户端ID号 $mqttClient = new MqttClient($clientId); $mqttClient->subscribe(function (string $topic, string $message, bool $retained, array $matchedWildcards): void { $datetime = date('Y-m-d H:i:s'); echo "---------- [SUBSCRIBER][$datetime] ----------" . PHP_EOL; echo '$topic => ' . $topic . PHP_EOL; echo '$message => ' . $message . PHP_EOL; echo '$retained => ' . ($retained ? 'true' : 'false') . PHP_EOL; echo '$matchedWildcards => ' . json_encode_array($matchedWildcards) . PHP_EOL; }); //========== 总结 ==========// // 1、可以有多个subscriber,但是这些subscriber的clientId不能有重复,正确来说是不管subscriber还是publisher, // 对于MQTT服务器来说clientId都不能有重复。 // 2、当publisher在某个topic发布消息时,如果有多个subscriber订阅了这个topic,那么这些subscriber都会收到消息。
<?php // 文件:publisher.mqtt.php // [root@localhost ~]# /program/php/bin/php /inetpub/wwwroot/mqtt/publisher.mqtt.php declare(strict_types=1); PHP_SAPI !== 'cli' && exit('脚本只能在命令行执行'); ini_set('display_errors', 'On'); error_reporting(-1); set_time_limit(0); ini_set('memory_limit', '-1'); require_once __DIR__ . '/MqttClient.class.php'; $clientId = 'test-publisher'; // MQTT客户端ID号 $mqttClient = new MqttClient($clientId); $messages = ['孩儿立志出乡关', '学不成名誓不还', '埋骨何须桑梓地', '人生无处不青山']; foreach ($messages as $message) { sleep(2); $mqttClient->publish($message); } $mqttClient->disconnect();
Copyright © 2025 码农人生. All Rights Reserved