2、Work queues
http://previous.rabbitmq.com/v3_5_7/tutorials/tutorial-two-php.html
设置 确保消息永不丢失
设置第四个参数来打开它们basic_consume 为 false (true表示没有ACK),
$ channel-> basic_consume('task_queue','',false,false,false,false,$ callback);
设置 可持久化
我们需要确保RabbitMQ永远不会丢失我们的队列。为此,我们需要声明它是持久的。为此,我们将第三个参数传递给queue_declare为true:
$ channel-> queue_declare('task_queue',false,true,false,false);
确信即使RabbitMQ重新启动,task_queue队列也不会丢失
将消息标记为持久性
通过设置delivery_mode = 2消息属性,AMQPMessage将其作为属性数组的一部分。
$msg = new AMQPMessage($data,
array('delivery_mode' => 2) # make message persistent
);
new_task.php file:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
//第三个参数为true时,为可持久化
$channel->queue_declare('task_queue', false, true, false, false);
//以允许从命令行发送任意消息
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "Hello World!";
}
//要将消息标记为持久性 - 通过设置delivery_mode = 2消息属性,AMQPMessage将其作为属性数组的一部分。
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();
?>
worker.php:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
//第三个参数为true为可持久化
$channel->queue_declare('task_queue', false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
//它需要为消息体中的每个点伪造一秒钟的工作。它将从队列中弹出消息并执行任务
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//在处理并确认前一个消息之前,不要向工作人员发送新消息。
$channel->basic_qos(null, 1, null);
/**
* 为了确保消息永不丢失,RabbitMQ支持消息确认。从消费者发回ack(nowledgement)以告知RabbitMQ已收到,处理了特定消息,并且RabbitMQ可以自由删除它。
* 默认情况下,消息确认已关闭。现在是时候通过设置第四个参数来打开它们basic_consume到假 (true表示没有ACK),并从工作人员发送适当的确认,一旦我们有一个任务来完成。
**/
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>