本篇文章為大家展示了RabbitMQ中怎么實現延遲功能,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
岑鞏網站建設公司成都創新互聯,岑鞏網站設計制作,有大型網站制作公司豐富經驗。已為岑鞏上千余家提供企業網站建設服務。企業網站搭建\成都外貿網站制作要多少錢,請找那個售后服務好的岑鞏做網站的公司定做!
使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang啟用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchang
輸出如下:
The following plugins have been enabled: rabbitmq_delayed_message_exchange
通過rabbitmq-plugins list查看已安裝列表,如下:
[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x
安裝插件后會生成新的Exchange類型x-delayed-message,該類型消息支持延遲投遞機制,接收到消息后并未立即將消息投遞至目標隊列中,而是存儲在mnesia(一個分布式數據系統)表中,檢測消息延遲時間,如達到可投遞時間時并將其通過x-delayed-type類型標記的交換機類型投遞至目標隊列。
消費者 delay_consumer2.php:
<?php
//header('Content-Type:text/html;charset=utf8;');
$params = array(
'exchangeName' => 'delayed_exchange_test',
'queueName' => 'delayed_queue_test',
'routeKey' => 'delayed_route_test',
);
$connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
//var_dump(extension_loaded('amqp'));
//exit();
try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die('Conexiune esuata');
//TODO 記錄日志
echo 'rabbit-mq 連接錯誤:', json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
// die('Connection through channel failed');
//TODO 記錄日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
//$exchange->setFlags(AMQP_DURABLE);//聲明一個已存在的交換器的,如果不存在將拋出異常,這個一般用在consume端
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message類型
/*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。
fanout:把所有發送到該Exchange的消息投遞到所有與它綁定的隊列中。
direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。
topic:將消息路由到binding key與routing key模式匹配的隊列中。*/
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange();
//$channel->startTransaction();
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName']);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
//綁定
$queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {
echo $e->getMessage();
exit();
}
function callback(AMQPEnvelope $message) {
global $queue;
if ($message) {
$body = $message->getBody();
echo '接收時間:'.date("Y-m-d H:i:s", time()). PHP_EOL;
echo '接收內容:'.$body . PHP_EOL;
//為了防止接收端在處理消息時down掉,只有在消息處理完成后才發送ack消息
$queue->ack($message->getDeliveryTag());
} else {
echo 'no message' . PHP_EOL;
}
}
//$queue->consume('callback'); 第一種消費方式,但是會阻塞,程序一直會卡在此處
//第二種消費方式,非阻塞
/*$start = time();
while(true)
{
$message = $queue->get();
if(!empty($message))
{
echo $message->getBody();
$queue->ack($message->getDeliveryTag()); //應答,代表該消息已經消費
$end = time();
echo '<br>' . ($end - $start);
exit();
}
else
{
//echo 'message not found' . PHP_EOL;
}
}*/
//注意:這里需要注意的是這個方法:$queue->consume,queue對象有兩個方法可用于取消息:consume和get。前者是阻塞的,無消息時會被掛起,適合循環中使用;后者則是非阻塞的,取消息時有則取,無則返回false。
//就是說用了consume之后,會同步阻塞,該程序常駐內存,不能用nginx,apache調用。
$action = '2';
if($action == '1'){
$queue->consume('callback'); //第一種消費方式,但是會阻塞,程序一直會卡在此處
}else{
//第二種消費方式,非阻塞
$start = time();
while(true)
{
$message = $queue->get();
if(!empty($message))
{
echo '接收時間:'.date("Y-m-d H:i:s", time()). PHP_EOL;
echo '接收內容:'.$message->getBody().PHP_EOL;
$queue->ack($message->getDeliveryTag()); //應答,代表該消息已經消費
$end = time();
echo '運行時間:'.($end - $start).'秒'.PHP_EOL;
//exit();
}
else
{
//echo 'message not found' . PHP_EOL;
}
}
}生產者delay_publisher2.php:
<?php
//header('Content-Type:text/html;charset=utf-8;');
$params = array(
'exchangeName' => 'delayed_exchange_test',
'queueName' => 'delayed_queue_test',
'routeKey' => 'delayed_route_test',
);
$connectConfig = array(
'host' => 'localhost',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
//var_dump(extension_loaded('amqp')); 判斷是否加載amqp擴展
//exit();
try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die('Conexiune esuata');
//TODO 記錄日志
echo 'rabbit-mq 連接錯誤:', json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
// die('Connection through channel failed');
//TODO 記錄日志
echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message類型
/*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。
fanout:把所有發送到該Exchange的消息投遞到所有與它綁定的隊列中。
direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。
topic:將消息路由到binding key與routing key模式匹配的隊列中。*/
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange();
//$channel->startTransaction();
//RabbitMQ不容許聲明2個相同名稱、配置不同的Queue,否則報錯
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName']);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
//綁定隊列和交換機
$queue->bind($params['exchangeName'], $params['routeKey']);
//$channel->commitTransaction();
} catch(Exception $e) {
}
for($i=5;$i>0;$i--){
//生成消息
echo '發送時間:'.date("Y-m-d H:i:s", time()).PHP_EOL;
echo 'i='.$i.',延遲'.$i.'秒'.PHP_EOL;
$message = json_encode(['order_id'=>time(),'i'=>$i]);
$exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 1000*$i]]);
sleep(2);
}
$conn->disconnect();對于代碼來講,首先對于消費者核心代碼
$exchange->setType('x-delayed-message'); //x-delayed-message類型
$exchange->setArgument('x-delayed-type','direct');生產者核心代碼
$exchange = new AMQPExchange($channel);
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message類型
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange();使用方法:先運行delay_consumer1.php,再運行delay_publisher1.php
運行效果:

上述內容就是RabbitMQ中怎么實現延遲功能,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注創新互聯行業資訊頻道。
文章題目:RabbitMQ中怎么實現延遲功能
本文地址:http://www.yijiale78.com/article26/gihocg.html
成都網站建設公司_創新互聯,為您提供微信公眾號、網站改版、網站內鏈、小程序開發、企業建站、外貿建站
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