使用amqp扩展操作RabbitMQ

操作系统:CentOS 7
PHP版本:8.1
amqp扩展版本:1.11.0
 
PHP操作RabbitMQ必须使用AMQP协议(Advanced Message Queuing Protocol,高级消息队列协议),而PHP要使用AMQP协议必须安装amqp扩展。
 
安装amqp扩展
[root@localhost src]# tar -xf amqp-1.11.0.tgz
[root@localhost src]# cd amqp-1.11.0
[root@localhost amqp-1.11.0]# /program/php/bin/phpize
[root@localhost amqp-1.11.0]# ./configure --with-php-config=/program/php/bin/php-config --with-amqp
[root@localhost amqp-1.11.0]# make
[root@localhost amqp-1.11.0]# make install
[root@localhost amqp-1.11.0]# vim /program/php/php.ini
extension=amqp.so
[root@localhost amqp-1.11.0]# service php-fpm restart
Gracefully shutting down php-fpm . done
Starting php-fpm  done
[root@localhost amqp-1.11.0]#



消息消费者代码(rabbitmq.consumer.php)
 
<?php
// +--------------------------------------------------------------+ //
// | 消息消费者 - rabbitmq.consumer.php                           | //
// +--------------------------------------------------------------+ //
declare(strict_types=1);
PHP_SAPI !== 'cli' && exit('脚本只能在命令行执行');
ini_set('display_errors', 'On');
error_reporting(-1);
set_time_limit(0);
ini_set('memory_limit', '-1');

const CONSUMER_ID = '消费者1'; // 如果需要启动多个消费者,可复制本脚本,然后修改消费者ID即可

// RabbitMQ服务器连接参数
$credentials = [
    'host' => 'localhost',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'RabbitMQ管理员账号',
    'password' => 'RabbitMQ管理员密码',
];

$exchangeName = 'manong.exchange'; // 交换机名称(生产者和消费者要使用相同的交换机名称)
$routingKey = 'manong.routing_key'; // 路由键(生产者和消费者要使用相同的路由键)
$queueName = 'manong.queue'; // 队列名称(若只启动一个消费者则该变量可以不设置)

/**
 * 在控制台输出一段DEBUG信息并终止程序
 *
 * @param string $text 参数说明
 * @return void echo&exit
 */
function shutdown(string $text): void
{
    debug($text);
    exit;
}

/**
 * 在控制台输出一段信息
 *
 * @param string $text 参数说明
 * @return void echo
 */
function debug(string $text): void
{
    echo '[' . CONSUMER_ID . "][DEBUG] $text" . PHP_EOL;
}

/**
 * 队列消息被消费后的回调函数
 *
 * @param AMQPEnvelope $envelope AMQPEnvelope对象
 * @param AMQPQueue $queue AMQPQueue对象
 * @return bool 消息消费结果
 */
function queue_consume_callback(AMQPEnvelope $envelope, AMQPQueue $queue): bool
{
    $ok = false;

    // 获取消息并JSON解码
    try {
        $body = json_decode($envelope->getBody(), true, 512, JSON_THROW_ON_ERROR);
    } catch (JsonException $e) {
        unset($e);
    }

    if (isset($body['time'], $body['content'])) {
        $datetime = date('Y-m-d H:i:s', $body['time']);
        $content = $body['content'];
        debug("[$datetime] $content");

        // 发送ACK应答(通知RabbitMQ服务器消费者已正常消费掉消息,可以将该消息从队列中删除)
        $deliveryTag = $envelope->getDeliveryTag();
        try {
            $ok = $queue->ack($deliveryTag);
        } catch (AMQPChannelException|AMQPConnectionException $e) {
            debug('发送ACK应答异常:' . $e->getMessage());
        }
    }

    return $ok;
}

// 与RabbitMQ服务器建立连接
$connection = new AMQPConnection($credentials); // 创建AMQP连接对象,此时尚未与RabbitMQ服务器建立连接
try {
    $connection->connect(); // 与RabbitMQ服务器建立连接
} catch (AMQPConnectionException $e) {
    shutdown('与RabbitMQ服务器建立连接异常:' . $e->getMessage());
}
$connection->isConnected() || shutdown('与RabbitMQ服务器建立连接失败');

// 建立网络信道
try {
    $channel = new AMQPChannel($connection);
} catch (AMQPConnectionException $e) {
    shutdown('建立网络信道异常:' . $e->getMessage());
}

// 创建交换机(在指定网络信道)
try {
    $exchange = new AMQPExchange($channel);
} catch (AMQPConnectionException|AMQPExchangeException $e) {
    shutdown('创建交换机异常:' . $e->getMessage());
}

// 创建队列,此时在RabbitMQ网页管理插件的Queues选项卡就可以看到该队列
try {
    $queue = new AMQPQueue($channel);
} catch (AMQPConnectionException|AMQPQueueException $e) {
    shutdown('创建队列异常:' . $e->getMessage());
}

