队列

队列插件为 php-queue 项目提供了一个易于使用的接口,该项目抽象了数十个队列后端,以便在应用程序中使用。队列可用于通过将长时间运行的进程(如电子邮件或通知发送)推迟到以后的时间来提高应用程序的性能。

安装

您可以使用 composer 将此插件安装到您的 CakePHP 应用程序中。

推荐安装 composer 包的方式是

composer require cakephp/queue

安装您希望使用的传输。有关可用传输的列表,请参阅 此页面。以下示例适用于纯 PHP Redis

composer require enqueue/redis predis/predis:^1

确保在 src/Application.php 文件中的 Application::bootstrap() 函数中加载插件

$this->addPlugin('Cake/Queue');

配置

以下配置应存在于 config/app.php 的配置数组中

// ...
'Queue' => [
    'default' => [
        // A DSN for your configured backend. default: null
        // Can contain protocol/port/username/password or be null if the backend defaults to localhost
        'url' => 'redis://myusername:[email protected]:1000',

        // The queue that will be used for sending messages. default: default
        // This can be overridden when queuing or processing messages
        'queue' => 'default',

        // The name of a configured logger, default: null
        'logger' => 'stdout',

        // The name of an event listener class to associate with the worker
        'listener' => \App\Listener\WorkerListener::class,

        // The amount of time in milliseconds to sleep if no jobs are currently available. default: 10000
        'receiveTimeout' => 10000,

        // Whether to store failed jobs in the queue_failed_jobs table. default: false
        'storeFailedJobs' => true,

        // (optional) The cache configuration for storing unique job ids. `duration`
        // should be greater than the maximum length of time any job can be expected
        // to remain on the queue. Otherwise, duplicate jobs may be
        // possible. Defaults to +24 hours. Note that `File` engine is only suitable
        // for local development.
        // See https://book.cakephp.com.cn/4/en/core-libraries/caching.html#configuring-cache-engines.
        'uniqueCache' => [
            'engine' => 'File',
        ],
    ]
],
// ...

Queue 配置键可以包含一个或多个队列配置。每个配置用于与不同的队列后端进行交互。

如果 storeFailedJobs 设置为 true,请确保运行插件迁移以创建 queue_failed_jobs 表。

安装迁移插件

composer require cakephp/migrations:"^3.1"

运行迁移

bin/cake migrations migrate --plugin Cake/Queue

用法

定义作业

工作负载被定义为“作业”。作业类可以在构造函数中从应用程序的依赖项注入容器中接收依赖项,就像控制器或命令一样。作业负责处理队列消息。一个简单的作业,它记录接收到的消息,如下所示

<?php
// src/Job/ExampleJob.php
declare(strict_types=1);

namespace App\Job;

use Cake\Log\LogTrait;
use Cake\Queue\Job\Message;
use Cake\Queue\Job\JobInterface;
use Interop\Queue\Processor;

class ExampleJob implements JobInterface
{
    use LogTrait;

    /**
     * The maximum number of times the job may be attempted. (optional property)
     *
     * @var int|null
     */
    public static $maxAttempts = 3;

    /**
     * Whether there should be only one instance of a job on the queue at a time. (optional property)
     *
     * @var bool
     */
    public static $shouldBeUnique = false;

    public function execute(Message $message): ?string
    {
        $id = $message->getArgument('id');
        $data = $message->getArgument('data');

        $this->log(sprintf('%d %s', $id, $data));

        return Processor::ACK;
    }
}

传递的 Message 对象具有以下方法

  • getArgument($key = null, $default = null): 可以返回传递的整个数据集或基于 Hash::get() 符号键的值。

  • getContext(): 返回原始上下文对象。

  • getOriginalMessage(): 返回原始队列消息对象。

  • getParsedBody(): 返回解析的队列消息正文。

作业可以返回以下任何值

  • Processor::ACK: 当消息成功处理时使用此常量。消息将从队列中删除。

  • Processor::REJECT: 当消息无法处理时使用此常量。消息将从队列中删除。

  • Processor::REQUEUE: 当消息无效或当前无法处理但我们可以稍后重试时使用此常量。原始消息将从队列中删除,但副本将再次发布到队列中。

