<?php declare(strict_types=1); //========== Redis消息队列·消息生产者 ==========// // Redis消息队列服务器连接参数 $host = 'localhost'; // 主机地址 $password = '*****'; // 认证密码 $key = 'message_queue_key'; // 消息队列KEY,消息生产者和消息消费者两端需保证一致 $redis = new Redis(); $redis->connect($host); $redis->auth($password); // 模拟不确定的操作(要么发短信,要么发邮件) if (mt_rand(0, 1) === 0) { $message = [ 'class' => 'SMS', // 类名 'params' => [ 'mobile' => '188' . str_pad((string)mt_rand(0, 99999999), 8, '0', STR_PAD_LEFT), // 手机号码 'code' => (string)mt_rand(100000, 999999), // 验证码 ], ]; } else { $address = [ 'LiuYi@163.com', 'ChenEr@189.cn', 'ZhangSan@tom.com', 'LiSi@sina.com.cn', 'WangWu@foxmail.com', 'ZhaoLiu@outlook.com', 'SunQi@qq.com', 'ZhouBa@10086.cn', 'WuJiu@sohu.com', 'ZhengShi@126.com', ]; $subject = [ 'PHP是世界上最好の语言', '从入门到放弃', '从删库到跑路', ]; $message = [ 'class' => 'Email', // 类名 'params' => [ 'address' => $address[mt_rand(0, count($address) - 1)], // 收件人邮箱地址 'subject' => $subject[mt_rand(0, count($subject) - 1)], // 邮件主题 ], ]; } /** * 消息内容编码(依次进行json_encode和base64_encode操作) * * @param array $message 消息内容数组 * @return string 消息内容字符串,若编码失败则返回空字符串 */ function message_encode(array $message): string { // 先对消息内容数组进行json_encode()转成字符串格式 try { $msg = json_encode($message, JSON_THROW_ON_ERROR); } catch (JsonException) { $msg = ''; } // 再对消息内容字符串进行base64_encode()避免中文编码问题 if (is_string($msg) && $msg !== '') { $msg = base64_encode($msg); } else { $msg = ''; } return $msg; } $message = message_encode($message); // 消息内容编码 if ($message !== '') { $ok = $redis->lPush($key, $message); // 向Redis消息队列服务器发送消息 if ($ok !== false) { echo '消息发送成功'; } else { echo '消息发送失败'; } }
<?php declare(strict_types=1); //========== Redis消息队列·消息消费者 ==========// // Redis消息队列服务器连接参数 $host = 'localhost'; // 主机地址 $password = '*****'; // 认证密码 $key = 'message_queue_key'; // 消息队列KEY,消息生产者和消息消费者两端需保证一致 $redis = new Redis(); $redis->connect($host); $redis->auth($password); /** * 消息内容解码(依次进行base64_decode和json_decode操作) * * @param string $message 消息内容字符串 * @return array 消息内容数组,若解码失败则返回空数组 */ function message_decode(string $message): array { // 先对消息内容字符串进行base64_decode()还原回JSON字符串 $json = base64_decode($message); // 再对JSON字符串解码还原回数组 if (is_string($json) && $json !== '') { try { $msg = json_decode($json, true, 512, JSON_THROW_ON_ERROR); } catch (JsonException) { $msg = []; } } return isset($msg) && is_array($msg) ? $msg : []; } while (true) { // 接收消息 try { $message = $redis->brPop($key, 0)[1]; // 重要提醒:brPop()会发生阻塞,直到收到消息。 } catch (RedisException) { continue; } // 创建线程来处理消息(避免处理消息耗时过长,阻塞接收消息) parallel\run(function () use ($message) { $msg = message_decode($message); // 消息内容解码 $class = isset($msg['class']) ? (string)$msg['class'] : ''; // 类名 $params = isset($msg['params']) ? (array)$msg['params'] : []; // 参数 $file = __DIR__ . "/$class.class.php"; // 处理消息的类文件 if (file_exists($file)) { require_once $file; // 引入处理消息的类文件 if (class_exists($class)) { (new $class($params))->do(); // 实例化类,开始处理消息 } } }); } // 向[18852594247]发送短信验证码[496131]成功 // 向[18805746317]发送短信验证码[975764]成功 // 向[WuJiu@sohu.com]发送[PHP是世界上最好の语言]主题邮件成功 // 向[ChenEr@189.cn]发送[从入门到放弃]主题邮件成功 // 向[18844793577]发送短信验证码[563572]成功 // 向[WuJiu@sohu.com]发送[PHP是世界上最好の语言]主题邮件成功 // 向[ZhengShi@126.com]发送[从入门到放弃]主题邮件成功 // 向[18817138608]发送短信验证码[211007]成功 // 向[18889004289]发送短信验证码[413143]成功 // 向[18807148864]发送短信验证码[225506]成功 //========== 总结 ==========// // 1、消息生产者只需要把类名和参数发送给消息消费者,这样在处理消息时就会变得非常灵活,后续加入新的消息类型也只要实现对应的类文件即可, // 而不需要重新运行消息消费者脚本(但是修改已引入的处理消息的类文件需要重新运行)。
<?php declare(strict_types=1); /** * SMS类 */ class SMS { private string $mobile; private string $code; /** * 构造方法 * * @param array $params 参数 */ public function __construct(array $params) { $this->mobile = (string)$params['mobile']; $this->code = (string)$params['code']; } /** * 发送短信 * * @return bool 发送成功=true|发送失败=false */ public function do(): bool { echo "向[$this->mobile]发送短信验证码[$this->code]成功" . PHP_EOL; return true; } }
<?php declare(strict_types=1); /** * Email类 */ class Email { private string $address; private string $subject; /** * 构造方法 * * @param array $params */ public function __construct(array $params) { $this->address = (string)$params['address']; $this->subject = (string)$params['subject']; } /** * 发送邮件 * * @return bool 发送成功=true|发送失败=false */ public function do(): bool { echo "向[$this->address]发送[$this->subject]主题邮件成功" . PHP_EOL; return true; } }
Copyright © 2024 码农人生. All Rights Reserved