<?php declare(strict_types=1); //========== Redis消息队列·消息生产者 ==========// // Redis消息队列服务器连接参数 $host = 'localhost'; // 主机地址 $password = '*****'; // 认证密码 $key = 'message_queue_key'; // 消息队列KEY,消息生产者和消息消费者两端需保证一致 $redis = new Redis(); $redis->connect($host); $redis->auth($password); $opr = ['+', '-', '*', '/']; // 四则运算符 // 构造消息内容 $message = [ 'mid' => mt_rand(100000, 999999), // 消息ID 'opr' => $opr[mt_rand(0, 3)], // 运算符 'num1' => mt_rand(10, 99), // 操作数1 'num2' => mt_rand(10, 99), // 操作数2 ]; // 消息内容必须以字符串形式发送,故使用json_encode()将消息内容数组转为字符串 try { $message = json_encode($message, JSON_THROW_ON_ERROR); } catch (JsonException) { $message = ''; } if ($message !== '') { $ok = $redis->lPush($key, $message); // 向Redis消息队列服务器发送消息 if ($ok !== false) { echo '消息发送成功'; } else { echo '消息发送失败'; } }
<?php declare(strict_types=1); //========== Redis消息队列·消息消费者 ==========// // Redis消息队列服务器连接参数 $host = 'localhost'; // 主机地址 $password = '*****'; // 认证密码 $key = 'message_queue_key'; // 消息队列KEY,消息生产者和消息消费者两端需保证一致 $redis = new Redis(); $redis->connect($host); $redis->auth($password); while (true) { // 接收消息 try { $message = $redis->brPop($key, 0)[1]; // 重要提醒:这里会发生阻塞,直到收到消息。 } catch (RedisException) { continue; } // 创建线程来处理消息(避免处理消息耗时过长,阻塞接收消息) parallel\run(function () use ($message) { // 解码消息 try { $message = json_decode($message, true, 512, JSON_THROW_ON_ERROR); } catch (JsonException) { $message = []; } if (isset($message['mid'], $message['opr'], $message['num1'], $message['num2'])) { $mid = (int)$message['mid']; // 消息ID $opr = (string)$message['opr']; // 运算符 $num1 = (int)$message['num1']; // 操作数1 $num2 = (int)$message['num2']; // 操作数2 if ($opr === '+') { $result = $num1 + $num2; } elseif ($opr === '-') { $result = $num1 - $num2; } elseif ($opr === '*') { $result = $num1 * $num2; } elseif ($opr === '/') { $result = round($num1 / $num2, 2); } if (isset($result)) { echo "处理消息[$mid]成功:$num1 $opr $num2 = $result" . PHP_EOL; } } }); } // 处理消息[848314]成功:22 + 49 = 71 // 处理消息[640448]成功:84 / 16 = 5.25 // 处理消息[643893]成功:11 - 75 = -64 // 处理消息[133635]成功:84 * 97 = 8148 // 处理消息[164103]成功:23 + 90 = 113 // 处理消息[148602]成功:82 / 44 = 1.86 // 处理消息[460240]成功:17 + 64 = 81 // 处理消息[887516]成功:39 + 86 = 125 // 处理消息[994511]成功:83 / 71 = 1.17 // 处理消息[127045]成功:20 / 57 = 0.35 //========== 总结 ==========// // 1、消息消费者PHP脚本必须以CLI模式执行,命令如下: // [root@localhost ~]# /program/php/bin/php /inetpub/wwwroot/demo/redis.consumer.php // 2、Redis不是专业的消息队列,所以缺少一些专业消息队列常见的高级特性,例如没有确认消息(ACK)机制,所以上面的代码只有消息处理操作, // 而没有ACK操作。 // 3、Redis消息队列不支持多个消费者,例如把当前脚本复制一份,然后启动两个消费者,这时生产者如果不停发消息就可以发现两个消费者是交替 // 处理消息,而不是同时处理同一个消息。 // 4、Redis主要功能还是内存存储,它作为消息队列在可靠性上肯定是不如RabbitMQ等专业消息队列的,但是它胜在简单,并且很多系统都会使用它 // 作为缓存服务器,这样就不用再额外安装消息队列服务器。总而言之还是要根据实际需求来决定,如果对消息的可靠性要求非常高,或者需要 // 支持多消费者模式,那么还是老老实实使用RabbitMQ等消息队列吧。
Copyright © 2024 码农人生. All Rights Reserved