队列插件为 php-queue 项目提供了一个易于使用的接口,该项目抽象了数十个队列后端,以便在应用程序中使用。队列可用于通过将长时间运行的进程(如电子邮件或通知发送)推迟到以后的时间来提高应用程序的性能。
您可以使用 composer 将此插件安装到您的 CakePHP 应用程序中。
推荐安装 composer 包的方式是
composer require cakephp/queue
安装您希望使用的传输。有关可用传输的列表,请参阅 此页面。以下示例适用于纯 PHP Redis
composer require enqueue/redis predis/predis:^1
确保在 src/Application.php
文件中的 Application::bootstrap()
函数中加载插件
$this->addPlugin('Cake/Queue');
以下配置应存在于 config/app.php 的配置数组中
// ...
'Queue' => [
'default' => [
// A DSN for your configured backend. default: null
// Can contain protocol/port/username/password or be null if the backend defaults to localhost
'url' => 'redis://myusername:[email protected]:1000',
// The queue that will be used for sending messages. default: default
// This can be overridden when queuing or processing messages
'queue' => 'default',
// The name of a configured logger, default: null
'logger' => 'stdout',
// The name of an event listener class to associate with the worker
'listener' => \App\Listener\WorkerListener::class,
// The amount of time in milliseconds to sleep if no jobs are currently available. default: 10000
'receiveTimeout' => 10000,
// Whether to store failed jobs in the queue_failed_jobs table. default: false
'storeFailedJobs' => true,
// (optional) The cache configuration for storing unique job ids. `duration`
// should be greater than the maximum length of time any job can be expected
// to remain on the queue. Otherwise, duplicate jobs may be
// possible. Defaults to +24 hours. Note that `File` engine is only suitable
// for local development.
// See https://book.cakephp.com.cn/4/en/core-libraries/caching.html#configuring-cache-engines.
'uniqueCache' => [
'engine' => 'File',
],
]
],
// ...
Queue
配置键可以包含一个或多个队列配置。每个配置用于与不同的队列后端进行交互。
如果 storeFailedJobs
设置为 true
,请确保运行插件迁移以创建 queue_failed_jobs
表。
安装迁移插件
composer require cakephp/migrations:"^3.1"
运行迁移
bin/cake migrations migrate --plugin Cake/Queue
工作负载被定义为“作业”。作业类可以在构造函数中从应用程序的依赖项注入容器中接收依赖项,就像控制器或命令一样。作业负责处理队列消息。一个简单的作业,它记录接收到的消息,如下所示
<?php
// src/Job/ExampleJob.php
declare(strict_types=1);
namespace App\Job;
use Cake\Log\LogTrait;
use Cake\Queue\Job\Message;
use Cake\Queue\Job\JobInterface;
use Interop\Queue\Processor;
class ExampleJob implements JobInterface
{
use LogTrait;
/**
* The maximum number of times the job may be attempted. (optional property)
*
* @var int|null
*/
public static $maxAttempts = 3;
/**
* Whether there should be only one instance of a job on the queue at a time. (optional property)
*
* @var bool
*/
public static $shouldBeUnique = false;
public function execute(Message $message): ?string
{
$id = $message->getArgument('id');
$data = $message->getArgument('data');
$this->log(sprintf('%d %s', $id, $data));
return Processor::ACK;
}
}
传递的 Message
对象具有以下方法
getArgument($key = null, $default = null)
: 可以返回传递的整个数据集或基于 Hash::get()
符号键的值。
getContext()
: 返回原始上下文对象。
getOriginalMessage()
: 返回原始队列消息对象。
getParsedBody()
: 返回解析的队列消息正文。
作业可以返回以下任何值
Processor::ACK
: 当消息成功处理时使用此常量。消息将从队列中删除。
Processor::REJECT
: 当消息无法处理时使用此常量。消息将从队列中删除。
Processor::REQUEUE
: 当消息无效或当前无法处理但我们可以稍后重试时使用此常量。原始消息将从队列中删除,但副本将再次发布到队列中。
作业也可以返回一个空值,这被解释为 Processor::ACK
。未能以有效类型响应将导致解释为消息失败并重新排队消息。
作业属性
maxAttempts
: 作业由于异常或显式返回 Processor::REQUEUE
而被重新排队的最大次数。如果提供,此值将覆盖 worker 命令行选项 --max-attempts
中提供的值。如果作业或命令行选项未提供值,则作业可以无限次重新排队。
shouldBeUnique
: 如果为 true
,则队列中一次只允许存在一个由其类、方法和数据标识的作业实例。随后的推送将被静默丢弃。这对于幂等操作很有用,在幂等操作中,连续的作业执行没有任何好处。例如,刷新计算数据。如果为 true
,则必须设置 uniqueCache
配置。
您可以使用 Cake\Queue\QueueManager
排队作业
use App\Job\ExampleJob;
use Cake\Queue\QueueManager;
$data = ['id' => 7, 'is_premium' => true];
$options = ['config' => 'default'];
QueueManager::push(ExampleJob::class, $data, $options);
参数
$className
: 将在处理作业时调用其执行方法的类。
$data
(可选): 将作为消息传递给作业的可 JSON 序列化的数据数组。它应该是键值对。
$options
(可选): 用于消息排队的可选数据数组。
以下键可在 options
数组中使用
config
:
default: default
description: 队列配置名称
type: string
delay
:
default: null
description: 延迟消息的时间(以整数秒为单位),在此之后将处理消息。并非所有消息代理都接受此选项。
type: integer
expires
:
default: null
description: 消息到期的时间(以整数秒为单位)。如果超过此时间并且消息尚未被使用,则该消息将从队列中删除。
type: integer
priority
:
default: null
type: constant
有效值
\Enqueue\Client\MessagePriority::VERY_LOW
\Enqueue\Client\MessagePriority::LOW
\Enqueue\Client\MessagePriority::NORMAL
\Enqueue\Client\MessagePriority::HIGH
\Enqueue\Client\MessagePriority::VERY_HIGH
queue
:
default: 来自队列 config
数组或字符串 default
(如果为空)
description: 要使用的队列的名称
type: string
可以通过将 Queue\Mailer\QueueTrait
添加到邮件程序类来排队邮件程序操作。以下示例显示了如何在邮件程序类中设置该特性
<?php
declare(strict_types=1);
namespace App\Mailer;
use Cake\Mailer\Mailer;
use Cake\Queue\Mailer\QueueTrait;
class UserMailer extends Mailer
{
use QueueTrait;
public function welcome(string $emailAddress, string $username): void
{
$this
->setTo($emailAddress)
->setSubject(sprintf('Welcome %s', $username));
}
// ... other actions here ...
}
现在可以使用 UserMailer
以延迟方式从应用程序中的任何位置发送与用户相关的电子邮件。要排队邮件程序操作,请使用邮件程序实例上的 push()
方法
$this->getMailer('User')->push('welcome', ['[email protected]', 'josegonzalez']);
此 QueueTrait::push()
调用将生成一个中间 MailerJob
,该作业负责处理电子邮件消息。如果 MailerJob 无法实例化 Email 或 Mailer 实例,则将其解释为 Processor::REJECT
。无效的 action
也被解释为 Processor::REJECT
,如果操作抛出 BadMethodCallException
,也会被解释为 Processor::REJECT
。任何非异常结果都将被视为 Processor:ACK
。
公开的 QueueTrait::push()
方法与 Mailer::send()
具有类似的签名,还支持 $options
数组参数。此数组中包含的选项与 QueueManager::push()
可用的选项相同。
如果您的应用程序没有使用邮件程序,但您仍然希望通过队列作业发送电子邮件,则可以使用 QueueTransport
。在应用程序的 EmailTransport
配置中添加一个传输
// in app/config.php
use Cake\Queue\Mailer\Transport\QueueTransport;
return [
// ... other configuration
'EmailTransport' => [
'default' => [
'className' => MailTransport::class,
// Configuration for MailTransport.
]
'queue' => [
'className' => QueueTransport::class,
// The transport name to use inside the queue job.
'transport' => 'default',
]
],
'Email' => [
'default' => [
// Connect the default email profile to deliver
// by queue jobs.
'transport' => 'queue',
]
]
];
使用此配置,只要您使用 default
电子邮件配置文件发送电子邮件,CakePHP 就会生成一个队列消息。一旦该队列消息被处理,默认的 MailTransport
将用于发送电子邮件消息。
排队消息后,您可以通过包含的 queue worker
shell 运行一个 worker
bin/cake queue worker
此 shell 可以接受一些不同的选项
--config
(default: default): 要使用的队列配置的名称
--queue
(default: default): 要绑定到的队列的名称
--processor
(默认: null
): 要绑定的处理器名称
--logger
(默认: stdout
): 已配置的日志记录器的名称
--max-jobs
(默认: null
): 要处理的最大作业数。达到限制后,工作者将退出。
--max-runtime
(默认: null
): 最大运行时间(秒)。达到限制后,工作者将退出。
--max-attempts
(默认: null
): 每个作业尝试的最大次数。作业上定义的最大尝试次数将覆盖此值。
--verbose
或 -v
(默认: null
): 提供详细输出,显示以下值的当前值:
最大迭代次数
最大运行时间
运行时间:工作者启动以来的时间,工作者将在运行时间超过最大运行时间值时完成
默认情况下,抛出异常的作业将无限期地重新入队。但是,如果在作业类或通过命令行参数配置了maxAttempts
,那么在处理后收到Processor::REQUEUE
响应(通常是由于抛出异常)并且没有剩余尝试次数时,作业将被视为失败。然后,作业将被拒绝并添加到queue_failed_jobs
表中,可以手动重新入队。
您选择的传输可能提供死信队列功能。虽然失败的作业有类似的目的,但它专门捕获返回Processor::REQUEUE
响应的作业,而不处理其他故障情况。它与传输无关,仅支持数据库持久性。
最初入队作业时传递的以下选项将被保留:config
、queue
和priority
。
将作业推回队列并将其从queue_failed_jobs
表中删除。如果作业无法重新入队,则不能保证该作业没有运行。
bin/cake queue requeue
可选过滤器
--id
:通过FailedJob
的 ID 重新入队作业
--class
:通过作业类重新入队作业
--queue
:通过作业接收的队列重新入队作业
--config
:通过用于入队作业的配置重新入队作业
如果没有提供任何过滤器,则所有失败的作业将被重新入队。
从queue_failed_jobs
表中删除作业。
bin/cake queue purge_failed
可选过滤器
--id
:通过FailedJob
的 ID 清除作业
--class
:通过作业类清除作业
--queue
:通过作业接收的队列清除作业
--config
:通过用于入队作业的配置清除作业
如果没有提供任何过滤器,则所有失败的作业将被清除。
工作者 shell 可以在正常执行期间调用事件。队列配置中的关联listener
可以监听这些事件。
Processor.message.exception
:
描述:当消息抛出异常时调度。
参数:message
和exception
Processor.message.invalid
:
描述:当消息具有无效的可调用对象时调度。
参数:message
Processor.message.reject
:
描述:当消息完成并且要被拒绝时调度。
参数:message
Processor.message.success
:
描述:当消息完成并且要被确认时调度。
参数:message
Processor.message.failure
:
描述:当消息完成并且要被重新入队时调度。
参数:message
Processor.message.seen
:
描述:当看到消息时调度。
参数:message
Processor.message.start
:
描述:在消息开始之前调度。
参数:message