灵活地实现消息队列消费者

<?php
declare(strict_types=1);

//========== Redis消息队列·消息生产者 ==========//

// Redis消息队列服务器连接参数
$host = 'localhost'; // 主机地址
$password = '*****'; // 认证密码
$key = 'message_queue_key'; // 消息队列KEY,消息生产者和消息消费者两端需保证一致

$redis = new Redis();
$redis->connect($host);
$redis->auth($password);

// 模拟不确定的操作(要么发短信,要么发邮件)
if (mt_rand(0, 1) === 0) {
    $message = [
        'class' => 'SMS', // 类名
        'params' => [
            'mobile' => '188' . str_pad((string)mt_rand(0, 99999999), 8, '0', STR_PAD_LEFT), // 手机号码
            'code' => (string)mt_rand(100000, 999999), // 验证码
        ],
    ];
} else {
    $address = [
        'LiuYi@163.com',
        'ChenEr@189.cn',
        'ZhangSan@tom.com',
        'LiSi@sina.com.cn',
        'WangWu@foxmail.com',
        'ZhaoLiu@outlook.com',
        'SunQi@qq.com',
        'ZhouBa@10086.cn',
        'WuJiu@sohu.com',
        'ZhengShi@126.com',
    ];

    $subject = [
        'PHP是世界上最好の语言',
        '从入门到放弃',
        '从删库到跑路',
    ];

    $message = [
        'class' => 'Email', // 类名
        'params' => [
            'address' => $address[mt_rand(0, count($address) - 1)], // 收件人邮箱地址
            'subject' => $subject[mt_rand(0, count($subject) - 1)], // 邮件主题
        ],
    ];
}

/**
 * 消息内容编码(依次进行json_encode和base64_encode操作)
 *
 * @param array $message 消息内容数组
 * @return string 消息内容字符串,若编码失败则返回空字符串
 */
function message_encode(array $message): string
{
    // 先对消息内容数组进行json_encode()转成字符串格式
    try {
        $msg = json_encode($message, JSON_THROW_ON_ERROR);
    } catch (JsonException) {
        $msg = '';
    }

    // 再对消息内容字符串进行base64_encode()避免中文编码问题
    if (is_string($msg) && $msg !== '') {
        $msg = base64_encode($msg);
    } else {
        $msg = '';
    }

    return $msg;
}

$message = message_encode($message); // 消息内容编码
if ($message !== '') {
    $ok = $redis->lPush($key, $message); // 向Redis消息队列服务器发送消息
    if ($ok !== false) {
        echo '消息发送成功';
    } else {
        echo '消息发送失败';
    }
}



<?php
declare(strict_types=1);

//========== Redis消息队列·消息消费者 ==========//

// Redis消息队列服务器连接参数
$host = 'localhost'; // 主机地址
$password = '*****'; // 认证密码
$key = 'message_queue_key'; // 消息队列KEY,消息生产者和消息消费者两端需保证一致

$redis = new Redis();
$redis->connect($host);
$redis->auth($password);

/**
 * 消息内容解码(依次进行base64_decode和json_decode操作)
 *
 * @param string $message 消息内容字符串
 * @return array 消息内容数组,若解码失败则返回空数组
 */
function message_decode(string $message): array
{
    // 先对消息内容字符串进行base64_decode()还原回JSON字符串
    $json = base64_decode($message);

    // 再对JSON字符串解码还原回数组
    if (is_string($json) && $json !== '') {
        try {
            $msg = json_decode($json, true, 512, JSON_THROW_ON_ERROR);
        } catch (JsonException) {
            $msg = [];
        }
    }

    return isset($msg) && is_array($msg) ? $msg : [];
}

while (true) {
    // 接收消息
    try {
        $message = $redis->brPop($key, 0)[1]; // 重要提醒:brPop()会发生阻塞,直到收到消息。
    } catch (RedisException) {
        continue;
    }

    // 创建线程来处理消息(避免处理消息耗时过长,阻塞接收消息)
    parallel\run(function () use ($message) {
        $msg = message_decode($message); // 消息内容解码

        $class = isset($msg['class']) ? (string)$msg['class'] : ''; // 类名
        $params = isset($msg['params']) ? (array)$msg['params'] : []; // 参数

        $file = __DIR__ . "/$class.class.php"; // 处理消息的类文件
        if (file_exists($file)) {
            require_once $file; // 引入处理消息的类文件

            if (class_exists($class)) {
                (new $class($params))->do(); // 实例化类,开始处理消息
            }
        }
    });
}


// 向[18852594247]发送短信验证码[496131]成功
// 向[18805746317]发送短信验证码[975764]成功
// 向[WuJiu@sohu.com]发送[PHP是世界上最好の语言]主题邮件成功
// 向[ChenEr@189.cn]发送[从入门到放弃]主题邮件成功
// 向[18844793577]发送短信验证码[563572]成功
// 向[WuJiu@sohu.com]发送[PHP是世界上最好の语言]主题邮件成功
// 向[ZhengShi@126.com]发送[从入门到放弃]主题邮件成功
// 向[18817138608]发送短信验证码[211007]成功
// 向[18889004289]发送短信验证码[413143]成功
// 向[18807148864]发送短信验证码[225506]成功


//========== 总结 ==========//
// 1、消息生产者只需要把类名和参数发送给消息消费者,这样在处理消息时就会变得非常灵活,后续加入新的消息类型也只要实现对应的类文件即可,
//    而不需要重新运行消息消费者脚本(但是修改已引入的处理消息的类文件需要重新运行)。



<?php
declare(strict_types=1);

/**
 * SMS类
 */
class SMS
{
    private string $mobile;
    private string $code;

    /**
     * 构造方法
     *
     * @param array $params 参数
     */
    public function __construct(array $params)
    {
        $this->mobile = (string)$params['mobile'];
        $this->code = (string)$params['code'];
    }

    /**
     * 发送短信
     *
     * @return bool 发送成功=true|发送失败=false
     */
    public function do(): bool
    {
        echo "向[$this->mobile]发送短信验证码[$this->code]成功" . PHP_EOL;

        return true;
    }
}



<?php
declare(strict_types=1);

/**
 * Email类
 */
class Email
{
    private string $address;
    private string $subject;

    /**
     * 构造方法
     *
     * @param array $params
     */
    public function __construct(array $params)
    {
        $this->address = (string)$params['address'];
        $this->subject = (string)$params['subject'];
    }

    /**
     * 发送邮件
     *
     * @return bool 发送成功=true|发送失败=false
     */
    public function do(): bool
    {
        echo "向[$this->address]发送[$this->subject]主题邮件成功" . PHP_EOL;

        return true;
    }
}

Copyright © 2024 码农人生. All Rights Reserved