$exchange->setName($exchangeName); // 设置交换机名称
$exchange->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型
$exchange->setFlags(AMQP_DURABLE); // 开启交换机持久化
try {
    $exchange->declareExchange();
} catch (AMQPChannelException|AMQPConnectionException|AMQPExchangeException $e) {
    shutdown('配置交换机异常:' . $e->getMessage());
}

$queue->setName($queueName); // 重要提醒:若只启动一个消费者则可以不调用AMQPQueue::setName()方法
try {
    $queue->declareQueue();
} catch (AMQPChannelException|AMQPConnectionException $e) {
    shutdown('配置队列异常:' . $e->getMessage());
}

try {
    $queue->bind($exchangeName, $routingKey);
} catch (AMQPChannelException|AMQPConnectionException $e) {
    shutdown('队列绑定到交换机异常:' . $e->getMessage());
}

// 启动消费者
try {
    $queue->consume('queue_consume_callback'); // 重要提醒:这里会一直阻塞
} catch (AMQPChannelException|AMQPConnectionException|AMQPEnvelopeException $e) {
    $connection->disconnect(); // 断开连接
    shutdown('启动消费者异常:' . $e->getMessage());
}

// 重要提醒:这里是执行不到的,因为$queue->consume()会一直阻塞,直到收到消息,并且处理完消息后又会进入阻塞状态
debug('启动消费者成功');


//========== 总结 ==========//
// 1、生产者通过网络信道(Channel)把消息内容和路由键(RoutingKey)发送给交换机(Exchange),交换机再根据路由键把消息内容放入队列,
//    最后RabbitMQ服务器把消息推送给匹配的消费者。
// 2、在启动多个消费者的情况下,是否调用AMQPQueue::setName()方法会影响消费效果:
//    如果不调用AMQPQueue::setName()方法,那么所有消费者都会收到消息,也就是一个消息会被多次消费;
//    如果调用AMQPQueue::setName()方法,那么只有一个消费者会收到消息,也就是一个消息只被消费一次。


 
消息生产者代码(rabbitmq.producer.php)
 
<?php
// +--------------------------------------------------------------+ //
// | 消息生产者 - rabbitmq.producer.php                           | //
// +--------------------------------------------------------------+ //
declare(strict_types=1);
ini_set('display_errors', 'On');
error_reporting(-1);

// RabbitMQ服务器连接参数
$credentials = [
    'host' => 'localhost',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'RabbitMQ管理员账号',
    'password' => 'RabbitMQ管理员密码',
];

$exchangeName = 'manong.exchange'; // 交换机名称(生产者和消费者要使用相同的交换机名称)
$routingKey = 'manong.routing_key'; // 路由键(生产者和消费者要使用相同的路由键)

// 与RabbitMQ服务器建立连接
$connection = new AMQPConnection($credentials); // 创建AMQP连接对象,此时尚未与RabbitMQ服务器建立连接
try {
    $connection->connect(); // 与RabbitMQ服务器建立连接
} catch (AMQPConnectionException $e) {
    exit('与RabbitMQ服务器建立连接异常:' . $e->getMessage());
}
$connection->isConnected() || exit('与RabbitMQ服务器建立连接失败');

// 建立网络信道
try {
    $channel = new AMQPChannel($connection);
} catch (AMQPConnectionException $e) {
    exit('建立网络信道异常:' . $e->getMessage());
}

// 创建交换机(在指定网络信道)
try {
    $exchange = new AMQPExchange($channel);
} catch (AMQPConnectionException|AMQPExchangeException $e) {
    exit('创建交换机异常:' . $e->getMessage());
}

$exchange->setName($exchangeName); // 设置交换机名称
$exchange->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型
$exchange->setFlags(AMQP_DURABLE); // 开启交换机持久化
try {
    $exchange->declareExchange();
} catch (AMQPChannelException|AMQPConnectionException|AMQPExchangeException $e) {
    exit('配置交换机异常:' . $e->getMessage());
}

// 设置消息内容
$content = [
    '平天大圣·牛魔王',
    '覆海大圣·蛟魔王',
    '混天大圣·鹏魔王',
    '移山大圣·狮驼王',
    '通风大圣·猕猴王',
    '驱神大圣·禺狨王',
    '齐天大圣·美猴王',
];
$message = ['time' => time(), 'content' => $content[mt_rand(0, count($content) - 1)]];
try {
    $message = json_encode($message, JSON_THROW_ON_ERROR);
} catch (JsonException $e) {
    exit('JSON编码异常:' . $e->getMessage());
}

// 发布消息
try {
    $ok = $exchange->publish($message, $routingKey);
} catch (AMQPChannelException|AMQPConnectionException|AMQPExchangeException) {
    $ok = false;
}
$ok = $ok ?? false;
echo '发布消息' . ($ok ? '成功' : '失败') . PHP_EOL;

$connection->disconnect(); // 断开连接

Copyright © 2024 码农人生. All Rights Reserved