Skip to content

队列

简介

在构建 Web 应用时,你可能有一些任务,例如解析和存储上传的 CSV 文件,在典型的 Web 请求中执行时间太长。幸运的是,Laravel 允许你轻松创建可以在后台处理的队列任务。通过将耗时任务移动到队列,你的应用可以以极快的速度响应 Web 请求,并为客户提供更好的用户体验。

Laravel 队列在多种不同的队列后端之间提供统一的队列 API,例如 Amazon SQSRedis 甚至关系型数据库。

Laravel 的队列配置选项存储在应用的 config/queue.php 配置文件中。在此文件中,你将找到框架中包含的每个队列驱动的连接配置,包括数据库、Amazon SQSRedisBeanstalkd 驱动,以及一个会立即执行任务的同步驱动(用于开发或测试期间)。还包含一个 null 队列驱动,用于丢弃队列任务。

NOTE

Laravel Horizon 是一个美观的仪表盘和配置系统,用于你的 Redis 驱动的队列。查看完整的 Horizon 文档 了解更多信息。

连接 vs. 队列

在开始使用 Laravel 队列之前,了解"连接"和"队列"之间的区别非常重要。在 config/queue.php 配置文件中,有一个 connections 配置数组。此选项定义了到后端队列服务(如 Amazon SQS、Beanstalk 或 Redis)的连接。但是,任何给定的队列连接可能有多个"队列",可以将其视为不同的堆栈或队列任务堆。

请注意,queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是任务被发送到给定连接时将分发到的默认队列。换句话说,如果你分发任务时没有明确定义应将其分发到哪个队列,则该任务将被放置在连接配置的 queue 属性中定义的队列上:

php
use App\Jobs\ProcessPodcast;

// 此任务被发送到默认连接的默认队列...
ProcessPodcast::dispatch();

// 此任务被发送到默认连接的 "emails" 队列...
ProcessPodcast::dispatch()->onQueue('emails');

某些应用可能不需要将任务推送到多个队列,而更喜欢只使用一个简单的队列。但是,将任务推送到多个队列对于希望优先处理或分段处理任务的应用特别有用,因为 Laravel 队列工作者允许你按优先级指定应处理哪些队列。例如,如果你将任务推送到 high 队列,你可以运行一个赋予它们更高处理优先级的工作者:

shell
php artisan queue:work --queue=high,default

驱动说明和先决条件

数据库

要使用 database 队列驱动,你需要一个数据库表来保存任务。通常,这包含在 Laravel 的默认 0001_01_01_000002_create_jobs_table.php 数据库迁移 中;但是,如果你的应用不包含此迁移,你可以使用 make:queue-table Artisan 命令来创建它:

shell
php artisan make:queue-table

php artisan migrate

Redis

要使用 redis 队列驱动,你应该在 config/database.php 配置文件中配置 Redis 数据库连接。

WARNING

redis 队列驱动不支持 serializercompression Redis 选项。

Redis 集群

如果你的 Redis 队列连接使用 Redis 集群,你的队列名称必须包含键哈希标签。这是为了确保给定队列的所有 Redis 键都被放置在同一个哈希槽中:

php
'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', '{default}'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => null,
    'after_commit' => false,
],
阻塞

使用 Redis 队列时,你可以使用 block_for 配置选项来指定驱动在遍历工作者循环并重新轮询 Redis 数据库之前应等待任务可用的时间。

根据你的队列负载调整此值可能比持续轮询 Redis 数据库寻找新任务更高效。例如,你可以将值设置为 5,表示驱动在等待任务可用时应阻塞五秒:

php
'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', 'default'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => 5,
    'after_commit' => false,
],

WARNING

block_for 设置为 0 将导致队列工作者无限期阻塞,直到有任务可用。这也将阻止诸如 SIGTERM 之类的信号被处理,直到下一个任务被处理。

其他驱动先决条件

列出的队列驱动需要以下依赖项。这些依赖项可以通过 Composer 包管理器安装:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~5.0
  • Redis: predis/predis ~2.0 或 phpredis PHP 扩展
  • MongoDB: mongodb/laravel-mongodb

创建任务

生成任务类

默认情况下,应用的所有可排队任务都存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,它将在你运行 make:job Artisan 命令时创建:

shell
php artisan make:job ProcessPodcast

生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue 接口,向 Laravel 表明该任务应被推送到队列中异步运行。

NOTE

任务存根可以使用存根发布进行自定义。

类结构

任务类非常简单,通常只包含一个 handle 方法,该方法在队列处理任务时被调用。让我们来看一个示例任务类。在此示例中,我们假设我们管理一个播客发布服务,需要在发布之前处理上传的播客文件:

php
<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * Create a new job instance.
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * Execute the job.
     */
    public function handle(AudioProcessor $processor): void
    {
        // Process uploaded podcast...
    }
}

在此示例中,请注意我们能够将 Eloquent 模型 直接传递到队列任务的构造函数中。由于任务使用了 Queueable trait,Eloquent 模型及其加载的关联关系将在任务处理时被优雅地序列化和反序列化。

如果你的队列任务在其构造函数中接受 Eloquent 模型,则只有模型的标识符会被序列化到队列中。当任务实际被处理时,队列系统将自动从数据库中重新获取完整的模型实例及其加载的关联关系。这种模型序列化方法允许将更小的任务有效负载发送到你的队列驱动。

handle 方法依赖注入

handle 方法在任务被队列处理时调用。请注意,我们可以在任务的 handle 方法上进行依赖类型提示。Laravel 服务容器 会自动注入这些依赖。

如果你想完全控制容器如何将依赖注入到 handle 方法中,可以使用容器的 bindMethod 方法。bindMethod 方法接受一个回调,该回调接收任务和容器。在回调中,你可以自由地以任何方式调用 handle 方法。通常,你应该从 App\Providers\AppServiceProvider 服务提供者boot 方法中调用此方法:

php
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;

$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
    return $job->handle($app->make(AudioProcessor::class));
});

WARNING

二进制数据,如原始图片内容,应通过 base64_encode 函数处理后再传递给队列任务。否则,任务在放入队列时可能无法正确序列化为 JSON。

队列关联关系

因为所有已加载的 Eloquent 模型关联关系在任务入队时也会被序列化,序列化后的任务字符串有时会变得很大。此外,当任务被反序列化并从数据库中重新获取模型关联关系时,它们将被完整获取。在任务入队过程中模型被序列化之前应用的任何先前关联约束,在任务反序列化时都不会被应用。因此,如果你希望使用给定关联关系的子集,你应该在队列任务中重新约束该关联关系。

或者,为了防止关联关系被序列化,你可以在设置属性值时在模型上调用 withoutRelations 方法。此方法将返回一个不包含已加载关联关系的模型实例:

php
/**
 * Create a new job instance.
 */
public function __construct(
    Podcast $podcast,
) {
    $this->podcast = $podcast->withoutRelations();
}

如果你使用 PHP 构造函数属性提升 并且想要指示 Eloquent 模型不应序列化其关联关系,你可以使用 WithoutRelations 属性:

php
use Illuminate\Queue\Attributes\WithoutRelations;

/**
 * Create a new job instance.
 */
public function __construct(
    #[WithoutRelations]
    public Podcast $podcast,
) {}

为了方便,如果你希望序列化所有模型时都不包含关联关系,你可以将 WithoutRelations 属性应用到整个类,而不是应用到每个模型上:

php
<?php

namespace App\Jobs;

use App\Models\DistributionPlatform;
use App\Models\Podcast;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\Attributes\WithoutRelations;

#[WithoutRelations]
class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * Create a new job instance.
     */
    public function __construct(
        public Podcast $podcast,
        public DistributionPlatform $platform,
    ) {}
}

如果任务接收的是 Eloquent 模型的集合或数组而不是单个模型,则该集合中的模型在任务反序列化和执行时不会恢复其关联关系。这是为了防止处理大量模型的任务消耗过多资源。

唯一任务

WARNING

唯一任务需要支持的缓存驱动。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动支持原子锁。

WARNING

唯一任务约束不适用于批次中的任务。

有时,你可能希望确保在任何时候队列中只有一个特定任务的实例。你可以通过在任务类上实现 ShouldBeUnique 接口来实现。此接口不要求你在类上定义任何额外的方法:

php
<?php

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    // ...
}

在上面的示例中,UpdateSearchIndex 任务是唯一的。因此,如果该任务的另一个实例已经在队列中且尚未完成处理,则不会分发该任务。

在某些情况下,你可能想要定义一个使任务唯一的特定"键",或者你可能想要指定一个超时时间,超过该时间后任务不再保持唯一。要实现这一点,你可以使用 UniqueFor 属性并在任务类上定义一个 uniqueId 方法:

php
<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Queue\Attributes\UniqueFor;

#[UniqueFor(3600)]
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    /**
     * The product instance.
     *
     * @var \App\Models\Product
     */
    public $product;

    /**
     * Get the unique ID for the job.
     */
    public function uniqueId(): string
    {
        return $this->product->id;
    }
}

在上面的示例中,UpdateSearchIndex 任务通过产品 ID 保持唯一。因此,具有相同产品 ID 的任务的任何新分发都将被忽略,直到现有任务完成处理。此外,如果现有任务在一小时内未被处理,唯一锁将被释放,具有相同唯一键的另一个任务可以被分发到队列。

WARNING

如果你的应用从多个 Web 服务器或容器分发任务,你应确保所有服务器都与同一个中央缓存服务器通信,以便 Laravel 可以准确确定任务是否唯一。

在处理开始前保持任务唯一

默认情况下,唯一任务在任务完成处理或所有重试尝试失败后被"解锁"。但是,在某些情况下,你可能希望任务在被处理之前立即解锁。要实现这一点,你的任务应实现 ShouldBeUniqueUntilProcessing 契约,而不是 ShouldBeUnique 契约:

php
<?php

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
    // ...
}

唯一任务锁

