使用Redis实现消息队列功能

<?php
declare(strict_types=1);

//========== Redis消息队列·消息生产者 ==========//

// Redis消息队列服务器连接参数
$host = 'localhost'; // 主机地址
$password = '*****'; // 认证密码
$key = 'message_queue_key'; // 消息队列KEY,消息生产者和消息消费者两端需保证一致

$redis = new Redis();
$redis->connect($host);
$redis->auth($password);

$opr = ['+', '-', '*', '/']; // 四则运算符

// 构造消息内容
$message = [
    'mid' => mt_rand(100000, 999999), // 消息ID
    'opr' => $opr[mt_rand(0, 3)], // 运算符
    'num1' => mt_rand(10, 99), // 操作数1
    'num2' => mt_rand(10, 99), // 操作数2
];

// 消息内容必须以字符串形式发送,故使用json_encode()将消息内容数组转为字符串
try {
    $message = json_encode($message, JSON_THROW_ON_ERROR);
} catch (JsonException) {
    $message = '';
}

if ($message !== '') {
    $ok = $redis->lPush($key, $message); // 向Redis消息队列服务器发送消息
    if ($ok !== false) {
        echo '消息发送成功';
    } else {
        echo '消息发送失败';
    }
}



<?php
declare(strict_types=1);

//========== Redis消息队列·消息消费者 ==========//

// Redis消息队列服务器连接参数
$host = 'localhost'; // 主机地址
$password = '*****'; // 认证密码
$key = 'message_queue_key'; // 消息队列KEY,消息生产者和消息消费者两端需保证一致

$redis = new Redis();
$redis->connect($host);
$redis->auth($password);

while (true) {
    // 接收消息
    try {
        $message = $redis->brPop($key, 0)[1]; // 重要提醒:这里会发生阻塞,直到收到消息。
    } catch (RedisException) {
        continue;
    }

    // 创建线程来处理消息(避免处理消息耗时过长,阻塞接收消息)
    parallel\run(function () use ($message) {
        // 解码消息
        try {
            $message = json_decode($message, true, 512, JSON_THROW_ON_ERROR);
        } catch (JsonException) {
            $message = [];
        }

        if (isset($message['mid'], $message['opr'], $message['num1'], $message['num2'])) {
            $mid = (int)$message['mid']; // 消息ID
            $opr = (string)$message['opr']; // 运算符
            $num1 = (int)$message['num1']; // 操作数1
            $num2 = (int)$message['num2']; // 操作数2

            if ($opr === '+') {
                $result = $num1 + $num2;
            } elseif ($opr === '-') {
                $result = $num1 - $num2;
            } elseif ($opr === '*') {
                $result = $num1 * $num2;
            } elseif ($opr === '/') {
                $result = round($num1 / $num2, 2);
            }

            if (isset($result)) {
                echo "处理消息[$mid]成功:$num1 $opr $num2 = $result" . PHP_EOL;
            }
        }
    });
}


// 处理消息[848314]成功:22 + 49 = 71
// 处理消息[640448]成功:84 / 16 = 5.25
// 处理消息[643893]成功:11 - 75 = -64
// 处理消息[133635]成功:84 * 97 = 8148
// 处理消息[164103]成功:23 + 90 = 113
// 处理消息[148602]成功:82 / 44 = 1.86
// 处理消息[460240]成功:17 + 64 = 81
// 处理消息[887516]成功:39 + 86 = 125
// 处理消息[994511]成功:83 / 71 = 1.17
// 处理消息[127045]成功:20 / 57 = 0.35


//========== 总结 ==========//
// 1、消息消费者PHP脚本必须以CLI模式执行,命令如下:
//    [root@localhost ~]# /program/php/bin/php /inetpub/wwwroot/demo/redis.consumer.php
// 2、Redis不是专业的消息队列,所以缺少一些专业消息队列常见的高级特性,例如没有确认消息(ACK)机制,所以上面的代码只有消息处理操作,
//    而没有ACK操作。
// 3、Redis消息队列不支持多个消费者,例如把当前脚本复制一份,然后启动两个消费者,这时生产者如果不停发消息就可以发现两个消费者是交替
//    处理消息,而不是同时处理同一个消息。
// 4、Redis主要功能还是内存存储,它作为消息队列在可靠性上肯定是不如RabbitMQ等专业消息队列的,但是它胜在简单,并且很多系统都会使用它
//    作为缓存服务器,这样就不用再额外安装消息队列服务器。总而言之还是要根据实际需求来决定,如果对消息的可靠性要求非常高,或者需要
//    支持多消费者模式,那么还是老老实实使用RabbitMQ等消息队列吧。

Copyright © 2024 码农人生. All Rights Reserved