<?php // +--------------------------------------------------------------+ // // | 消息消费者 - rabbitmq.consumer.php | // // +--------------------------------------------------------------+ // declare(strict_types=1); PHP_SAPI !== 'cli' && exit('脚本只能在命令行执行'); ini_set('display_errors', 'On'); error_reporting(-1); set_time_limit(0); ini_set('memory_limit', '-1'); const CONSUMER_ID = '消费者1'; // 如果需要启动多个消费者,可复制本脚本,然后修改消费者ID即可 // RabbitMQ服务器连接参数 $credentials = [ 'host' => 'localhost', 'port' => '5672', 'vhost' => '/', 'login' => 'RabbitMQ管理员账号', 'password' => 'RabbitMQ管理员密码', ]; $exchangeName = 'manong.exchange'; // 交换机名称(生产者和消费者要使用相同的交换机名称) $routingKey = 'manong.routing_key'; // 路由键(生产者和消费者要使用相同的路由键) $queueName = 'manong.queue'; // 队列名称(若只启动一个消费者则该变量可以不设置) /** * 在控制台输出一段DEBUG信息并终止程序 * * @param string $text 参数说明 * @return void echo&exit */ function shutdown(string $text): void { debug($text); exit; } /** * 在控制台输出一段信息 * * @param string $text 参数说明 * @return void echo */ function debug(string $text): void { echo '[' . CONSUMER_ID . "][DEBUG] $text" . PHP_EOL; } /** * 队列消息被消费后的回调函数 * * @param AMQPEnvelope $envelope AMQPEnvelope对象 * @param AMQPQueue $queue AMQPQueue对象 * @return bool 消息消费结果 */ function queue_consume_callback(AMQPEnvelope $envelope, AMQPQueue $queue): bool { $ok = false; // 获取消息并JSON解码 try { $body = json_decode($envelope->getBody(), true, 512, JSON_THROW_ON_ERROR); } catch (JsonException $e) { unset($e); } if (isset($body['time'], $body['content'])) { $datetime = date('Y-m-d H:i:s', $body['time']); $content = $body['content']; debug("[$datetime] $content"); // 发送ACK应答(通知RabbitMQ服务器消费者已正常消费掉消息,可以将该消息从队列中删除) $deliveryTag = $envelope->getDeliveryTag(); try { $ok = $queue->ack($deliveryTag); } catch (AMQPChannelException|AMQPConnectionException $e) { debug('发送ACK应答异常:' . $e->getMessage()); } } return $ok; } // 与RabbitMQ服务器建立连接 $connection = new AMQPConnection($credentials); // 创建AMQP连接对象,此时尚未与RabbitMQ服务器建立连接 try { $connection->connect(); // 与RabbitMQ服务器建立连接 } catch (AMQPConnectionException $e) { shutdown('与RabbitMQ服务器建立连接异常:' . $e->getMessage()); } $connection->isConnected() || shutdown('与RabbitMQ服务器建立连接失败'); // 建立网络信道 try { $channel = new AMQPChannel($connection); } catch (AMQPConnectionException $e) { shutdown('建立网络信道异常:' . $e->getMessage()); } // 创建交换机(在指定网络信道) try { $exchange = new AMQPExchange($channel); } catch (AMQPConnectionException|AMQPExchangeException $e) { shutdown('创建交换机异常:' . $e->getMessage()); } // 创建队列,此时在RabbitMQ网页管理插件的Queues选项卡就可以看到该队列 try { $queue = new AMQPQueue($channel); } catch (AMQPConnectionException|AMQPQueueException $e) { shutdown('创建队列异常:' . $e->getMessage()); } $exchange->setName($exchangeName); // 设置交换机名称 $exchange->setType('x-delayed-message'); // 设置交换机类型 $exchange->setArgument('x-delayed-type', 'direct'); // 重要提醒:这是延迟消息必须设置的参数 $exchange->setFlags(AMQP_DURABLE); // 开启交换机持久化 try { $exchange->declareExchange(); } catch (AMQPChannelException|AMQPConnectionException|AMQPExchangeException $e) { shutdown('配置交换机异常:' . $e->getMessage()); } $queue->setName($queueName); // 重要提醒:若只启动一个消费者则可以不调用AMQPQueue::setName()方法 try { $queue->declareQueue(); } catch (AMQPChannelException|AMQPConnectionException $e) { shutdown('配置队列异常:' . $e->getMessage()); } try { $queue->bind($exchangeName, $routingKey); } catch (AMQPChannelException|AMQPConnectionException $e) { shutdown('队列绑定到交换机异常:' . $e->getMessage()); } // 启动消费者 try { $queue->consume('queue_consume_callback'); // 重要提醒:这里会一直阻塞 } catch (AMQPChannelException|AMQPConnectionException|AMQPEnvelopeException $e) { $connection->disconnect(); // 断开连接 shutdown('启动消费者异常:' . $e->getMessage()); } // 重要提醒:这里是执行不到的,因为$queue->consume()会一直阻塞,直到收到消息,并且处理完消息后又会进入阻塞状态 debug('启动消费者成功');
<?php // +--------------------------------------------------------------+ // // | 消息生产者 - rabbitmq.producer.php | // // +--------------------------------------------------------------+ // declare(strict_types=1); ini_set('display_errors', 'On'); error_reporting(-1); // RabbitMQ服务器连接参数 $credentials = [ 'host' => 'localhost', 'port' => '5672', 'vhost' => '/', 'login' => 'RabbitMQ管理员账号', 'password' => 'RabbitMQ管理员密码', ]; $exchangeName = 'manong.exchange'; // 交换机名称(生产者和消费者要使用相同的交换机名称) $routingKey = 'manong.routing_key'; // 路由键(生产者和消费者要使用相同的路由键) // 与RabbitMQ服务器建立连接 $connection = new AMQPConnection($credentials); // 创建AMQP连接对象,此时尚未与RabbitMQ服务器建立连接 try { $connection->connect(); // 与RabbitMQ服务器建立连接 } catch (AMQPConnectionException $e) { exit('与RabbitMQ服务器建立连接异常:' . $e->getMessage()); } $connection->isConnected() || exit('与RabbitMQ服务器建立连接失败'); // 建立网络信道 try { $channel = new AMQPChannel($connection); } catch (AMQPConnectionException $e) { exit('建立网络信道异常:' . $e->getMessage()); } // 创建交换机(在指定网络信道) try { $exchange = new AMQPExchange($channel); } catch (AMQPConnectionException|AMQPExchangeException $e) { exit('创建交换机异常:' . $e->getMessage()); } $exchange->setName($exchangeName); // 设置交换机名称 $exchange->setType('x-delayed-message'); // 设置交换机类型 $exchange->setArgument('x-delayed-type', 'direct'); // 重要提醒:这是延迟消息必须设置的参数 $exchange->setFlags(AMQP_DURABLE); // 开启交换机持久化 try { $exchange->declareExchange(); } catch (AMQPChannelException|AMQPConnectionException|AMQPExchangeException $e) { exit('配置交换机异常:' . $e->getMessage()); } // 设置消息内容 $content = [ '平天大圣·牛魔王', '覆海大圣·蛟魔王', '混天大圣·鹏魔王', '移山大圣·狮驼王', '通风大圣·猕猴王', '驱神大圣·禺狨王', '齐天大圣·美猴王', ]; $message = ['time' => time(), 'content' => $content[mt_rand(0, count($content) - 1)]]; try { $message = json_encode($message, JSON_THROW_ON_ERROR); } catch (JsonException $e) { exit('JSON编码异常:' . $e->getMessage()); } // 发布消息 $delay = 1000 * 10; // 延迟时间(单位为毫秒),若不需要延迟可设为0 try { $ok = $exchange->publish($message, $routingKey, AMQP_NOPARAM, ['headers' => ['x-delay' => $delay]]); } catch (AMQPChannelException|AMQPConnectionException|AMQPExchangeException) { $ok = false; } $ok = $ok ?? false; echo '发布消息' . ($ok ? '成功' : '失败') . PHP_EOL; $connection->disconnect(); // 断开连接
Copyright © 2024 码农人生. All Rights Reserved