在幕后,当分发 ShouldBeUnique 任务时,Laravel 会尝试使用 uniqueId 键获取。如果锁已被持有,则不会分发该任务。当任务完成处理或所有重试尝试失败时,锁将被释放。默认情况下,Laravel 将使用默认缓存驱动来获取此锁。但是,如果你希望使用其他驱动来获取锁,可以定义一个 uniqueVia 方法返回应使用的缓存驱动:

php
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    // ...

    /**
     * Get the cache driver for the unique job lock.
     */
    public function uniqueVia(): Repository
    {
        return Cache::driver('redis');
    }
}

NOTE

如果你只需要限制任务的并发处理,请改用 WithoutOverlapping 任务中间件。

加密任务

Laravel 允许你通过加密确保任务数据的隐私性和完整性。要开始使用,只需将 ShouldBeEncrypted 接口添加到任务类。一旦将此接口添加到类中,Laravel 将在将任务推送到队列之前自动加密你的任务:

php
<?php

use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;

class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
    // ...
}

任务中间件

任务中间件允许你将自定义逻辑包裹在队列任务的执行周围,减少任务本身的样板代码。例如,考虑以下 handle 方法,它利用 Laravel 的 Redis 速率限制功能,每五秒只允许一个任务处理:

php
use Illuminate\Support\Facades\Redis;

/**
 * Execute the job.
 */
public function handle(): void
{
    Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
        info('Lock obtained...');

        // Handle job...
    }, function () {
        // Could not obtain lock...

        return $this->release(5);
    });
}

虽然这段代码是有效的,但 handle 方法的实现变得嘈杂,因为它充斥着 Redis 速率限制逻辑。此外,对于我们想要进行速率限制的任何其他任务,都必须重复此速率限制逻辑。与其在 handle 方法中进行速率限制,不如定义一个处理速率限制的任务中间件:

php
<?php

namespace App\Jobs\Middleware;

use Closure;
use Illuminate\Support\Facades\Redis;

class RateLimited
{
    /**
     * Process the queued job.
     *
     * @param  \Closure(object): void  $next
     */
    public function handle(object $job, Closure $next): void
    {
        Redis::throttle('key')
            ->block(0)->allow(1)->every(5)
            ->then(function () use ($job, $next) {
                // Lock obtained...

                $next($job);
            }, function () use ($job) {
                // Could not obtain lock...

                $job->release(5);
            });
    }
}

如你所见,与路由中间件类似,任务中间件接收正在处理的任务和一个应被调用以继续处理任务的回调。

你可以使用 make:job-middleware Artisan 命令生成新的任务中间件类。创建任务中间件后,可以通过从任务的 middleware 方法返回它们来将它们附加到任务。此方法在由 make:job Artisan 命令搭建的任务上不存在,因此你需要手动将其添加到任务类中:

php
use App\Jobs\Middleware\RateLimited;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited];
}

NOTE

任务中间件也可以分配给可排队事件监听器可邮寄对象通知

速率限制

虽然我们刚刚演示了如何编写自己的速率限制任务中间件,但 Laravel 实际上包含了一个速率限制中间件,你可以利用它来对任务进行速率限制。与路由速率限制器类似,任务速率限制器使用 RateLimiter facade 的 for 方法定义。

例如,你可能希望允许用户每小时备份一次数据,同时不对高级客户施加此类限制。要实现这一点,你可以在 AppServiceProviderboot 方法中定义一个 RateLimiter

php
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;

/**
 * Bootstrap any application services.
 */
public function boot(): void
{
    RateLimiter::for('backups', function (object $job) {
        return $job->user->vipCustomer()
            ? Limit::none()
            : Limit::perHour(1)->by($job->user->id);
    });
}

在上面的示例中,我们定义了一个每小时的速率限制;但是,你可以使用 perMinute 方法轻松定义基于分钟的速率限制。此外,你可以将任何值传递给速率限制的 by 方法;但是,此值最常用于按客户分段速率限制:

php
return Limit::perMinute(50)->by($job->user->id);

定义速率限制后,你可以使用 Illuminate\Queue\Middleware\RateLimited 中间件将速率限制器附加到你的任务。每次任务超过速率限制时,此中间件将根据速率限制持续时间以适当的延迟将任务释放回队列:

php
use Illuminate\Queue\Middleware\RateLimited;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited('backups')];
}

将受速率限制的任务释放回队列仍会增加任务的总 attempts 次数。你可能希望相应地调整任务类上的 TriesMaxExceptions 属性。或者,你可能希望使用 retryUntil 方法 来定义任务不再尝试的时间。

使用 releaseAfter 方法,你还可以指定释放的任务再次尝试之前必须经过的秒数:

php
/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new RateLimited('backups'))->releaseAfter(60)];
}

如果你不希望在任务受到速率限制时重试,可以使用 dontRelease 方法:

php
/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new RateLimited('backups'))->dontRelease()];
}

使用 Redis 进行速率限制

如果你使用的是 Redis,可以使用 Illuminate\Queue\Middleware\RateLimitedWithRedis 中间件,它针对 Redis 进行了微调,比基本的速率限制中间件更高效:

php
use Illuminate\Queue\Middleware\RateLimitedWithRedis;

public function middleware(): array
{
    return [new RateLimitedWithRedis('backups')];
}

The connection method may be used to specify which Redis connection the middleware should use:

php
return [(new RateLimitedWithRedis('backups'))->connection('limiter')];

防止任务重叠

Laravel 包含一个 Illuminate\Queue\Middleware\WithoutOverlapping 中间件,允许你基于任意键防止任务重叠。当队列任务正在修改一个一次只应由一个任务修改的资源时,这会很有帮助。

例如,假设你有一个更新用户信用评分的队列任务,并且你想防止同一用户 ID 的信用评分更新任务重叠。要实现这一点,你可以从任务的 middleware 方法返回 WithoutOverlapping 中间件:

php
use Illuminate\Queue\Middleware\WithoutOverlapping;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new WithoutOverlapping($this->user->id)];
}

将重叠的任务释放回队列仍会增加任务的总尝试次数。你可能希望相应地调整任务类上的 TriesMaxExceptions 属性。例如,将 Tries 保持为默认值 1 将阻止任何重叠任务稍后被重试。

同一类型的任何重叠任务都将被释放回队列。你还可以指定释放的任务再次尝试之前必须经过的秒数:

php
/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}

如果你希望立即删除任何重叠的任务以便它们不会被重试,可以使用 dontRelease 方法:

php
/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}

WithoutOverlapping 中间件由 Laravel 的原子锁功能驱动。有时,你的任务可能会意外失败或超时,导致锁未被释放。因此,你可以使用 expireAfter 方法显式定义锁的过期时间。例如,下面的示例将指示 Laravel 在任务开始处理三分钟后释放 WithoutOverlapping 锁:

php
/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}

WARNING

WithoutOverlapping 中间件需要支持的缓存驱动。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动支持原子锁。

跨任务类共享锁键

默认情况下,WithoutOverlapping 中间件只会防止同一类的任务重叠。因此,虽然两个不同的任务类可能使用相同的锁键,但它们不会被阻止重叠。但是,你可以使用 shared 方法指示 Laravel 跨任务类应用该键:

php
use Illuminate\Queue\Middleware\WithoutOverlapping;

class ProviderIsDown
{
    // ...

    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

class ProviderIsUp
{
    // ...

    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

异常节流

Laravel 包含一个 Illuminate\Queue\Middleware\ThrottlesExceptions 中间件,允许你对异常进行节流。一旦任务抛出给定数量的异常,所有进一步尝试执行该任务都将被延迟,直到指定的时间间隔过去。此中间件对于与不稳定的第三方服务交互的任务特别有用。

例如,假设一个与第三方 API 交互的队列任务开始抛出异常。要节流异常,你可以从任务的 middleware 方法返回 ThrottlesExceptions 中间件。通常,此中间件应与实现基于时间的尝试的任务配对:

php
use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new ThrottlesExceptions(10, 5 * 60)];
}

/**
 * Determine the time at which the job should timeout.
 */
public function retryUntil(): DateTime
{
    return now()->plus(minutes: 30);
}

中间件接受的第一个构造函数参数是任务在被节流之前可以抛出的异常数量,第二个构造函数参数是任务被节流后再次尝试之前应经过的秒数。在上面的代码示例中,如果任务连续抛出 10 个异常,我们将等待 5 分钟后再次尝试该任务,受 30 分钟时间限制的约束。

当任务抛出异常但异常阈值尚未达到时,任务通常会立即重试。但是,你可以通过在将中间件附加到任务时调用 backoff 方法来指定此类任务应延迟的分钟数:

php
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
}

在内部,此中间件使用 Laravel 的缓存系统来实现速率限制,任务的类名被用作缓存"键"。你可以在将中间件附加到任务时通过调用 by 方法来覆盖此键。如果你有多个任务与同一第三方服务交互,并且你希望它们共享一个公共的节流"桶"以确保它们遵守单个共享限制,这可能很有用:

php
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
}

默认情况下,此中间件将节流每个异常。你可以在将中间件附加到任务时通过调用 when 方法来修改此行为。然后,只有当提供给 when 方法的闭包返回 true 时,异常才会被节流:

php
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10 * 60))->when(
        fn (Throwable $throwable) => $throwable instanceof HttpClientException
    )];
}

when 方法将任务释放回队列或抛出异常不同,deleteWhen 方法允许你在给定异常发生时完全删除该任务:

php
use App\Exceptions\CustomerDeletedException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(2, 10 * 60))->deleteWhen(CustomerDeletedException::class)];
}

如果你希望将被节流的异常报告给应用的异常处理程序,可以在将中间件附加到任务时调用 report 方法。可选地,你可以向 report 方法提供一个闭包,只有当给定的闭包返回 true 时才会报告异常:

php
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10 * 60))->report(
        fn (Throwable $throwable) => $throwable instanceof HttpClientException
    )];
}

使用 Redis 节流异常

如果你使用的是 Redis,可以使用 Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis 中间件,它针对 Redis 进行了微调,比基本的异常节流中间件更高效:

php
use Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis;

public function middleware(): array
{
    return [new ThrottlesExceptionsWithRedis(10, 10 * 60)];
}

connection 方法可用于指定中间件应使用哪个 Redis 连接:

php
return [(new ThrottlesExceptionsWithRedis(10, 10 * 60))->connection('limiter')];

跳过任务

Skip 中间件允许你指定任务应被跳过/删除,而无需修改任务的逻辑。Skip::when 方法将在给定条件评估为 true 时删除任务,而 Skip::unless 方法将在条件评估为 false 时删除任务:

php
use Illuminate\Queue\Middleware\Skip;

/**
 * Get the middleware the job should pass through.
 */
public function middleware(): array
{
    return [
        Skip::when($condition),
    ];
}

你还可以将 Closure 传递给 whenunless 方法以进行更复杂的条件评估:

php
use Illuminate\Queue\Middleware\Skip;

/**
 * Get the middleware the job should pass through.
 */
public function middleware(): array
{
    return [
        Skip::when(function (): bool {
            return $this->shouldSkip();
        }),
    ];
}

分发任务

编写完任务类后,你可以使用任务本身的 dispatch 方法来分发它。传递给 dispatch 方法的参数将被传递给任务的构造函数:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast);

        return redirect('/podcasts');
    }
}

如果你希望有条件地分发任务,可以使用 dispatchIfdispatchUnless 方法:

php
ProcessPodcast::dispatchIf($accountActive, $podcast);

ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

在新的 Laravel 应用中,database 连接被定义为默认队列。你可以通过更改应用的 .env 文件中的 QUEUE_CONNECTION 环境变量来指定不同的默认队列连接。

延迟分发

如果你希望指定任务不应立即可用于队列工作者处理,可以在分发任务时使用 delay 方法。例如,让我们指定任务在分发后 10 分钟内不可用于处理:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast)
            ->delay(now()->plus(minutes: 10));

        return redirect('/podcasts');
    }
}

在某些情况下,任务可能配置了默认延迟。如果你需要绕过此延迟并分发任务以立即处理,可以使用 withoutDelay 方法:

php
ProcessPodcast::dispatch($podcast)->withoutDelay();

WARNING

Amazon SQS 队列服务的最大延迟时间为 15 分钟。

同步分发

如果你希望立即(同步)分发任务,可以使用 dispatchSync 方法。使用此方法时,任务不会被排队,而是在当前进程中立即执行:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // Create podcast...

        ProcessPodcast::dispatchSync($podcast);

        return redirect('/podcasts');
    }
}

延迟同步分发

使用延迟同步分发,你可以分发一个在当前进程中处理的任务,但是在 HTTP 响应发送给用户之后。这允许你同步处理"队列"任务而不会减慢用户的应用体验。要延迟同步任务的执行,请将任务分发到 deferred 连接:

php
RecordDelivery::dispatch($order)->onConnection('deferred');

deferred 连接也作为默认的故障转移队列

类似地,background 连接在 HTTP 响应发送给用户之后处理任务;但是,任务在单独生成的 PHP 进程中处理,允许 PHP-FPM / 应用工作者可用于处理另一个传入的 HTTP 请求:

php
RecordDelivery::dispatch($order)->onConnection('background');

任务与数据库事务

虽然在数据库事务中分发任务是完全可以的,但你应该特别注意确保你的任务实际上能够成功执行。当在事务中分发任务时,任务可能会在父事务提交之前被工作者处理。当这种情况发生时,你在数据库事务期间对模型或数据库记录所做的任何更新可能尚未反映在数据库中。此外,在事务中创建的任何模型或数据库记录可能不存在于数据库中。

幸运的是,Laravel 提供了几种解决此问题的方法。首先,你可以在队列连接的配置数组中设置 after_commit 连接选项:

php
'redis' => [
    'driver' => 'redis',
    // ...
    'after_commit' => true,
],

after_commit 选项为 true 时,你可以在数据库事务中分发任务;但是,Laravel 会等待打开的父数据库事务提交后才实际分发任务。当然,如果当前没有打开的数据库事务,任务将立即被分发。

如果事务由于事务期间发生的异常而被回滚,则在该事务期间分发的任务将被丢弃。

NOTE

after_commit 配置选项设置为 true 也将导致任何队列事件监听器、可邮寄对象、通知和广播事件在所有打开的数据库事务提交后才被分发。

内联指定提交分发行为

如果你没有将 after_commit 队列连接配置选项设置为 true,你仍然可以指示特定任务应在所有打开的数据库事务提交后分发。要实现这一点,你可以在分发操作上链式调用 afterCommit 方法:

php
use App\Jobs\ProcessPodcast;

ProcessPodcast::dispatch($podcast)->afterCommit();

同样,如果 after_commit 配置选项设置为 true,你可以指示特定任务应立即分发,而无需等待任何打开的数据库事务提交:

php
ProcessPodcast::dispatch($podcast)->beforeCommit();

任务链

任务链允许你指定一组队列任务,这些任务应在主任务成功执行后按顺序运行。如果序列中的一个任务失败,其余任务将不会运行。要执行队列任务链,你可以使用 Bus facade 提供的 chain 方法。Laravel 的命令总线是队列任务分发构建在其之上的更低级别组件:

php
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->dispatch();

除了链式连接任务类实例外,你还可以链式连接闭包:

php
Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    function () {
        Podcast::update(/* ... */);
    },
])->dispatch();

WARNING

在任务中使用 $this->delete() 方法删除任务不会阻止链式任务被处理。链只会在链中的任务失败时停止执行。

链连接和队列

如果你希望指定链式任务应使用的连接和队列,可以使用 onConnectiononQueue 方法。这些方法指定应使用的队列连接和队列名称,除非队列任务被显式分配了不同的连接/队列:

php
Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();

向链中添加任务

有时,你可能需要从链中的另一个任务内部向现有任务链前置或追加任务。你可以使用 prependToChainappendToChain 方法来实现:

php
/**
 * Execute the job.
 */
public function handle(): void
{
    // ...

    // Prepend to the current chain, run job immediately after current job...
    $this->prependToChain(new TranscribePodcast);

    // Append to the current chain, run job at end of chain...
    $this->appendToChain(new TranscribePodcast);
}

链失败

在链式连接任务时,你可以使用 catch 方法指定一个闭包,如果链中的任务失败,该闭包将被调用。给定的回调将接收导致任务失败的 Throwable 实例:

php
use Illuminate\Support\Facades\Bus;
use Throwable;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->catch(function (Throwable $e) {
    // A job within the chain has failed...
})->dispatch();

WARNING

由于链回调被序列化并由 Laravel 队列在稍后执行,你不应在链回调中使用 $this 变量。

自定义队列和连接

分发到特定队列

通过将任务推送到不同的队列,你可以"分类"你的队列任务,甚至可以优先分配多少工作者给各个队列。请记住,这不是将任务推送到队列配置文件中定义的不同队列"连接",而只是推送到单个连接中的特定队列。要指定队列,请在分发任务时使用 onQueue 方法:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // Create podcast...

        ProcessPodcast::dispatch($podcast)->onQueue('processing');

        return redirect('/podcasts');
    }
}

或者,你可以通过在任务的构造函数中调用 onQueue 方法来指定任务的队列:

php
<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * Create a new job instance.
     */
    public function __construct()
    {
        $this->onQueue('processing');
    }
}

分发到特定连接

如果你的应用与多个队列连接交互,你可以使用 onConnection 方法指定将任务推送到哪个连接:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // Create podcast...

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');

        return redirect('/podcasts');
    }
}

你可以将 onConnectiononQueue 方法链式调用在一起,以指定任务的连接和队列:

php
ProcessPodcast::dispatch($podcast)
    ->onConnection('sqs')
    ->onQueue('processing');

或者,你可以通过在任务的构造函数中调用 onConnection 方法来指定任务的连接:

php
<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * Create a new job instance.
     */
    public function __construct()
    {
        $this->onConnection('sqs');
    }
}

队列路由

你可以使用 Queue facade 的 route 方法为特定任务类定义默认连接和队列。当你希望确保某些任务始终使用特定队列而无需在任务上指定连接或队列时,这很有用。

除了路由特定的任务类外,你还可以将接口、trait 或父类传递给 route 方法。当你这样做时,任何实现该接口、使用该 trait 或扩展该父类的任务都将自动使用配置的连接和队列。

通常,你应该从服务提供者的 boot 方法中调用 route 方法:

php
use App\Concerns\RequiresVideo;
use App\Jobs\ProcessPodcast;
use App\Jobs\ProcessVideo;
use Illuminate\Support\Facades\Queue;

/**
 * Bootstrap any application services.
 */
public function boot(): void
{
    Queue::route(ProcessPodcast::class, connection: 'redis', queue: 'podcasts');
    Queue::route(RequiresVideo::class, queue: 'video');
}

当指定连接而未指定队列时,任务将被发送到默认队列:

php
Queue::route(ProcessPodcast::class, connection: 'redis');

你还可以通过将数组传递给 route 方法来一次路由多个任务类:

php
Queue::route([
    ProcessPodcast::class => ['podcasts', 'redis'], // Queue and connection
    ProcessVideo::class => 'videos', // Queue only (uses default connection)
]);

NOTE

队列路由仍然可以被任务在每个任务的基础上覆盖。

指定最大任务尝试次数 / 超时值

最大尝试次数

任务尝试是 Laravel 队列系统的核心概念,并驱动许多高级功能。虽然一开始可能看起来很困惑,但在修改默认配置之前了解它们的工作方式非常重要。

当任务被分发时,它被推送到队列上。然后工作者拾取它并尝试执行它。这就是一次任务尝试。

但是,尝试并不一定意味着任务的 handle 方法被执行了。尝试还可以通过以下几种方式被"消耗":

  • 任务在执行期间遇到未处理的异常。
  • 任务使用 $this->release() 手动释放回队列。
  • WithoutOverlappingRateLimited 等中间件未能获取锁并释放了任务。
  • 任务超时。
  • 任务的 handle 方法运行并完成而未抛出异常。

你可能不希望无限期地继续尝试任务。因此,Laravel 提供了多种方式来指定任务可以尝试多少次或多长时间。

NOTE

默认情况下,Laravel 只会尝试一次任务。如果你的任务使用了 WithoutOverlappingRateLimited 等中间件,或者你正在手动释放任务,你可能需要通过 tries 选项增加允许的尝试次数。