作业也可以返回一个空值,这被解释为 Processor::ACK。未能以有效类型响应将导致解释为消息失败并重新排队消息。

作业属性

  • maxAttempts: 作业由于异常或显式返回 Processor::REQUEUE 而被重新排队的最大次数。如果提供,此值将覆盖 worker 命令行选项 --max-attempts 中提供的值。如果作业或命令行选项未提供值,则作业可以无限次重新排队。

  • shouldBeUnique: 如果为 true,则队列中一次只允许存在一个由其类、方法和数据标识的作业实例。随后的推送将被静默丢弃。这对于幂等操作很有用,在幂等操作中,连续的作业执行没有任何好处。例如,刷新计算数据。如果为 true,则必须设置 uniqueCache 配置。

排队作业

您可以使用 Cake\Queue\QueueManager 排队作业

use App\Job\ExampleJob;
use Cake\Queue\QueueManager;

$data = ['id' => 7, 'is_premium' => true];
$options = ['config' => 'default'];

QueueManager::push(ExampleJob::class, $data, $options);

参数

  • $className: 将在处理作业时调用其执行方法的类。

  • $data (可选): 将作为消息传递给作业的可 JSON 序列化的数据数组。它应该是键值对。

  • $options (可选): 用于消息排队的可选数据数组。

以下键可在 options 数组中使用

  • config:

    • default: default

    • description: 队列配置名称

    • type: string

  • delay:

    • default: null

    • description: 延迟消息的时间(以整数秒为单位),在此之后将处理消息。并非所有消息代理都接受此选项。

    • type: integer

  • expires:

    • default: null

    • description: 消息到期的时间(以整数秒为单位)。如果超过此时间并且消息尚未被使用,则该消息将从队列中删除。

    • type: integer

  • priority:

    • default: null

    • type: constant

    • 有效值

      • \Enqueue\Client\MessagePriority::VERY_LOW

      • \Enqueue\Client\MessagePriority::LOW

      • \Enqueue\Client\MessagePriority::NORMAL

      • \Enqueue\Client\MessagePriority::HIGH

      • \Enqueue\Client\MessagePriority::VERY_HIGH

  • queue:

    • default: 来自队列 config 数组或字符串 default(如果为空)

    • description: 要使用的队列的名称

    • type: string

排队邮件程序操作

可以通过将 Queue\Mailer\QueueTrait 添加到邮件程序类来排队邮件程序操作。以下示例显示了如何在邮件程序类中设置该特性

<?php
declare(strict_types=1);

namespace App\Mailer;

use Cake\Mailer\Mailer;
use Cake\Queue\Mailer\QueueTrait;

class UserMailer extends Mailer
{
    use QueueTrait;

    public function welcome(string $emailAddress, string $username): void
    {
        $this
            ->setTo($emailAddress)
            ->setSubject(sprintf('Welcome %s', $username));
    }

    // ... other actions here ...
}

现在可以使用 UserMailer 以延迟方式从应用程序中的任何位置发送与用户相关的电子邮件。要排队邮件程序操作,请使用邮件程序实例上的 push() 方法

$this->getMailer('User')->push('welcome', ['[email protected]', 'josegonzalez']);

QueueTrait::push() 调用将生成一个中间 MailerJob,该作业负责处理电子邮件消息。如果 MailerJob 无法实例化 Email 或 Mailer 实例,则将其解释为 Processor::REJECT。无效的 action 也被解释为 Processor::REJECT,如果操作抛出 BadMethodCallException,也会被解释为 Processor::REJECT。任何非异常结果都将被视为 Processor:ACK

公开的 QueueTrait::push() 方法与 Mailer::send() 具有类似的签名,还支持 $options 数组参数。此数组中包含的选项与 QueueManager::push() 可用的选项相同。

通过队列作业发送电子邮件

如果您的应用程序没有使用邮件程序,但您仍然希望通过队列作业发送电子邮件,则可以使用 QueueTransport。在应用程序的 EmailTransport 配置中添加一个传输