指定任务可尝试最大次数的一种方法是通过 Artisan 命令行上的 --tries 开关。这将应用于工作者处理的所有任务,除非正在处理的任务指定了它可以尝试的次数:

shell
php artisan queue:work --tries=3

如果任务超过其最大尝试次数,它将被视为"失败"任务。有关处理失败任务的更多信息,请参阅失败任务文档。如果向 queue:work 命令提供 --tries=0,任务将无限期重试。

你可以通过使用 Tries 属性在任务类本身上定义任务可尝试的最大次数来采取更精细的方法。如果在任务上指定了最大尝试次数,它将优先于命令行上提供的 --tries 值:

php
<?php

namespace App\Jobs;

use Illuminate\Queue\Attributes\Tries;

#[Tries(5)]
class ProcessPodcast implements ShouldQueue
{
    // ...
}

如果你需要对特定任务的最大尝试次数进行动态控制,可以在任务上定义一个 tries 方法:

php
/**
 * Determine number of times the job may be attempted.
 */
public function tries(): int
{
    return 5;
}

基于时间的尝试

作为定义任务失败前可尝试多少次的替代方案,你可以定义任务不再尝试的时间。这允许任务在给定时间范围内尝试任意次数。要定义任务不再尝试的时间,请向任务类添加 retryUntil 方法。此方法应返回一个 DateTime 实例:

php
use DateTime;

/**
 * Determine the time at which the job should timeout.
 */
public function retryUntil(): DateTime
{
    return now()->plus(minutes: 10);
}

如果同时定义了 retryUntiltries,Laravel 优先使用 retryUntil 方法。

NOTE

你还可以在队列事件监听器队列通知上定义 Tries 属性或 retryUntil 方法。

最大异常数

有时你可能希望指定任务可以尝试多次,但如果重试是由给定数量的未处理异常触发的(而不是直接由 release 方法释放的),则应该失败。要实现这一点,你可以在任务类上使用 TriesMaxExceptions 属性:

php
<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\Attributes\MaxExceptions;
use Illuminate\Queue\Attributes\Tries;
use Illuminate\Support\Facades\Redis;

#[Tries(25)]
#[MaxExceptions(3)]
class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        Redis::throttle('key')->allow(10)->every(60)->then(function () {
            // Lock obtained, process the podcast...
        }, function () {
            // Unable to obtain lock...
            return $this->release(10);
        });
    }
}

在此示例中,如果应用无法获取 Redis 锁,任务将被释放十秒,并将继续重试最多 25 次。但是,如果任务抛出三个未处理的异常,任务将失败。

超时

通常,你大致知道你的队列任务需要多长时间。因此,Laravel 允许你指定"超时"值。默认情况下,超时值为 60 秒。如果任务的处理时间超过超时值指定的秒数,处理该任务的工作者将以错误退出。通常,工作者将由在服务器上配置的进程管理器自动重新启动。

可以使用 Artisan 命令行上的 --timeout 开关指定任务可运行的最大秒数:

shell
php artisan queue:work --timeout=30

如果任务通过持续超时超过其最大尝试次数,它将被标记为失败。

你还可以使用任务类上的 Timeout 属性定义任务应允许运行的最大秒数。如果在任务上指定了超时,它将优先于命令行上指定的任何超时:

php
<?php

namespace App\Jobs;

use Illuminate\Queue\Attributes\Timeout;

#[Timeout(120)]
class ProcessPodcast implements ShouldQueue
{
    // ...
}

有时,IO 阻塞进程(如套接字或传出的 HTTP 连接)可能不会遵守你指定的超时。因此,在使用这些功能时,你应该始终尝试使用它们的 API 指定超时。例如,使用 Guzzle 时,你应该始终指定连接和请求超时值。

WARNING

必须安装 PCNTL PHP 扩展才能指定任务超时。此外,任务的"超时"值应始终小于其"重试后"值。否则,任务可能在实际完成执行或超时之前被重新尝试。

超时时失败

如果你希望指示任务在超时时应被标记为失败,可以在任务类上使用 FailOnTimeout 属性:

php
<?php

namespace App\Jobs;

use Illuminate\Queue\Attributes\FailOnTimeout;

#[FailOnTimeout]
class ProcessPodcast implements ShouldQueue
{
    // ...
}

NOTE

默认情况下,当任务超时时,它会消耗一次尝试并被释放回队列(如果允许重试)。但是,如果你配置任务在超时时失败,它将不会被重试,无论 tries 设置的值如何。

SQS FIFO 和公平队列

Laravel 支持 Amazon SQS FIFO(先进先出) 队列,允许你按照发送的确切顺序处理任务,同时通过消息去重确保一次性处理。

FIFO 队列需要消息组 ID 来确定哪些任务可以并行处理。具有相同组 ID 的任务按顺序处理,而具有不同组 ID 的消息可以并发处理。

Laravel 提供了一个流畅的 onGroup 方法来在分发任务时指定消息组 ID:

php
ProcessOrder::dispatch($order)
    ->onGroup("customer-{$order->customer_id}");

SQS FIFO 队列支持消息去重以确保一次性处理。在你的任务类中实现 deduplicationId 方法以提供自定义的去重 ID:

php
<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessSubscriptionRenewal implements ShouldQueue
{
    use Queueable;

    // ...

    /**
     * Get the job's deduplication ID.
     */
    public function deduplicationId(): string
    {
        return "renewal-{$this->subscription->id}";
    }
}

FIFO 监听器、邮件和通知

使用 FIFO 队列时,你还需要在监听器、邮件和通知上定义消息组。或者,你可以将这些对象的队列实例分发到非 FIFO 队列。

要为队列事件监听器定义消息组,请在监听器上定义 messageGroup 方法。你还可以选择性地定义 deduplicationId 方法:

php
<?php

namespace App\Listeners;

class SendShipmentNotification
{
    // ...

    /**
     * Get the job's message group.
     */
    public function messageGroup(): string
    {
        return 'shipments';
    }

    /**
     * Get the job's deduplication ID.
     */
    public function deduplicationId(): string
    {
        return "shipment-notification-{$this->shipment->id}";
    }
}

当发送将在 FIFO 队列上排队的邮件消息时,你应该在发送时调用 onGroup 方法,并可选地调用 withDeduplicator 方法:

php
use App\Mail\InvoicePaid;
use Illuminate\Support\Facades\Mail;

$invoicePaid = (new InvoicePaid($invoice))
    ->onGroup('invoices')
    ->withDeduplicator(fn () => 'invoices-'.$invoice->id);

Mail::to($request->user())->send($invoicePaid);

当发送将在 FIFO 队列上排队的通知时,你应该在发送时调用 onGroup 方法,并可选地调用 withDeduplicator 方法:

php
use App\Notifications\InvoicePaid;

$invoicePaid = (new InvoicePaid($invoice))
    ->onGroup('invoices')
    ->withDeduplicator(fn () => 'invoices-'.$invoice->id);

$user->notify($invoicePaid);

队列故障转移

failover 队列驱动在将任务推送到队列时提供自动故障转移功能。如果 failover 配置的主队列连接因任何原因失败,Laravel 将自动尝试将任务推送到列表中的下一个配置连接。这对于确保队列可靠性至关重要的生产环境中的高可用性特别有用。

要配置故障转移队列连接,请指定 failover 驱动并提供一个按顺序尝试的连接名称数组。默认情况下,Laravel 在应用的 config/queue.php 配置文件中包含一个示例故障转移配置:

php
'failover' => [
    'driver' => 'failover',
    'connections' => [
        'redis',
        'database',
        'sync',
    ],
],

配置了使用 failover 驱动的连接后,你需要在应用的 .env 文件中将故障转移连接设置为默认队列连接以使用故障转移功能:

ini
QUEUE_CONNECTION=failover

接下来,为故障转移连接列表中的每个连接启动至少一个工作者:

bash
php artisan queue:work redis
php artisan queue:work database

NOTE

你不需要为使用 syncbackgrounddeferred 队列驱动的连接运行工作者,因为这些驱动在当前 PHP 进程中处理任务。

当队列连接操作失败并激活故障转移时,Laravel 将分发 Illuminate\Queue\Events\QueueFailedOver 事件,允许你报告或记录队列连接失败。

NOTE

如果你使用 Laravel Horizon,请记住 Horizon 仅管理 Redis 队列。如果你的故障转移列表包含 database,你应该在 Horizon 旁边运行一个常规的 php artisan queue:work database 进程。

错误处理

如果在处理任务时抛出异常,任务将自动被释放回队列以便再次尝试。任务将继续被释放,直到它已尝试了应用允许的最大次数。最大尝试次数由 queue:work Artisan 命令上使用的 --tries 开关定义。或者,最大尝试次数可以在任务类本身上定义。有关运行队列工作者的更多信息可以在下面找到

手动释放任务

有时你可能希望手动将任务释放回队列,以便稍后再次尝试。你可以通过调用 release 方法来实现:

php
/**
 * Execute the job.
 */
public function handle(): void
{
    // ...

    $this->release();
}

默认情况下,release 方法将任务释放回队列以立即处理。但是,你可以通过向 release 方法传递整数或日期实例来指示队列在给定秒数过去之前不要使任务可用于处理:

php
$this->release(10);

$this->release(now()->plus(seconds: 10));

手动失败任务

有时你可能需要手动将任务标记为"失败"。为此,你可以调用 fail 方法:

php
/**
 * Execute the job.
 */
public function handle(): void
{
    // ...

    $this->fail();
}

如果你希望因为捕获到的异常而将任务标记为失败,可以将异常传递给 fail 方法。或者,为了方便,你可以传递一个字符串错误消息,它将被转换为异常:

php
$this->fail($exception);

$this->fail('Something went wrong.');

NOTE

有关失败任务的更多信息,请查看处理任务失败的文档

在特定异常时失败任务

FailOnException 任务中间件允许你在抛出特定异常时短路重试。这允许对瞬态异常(如外部 API 错误)进行重试,但对持久异常(如用户的权限被撤销)将任务永久失败:

php
<?php

namespace App\Jobs;

use App\Models\User;
use Illuminate\Auth\Access\AuthorizationException;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\Attributes\Tries;
use Illuminate\Queue\Middleware\FailOnException;
use Illuminate\Support\Facades\Http;

#[Tries(3)]
class SyncChatHistory implements ShouldQueue
{
    use Queueable;

    /**
     * Create a new job instance.
     */
    public function __construct(
        public User $user,
    ) {}

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        $this->user->authorize('sync-chat-history');

        $response = Http::throw()->get(
            "https://chat.laravel.test/?user={$this->user->uuid}"
        );

        // ...
    }

    /**
     * Get the middleware the job should pass through.
     */
    public function middleware(): array
    {
        return [
            new FailOnException([AuthorizationException::class])
        ];
    }
}

任务批处理

Laravel 的任务批处理功能允许你轻松执行一批任务,然后在这批任务完成执行后执行某些操作。在开始之前,你应该创建一个数据库迁移来构建一个表,该表将包含关于任务批次的元信息,例如它们的完成百分比。可以使用 make:queue-batches-table Artisan 命令生成此迁移:

shell
php artisan make:queue-batches-table

php artisan migrate

定义可批处理任务

要定义可批处理任务,你应该像平常一样创建可排队任务;但是,你应该向任务类添加 Illuminate\Bus\Batchable trait。此 trait 提供对 batch 方法的访问,可用于检索任务正在其中执行的当前批次:

php
<?php

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ImportCsv implements ShouldQueue
{
    use Batchable, Queueable;

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        if ($this->batch()->cancelled()) {
            // Determine if the batch has been cancelled...

            return;
        }

        // Import a portion of the CSV file...
    }
}

分发批次

要分发一批任务,你应该使用 Bus facade 的 batch 方法。当然,批处理主要在与完成回调结合使用时有用。因此,你可以使用 thencatchfinally 方法为批次定义完成回调。这些回调中的每一个在被调用时都将接收一个 Illuminate\Bus\Batch 实例。

当运行多个队列工作者时,批次中的任务将被并行处理。因此,任务完成的顺序可能与它们被添加到批次中的顺序不同。请参阅我们关于任务链和批次的文档,了解如何按顺序运行一系列任务。

在此示例中,我们将假设我们正在排队一批任务,每个任务处理 CSV 文件中给定数量的行:

php
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;

$batch = Bus::batch([
    new ImportCsv(1, 100),
    new ImportCsv(101, 200),
    new ImportCsv(201, 300),
    new ImportCsv(301, 400),
    new ImportCsv(401, 500),
])->before(function (Batch $batch) {
    // The batch has been created but no jobs have been added...
})->progress(function (Batch $batch) {
    // A single job has completed successfully...
})->then(function (Batch $batch) {
    // All jobs completed successfully...
})->catch(function (Batch $batch, Throwable $e) {
    // Batch job failure detected...
})->finally(function (Batch $batch) {
    // The batch has finished executing...
})->dispatch();

return $batch->id;

批次的 ID(可通过 $batch->id 属性访问)可用于在批次分发后查询 Laravel 命令总线以获取有关批次的信息。

WARNING

由于批次回调被序列化并由 Laravel 队列在稍后执行,你不应在回调中使用 $this 变量。此外,由于批处理任务被包裹在数据库事务中,触发隐式提交的数据库语句不应在任务中执行。

命名批次

某些工具(如 Laravel HorizonLaravel Telescope)如果批次被命名,可能会为批次提供更友好的调试信息。要为批次分配任意名称,你可以在定义批次时调用 name 方法:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->name('Import CSV')->dispatch();

批次连接和队列

如果你希望指定批处理任务应使用的连接和队列,可以使用 onConnectiononQueue 方法。所有批处理任务必须在相同的连接和队列中执行:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->onConnection('redis')->onQueue('imports')->dispatch();

链和批次

你可以通过将链式任务放在数组中来在批次中定义一组链式任务。例如,我们可以并行执行两个任务链,并在两个任务链都完成处理后执行回调:

php
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

Bus::batch([
    [
        new ReleasePodcast(1),
        new SendPodcastReleaseNotification(1),
    ],
    [
        new ReleasePodcast(2),
        new SendPodcastReleaseNotification(2),
    ],
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->dispatch();

相反,你可以通过在链中定义批次来在中运行任务批次。例如,你可以先运行一批任务来发布多个播客,然后运行一批任务来发送发布通知:

php
use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new FlushPodcastCache,
    Bus::batch([
        new ReleasePodcast(1),
        new ReleasePodcast(2),
    ]),
    Bus::batch([
        new SendPodcastReleaseNotification(1),
        new SendPodcastReleaseNotification(2),
    ]),
])->dispatch();

向批次添加任务

有时从批处理任务内部向批次添加额外的任务可能很有用。当你需要批处理数千个任务时,在 Web 请求期间分发可能需要太长时间,这种模式很有用。因此,你可能希望分发一个初始的"加载器"任务批次,用更多的任务填充该批次:

php
$batch = Bus::batch([
    new LoadImportBatch,
    new LoadImportBatch,
    new LoadImportBatch,
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->name('Import Contacts')->dispatch();

在此示例中,我们将使用 LoadImportBatch 任务用额外的任务填充批次。要实现这一点,我们可以使用通过任务的 batch 方法访问的批次实例上的 add 方法:

php
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;

/**
 * Execute the job.
 */
public function handle(): void
{
    if ($this->batch()->cancelled()) {
        return;
    }

    $this->batch()->add(Collection::times(1000, function () {
        return new ImportContacts;
    }));
}

WARNING

你只能从属于同一批次的任务内部向批次添加任务。

检查批次

提供给批次完成回调的 Illuminate\Bus\Batch 实例具有多种属性和方法,可帮助你与给定的任务批次进行交互和检查:

php
// The UUID of the batch...
$batch->id;

// The name of the batch (if applicable)...
$batch->name;

// The number of jobs assigned to the batch...
$batch->totalJobs;

// The number of jobs that have not been processed by the queue...
$batch->pendingJobs;

// The number of jobs that have failed...
$batch->failedJobs;

// The number of jobs that have been processed thus far...
$batch->processedJobs();

// The completion percentage of the batch (0-100)...
$batch->progress();

// Indicates if the batch has finished executing...
$batch->finished();

// Cancel the execution of the batch...
$batch->cancel();

// Indicates if the batch has been cancelled...
$batch->cancelled();

从路由返回批次

所有 Illuminate\Bus\Batch 实例都是 JSON 可序列化的,这意味着你可以直接从应用的路由返回它们,以检索包含批次信息(包括其完成进度)的 JSON 负载。这使得在应用的 UI 中显示批次完成进度信息变得方便。

要通过其 ID 检索批次,你可以使用 Bus facade 的 findBatch 方法:

php
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;

Route::get('/batch/{batchId}', function (string $batchId) {
    return Bus::findBatch($batchId);
});

取消批次

有时你可能需要取消给定批次的执行。这可以通过在 Illuminate\Bus\Batch 实例上调用 cancel 方法来实现:

php
/**
 * Execute the job.
 */
public function handle(): void
{
    if ($this->user->exceedsImportLimit()) {
        $this->batch()->cancel();

        return;
    }

    if ($this->batch()->cancelled()) {
        return;
    }
}

你可能已经在前面的示例中注意到,批处理任务通常应在继续执行之前确定其对应的批次是否已被取消。但是,为了方便,你可以将 SkipIfBatchCancelled 中间件分配给任务。如其名称所示,如果任务对应的批次已被取消,此中间件将指示 Laravel 不处理该任务:

php
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;

/**
 * Get the middleware the job should pass through.
 */
public function middleware(): array
{
    return [new SkipIfBatchCancelled];
}

批次失败

当批处理任务失败时,catch 回调(如果已分配)将被调用。此回调仅为批次中失败的第一个任务调用。

允许失败

当批次中的任务失败时,Laravel 将自动将批次标记为"已取消"。如果你愿意,你可以禁用此行为,以便任务失败不会自动将批次标记为已取消。这可以通过在分发批次时调用 allowFailures 方法来实现:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->allowFailures()->dispatch();

你可以选择性地向 allowFailures 方法提供一个闭包,该闭包将在每次任务失败时执行:

php
$batch = Bus::batch([
    // ...
])->allowFailures(function (Batch $batch, $exception) {
    // Handle individual job failures...
})->dispatch();

重试失败的批次任务

为了方便,Laravel 提供了一个 queue:retry-batch Artisan 命令,允许你轻松重试给定批次的所有失败任务。此命令接受应重试其失败任务的批次的 UUID:

shell
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

修剪批次

如果不进行修剪,job_batches 表可能会很快积累记录。为了缓解这个问题,你应该调度 queue:prune-batches Artisan 命令每天运行:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches')->daily();

默认情况下,所有超过 24 小时的已完成批次将被修剪。你可以在调用命令时使用 hours 选项来确定保留批次数据的时间。例如,以下命令将删除所有超过 48 小时前完成的批次:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48')->daily();

有时,你的 job_batches 表可能会积累从未成功完成的批次记录,例如任务失败且该任务从未成功重试的批次。你可以使用 unfinished 选项指示 queue:prune-batches 命令修剪这些未完成的批次记录:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同样,你的 job_batches 表也可能会积累已取消批次的记录。你可以使用 cancelled 选项指示 queue:prune-batches 命令修剪这些已取消的批次记录:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();

在 DynamoDB 中存储批次

Laravel 还支持将批次元信息存储在 DynamoDB 中,而不是关系型数据库中。但是,你需要手动创建一个 DynamoDB 表来存储所有批次记录。

通常,此表应命名为 job_batches,但你应根据应用的 queue 配置文件中 queue.batching.table 配置值来命名该表。

DynamoDB 批次表配置

job_batches 表应具有一个名为 application 的字符串主分区键和一个名为 id 的字符串主排序键。键的 application 部分将包含你的应用名称,如应用的 app 配置文件中的 name 配置值所定义的。由于应用名称是 DynamoDB 表键的一部分,你可以使用同一张表来存储多个 Laravel 应用的任务批次。

此外,如果你希望利用自动批次修剪,可以为表定义 ttl 属性。

DynamoDB 配置

接下来,安装 AWS SDK,以便你的 Laravel 应用可以与 Amazon DynamoDB 通信:

shell
composer require aws/aws-sdk-php

然后,将 queue.batching.driver 配置选项的值设置为 dynamodb。此外,你应该在 batching 配置数组中定义 keysecretregion 配置选项。这些选项将用于与 AWS 进行身份验证。使用 dynamodb 驱动时,queue.batching.database 配置选项是不必要的:

php
'batching' => [
    'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
],

在 DynamoDB 中修剪批次

使用 DynamoDB 存储任务批次信息时,用于修剪存储在关系型数据库中的批次的典型修剪命令将不起作用。相反,你可以利用 DynamoDB 的原生 TTL 功能自动删除旧批次的记录。

如果你使用 ttl 属性定义了 DynamoDB 表,你可以定义配置参数来指示 Laravel 如何修剪批次记录。queue.batching.ttl_attribute 配置值定义了保存 TTL 的属性名称,而 queue.batching.ttl 配置值定义了批次记录可以从 DynamoDB 表中删除的秒数,相对于记录最后一次更新的时间:

php
'batching' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
    'ttl_attribute' => 'ttl',
    'ttl' => 60 * 60 * 24 * 7, // 7 days...
],

队列闭包

除了将任务类分发到队列外,你还可以分发闭包。这对于需要在当前请求周期之外执行的快速、简单任务非常有用。当将闭包分发到队列时,闭包的代码内容会被加密签名,以防止在传输过程中被修改:

php
use App\Models\Podcast;

$podcast = Podcast::find(1);

dispatch(function () use ($podcast) {
    $podcast->publish();
});

要为队列闭包分配一个名称(可由队列报告仪表盘使用,也可由 queue:work 命令显示),你可以使用 name 方法:

php
dispatch(function () {
    // ...
})->name('Publish Podcast');

使用 catch 方法,你可以提供一个闭包,如果队列闭包在耗尽队列的所有配置重试次数后未能成功完成,则应执行该闭包:

php
use Throwable;

dispatch(function () use ($podcast) {
    $podcast->publish();
})->catch(function (Throwable $e) {
    // This job has failed...
});

WARNING

由于 catch 回调被序列化并由 Laravel 队列在稍后执行,你不应在 catch 回调中使用 $this 变量。

运行队列工作者

queue:work 命令

Laravel 包含一个 Artisan 命令,它将启动队列工作者并在任务被推送到队列时处理新任务。你可以使用 queue:work Artisan 命令运行工作者。请注意,一旦 queue:work 命令启动,它将继续运行,直到手动停止或关闭终端:

shell
php artisan queue:work

NOTE

要保持 queue:work 进程在后台永久运行,你应该使用进程监控器(如 Supervisor)来确保队列工作者不会停止运行。

如果你希望在命令输出中包含已处理的任务 ID、连接名称和队列名称,可以在调用 queue:work 命令时包含 -v 标志:

shell
php artisan queue:work -v

请记住,队列工作者是长寿命进程,并将启动的应用状态存储在内存中。因此,它们在启动后不会注意到代码库的更改。因此,在部署过程中,请确保重新启动队列工作者。此外,请记住,应用创建或修改的任何静态状态不会在任务之间自动重置。

或者,你可以运行 queue:listen 命令。使用 queue:listen 命令时,当你想重新加载更新的代码或重置应用状态时不必手动重新启动工作者;但是,此命令的效率明显低于 queue:work 命令:

shell
php artisan queue:listen

运行多个队列工作者

要将多个工作者分配给队列并并发处理任务,你只需启动多个 queue:work 进程。这可以通过终端中的多个选项卡在本地完成,或在生产环境中使用进程管理器的配置设置。使用 Supervisor 时,你可以使用 numprocs 配置值。

指定连接和队列

你还可以指定工作者应使用哪个队列连接。传递给 work 命令的连接名称应对应 config/queue.php 配置文件中定义的连接之一:

shell
php artisan queue:work redis

默认情况下,queue:work 命令只处理给定连接上的默认队列的任务。但是,你可以通过仅处理给定连接的特定队列来进一步自定义队列工作者。例如,如果你的所有电子邮件都在 redis 队列连接的 emails 队列中处理,你可以发出以下命令来启动一个只处理该队列的工作者:

shell
php artisan queue:work redis --queue=emails

处理指定数量的任务

--once 选项可用于指示工作者仅从队列中处理单个任务:

shell
php artisan queue:work --once

--max-jobs 选项可用于指示工作者处理给定数量的任务然后退出。当与 Supervisor 结合使用时,此选项可能很有用,以便你的工作者在处理给定数量的任务后自动重新启动,释放它们可能累积的任何内存:

shell
php artisan queue:work --max-jobs=1000

处理所有队列任务然后退出

--stop-when-empty 选项可用于指示工作者处理所有任务然后优雅退出。如果你希望在队列为空后关闭容器,则在 Docker 容器中处理 Laravel 队列时此选项可能很有用:

shell
php artisan queue:work --stop-when-empty

处理给定秒数的任务

--max-time 选项可用于指示工作者处理给定秒数的任务然后退出。当与 Supervisor 结合使用时,此选项可能很有用,以便你的工作者在处理给定时间的任务后自动重新启动,释放它们可能累积的任何内存:

shell
# Process jobs for one hour and then exit...
php artisan queue:work --max-time=3600

工作者睡眠时长

当队列上有任务可用时,工作者将继续处理任务,任务之间没有延迟。但是,sleep 选项确定如果没有可用的任务,工作者将"睡眠"多少秒。当然,在睡眠期间,工作者不会处理任何新任务:

shell
php artisan queue:work --sleep=3

维护模式和队列

当你的应用处于维护模式时,不会处理队列任务。一旦应用退出维护模式,任务将继续正常处理。

要强制队列工作者即使在维护模式启用时也处理任务,你可以使用 --force 选项:

shell
php artisan queue:work --force

资源注意事项

守护进程队列工作者在处理每个任务之前不会"重启"框架。因此,你应该在每个任务完成后释放任何重型资源。例如,如果你使用 GD 库进行图像处理,你应该在完成图像处理后使用 imagedestroy 释放内存。

队列优先级

有时你可能希望优先处理队列的方式。例如,在 config/queue.php 配置文件中,你可以将 redis 连接的默认 queue 设置为 low。但是,有时你可能希望将任务推送到 high 优先级队列,像这样:

php
dispatch((new Job)->onQueue('high'));

要启动一个工作者,验证所有 high 队列任务都在继续处理 low 队列上的任何任务之前被处理,请将以逗号分隔的队列名称列表传递给 work 命令:

shell
php artisan queue:work --queue=high,low

队列工作者和部署

由于队列工作者是长寿命进程,它们不会在不重新启动的情况下注意到代码的更改。因此,使用队列工作者部署应用的最简单方法是在部署过程中重新启动工作者。你可以通过发出 queue:restart 命令来优雅地重新启动所有工作者:

shell
php artisan queue:restart

此命令将指示所有队列工作者在完成处理当前任务后优雅退出,以便不会丢失现有任务。由于队列工作者将在执行 queue:restart 命令时退出,你应该运行进程管理器(如 Supervisor)来自动重新启动队列工作者。

NOTE

队列使用缓存来存储重启信号,因此你应该在使用此功能之前验证应用的缓存驱动是否已正确配置。

任务过期和超时

任务过期

config/queue.php 配置文件中,每个队列连接定义了一个 retry_after 选项。此选项指定队列连接在重试正在处理的任务之前应等待多少秒。例如,如果 retry_after 的值设置为 90,则如果任务已处理 90 秒而未被释放或删除,它将被释放回队列。通常,你应该将 retry_after 值设置为你的任务合理完成处理所需的最大秒数。

WARNING

唯一不包含 retry_after 值的队列连接是 Amazon SQS。SQS 将根据在 AWS 控制台中管理的默认可见性超时重试任务。

工作者超时

queue:work Artisan 命令暴露了一个 --timeout 选项。默认情况下,--timeout 值为 60 秒。如果任务的处理时间超过超时值指定的秒数,处理该任务的工作者将以错误退出。通常,工作者将由在服务器上配置的进程管理器自动重新启动:

shell
php artisan queue:work --timeout=60

retry_after 配置选项和 --timeout CLI 选项是不同的,但它们共同工作以确保任务不会丢失且任务只被成功处理一次。

WARNING

--timeout 值应始终比 retry_after 配置值短至少几秒。这将确保处理冻结任务的工作者始终在任务被重试之前被终止。如果你的 --timeout 选项比 retry_after 配置值更长,你的任务可能会被处理两次。

暂停和恢复队列工作者

有时你可能需要临时阻止队列工作者处理新任务,而不完全停止工作者。例如,你可能希望在系统维护期间暂停任务处理。Laravel 提供了 queue:pausequeue:continue Artisan 命令来暂停和恢复队列工作者。

要暂停特定队列,请提供队列连接名称和队列名称:

shell
php artisan queue:pause database:default

在此示例中,database 是队列连接名称,default 是队列名称。一旦队列被暂停,任何从该队列处理任务的工作者将继续完成当前任务,但在队列恢复之前不会拾取任何新任务。

要在已暂停的队列上恢复处理任务,请使用 queue:continue 命令:

shell
php artisan queue:continue database:default

恢复队列后,工作者将立即开始从该队列处理新任务。请注意,暂停队列不会停止工作者进程本身 - 它只是阻止工作者从指定队列处理新任务。

工作者重启和暂停信号

默认情况下,队列工作者在每次任务迭代时轮询缓存驱动以获取重启和暂停信号。虽然这种轮询对于响应 queue:restartqueue:pause 命令是必要的,但它确实引入了少量性能开销。

如果你需要优化性能且不需要这些中断功能,你可以通过在 Queue facade 上调用 withoutInterruptionPolling 方法来全局禁用此轮询。这通常应在 AppServiceProviderboot 方法中完成:

php
use Illuminate\Support\Facades\Queue;

/**
 * Bootstrap any application services.
 */
public function boot(): void
{
    Queue::withoutInterruptionPolling();
}

或者,你可以通过设置 Illuminate\Queue\Worker 类的静态 $restartable$pausable 属性来单独禁用重启或暂停轮询:

php
use Illuminate\Queue\Worker;

/**
 * Bootstrap any application services.
 */
public function boot(): void
{
    Worker::$restartable = false;
    Worker::$pausable = false;
}

WARNING

当中断轮询被禁用时,工作者将不会响应 queue:restartqueue:pause 命令(取决于禁用了哪些功能)。

Supervisor 配置

在生产环境中,你需要一种方法来保持 queue:work 进程运行。queue:work 进程可能因多种原因停止运行,例如工作者超时或执行 queue:restart 命令。

因此,你需要配置一个进程监控器,它可以检测 queue:work 进程何时退出并自动重新启动它们。此外,进程监控器可以允许你指定希望同时运行多少个 queue:work 进程。Supervisor 是 Linux 环境中常用的进程监控器,我们将在以下文档中讨论如何配置它。

安装 Supervisor

Supervisor 是 Linux 操作系统的进程监控器,如果你的 queue:work 进程失败,它将自动重新启动它们。要在 Ubuntu 上安装 Supervisor,你可以使用以下命令:

shell
sudo apt-get install supervisor

NOTE

如果自己配置和管理 Supervisor 听起来很复杂,请考虑使用 Laravel Cloud,它提供了一个完全托管的平台来运行 Laravel 队列工作者。

配置 Supervisor

Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录中。在此目录中,你可以创建任意数量的配置文件,指示 Supervisor 如何监控你的进程。例如,让我们创建一个 laravel-worker.conf 文件来启动和监控 queue:work 进程:

ini
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

在此示例中,numprocs 指令将指示 Supervisor 运行八个 queue:work 进程并监控所有这些进程,如果它们失败则自动重新启动它们。你应该更改配置的 command 指令以反映你所需的队列连接和工作者选项。

WARNING

你应该确保 stopwaitsecs 的值大于你运行时间最长的任务所消耗的秒数。否则,Supervisor 可能会在任务完成处理之前终止它。

启动 Supervisor

创建配置文件后,你可以使用以下命令更新 Supervisor 配置并启动进程:

shell
sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start "laravel-worker:*"

有关 Supervisor 的更多信息,请参阅 Supervisor 文档

处理失败的任务

有时你的队列任务会失败。别担心,事情并不总是按计划进行!Laravel 提供了一种便捷的方式来指定任务应尝试的最大次数。当异步任务超过此尝试次数后,它将被插入到 failed_jobs 数据库表中。同步分发的任务失败时不会存储在此表中,它们的异常会立即由应用处理。

创建 failed_jobs 表的迁移通常已存在于新的 Laravel 应用中。但是,如果你的应用不包含此表的迁移,你可以使用 make:queue-failed-table 命令创建迁移:

shell
php artisan make:queue-failed-table

php artisan migrate

运行队列工作者进程时,你可以使用 queue:work 命令上的 --tries 开关指定任务应尝试的最大次数。如果你未指定 --tries 选项的值,任务将只尝试一次或按任务类的 Tries 属性指定的次数尝试:

shell
php artisan queue:work redis --tries=3

使用 --backoff 选项,你可以指定 Laravel 在重试遇到异常的任务之前应等待多少秒。默认情况下,任务会立即被释放回队列以便再次尝试:

shell
php artisan queue:work redis --tries=3 --backoff=3

如果你想按每个任务配置 Laravel 在重试遇到异常的任务之前应等待多少秒,你可以在任务类上使用 Backoff 属性:

php
<?php

namespace App\Jobs;

use Illuminate\Queue\Attributes\Backoff;

#[Backoff(3)]
class ProcessPodcast implements ShouldQueue
{
    // ...
}

如果你需要更复杂的逻辑来确定任务的退避时间,你可以在任务类上定义 backoff 方法:

php
/**
 * Calculate the number of seconds to wait before retrying the job.
 */
public function backoff(): int
{
    return 3;
}

你可以通过定义退避值数组轻松配置"指数"退避。在此示例中,第一次重试的延迟为 1 秒,第二次重试为 5 秒,第三次重试为 10 秒,如果还有更多剩余尝试,则每次后续重试为 10 秒:

php
<?php

namespace App\Jobs;

use Illuminate\Queue\Attributes\Backoff;

#[Backoff([1, 5, 10])]
class ProcessPodcast implements ShouldQueue
{
    // ...
}

失败任务后的清理

当特定任务失败时,你可能希望向用户发送警报或回滚任务部分完成的任何操作。要实现这一点,你可以在任务类上定义 failed 方法。导致任务失败的 Throwable 实例将被传递给 failed 方法:

php
<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Throwable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * Create a new job instance.
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * Execute the job.
     */
    public function handle(AudioProcessor $processor): void
    {
        // Process uploaded podcast...
    }

    /**
     * Handle a job failure.
     */
    public function failed(?Throwable $exception): void
    {
        // Send user notification of failure, etc...
    }
}

WARNING

在调用 failed 方法之前会实例化一个新的任务实例;因此,handle 方法中可能发生的任何类属性修改都将丢失。

失败的任务不一定是遇到了未处理异常的任务。当任务耗尽了所有允许的尝试时,也可能被视为失败。这些尝试可以通过以下几种方式消耗:

  • 任务超时。
  • 任务在执行期间遇到未处理的异常。
  • 任务被手动或由中间件释放回队列。

如果最终尝试因任务执行期间抛出的异常而失败,该异常将被传递给任务的 failed 方法。但是,如果任务因达到最大允许尝试次数而失败,$exception 将是 Illuminate\Queue\MaxAttemptsExceededException 的实例。类似地,如果任务因超过配置的超时而失败,$exception 将是 Illuminate\Queue\TimeoutExceededException 的实例。

重试失败的任务

要查看已插入到 failed_jobs 数据库表中的所有失败任务,你可以使用 queue:failed Artisan 命令:

shell
php artisan queue:failed

queue:failed 命令将列出任务 ID、连接、队列、失败时间以及有关任务的其他信息。任务 ID 可用于重试失败的任务。例如,要重试 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 的失败任务,请发出以下命令:

shell
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

如有必要,你可以向命令传递多个 ID:

shell
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

你还可以重试特定队列的所有失败任务:

shell
php artisan queue:retry --queue=name

要重试所有失败的任务,请执行 queue:retry 命令并传递 all 作为 ID:

shell
php artisan queue:retry all

如果你想删除失败的任务,可以使用 queue:forget 命令:

shell
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d

NOTE

使用 Horizon 时,你应该使用 horizon:forget 命令而不是 queue:forget 命令来删除失败的任务。

要从 failed_jobs 表中删除所有失败的任务,你可以使用 queue:flush 命令:

shell
php artisan queue:flush

queue:flush 命令从你的队列中删除所有失败的任务记录,无论失败的任务有多旧。你可以使用 --hours 选项仅删除在一定小时数之前或更早失败的任务:

shell
php artisan queue:flush --hours=48

忽略缺失的模型

当将 Eloquent 模型注入任务时,模型会在放入队列之前自动序列化,并在处理任务时从数据库重新检索。但是,如果模型在任务等待工作者处理时已被删除,你的任务可能会因 ModelNotFoundException 而失败。

为了方便,你可以通过在任务类上使用 DeleteWhenMissingModels 属性来选择自动删除缺失模型的任务。当此属性存在时,Laravel 将静默地丢弃该任务而不引发异常:

php
<?php

namespace App\Jobs;

use Illuminate\Queue\Attributes\DeleteWhenMissingModels;

#[DeleteWhenMissingModels]
class ProcessPodcast implements ShouldQueue
{
    // ...
}

修剪失败的任务

你可以通过调用 queue:prune-failed Artisan 命令来修剪应用的 failed_jobs 表中的记录:

shell
php artisan queue:prune-failed

默认情况下,所有超过 24 小时的失败任务记录将被修剪。如果你向命令提供 --hours 选项,则只会保留在最近 N 小时内插入的失败任务记录。例如,以下命令将删除所有超过 48 小时前插入的失败任务记录:

shell
php artisan queue:prune-failed --hours=48

在 DynamoDB 中存储失败的任务

Laravel 还支持将失败的任务记录存储在 DynamoDB 中,而不是关系型数据库表中。但是,你必须手动创建一个 DynamoDB 表来存储所有失败的任务记录。通常,此表应命名为 failed_jobs,但你应根据应用的 queue 配置文件中 queue.failed.table 配置值来命名该表。

failed_jobs 表应具有一个名为 application 的字符串主分区键和一个名为 uuid 的字符串主排序键。键的 application 部分将包含你的应用名称,如应用的 app 配置文件中的 name 配置值所定义的。由于应用名称是 DynamoDB 表键的一部分,你可以使用同一张表来存储多个 Laravel 应用的失败任务。

此外,请确保安装了 AWS SDK,以便你的 Laravel 应用可以与 Amazon DynamoDB 通信:

shell
composer require aws/aws-sdk-php

接下来,将 queue.failed.driver 配置选项的值设置为 dynamodb。此外,你应该在失败任务配置数组中定义 keysecretregion 配置选项。这些选项将用于与 AWS 进行身份验证。使用 dynamodb 驱动时,queue.failed.database 配置选项是不必要的:

php
'failed' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'failed_jobs',
],

禁用失败任务存储

你可以通过将 queue.failed.driver 配置选项的值设置为 null 来指示 Laravel 丢弃失败的任务而不存储它们。通常,这可以通过 QUEUE_FAILED_DRIVER 环境变量来实现:

ini
QUEUE_FAILED_DRIVER=null

失败任务事件

如果你希望注册一个在任务失败时被调用的事件监听器,你可以使用 Queue facade 的 failing 方法。例如,我们可以从 Laravel 包含的 AppServiceProviderboot 方法中将闭包附加到此事件:

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     */
    public function register(): void
    {
        // ...
    }

    /**
     * Bootstrap any application services.
     */
    public function boot(): void
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }
}