// in app/config.php
use Cake\Queue\Mailer\Transport\QueueTransport;

return [
    // ... other configuration
    'EmailTransport' => [
        'default' => [
            'className' => MailTransport::class,
            // Configuration for MailTransport.
        ]
        'queue' => [
            'className' => QueueTransport::class,
            // The transport name to use inside the queue job.
            'transport' => 'default',
        ]
    ],
    'Email' => [
        'default' => [
            // Connect the default email profile to deliver
            // by queue jobs.
            'transport' => 'queue',
        ]
    ]
];

使用此配置,只要您使用 default 电子邮件配置文件发送电子邮件,CakePHP 就会生成一个队列消息。一旦该队列消息被处理,默认的 MailTransport 将用于发送电子邮件消息。

运行 worker

排队消息后,您可以通过包含的 queue worker shell 运行一个 worker

bin/cake queue worker

此 shell 可以接受一些不同的选项

  • --config (default: default): 要使用的队列配置的名称

  • --queue (default: default): 要绑定到的队列的名称

  • --processor (默认: null): 要绑定的处理器名称

  • --logger (默认: stdout): 已配置的日志记录器的名称

  • --max-jobs (默认: null): 要处理的最大作业数。达到限制后,工作者将退出。

  • --max-runtime (默认: null): 最大运行时间(秒)。达到限制后,工作者将退出。

  • --max-attempts (默认: null): 每个作业尝试的最大次数。作业上定义的最大尝试次数将覆盖此值。

  • --verbose-v (默认: null): 提供详细输出,显示以下值的当前值:

    • 最大迭代次数

    • 最大运行时间

    • 运行时间:工作者启动以来的时间,工作者将在运行时间超过最大运行时间值时完成

失败的作业

默认情况下,抛出异常的作业将无限期地重新入队。但是,如果在作业类或通过命令行参数配置了maxAttempts,那么在处理后收到Processor::REQUEUE响应(通常是由于抛出异常)并且没有剩余尝试次数时,作业将被视为失败。然后,作业将被拒绝并添加到queue_failed_jobs表中,可以手动重新入队。

您选择的传输可能提供死信队列功能。虽然失败的作业有类似的目的,但它专门捕获返回Processor::REQUEUE响应的作业,而不处理其他故障情况。它与传输无关,仅支持数据库持久性。

最初入队作业时传递的以下选项将被保留:configqueuepriority

重新入队失败的作业

将作业推回队列并将其从queue_failed_jobs表中删除。如果作业无法重新入队,则不能保证该作业没有运行。

bin/cake queue requeue

可选过滤器

  • --id:通过FailedJob的 ID 重新入队作业

  • --class:通过作业类重新入队作业

  • --queue:通过作业接收的队列重新入队作业

  • --config:通过用于入队作业的配置重新入队作业

如果没有提供任何过滤器,则所有失败的作业将被重新入队。

清除失败的作业

queue_failed_jobs表中删除作业。

bin/cake queue purge_failed

可选过滤器

  • --id:通过FailedJob的 ID 清除作业

  • --class:通过作业类清除作业

  • --queue:通过作业接收的队列清除作业

  • --config:通过用于入队作业的配置清除作业

如果没有提供任何过滤器,则所有失败的作业将被清除。

工作者事件

工作者 shell 可以在正常执行期间调用事件。队列配置中的关联listener可以监听这些事件。

  • Processor.message.exception:

    • 描述:当消息抛出异常时调度。

    • 参数:messageexception

  • Processor.message.invalid:

    • 描述:当消息具有无效的可调用对象时调度。

    • 参数:message

  • Processor.message.reject:

    • 描述:当消息完成并且要被拒绝时调度。

    • 参数:message

  • Processor.message.success:

    • 描述:当消息完成并且要被确认时调度。

    • 参数:message

  • Processor.message.failure:

    • 描述:当消息完成并且要被重新入队时调度。

    • 参数:message

  • Processor.message.seen:

    • 描述:当看到消息时调度。

    • 参数:message

  • Processor.message.start:

    • 描述:在消息开始之前调度。

    • 参数:message