从队列中清除任务

NOTE

使用 Horizon 时,你应该使用 horizon:clear 命令而不是 queue:clear 命令来从队列中清除任务。

如果你希望从默认连接的默认队列中删除所有任务,你可以使用 queue:clear Artisan 命令:

shell
php artisan queue:clear

你还可以提供 connection 参数和 queue 选项以从特定连接和队列中删除任务:

shell
php artisan queue:clear redis --queue=emails

WARNING

从队列中清除任务仅适用于 SQS、Redis 和 database 队列驱动。此外,SQS 消息删除过程最多需要 60 秒,因此在你清除队列后最多 60 秒内发送到 SQS 队列的任务也可能被删除。

监控你的队列

如果你的队列突然涌入大量任务,它可能会不堤重负,导致任务完成的等待时间很长。如果你愿意,Laravel 可以在你的队列任务计数超过指定阈值时提醒你。

要开始,你应该调度 queue:monitor 命令每分钟运行。该命令接受你希望监控的队列名称以及你期望的任务计数阈值:

shell
php artisan queue:monitor redis:default,redis:deployments --max=100

仅调度此命令不足以触发通知提醒你队列的超负荷状态。当命令遇到任务计数超过阈值的队列时,将分发 Illuminate\Queue\Events\QueueBusy 事件。你可以在应用的 AppServiceProvider 中监听此事件,以便向你或你的开发团队发送通知:

php
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;

/**
 * Bootstrap any application services.
 */
public function boot(): void
{
    Event::listen(function (QueueBusy $event) {
        Notification::route('mail', 'dev@example.com')
            ->notify(new QueueHasLongWaitTime(
                $event->connectionName,
                $event->queue,
                $event->size
            ));
    });
}

测试

在测试分发任务的代码时,你可能希望指示 Laravel 不实际执行任务本身,因为任务的代码可以直接单独测试,与分发它的代码分开。当然,要测试任务本身,你可以实例化一个任务实例并在测试中直接调用 handle 方法。

你可以使用 Queue facade 的 fake 方法来阻止队列任务实际被推送到队列。调用 Queue facade 的 fake 方法后,你可以断言应用尝试将任务推送到队列:

php
<?php

use App\Jobs\AnotherJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;

test('orders can be shipped', function () {
    Queue::fake();

    // Perform order shipping...

    // Assert that no jobs were pushed...
    Queue::assertNothingPushed();

    // Assert a job was pushed to a given queue...
    Queue::assertPushedOn('queue-name', ShipOrder::class);

    // Assert a job was pushed
    Queue::assertPushed(ShipOrder::class);

    // Assert a job was pushed twice...
    Queue::assertPushedTimes(ShipOrder::class, 2);

    // Assert a job was not pushed...
    Queue::assertNotPushed(AnotherJob::class);

    // Assert that a closure was pushed to the queue...
    Queue::assertClosurePushed();

    // Assert that a closure was not pushed...
    Queue::assertClosureNotPushed();

    // Assert the total number of jobs that were pushed...
    Queue::assertCount(3);
});
php
<?php

namespace Tests\Feature;

use App\Jobs\AnotherJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;

class ExampleTest extends TestCase
{
    public function test_orders_can_be_shipped(): void
    {
        Queue::fake();

        // Perform order shipping...

        // Assert that no jobs were pushed...
        Queue::assertNothingPushed();

        // Assert a job was pushed to a given queue...
        Queue::assertPushedOn('queue-name', ShipOrder::class);

        // Assert a job was pushed
        Queue::assertPushed(ShipOrder::class);

        // Assert a job was pushed twice...
        Queue::assertPushedTimes(ShipOrder::class, 2);

        // Assert a job was not pushed...
        Queue::assertNotPushed(AnotherJob::class);

        // Assert that a closure was pushed to the queue...
        Queue::assertClosurePushed();

        // Assert that a closure was not pushed...
        Queue::assertClosureNotPushed();

        // Assert the total number of jobs that were pushed...
        Queue::assertCount(3);
    }
}

你可以向 assertPushedassertNotPushedassertClosurePushedassertClosureNotPushed 方法传递闭包,以断言推送了通过给定"真值测试"的任务。如果至少有一个任务被推送并通过了给定的真值测试,则断言将成功:

php
use Illuminate\Queue\CallQueuedClosure;

Queue::assertPushed(function (ShipOrder $job) use ($order) {
    return $job->order->id === $order->id;
});

Queue::assertClosurePushed(function (CallQueuedClosure $job) {
    return $job->name === 'validate-order';
});

伪造任务子集

如果你只需要伪造特定任务,同时允许其他任务正常执行,你可以将应被伪造的任务的类名传递给 fake 方法:

php
test('orders can be shipped', function () {
    Queue::fake([
        ShipOrder::class,
    ]);

    // Perform order shipping...

    // Assert a job was pushed twice...
    Queue::assertPushedTimes(ShipOrder::class, 2);
});
php
public function test_orders_can_be_shipped(): void
{
    Queue::fake([
        ShipOrder::class,
    ]);

    // Perform order shipping...

    // Assert a job was pushed twice...
    Queue::assertPushedTimes(ShipOrder::class, 2);
}

你可以使用 except 方法伪造除一组指定任务之外的所有任务:

php
Queue::fake()->except([
    ShipOrder::class,
]);

测试任务链

要测试任务链,你需要利用 Bus facade 的伪造功能。Bus facade 的 assertChained 方法可用于断言任务链已被分发。assertChained 方法接受链式任务数组作为其第一个参数:

php
use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertChained([
    ShipOrder::class,
    RecordShipment::class,
    UpdateInventory::class
]);

如你在上面的示例中所见,链式任务数组可以是任务类名的数组。但是,你也可以提供实际任务实例的数组。这样做时,Laravel 将确保任务实例属于同一个类并且具有与应用分发的链式任务相同的属性值:

php
Bus::assertChained([
    new ShipOrder,
    new RecordShipment,
    new UpdateInventory,
]);

你可以使用 assertDispatchedWithoutChain 方法断言任务被推送时没有任务链:

php
Bus::assertDispatchedWithoutChain(ShipOrder::class);

测试链修改

如果链式任务向现有链前置或追加任务,你可以使用任务的 assertHasChain 方法断言任务具有预期的剩余任务链:

php
$job = new ProcessPodcast;

$job->handle();

$job->assertHasChain([
    new TranscribePodcast,
    new OptimizePodcast,
    new ReleasePodcast,
]);

assertDoesntHaveChain 方法可用于断言任务的剩余链为空:

php
$job->assertDoesntHaveChain();

测试链式批次

如果你的任务链包含一批任务,你可以通过在链断言中插入 Bus::chainedBatch 定义来断言链式批次符合你的预期:

php
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::assertChained([
    new ShipOrder,
    Bus::chainedBatch(function (PendingBatch $batch) {
        return $batch->jobs->count() === 3;
    }),
    new UpdateInventory,
]);

测试任务批次

Bus facade 的 assertBatched 方法可用于断言一批任务已被分发。传递给 assertBatched 方法的闭包接收一个 Illuminate\Bus\PendingBatch 实例,可用于检查批次中的任务:

php
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertBatched(function (PendingBatch $batch) {
    return $batch->name == 'Import CSV' &&
           $batch->jobs->count() === 10;
});

hasJobs 方法可用于待处理批次以验证批次包含预期的任务。该方法接受任务实例、类名或闭包的数组:

php
Bus::assertBatched(function (PendingBatch $batch) {
    return $batch->hasJobs([
        new ProcessCsvRow(row: 1),
        new ProcessCsvRow(row: 2),
        new ProcessCsvRow(row: 3),
    ]);
});

使用闭包时,闭包将接收任务实例。预期的任务类型将从闭包的类型提示中推断:

php
Bus::assertBatched(function (PendingBatch $batch) {
    return $batch->hasJobs([
        fn (ProcessCsvRow $job) => $job->row === 1,
        fn (ProcessCsvRow $job) => $job->row === 2,
        fn (ProcessCsvRow $job) => $job->row === 3,
    ]);
});

你可以使用 assertBatchCount 方法断言分发了给定数量的批次:

php
Bus::assertBatchCount(3);

你可以使用 assertNothingBatched 断言没有分发任何批次:

php
Bus::assertNothingBatched();

测试任务/批次交互

此外,你可能偶尔需要测试单个任务与其底层批次的交互。例如,你可能需要测试任务是否取消了其批次的进一步处理。要实现这一点,你需要通过 withFakeBatch 方法向任务分配一个伪批次。withFakeBatch 方法返回一个包含任务实例和伪批次的元组:

php
[$job, $batch] = (new ShipOrder)->withFakeBatch();

$job->handle();

$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);

测试任务/队列交互

有时,你可能需要测试队列任务将自己释放回队列。或者,你可能需要测试任务是否删除了自己。你可以通过实例化任务并调用 withFakeQueueInteractions 方法来测试这些队列交互。

一旦任务的队列交互被伪造,你可以在任务上调用 handle 方法。调用任务后,各种断言方法可用于验证任务的队列交互:

php
use App\Exceptions\CorruptedAudioException;
use App\Jobs\ProcessPodcast;

$job = (new ProcessPodcast)->withFakeQueueInteractions();

$job->handle();

$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertNotDeleted();
$job->assertFailed();
$job->assertFailedWith(CorruptedAudioException::class);
$job->assertNotFailed();

任务事件

使用 Queue facadebeforeafter 方法,你可以指定在队列任务处理之前或之后执行的回调。这些回调是执行额外日志记录或增加仪表盘统计信息的绝佳机会。通常,你应该从服务提供者boot 方法中调用这些方法。例如,我们可以使用 Laravel 包含的 AppServiceProvider

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     */
    public function register(): void
    {
        // ...
    }

    /**
     * Bootstrap any application services.
     */
    public function boot(): void
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }
}

使用 Queue facadelooping 方法,你可以指定在工作者尝试从队列中获取任务之前执行的回调。例如,你可以注册一个闭包来回滚之前失败任务留下的任何未关闭事务:

php
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;

Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }
});