佇列 - Queue
簡介
在製作 Web App 時,有些任務若在 Web Request 中進行會花費太多時間,如解析 CSV 檔並上傳。所幸,在 Laravel 中,要建立在背景執行的佇列任務非常輕鬆。只要將需要花費時間的任務移到佇列中執行,就能加速網站對 Request 的回應速度,並提供更好的使用者經驗給客戶。
Laravel 的佇列為各種不同的佇列後端都提供了統一的 API。這些後端包括 Amazon SQS、Redis、甚至是關聯式資料庫。
Laravel 的佇列設定選項保存在專案的 config/queue.php
設定檔中。在這個檔案內,可以看到供各個 Laravel 內建佇列 Driver 使用的連線設定,包含資料庫、Amazon SQS、Redis、Beanstalkd 等 Driver,還包括一個會即時執行任務的同步佇列 (用於本機開發)。還包含一個 null
佇列,用於忽略佇列任務。
Laravel 現在還提供 Horizon。Horizon 是為 Redis 佇列提供的一個一個漂亮面板。更多資訊請參考完整的 Horizon 說明文件。
Connections vs. Queues
在開始使用 Laravel 佇列前,我們需要先瞭解「連線」與「佇列」的差別。在 config/queue.php
設定檔中有個 connections
設定陣列。該選項用於定義連到後端佇列服務的連線,後端佇列服務就是像 Amazon SQS、Beanstalk、Redis 等。不過,一個佇列連線可以有多個「佇列」,我們可以將這些不同的佇列想成是幾個不同堆疊的佇列任務。
可以注意到範例 queue
設定檔中的各個範例連線設定中都包含了一個 queue
屬性。這個 queue
屬性指定的,就是當我們將任務傳給這個連線時預設會被分派的佇列。換句話說,若我們在分派任務時沒有顯式定義要分派到哪個佇列上,這個任務就會被分派到連線設定中 queue
屬性所定義的佇列上:
1use App\Jobs\ProcessPodcast;23// This job is sent to the default connection's default queue...4ProcessPodcast::dispatch();56// This job is sent to the default connection's "emails" queue...7ProcessPodcast::dispatch()->onQueue('emails');
1use App\Jobs\ProcessPodcast;23// This job is sent to the default connection's default queue...4ProcessPodcast::dispatch();56// This job is sent to the default connection's "emails" queue...7ProcessPodcast::dispatch()->onQueue('emails');
有的程式沒有要將任務推送到不同佇列的需求,這些程式只需要有單一佇列就好了。不過,因為 Laravel 的 Queue Worker 可調整各個 Queue 的優先處理等級,因此如果想要調整不同任務的優先處理順序,把任務推送到不同佇列就很有用。就來來說,我們如果把任務推送到 high
佇列,我們就可以執行一個 Worker 來讓這個佇列以更高優先級處理:
1php artisan queue:work --queue=high,default
1php artisan queue:work --queue=high,default
Driver Notes and Prerequisites
Database
In order to use the database
queue driver, you will need a database table to hold the jobs. Typically, this is included in Laravel's default 0001_01_01_000002_create_jobs_table.php
database migration; however, if your application does not contain this migration, you may use the make:queue-table
Artisan command to create it:
1php artisan make:queue-table23php artisan migrate
1php artisan make:queue-table23php artisan migrate
Redis
若要使用 redis
佇列 Driver,請在 config/database.php
設定檔中設定 Redis 資料庫連線。
redis
Queue Driver 不支援 serializer
與 compression
Redis 選項。
Redis Cluster
若 Redis 佇列要使用 Redis Cluster,則設定的佇列名稱必須包含一個 Key Hash Tag。必須加上 Key Hash Tag,這樣才能確保給定佇列中所有的 Redis 索引鍵都有被放在相同的 Hash Slot 中:
1'redis' => [2 'driver' => 'redis',3 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),4 'queue' => env('REDIS_QUEUE', '{default}'),5 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),6 'block_for' => null,7 'after_commit' => false,8],
1'redis' => [2 'driver' => 'redis',3 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),4 'queue' => env('REDIS_QUEUE', '{default}'),5 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),6 'block_for' => null,7 'after_commit' => false,8],
Blocking
在使用 Redis 佇列時,可使用 block_for
設定選項來指定 Redis Driver 在迭代 Worker 迴圈並重新讀取 Redis 資料庫來等新 Job 進來時要等待多久。
可依據佇列的負載來調整這個值,以避免不斷讀取 Redis 資料庫來尋找新任務,會比較有效率。舉例來說,我們可以將其設為 5
,表示 Redis Driver 在等待新任務出現時應先等待 5 秒再查詢 Redis 資料庫:
1'redis' => [2 'driver' => 'redis',3 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),4 'queue' => env('REDIS_QUEUE', 'default'),5 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),6 'block_for' => 5,7 'after_commit' => false,8],
1'redis' => [2 'driver' => 'redis',3 'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),4 'queue' => env('REDIS_QUEUE', 'default'),5 'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),6 'block_for' => 5,7 'after_commit' => false,8],
將 block_for
設為 0
會導致 Queue Worker 在新 Job 出現前一直 Block。也會導致在處理下一個 Job 前都無法處理如 SIGTERM
等訊號 (Signal)。
其他 Driver 的前置要求
下列 Queue Driver 還需要一些相依性套件。可以使用 Composer 套件管理員來安裝這些相依性套件:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~5.0
- Redis:
predis/predis ~2.0
or phpredis PHP extension -
MongoDB:
mongodb/laravel-mongodb
建立 Job
產生 Job 類別
預設情況下,專案中所有可放入佇列的任務都存放在 app/Jobs
目錄內。若 app/Jobs
目錄不存在,則執行 make:jobs
Artisan 指令時會建立該目錄:
1php artisan make:job ProcessPodcast
1php artisan make:job ProcessPodcast
產生的類別會實作 Illuminate\Contracts\Queue\ShouldQueue
介面,這樣 Laravel 就知道該 Job 要被推入佇列並以非同步方式執行。
可以安裝 Stub 來自訂 Job 的 Stub。
類別架構
Job 類別非常簡單,通常只包含了一個 handle
方法,會在佇列處理 Job 時叫用。要開始使用 Job,我們先來看一個範例 Job 類別。在這個範例中,我們先假裝時我們在管理一個 Podcast 上架服務,我們需要在上架前處理上傳的 Podcast 檔案:
1<?php23namespace App\Jobs;45use App\Models\Podcast;6use App\Services\AudioProcessor;7use Illuminate\Contracts\Queue\ShouldQueue;8use Illuminate\Foundation\Queue\Queueable;910class ProcessPodcast implements ShouldQueue11{12 use Queueable;1314 /**15 * Create a new job instance.16 */17 public function __construct(18 public Podcast $podcast,19 ) {}2021 /**22 * Execute the job.23 */24 public function handle(AudioProcessor $processor): void25 {26 // Process uploaded podcast...27 }28}
1<?php23namespace App\Jobs;45use App\Models\Podcast;6use App\Services\AudioProcessor;7use Illuminate\Contracts\Queue\ShouldQueue;8use Illuminate\Foundation\Queue\Queueable;910class ProcessPodcast implements ShouldQueue11{12 use Queueable;1314 /**15 * Create a new job instance.16 */17 public function __construct(18 public Podcast $podcast,19 ) {}2021 /**22 * Execute the job.23 */24 public function handle(AudioProcessor $processor): void25 {26 // Process uploaded podcast...27 }28}
In this example, note that we were able to pass an Eloquent model directly into the queued job's constructor. Because of the Queueable
trait that the job is using, Eloquent models and their loaded relationships will be gracefully serialized and unserialized when the job is processing.
若佇列任務的 Constructor 中接受 Eloquent Model,則只有 Model 的識別元會被序列化進佇列中。實際要處理任務時,佇列系統會自動從資料庫中重新取得完整的 Model 實體以及已載入的關聯。通過這種序列化 Model 的做法,我們就能縮小傳入佇列 Driver 的任務承載。
handle
方法的相依性插入
佇列在處理任務時會叫用該任務的 handle
方法。請注意,我們可以在任務的 handle
方法上型別提示任何相依性項目。Laravel Service Container 會自動插入這些相依性。
若想完整控制 Container 要如何插入這些相依性到 handle
方法,可使用 Container 的 bindMethod
方法。bindMethod
方法接收一個回呼,該回呼則接收該任務與 Container。我們可以在這個回呼中自行叫用 handle
方法。一般來說,我們應該從 App\Providers\AppServiceProvider
Service Provider 的 boot
方法中叫用這個方法:
1use App\Jobs\ProcessPodcast;2use App\Services\AudioProcessor;3use Illuminate\Contracts\Foundation\Application;45$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {6 return $job->handle($app->make(AudioProcessor::class));7});
1use App\Jobs\ProcessPodcast;2use App\Services\AudioProcessor;3use Illuminate\Contracts\Foundation\Application;45$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {6 return $job->handle($app->make(AudioProcessor::class));7});
二進位資料,如圖片等,應在傳入佇列任務前先使用 base64_encode
函式進行編碼。若未進行編碼,則這些資料在放入佇列時可能無法正確被序列化為 JSON。
佇列中的關聯
由於所有已載入的 Eloquent Model 關聯都會在 Job 被放入佇列時序列化,因此,序列化的 Job 字串有時候會比較大。此外,當任務被反序列化,然後 Model 關聯被從資料庫中重新取出時,這些關聯的資料會以完整的關聯取出。這表示,若在 Model 被任務佇列序列化前有對關聯套用任何查詢條件,在反序列化時,這些條件都不會被套用。因此,若只想處理給定關聯中的一部分,應在佇列任務中重新套用這些查詢條件。
或者,若要防止關聯被序列化,可在設定屬性值時在 Model 上呼叫 withoutRelations
方法。該方法會回傳該 Model 不含已載入關聯的實體:
1/**2 * Create a new job instance.3 */4public function __construct(5 Podcast $podcast,6) {7 $this->podcast = $podcast->withoutRelations();8}
1/**2 * Create a new job instance.3 */4public function __construct(5 Podcast $podcast,6) {7 $this->podcast = $podcast->withoutRelations();8}
If you are using PHP constructor property promotion and would like to indicate that an Eloquent model should not have its relations serialized, you may use the WithoutRelations
attribute:
1use Illuminate\Queue\Attributes\WithoutRelations;23/**4 * Create a new job instance.5 */6public function __construct(7 #[WithoutRelations]8 public Podcast $podcast,9) {}
1use Illuminate\Queue\Attributes\WithoutRelations;23/**4 * Create a new job instance.5 */6public function __construct(7 #[WithoutRelations]8 public Podcast $podcast,9) {}
If a job receives a collection or array of Eloquent models instead of a single model, the models within that collection will not have their relationships restored when the job is deserialized and executed. This is to prevent excessive resource usage on jobs that deal with large numbers of models.
不重複 Job
若要使用不重複任務,則需要使用支援 [Atomic Lock] 的快取 Driver。目前,memcached
、redis
、dynamodb
、database
、file
、array
等快取 Driver 有支援 Atomic Lock。此外,不重複任務的條件限制不會被套用到批次任務中的人物上。
有時候,我們可能會想確保某個任務在佇列中一次只能有一個實體。我們可以在 Job 類別上實作 ShouldBeUnique
介面來確保一次只執行一個實體。要實作這個介面,我們需要在 Class 上定義幾個額外的方法:
1<?php23use Illuminate\Contracts\Queue\ShouldQueue;4use Illuminate\Contracts\Queue\ShouldBeUnique;56class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique7{8 ...9}
1<?php23use Illuminate\Contracts\Queue\ShouldQueue;4use Illuminate\Contracts\Queue\ShouldBeUnique;56class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique7{8 ...9}
在上述範例中,UpdateSearchIndex
Job 是不重複的。所以,若佇列中已經有該 Job 的另一個實體且尚未執行完畢,就不會再次分派該 Job。
在某些情況下,我們可能會想指定要用來判斷 Job 是否重複的「索引鍵」,或是我們可能會想指定一個逾時時間,讓這個 Job 在執行超過該逾時後就不再判斷是否重複。為此,可在 Job 類別上定義 uniqueId
與 uniqueFor
屬性或方法:
1<?php23use App\Models\Product;4use Illuminate\Contracts\Queue\ShouldQueue;5use Illuminate\Contracts\Queue\ShouldBeUnique;67class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique8{9 /**10 * The product instance.11 *12 * @var \App\Product13 */14 public $product;1516 /**17 * The number of seconds after which the job's unique lock will be released.18 *19 * @var int20 */21 public $uniqueFor = 3600;2223 /**24 * Get the unique ID for the job.25 */26 public function uniqueId(): string27 {28 return $this->product->id;29 }30}
1<?php23use App\Models\Product;4use Illuminate\Contracts\Queue\ShouldQueue;5use Illuminate\Contracts\Queue\ShouldBeUnique;67class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique8{9 /**10 * The product instance.11 *12 * @var \App\Product13 */14 public $product;1516 /**17 * The number of seconds after which the job's unique lock will be released.18 *19 * @var int20 */21 public $uniqueFor = 3600;2223 /**24 * Get the unique ID for the job.25 */26 public function uniqueId(): string27 {28 return $this->product->id;29 }30}
在上述範例中,UpdateSearchIndex
Job 使用 Product ID 來判斷是否重複。因此,若新分派的 Job 有相同的 Product ID,則直到現存 Job 執行完畢前,這個 Job 都會被忽略。此外,若現有的 Job 在一個小時內都未被處理,這個不重複鎖定會被解除,之後若有另一個具相同重複索引鍵的 Job 將可被分派進佇列中。
若專案會在多個 Web Server 或 Container 上分派任務,則請確保所有的這些 Server 都使用相同的中央 Cache Server,好讓 Laravel 可精準判斷該 Job 是否不重複。
在開始處理 Job 後仍維持讓 Job 不重複
預設情況下,不重複的 Job 會在執行完成或所有嘗試都失敗後「解除鎖定」。不過,有的情況下,我們可能會想在執行完成前就先解除鎖定 Job。為此,不要在該 Job 上實作 ShouldBeUnique
,而是實作 ShouldBeUniqueUntillProcessing
Contract:
1<?php23use App\Models\Product;4use Illuminate\Contracts\Queue\ShouldQueue;5use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;67class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing8{9 // ...10}
1<?php23use App\Models\Product;4use Illuminate\Contracts\Queue\ShouldQueue;5use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;67class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing8{9 // ...10}
不重複 Job 的鎖定
當分派 ShouldBeUnique
時,Laravel 會在幕後使用 uniqueId
索引鍵來取得一個 Lock。若未能取得 Lock,就不會分派該 Job。當 Job 完成處理或所有嘗試都失敗後,就會解除該 Lock。預設情況下,Laravel 會使用預設的快取 Driver 來取得該 Lock。不過,若想使用其他 Driver 來取得 Lock,可定義一個 uniqueVia
方法,並在該方法中回傳要使用的快取 Driver:
1use Illuminate\Contracts\Cache\Repository;2use Illuminate\Support\Facades\Cache;34class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique5{6 ...78 /**9 * Get the cache driver for the unique job lock.10 */11 public function uniqueVia(): Repository12 {13 return Cache::driver('redis');14 }15}
1use Illuminate\Contracts\Cache\Repository;2use Illuminate\Support\Facades\Cache;34class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique5{6 ...78 /**9 * Get the cache driver for the unique job lock.10 */11 public function uniqueVia(): Repository12 {13 return Cache::driver('redis');14 }15}
若想限制某個 Job 可同時執行的數量,請使用 WithoutOverlapping
Job Middleware 而不是使用 Unique Job。
加密的 Job
Laravel allows you to ensure the privacy and integrity of a job's data via encryption. To get started, simply add the ShouldBeEncrypted
interface to the job class. Once this interface has been added to the class, Laravel will automatically encrypt your job before pushing it onto a queue:
1<?php23use Illuminate\Contracts\Queue\ShouldBeEncrypted;4use Illuminate\Contracts\Queue\ShouldQueue;56class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted7{8 // ...9}
1<?php23use Illuminate\Contracts\Queue\ShouldBeEncrypted;4use Illuminate\Contracts\Queue\ShouldQueue;56class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted7{8 // ...9}
Job Middleware
使用 Job Middleware 就能讓我們將佇列 Job 包裝在一組自定邏輯內執行,讓我們能減少在各個 Job 內撰寫重複的程式碼。舉例來說,假設有下列這個 handle
方法,該方法會使用 Laravel 的 Redis 頻率限制功能,限制每 5 秒只能處理 1 個 Job:
1use Illuminate\Support\Facades\Redis;23/**4 * Execute the job.5 */6public function handle(): void7{8 Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {9 info('Lock obtained...');1011 // Handle job...12 }, function () {13 // Could not obtain lock...1415 return $this->release(5);16 });17}
1use Illuminate\Support\Facades\Redis;23/**4 * Execute the job.5 */6public function handle(): void7{8 Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {9 info('Lock obtained...');1011 // Handle job...12 }, function () {13 // Could not obtain lock...1415 return $this->release(5);16 });17}
雖然我們確實可以這樣寫,但這樣一來 handle
方法的實作就變得很亂,因為我們的程式碼跟 Redis 頻率限制的邏輯混在一起了。此外,這樣的頻率限制邏輯一定也會與其他我們想要作頻率限制的 Job 重複。
我們可以定義一個 Job Middleware 來處理頻率限制,而不用在 handle 方法內處理。Laravel 中沒有預設放置 Job Middleware 的地方,因此我們可以隨意在專案內放置這些 Job Middleware。舉例來說,我們可以把 Middleware 放在 app/Jobs/Middleware
目錄下:
1<?php23namespace App\Jobs\Middleware;45use Closure;6use Illuminate\Support\Facades\Redis;78class RateLimited9{10 /**11 * Process the queued job.12 *13 * @param \Closure(object): void $next14 */15 public function handle(object $job, Closure $next): void16 {17 Redis::throttle('key')18 ->block(0)->allow(1)->every(5)19 ->then(function () use ($job, $next) {20 // Lock obtained...2122 $next($job);23 }, function () use ($job) {24 // Could not obtain lock...2526 $job->release(5);27 });28 }29}
1<?php23namespace App\Jobs\Middleware;45use Closure;6use Illuminate\Support\Facades\Redis;78class RateLimited9{10 /**11 * Process the queued job.12 *13 * @param \Closure(object): void $next14 */15 public function handle(object $job, Closure $next): void16 {17 Redis::throttle('key')18 ->block(0)->allow(1)->every(5)19 ->then(function () use ($job, $next) {20 // Lock obtained...2122 $next($job);23 }, function () use ($job) {24 // Could not obtain lock...2526 $job->release(5);27 });28 }29}
就像這樣,跟 Route Middleware 很像,Job Middleware 會收到正在處理的 Job,以及要繼續執行 Job 時要叫用的回呼。
建立好 Job Middleware 後,我們就可以在 Job 的 middleware
方法內將這個 Middleware 附加上去了。make:job
產生的空 Job 不包含 middleware
方法,所以我們需要手動在 Job 類別中新增這個方法:
1use App\Jobs\Middleware\RateLimited;23/**4 * Get the middleware the job should pass through.5 *6 * @return array<int, object>7 */8public function middleware(): array9{10 return [new RateLimited];11}
1use App\Jobs\Middleware\RateLimited;23/**4 * Get the middleware the job should pass through.5 *6 * @return array<int, object>7 */8public function middleware(): array9{10 return [new RateLimited];11}
Job Middleware 也可以被指派給可放入佇列的 Event Listener、Mailable、Notification 等。
頻率限制
雖然我們已經示範了要如何自行撰寫頻率限制的 Job Middleware。不過,其實 Laravel 有內建用來為 Job 做頻率限制的 Middleware。就跟 Route 的 Rate Limiter 一樣,可以使用 RateLimiter
Facade 的 for
方法來定義 Job 的頻率限制。
舉例來說,我們可能會想讓使用者能備份資料,而一般的使用者限制為每小時可備份一次,VIP 使用者則不限次數。若要做這種頻率限制,可以在 AppServiceProvider
中的 boot
方法內定義一個 RateLimiter
:
1use Illuminate\Cache\RateLimiting\Limit;2use Illuminate\Support\Facades\RateLimiter;34/**5 * Bootstrap any application services.6 */7public function boot(): void8{9 RateLimiter::for('backups', function (object $job) {10 return $job->user->vipCustomer()11 ? Limit::none()12 : Limit::perHour(1)->by($job->user->id);13 });14}
1use Illuminate\Cache\RateLimiting\Limit;2use Illuminate\Support\Facades\RateLimiter;34/**5 * Bootstrap any application services.6 */7public function boot(): void8{9 RateLimiter::for('backups', function (object $job) {10 return $job->user->vipCustomer()11 ? Limit::none()12 : Limit::perHour(1)->by($job->user->id);13 });14}
在上述範例中,我們定義了一個每小時的頻率限制。除了以小時來定義頻率限制外,也可以使用 perMinute
方法來以分鐘定義頻率限制。此外,我們也可以傳入任意值給頻率限制的 by
方法。傳給 by
的值通常是用來區分不同使用者的:
1return Limit::perMinute(50)->by($job->user->id);
1return Limit::perMinute(50)->by($job->user->id);
定義好頻率限制後,我們就可以使用 Illuminate\Queue\Middleware\RateLimited
Middleware 來將這個 Rate Limiter 附加到 Job 上。每當這個 Job 超過頻率限制後,這個 Middleware 就會依照頻率限制的間隔,使用適當的延遲時間來將該 Job 放回到佇列中。
1use Illuminate\Queue\Middleware\RateLimited;23/**4 * Get the middleware the job should pass through.5 *6 * @return array<int, object>7 */8public function middleware(): array9{10 return [new RateLimited('backups')];11}
1use Illuminate\Queue\Middleware\RateLimited;23/**4 * Get the middleware the job should pass through.5 *6 * @return array<int, object>7 */8public function middleware(): array9{10 return [new RateLimited('backups')];11}
將受頻率限制的 Job 放回佇列後,一樣會增加 Job 的 attemps
總數。若有需要可以在 Job 類別上適當地設定 tries
與 maxExceptions
屬性。或者,也可以使用 retryUntil
方法 來定義不再重新嘗試 Job 的時間。
若不想讓 Job 在遇到頻率限制後重新嘗試,可使用 dontRelease
方法:
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new RateLimited('backups'))->dontRelease()];9}
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new RateLimited('backups'))->dontRelease()];9}
若使用 Redis,可使用 Illuminate\Queue\Middleware\RateLimitedWithRedis
Middleware。這個 Middleware 有為 Redis 做最佳化,比起一般基礎的頻率限制 Middleware 來說會更有效率。
避免 Job 重疊
Laravel 隨附了一個 Illuminate\Queue\Middleware\WithoutOverlapping
Middleware,可讓我們依照任意索引鍵來避免 Job 重疊。使用這個 Middleware 就能避免同一個資源同時被多個佇列 Job 修改。
舉例來說,假設我們有個佇列任務會負責更新使用者的信用分數,而我們想避免兩個更新相同 User ID 的信用分數 Job 重疊。為此,可在 Job 的 middleware
方法中回傳 WithoutOverlapping
Middleware:
1use Illuminate\Queue\Middleware\WithoutOverlapping;23/**4 * Get the middleware the job should pass through.5 *6 * @return array<int, object>7 */8public function middleware(): array9{10 return [new WithoutOverlapping($this->user->id)];11}
1use Illuminate\Queue\Middleware\WithoutOverlapping;23/**4 * Get the middleware the job should pass through.5 *6 * @return array<int, object>7 */8public function middleware(): array9{10 return [new WithoutOverlapping($this->user->id)];11}
每當有重疊的 Job,這些 Job 都會被重新放到佇列中。可以指定這些被重新放回佇列的 Job 在重新嘗試前必須等待多久的秒數:
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];9}
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];9}
若想在 Job 重疊時馬上刪除這些重疊的 Job 來讓這些 Job 不被重試,請使用 dontRelease
方法:
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new WithoutOverlapping($this->order->id))->dontRelease()];9}
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new WithoutOverlapping($this->order->id))->dontRelease()];9}
WithoutOverlapping
Middleware 使用 Laravel 的 Atomic Lock 功能提供。有時候,Job 可能會未預期地失敗或逾時,並可能未正確釋放 Lock。因此,我們可以使用 expireAfter
方法來顯式定義一個 Lock 的有效時間。舉例來說,下列範例會讓 Laravel 在 Job 開始處理的 3 分鐘後釋放 WithoutOverlapping
Lock:
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];9}
1/**2 * Get the middleware the job should pass through.3 *4 * @return array<int, object>5 */6public function middleware(): array7{8 return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];9}
若要使用 WithoutOverlapping
Middleware,則需要使用支援 [Atomic Lock] 的快取 Driver。目前,memcached
、redis
、dynamodb
、database
、file
、array
等快取 Driver 有支援 Atomic Lock。
在多個 Job 類別中共用 Lock 索引鍵
預設情況下,WithoutOverlapping
Middleware 只會防止相同類別的重疊 Job。因此,即使某兩個不同的 Job 可能使用相同的 Lock 索引鍵,仍然無法防止 Job 重疊。不過,可以使用 shared
方法來指定讓 Laravel 在多個 Job 類別間套用同樣的索引鍵:
1use Illuminate\Queue\Middleware\WithoutOverlapping;23class ProviderIsDown4{5 // ...67 public function middleware(): array8 {9 return [10 (new WithoutOverlapping("status:{$this->provider}"))->shared(),11 ];12 }13}1415class ProviderIsUp16{17 // ...1819 public function middleware(): array20 {21 return [22 (new WithoutOverlapping("status:{$this->provider}"))->shared(),23 ];24 }25}
1use Illuminate\Queue\Middleware\WithoutOverlapping;23class ProviderIsDown4{5 // ...67 public function middleware(): array8 {9 return [10 (new WithoutOverlapping("status:{$this->provider}"))->shared(),11 ];12 }13}1415class ProviderIsUp16{17 // ...1819 public function middleware(): array20 {21 return [22 (new WithoutOverlapping("status:{$this->provider}"))->shared(),23 ];24 }25}
頻率限制的 Exception
Laravel 中隨附了一個 Illuminate\Queue\Middleware\ThrottlesExceptions
Middleware,能讓我們針對 Exception 做頻率限制。每當有 Job 擲回特定數量的 Exception 時,接下來要再次嘗試執行該 Job 前,必須要等待特定的時間過後才能繼續。對於一些使用了不穩定第三方服務的 Job 來說,特別適合使用這個功能。
舉例來說,假設我們有個使用了第三方 API 的佇列 Job,而這個 Job 會擲回 Exception。若要對 Exception 做頻率限制,可以在 Job 的 middleware
方法內回傳 ThrottlesExceptions
Middleware。一般來說,這個 Middleware 應放在實作基於時間的 attempts之 Job 內:
1use DateTime;2use Illuminate\Queue\Middleware\ThrottlesExceptions;34/**5 * Get the middleware the job should pass through.6 *7 * @return array<int, object>8 */9public function middleware(): array10{11 return [new ThrottlesExceptions(10, 5 * 60)];12}1314/**15 * Determine the time at which the job should timeout.16 */17public function retryUntil(): DateTime18{19 return now()->addMinutes(30);20}
1use DateTime;2use Illuminate\Queue\Middleware\ThrottlesExceptions;34/**5 * Get the middleware the job should pass through.6 *7 * @return array<int, object>8 */9public function middleware(): array10{11 return [new ThrottlesExceptions(10, 5 * 60)];12}1314/**15 * Determine the time at which the job should timeout.16 */17public function retryUntil(): DateTime18{19 return now()->addMinutes(30);20}
The first constructor argument accepted by the middleware is the number of exceptions the job can throw before being throttled, while the second constructor argument is the number of seconds that should elapse before the job is attempted again once it has been throttled. In the code example above, if the job throws 10 consecutive exceptions, we will wait 5 minutes before attempting the job again, constrained by the 30-minute time limit.
當 Job 擲回 Exception,但還未達到所設定的 Exception 閥值,則一般情況下會馬上重試 Job。不過,也可以在講 Middleware 附加到 Job 上時呼叫 backoff
方法來指定一個以分鐘為單位的數字,來指定 Job 所要延遲的時間:
1use Illuminate\Queue\Middleware\ThrottlesExceptions;23/**4 * Get the middleware the job should pass through.5 *6 * @return array<int, object>7 */8public function middleware(): array9{10 return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];11}
1use Illuminate\Queue\Middleware\ThrottlesExceptions;23/**4 * Get the middleware the job should pass through.5 *6 * @return array<int, object>7 */8public function middleware(): array9{10 return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];11}
這個 Middleware 在內部使用了 Laravel 的快取系統來實作頻率限制,並使用了該 Job 的類別名稱來作為快取的「索引鍵」。可以在講 Middleware 附加到 Job 上時呼叫 by
方法來複寫這個索引鍵。當有多個 Job 都使用了同一個第三方服務時,就很適合使用這個方法來讓這些 Job 都共用相同的頻率限制:
1use Illuminate\Queue\Middleware\ThrottlesExceptions;23/**4 * Get the middleware the job should pass through.5 *6 * @return array<int, object>7 */8public function middleware(): array9{10 return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];11}
1use Illuminate\Queue\Middleware\ThrottlesExceptions;23/**4 * Get the middleware the job should pass through.5 *6 * @return array<int, object>7 */8public function middleware(): array9{10 return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];11}
By default, this middleware will throttle every exception. You can modify this behaviour by invoking the when
method when attaching the middleware to your job. The exception will then only be throttled if closure provided to the when
method returns true
:
1use Illuminate\Http\Client\HttpClientException;2use Illuminate\Queue\Middleware\ThrottlesExceptions;34/**5 * Get the middleware the job should pass through.6 *7 * @return array<int, object>8 */9public function middleware(): array10{11 return [(new ThrottlesExceptions(10, 10 * 60))->when(12 fn (Throwable $throwable) => $throwable instanceof HttpClientException13 )];14}
1use Illuminate\Http\Client\HttpClientException;2use Illuminate\Queue\Middleware\ThrottlesExceptions;34/**5 * Get the middleware the job should pass through.6 *7 * @return array<int, object>8 */9public function middleware(): array10{11 return [(new ThrottlesExceptions(10, 10 * 60))->when(12 fn (Throwable $throwable) => $throwable instanceof HttpClientException13 )];14}
If you would like to have the throttled exceptions reported to your application's exception handler, you can do so by invoking the report
method when attaching the middleware to your job. Optionally, you may provide a closure to the report
method and the exception will only be reported if the given closure returns true
:
1use Illuminate\Http\Client\HttpClientException;2use Illuminate\Queue\Middleware\ThrottlesExceptions;34/**5 * Get the middleware the job should pass through.6 *7 * @return array<int, object>8 */9public function middleware(): array10{11 return [(new ThrottlesExceptions(10, 10 * 60))->report(12 fn (Throwable $throwable) => $throwable instanceof HttpClientException13 )];14}
1use Illuminate\Http\Client\HttpClientException;2use Illuminate\Queue\Middleware\ThrottlesExceptions;34/**5 * Get the middleware the job should pass through.6 *7 * @return array<int, object>8 */9public function middleware(): array10{11 return [(new ThrottlesExceptions(10, 10 * 60))->report(12 fn (Throwable $throwable) => $throwable instanceof HttpClientException13 )];14}
若使用 Redis,可使用 Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis
Middleware。該 Middleware 有為 Redis 最佳化,因此會比一般的 Exception 頻率限制 Middleware 還要有效率。
Skipping Jobs
The Skip
middleware allows you to specify that a job should be skipped / deleted without needing to modify the job's logic. The Skip::when
method will delete the job if the given condition evaluates to true
, while the Skip::unless
method will delete the job if the condition evaluates to false
:
1use Illuminate\Queue\Middleware\Skip;23/**4* Get the middleware the job should pass through.5*/6public function middleware(): array7{8 return [9 Skip::when($someCondition),10 ];11}
1use Illuminate\Queue\Middleware\Skip;23/**4* Get the middleware the job should pass through.5*/6public function middleware(): array7{8 return [9 Skip::when($someCondition),10 ];11}
You can also pass a Closure
to the when
and unless
methods for more complex conditional evaluation:
1use Illuminate\Queue\Middleware\Skip;23/**4* Get the middleware the job should pass through.5*/6public function middleware(): array7{8 return [9 Skip::when(function (): bool {10 return $this->shouldSkip();11 }),12 ];13}
1use Illuminate\Queue\Middleware\Skip;23/**4* Get the middleware the job should pass through.5*/6public function middleware(): array7{8 return [9 Skip::when(function (): bool {10 return $this->shouldSkip();11 }),12 ];13}
分派 Job
寫好 Job 類別後,就可以使用 Job 上的 dispatch
方法來分派該 Job。傳給 dispatch
方法的引數會被傳給 Job 的 Constructor:
1<?php23namespace App\Http\Controllers;45use App\Http\Controllers\Controller;6use App\Jobs\ProcessPodcast;7use App\Models\Podcast;8use Illuminate\Http\RedirectResponse;9use Illuminate\Http\Request;1011class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);1920 // ...2122 ProcessPodcast::dispatch($podcast);2324 return redirect('/podcasts');25 }26}
1<?php23namespace App\Http\Controllers;45use App\Http\Controllers\Controller;6use App\Jobs\ProcessPodcast;7use App\Models\Podcast;8use Illuminate\Http\RedirectResponse;9use Illuminate\Http\Request;1011class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);1920 // ...2122 ProcessPodcast::dispatch($podcast);2324 return redirect('/podcasts');25 }26}
若想要有條件地分派 Job,可使用 dispatchIf
與
dispatchUnless` 方法:
1ProcessPodcast::dispatchIf($accountActive, $podcast);23ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
1ProcessPodcast::dispatchIf($accountActive, $podcast);23ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
在新的 Laravel 專案中,預設的 Queue Driver 是 sync
Driver。該 Driver 會在目前 Request 中的前景 (Foreground) 同步執行 Job。若想要讓 Job 被真正放進佇列中在背景執行,你需要在專案的 config/queue.php
設定檔中指定一個不同的 Queue Driver。
延遲分派
若不想讓 Job 馬上被 Queue Worker 處理,可在分派 Job 時使用 delay
方法。舉例來說,我們來指定讓一個 Job 在分派的 10 分鐘後才被開始處理:
1<?php23namespace App\Http\Controllers;45use App\Http\Controllers\Controller;6use App\Jobs\ProcessPodcast;7use App\Models\Podcast;8use Illuminate\Http\RedirectResponse;9use Illuminate\Http\Request;1011class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);1920 // ...2122 ProcessPodcast::dispatch($podcast)23 ->delay(now()->addMinutes(10));2425 return redirect('/podcasts');26 }27}
1<?php23namespace App\Http\Controllers;45use App\Http\Controllers\Controller;6use App\Jobs\ProcessPodcast;7use App\Models\Podcast;8use Illuminate\Http\RedirectResponse;9use Illuminate\Http\Request;1011class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);1920 // ...2122 ProcessPodcast::dispatch($podcast)23 ->delay(now()->addMinutes(10));2425 return redirect('/podcasts');26 }27}
In some cases, jobs may have a default delay configured. If you need to bypass this delay and dispatch a job for immediate processing, you may use the withoutDelay
method:
1ProcessPodcast::dispatch($podcast)->withoutDelay();
1ProcessPodcast::dispatch($podcast)->withoutDelay();
Amazon SQS 佇列服務的延遲時間最多只能為 15 分鐘。
Dispatching After the Response is Sent to the Browser
如果你的網頁伺服器使用 FastCGI,則 dispatchAfterResponse
是另一個分派 Job 的方法。該方法會延遲分派 Job,直到 HTTP Response 被傳回使用者瀏覽器後才開始處理Job。這樣一來,在處理佇列 Job 的同時,使用者就能繼續使用我們的網站。一般來說,這種做法應只用於一些只需花費 1 秒鐘的 Job,如寄送 E-Mail 等。由於這些 Job 會在目前的 HTTP Request 中處理,因此使用這種方式分派 Job 就不需要執行 Queue Worker:
1use App\Jobs\SendNotification;23SendNotification::dispatchAfterResponse();
1use App\Jobs\SendNotification;23SendNotification::dispatchAfterResponse();
也可以用 dispatch
分派一個閉包,然後在 dispatch
輔助函式後串上一個 afterResponse
方法來在 HTTP Response 被傳送給瀏覽器後執行這個閉包:
1use App\Mail\WelcomeMessage;2use Illuminate\Support\Facades\Mail;34dispatch(function () {6})->afterResponse();
1use App\Mail\WelcomeMessage;2use Illuminate\Support\Facades\Mail;34dispatch(function () {6})->afterResponse();
同步分派
若想馬上分派 Job (即,同步),則可使用 dispatchSync
方法。在使用這個方法時,所分派的 Job 不會被放入佇列,而會在目前的處理程序中馬上執行:
1<?php23namespace App\Http\Controllers;45use App\Http\Controllers\Controller;6use App\Jobs\ProcessPodcast;7use App\Models\Podcast;8use Illuminate\Http\RedirectResponse;9use Illuminate\Http\Request;1011class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);1920 // Create podcast...2122 ProcessPodcast::dispatchSync($podcast);2324 return redirect('/podcasts');25 }26}
1<?php23namespace App\Http\Controllers;45use App\Http\Controllers\Controller;6use App\Jobs\ProcessPodcast;7use App\Models\Podcast;8use Illuminate\Http\RedirectResponse;9use Illuminate\Http\Request;1011class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);1920 // Create podcast...2122 ProcessPodcast::dispatchSync($podcast);2324 return redirect('/podcasts');25 }26}
Job 與資料庫 Transaction
雖然,在資料庫 Transaction 中分派 Job 是完全 OK 的,但應特別注意 Job 能否被正確執行。當我們在 Transaction 中分派 Job 後,這個 Job 很有可能會在上層 Transaction 被 Commit 前就被 Queue Worker 給執行了。這時候,我們在 Transaction 中對 Model 或資料庫記錄所做出的更改都還未反應到資料庫上。而且,在 Transaction 中做建立的 Model 或資料庫記錄也可能還未出現在資料庫中。
幸好,Laravel 提供了數種方法可解決這個狀況。第一種方法,我們可以在 Queue 連線的設定陣列中設定 after_commit
連線選項:
1'redis' => [2 'driver' => 'redis',3 // ...4 'after_commit' => true,5],
1'redis' => [2 'driver' => 'redis',3 // ...4 'after_commit' => true,5],
當 after_commit
設為 true
後,我們就可以在資料庫 Transaction 中分派 Job 了。Laravel 會等到未完成的上層資料庫 Transaction 都被 Commit 後才將 Job 分派出去。不過,當然,若目前沒有正在處理的資料庫 Transaction,這個 Job 會馬上被分派。
若因為 Transaction 中發上 Exception 而造成 Transaction 被 Roll Back,則在這個 Transaction 間所分派的 Job 也會被取消。
將 after_commit
設定選項設為 true
後,所有放入佇列的 Listener、Maillable、Notification、廣播事件……等都會等待到所有資料庫 Transaciton 都 Commit 後才被分派。
內嵌指定 Commit 的分派行為
若未將 after_commit
佇列連線選項設為 true
,則我們還是可以指定讓某個特定的 Job 在所有已開啟的資料庫 Transaction 都被 Commit 後才被分派。若要這麼做,可在分派動作後串上 afterCommit
方法:
1use App\Jobs\ProcessPodcast;23ProcessPodcast::dispatch($podcast)->afterCommit();
1use App\Jobs\ProcessPodcast;23ProcessPodcast::dispatch($podcast)->afterCommit();
同樣地,若 after_commit
選項為 true
,則我們也可以馬上分派某個特定的 Job,而不等待資料庫 Transaction 的 Commit:
1ProcessPodcast::dispatch($podcast)->beforeCommit();
1ProcessPodcast::dispatch($podcast)->beforeCommit();
Job 的串聯
通過 Job 串聯,我們就可以指定一組佇列 Job 的清單,在主要 Job 執行成功後才依序執行這組 Job。若按照順序執行的其中一個 Job 執行失敗,則剩下的 Job 都將不被執行。若要執行佇列的 Job 串聯,可使用 Bus
Facade 中的 chain
方法。Laravel 的 Command Bus 是一個低階的原件,佇列 Job 的分派功能就是使用這個原件製作的:
1use App\Jobs\OptimizePodcast;2use App\Jobs\ProcessPodcast;3use App\Jobs\ReleasePodcast;4use Illuminate\Support\Facades\Bus;56Bus::chain([7 new ProcessPodcast,8 new OptimizePodcast,9 new ReleasePodcast,10])->dispatch();
1use App\Jobs\OptimizePodcast;2use App\Jobs\ProcessPodcast;3use App\Jobs\ReleasePodcast;4use Illuminate\Support\Facades\Bus;56Bus::chain([7 new ProcessPodcast,8 new OptimizePodcast,9 new ReleasePodcast,10])->dispatch();
除了串聯 Job 類別實體,我們也可以串聯閉包:
1Bus::chain([2 new ProcessPodcast,3 new OptimizePodcast,4 function () {5 Podcast::update(/* ... */);6 },7])->dispatch();
1Bus::chain([2 new ProcessPodcast,3 new OptimizePodcast,4 function () {5 Podcast::update(/* ... */);6 },7])->dispatch();
在 Job 中使用 $this->delete()
方法來刪除 Job 是沒有辦法讓串聯的 Job 不被執行的。只有當串聯中的 Job 失敗時才會停止執行。
Chain Connection and Queue
若想指定串聯 Job 的連線與佇列,則可使用 onConnection
與 onQueue
方法。除非佇列 Job 有特別指定不同的連線或佇列,否則,這些方法可用來指定要使用的連線名稱與佇列名稱:
1Bus::chain([2 new ProcessPodcast,3 new OptimizePodcast,4 new ReleasePodcast,5])->onConnection('redis')->onQueue('podcasts')->dispatch();
1Bus::chain([2 new ProcessPodcast,3 new OptimizePodcast,4 new ReleasePodcast,5])->onConnection('redis')->onQueue('podcasts')->dispatch();
Adding Jobs to the Chain
Occasionally, you may need to prepend or append a job to an existing job chain from within another job in that chain. You may accomplish this using the prependToChain
and appendToChain
methods:
1/**2 * Execute the job.3 */4public function handle(): void5{6 // ...78 // Prepend to the current chain, run job immediately after current job...9 $this->prependToChain(new TranscribePodcast);1011 // Append to the current chain, run job at end of chain...12 $this->appendToChain(new TranscribePodcast);13}
1/**2 * Execute the job.3 */4public function handle(): void5{6 // ...78 // Prepend to the current chain, run job immediately after current job...9 $this->prependToChain(new TranscribePodcast);1011 // Append to the current chain, run job at end of chain...12 $this->appendToChain(new TranscribePodcast);13}
串聯失敗
將 Job 串聯起來後,可使用 catch
方法來指定當串聯中有 Job 失敗時要被叫用的閉包。給定的回呼會收到一個導致 Job 失敗的 Throwable
實體:
1use Illuminate\Support\Facades\Bus;2use Throwable;34Bus::chain([5 new ProcessPodcast,6 new OptimizePodcast,7 new ReleasePodcast,8])->catch(function (Throwable $e) {9 // A job within the chain has failed...10})->dispatch();
1use Illuminate\Support\Facades\Bus;2use Throwable;34Bus::chain([5 new ProcessPodcast,6 new OptimizePodcast,7 new ReleasePodcast,8])->catch(function (Throwable $e) {9 // A job within the chain has failed...10})->dispatch();
由於串聯的回呼會被序列化並在稍後由 Laravel 的佇列執行,因此請不要在串聯的回呼中使用 $this
變數。
Customizing the Queue and Connection
Dispatching to a Particular Queue
我們可以將 Job 分門別類放入不同的佇列中,進而分類管理這些 Job,甚至能針對不同佇列設定優先度、指定要有多少個 Worker。不過請記得,放入不同佇列不會將 Job 推送到佇列設定檔中所定義的不同佇列「連線」上,而只會將 Job 推入單一連線中指定的佇列。若要指定佇列,請在分派 Job 時使用 onQueue
方法:
1<?php23namespace App\Http\Controllers;45use App\Http\Controllers\Controller;6use App\Jobs\ProcessPodcast;7use App\Models\Podcast;8use Illuminate\Http\RedirectResponse;9use Illuminate\Http\Request;1011class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);1920 // Create podcast...2122 ProcessPodcast::dispatch($podcast)->onQueue('processing');2324 return redirect('/podcasts');25 }26}
1<?php23namespace App\Http\Controllers;45use App\Http\Controllers\Controller;6use App\Jobs\ProcessPodcast;7use App\Models\Podcast;8use Illuminate\Http\RedirectResponse;9use Illuminate\Http\Request;1011class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);1920 // Create podcast...2122 ProcessPodcast::dispatch($podcast)->onQueue('processing');2324 return redirect('/podcasts');25 }26}
或者,也可以在 Job 的 Constructor 中呼叫 onQueue
方法來指定 Job 的佇列:
1<?php23namespace App\Jobs;45 use Illuminate\Contracts\Queue\ShouldQueue;6 use Illuminate\Foundation\Queue\Queueable;78class ProcessPodcast implements ShouldQueue9{10 use Queueable;1112 /**13 * Create a new job instance.14 */15 public function __construct()16 {17 $this->onQueue('processing');18 }19}
1<?php23namespace App\Jobs;45 use Illuminate\Contracts\Queue\ShouldQueue;6 use Illuminate\Foundation\Queue\Queueable;78class ProcessPodcast implements ShouldQueue9{10 use Queueable;1112 /**13 * Create a new job instance.14 */15 public function __construct()16 {17 $this->onQueue('processing');18 }19}
Dispatching to a Particular Connection
若專案有使用到多個佇列連線,則可以使用 onConnection
方法來指定要將 Job 推送到哪個連線:
1<?php23namespace App\Http\Controllers;45use App\Http\Controllers\Controller;6use App\Jobs\ProcessPodcast;7use App\Models\Podcast;8use Illuminate\Http\RedirectResponse;9use Illuminate\Http\Request;1011class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);1920 // Create podcast...2122 ProcessPodcast::dispatch($podcast)->onConnection('sqs');2324 return redirect('/podcasts');25 }26}
1<?php23namespace App\Http\Controllers;45use App\Http\Controllers\Controller;6use App\Jobs\ProcessPodcast;7use App\Models\Podcast;8use Illuminate\Http\RedirectResponse;9use Illuminate\Http\Request;1011class PodcastController extends Controller12{13 /**14 * Store a new podcast.15 */16 public function store(Request $request): RedirectResponse17 {18 $podcast = Podcast::create(/* ... */);1920 // Create podcast...2122 ProcessPodcast::dispatch($podcast)->onConnection('sqs');2324 return redirect('/podcasts');25 }26}
也可以將 onConnection
與 onQueue
方法串聯在一起來指定 Job 的連線與佇列:
1ProcessPodcast::dispatch($podcast)2 ->onConnection('sqs')3 ->onQueue('processing');
1ProcessPodcast::dispatch($podcast)2 ->onConnection('sqs')3 ->onQueue('processing');
或者,也可以在 Job 的 Constructor 中呼叫 onConnection
來指定 Job 的連線:
1<?php23namespace App\Jobs;45 use Illuminate\Contracts\Queue\ShouldQueue;6 use Illuminate\Foundation\Queue\Queueable;78class ProcessPodcast implements ShouldQueue9{10 use Queueable;1112 /**13 * Create a new job instance.14 */15 public function __construct()16 {17 $this->onConnection('sqs');18 }19}
1<?php23namespace App\Jobs;45 use Illuminate\Contracts\Queue\ShouldQueue;6 use Illuminate\Foundation\Queue\Queueable;78class ProcessPodcast implements ShouldQueue9{10 use Queueable;1112 /**13 * Create a new job instance.14 */15 public function __construct()16 {17 $this->onConnection('sqs');18 }19}
指定最大嘗試次數與逾時
最大嘗試次數
若有某個佇列 Job 遇到錯誤,我們通常不會想讓這個 Job 一直重試。因此,Laravel 提供了多種定義 Job 重試次數的方法。
其中一種指定 Job 最大嘗試次數的方法是在 Artisan 指令列中使用 --tries
開關。使用這種方式指定的嘗試次數會套用到所有該 Worker 處理的 Job,除非 Job 上有指定嘗試次數:
1php artisan queue:work --tries=3
1php artisan queue:work --tries=3
若 Job 嘗試了最大嘗試次數,則這個 Job 會被視為是「執行失敗」。更多有關處理執行失敗 Job 的資訊,請參考 執行失敗 Job 的說明文件。若提供 --tries=0
給 queue:work
指令,則失敗的 Job 會被無限次數重試。
也可以用另一種更仔細的方法,就是在 Job 類別內定義這個 Job 的最大嘗試次數。若有在 Job 中指定最大嘗試次數,定義在 Job 類別內的次數會比指令列中 --tries
的值擁有更高的優先度:
1<?php23namespace App\Jobs;45class ProcessPodcast implements ShouldQueue6{7 /**8 * The number of times the job may be attempted.9 *10 * @var int11 */12 public $tries = 5;13}
1<?php23namespace App\Jobs;45class ProcessPodcast implements ShouldQueue6{7 /**8 * The number of times the job may be attempted.9 *10 * @var int11 */12 public $tries = 5;13}
If you need dynamic control over a particular job's maximum attempts, you may define a tries
method on the job:
1/**2 * Determine number of times the job may be attempted.3 */4public function tries(): int5{6 return 5;7}
1/**2 * Determine number of times the job may be attempted.3 */4public function tries(): int5{6 return 5;7}
基於時間的嘗試限制
除了定義 Job 重試多少次要視為失敗以外,也可以限制 Job 嘗試執行的時間長度。這樣一來,在指定的時間範圍內,Job 就可以不斷重試。若要定義最長可重試時間,請在 Job 類別中定義一個 retryUntil
方法。該方法應回傳 DateTime
實體:
1use DateTime;23/**4 * Determine the time at which the job should timeout.5 */6public function retryUntil(): DateTime7{8 return now()->addMinutes(10);9}
1use DateTime;23/**4 * Determine the time at which the job should timeout.5 */6public function retryUntil(): DateTime7{8 return now()->addMinutes(10);9}
也可以在放入佇列的 Event Listener 中定義一個 tries
屬性或 retryUntil
方法。
最大 Exception 數
有時候,我們可能會想讓 Job 可重試多次,但當出現指定數量的未處理 Exception 後,就視為執行失敗 (與直接使用 release
方法釋放 Job 不同)。若要指定未處理 Exception 數量,可在 Job 類別中定義一個 maxExceptions
屬性:
1<?php23namespace App\Jobs;45use Illuminate\Support\Facades\Redis;67class ProcessPodcast implements ShouldQueue8{9 /**10 * The number of times the job may be attempted.11 *12 * @var int13 */14 public $tries = 25;1516 /**17 * The maximum number of unhandled exceptions to allow before failing.18 *19 * @var int20 */21 public $maxExceptions = 3;2223 /**24 * Execute the job.25 */26 public function handle(): void27 {28 Redis::throttle('key')->allow(10)->every(60)->then(function () {29 // Lock obtained, process the podcast...30 }, function () {31 // Unable to obtain lock...32 return $this->release(10);33 });34 }35}
1<?php23namespace App\Jobs;45use Illuminate\Support\Facades\Redis;67class ProcessPodcast implements ShouldQueue8{9 /**10 * The number of times the job may be attempted.11 *12 * @var int13 */14 public $tries = 25;1516 /**17 * The maximum number of unhandled exceptions to allow before failing.18 *19 * @var int20 */21 public $maxExceptions = 3;2223 /**24 * Execute the job.25 */26 public function handle(): void27 {28 Redis::throttle('key')->allow(10)->every(60)->then(function () {29 // Lock obtained, process the podcast...30 }, function () {31 // Unable to obtain lock...32 return $this->release(10);33 });34 }35}
在這個例子中,這個 Job 會在程式無法在 10 秒內取得 Redis Lock 時被釋放,而這個 Job 在此期間最多可嘗試 25 次。不過,若 Job 中有擲回未處理的 Exception,則會被視為是失敗的 Job。
逾時
通常來說,我們知道某個佇列任務大約需要花多少時間執行。因此,在 Laravel 中,我們可以指定一個「逾時」值。預設情況下,逾時值為 60 秒。若 Job 執行超過逾時值所指定的秒數後,負責處理該 Job 的 Worker 就會以錯誤終止執行。一般來說,Worker 會自動由 Server 上設定的 Process Manager 重新開啟。
可在 Artisan 指令列上使用 --timeout
開關來指定 Job 能執行的最大秒數:
1php artisan queue:work --timeout=30
1php artisan queue:work --timeout=30
若 Job 不斷執行逾時超過其最大重試次數,則該 Job 會被標記為執行失敗。
也可以在 Job 類別中定義該 Job 能執行的最大秒數。若有在 Job 上指定逾時,則在 Job 類別上定義的逾時比在指令列上指定的數字擁有更高的優先度:
1<?php23namespace App\Jobs;45class ProcessPodcast implements ShouldQueue6{7 /**8 * The number of seconds the job can run before timing out.9 *10 * @var int11 */12 public $timeout = 120;13}
1<?php23namespace App\Jobs;45class ProcessPodcast implements ShouldQueue6{7 /**8 * The number of seconds the job can run before timing out.9 *10 * @var int11 */12 public $timeout = 120;13}
有時候,如 Socket 或連外 HTTP 連線等的 IO Blocking Process 可能不適用所指定的逾時設定。因此,若有使用到這些功能,也請在這些功能的 API 上指定逾時。舉例來說,若使用 Guzzle,則可像這樣指定連線與 Request 的逾時值:
The pcntl
PHP extension must be installed in order to specify job timeouts. In addition, a job's "timeout" value should always be less than its "retry after" value. Otherwise, the job may be re-attempted before it has actually finished executing or timed out.
Failing on Timeout
若想讓 Job 在逾時後被標記為執行失敗,可在 Job 類別上定義 $failOnTimeout
屬性:
1/**2 * Indicate if the job should be marked as failed on timeout.3 *4 * @var bool5 */6public $failOnTimeout = true;
1/**2 * Indicate if the job should be marked as failed on timeout.3 *4 * @var bool5 */6public $failOnTimeout = true;
錯誤處理
若在處理 Job 時有擲回 Exception,則這個 Job 會被自動釋放回佇列中,好讓這個 Job 能被重新嘗試。被釋放會佇列的 Job 會繼續被重試,直到重試次數達到專案上所設定的最大次數。最大重試次數可使用 queue:work
Artisan 指令上的 --tries
開關來定義。或者,也可以在 Job 類別上定義最大重試次數。更多有關如何執行 Queue Worker 的資訊可在本文後方找到。
Manually Releasing a Job
有的時候,我們可能會想手動將 Job 釋放會佇列中,好讓這個 Job 能在稍後重試。若要手動釋放 Job,可以呼叫 release
方法:
1/**2 * Execute the job.3 */4public function handle(): void5{6 // ...78 $this->release();9}
1/**2 * Execute the job.3 */4public function handle(): void5{6 // ...78 $this->release();9}
By default, the release
method will release the job back onto the queue for immediate processing. However, you may instruct the queue to not make the job available for processing until a given number of seconds has elapsed by passing an integer or date instance to the release
method:
1$this->release(10);23$this->release(now()->addSeconds(10));
1$this->release(10);23$this->release(now()->addSeconds(10));
Manually Failing a Job
有時候,我們可能需要手動將 Job 標記為「失敗」。若要手動將 Job 標記為失敗,可呼叫 fail
方法:
1/**2 * Execute the job.3 */4public function handle(): void5{6 // ...78 $this->fail();9}
1/**2 * Execute the job.3 */4public function handle(): void5{6 // ...78 $this->fail();9}
若想以所 Catch 到的 Exception 來將 Job 標記為失敗,可將該 Exception 傳入 fail
方法。或者,為了讓開發起來更方便,也可以傳入一個錯誤訊息字串,而該字串會自動被轉為 Exception:
1$this->fail($exception);23$this->fail('Something went wrong.');
1$this->fail($exception);23$this->fail('Something went wrong.');
有關失敗 Job 的更多資訊,請參考有關處理失敗 Job 的說明文件。
批次 Job
Laravel's job batching feature allows you to easily execute a batch of jobs and then perform some action when the batch of jobs has completed executing. Before getting started, you should create a database migration to build a table which will contain meta information about your job batches, such as their completion percentage. This migration may be generated using the make:queue-batches-table
Artisan command:
1php artisan make:queue-batches-table23php artisan migrate
1php artisan make:queue-batches-table23php artisan migrate
定義可批次處理的 Job
若要定義可批次處理的 Job,請先像平常一樣建立可放入佇列的 Job。不過,我們還需要在這個 Job 類別中加上 Illuminate\Bus\Batchable
Trait。這個 Trait 提供了一個 batch
方法,可使用該方法來取得該 Job 所在的批次:
1<?php23namespace App\Jobs;45use Illuminate\Bus\Batchable;6use Illuminate\Contracts\Queue\ShouldQueue;7use Illuminate\Foundation\Queue\Queueable;89class ImportCsv implements ShouldQueue10{11 use Batchable, Queueable;1213 /**14 * Execute the job.15 */16 public function handle(): void17 {18 if ($this->batch()->cancelled()) {19 // Determine if the batch has been cancelled...2021 return;22 }2324 // Import a portion of the CSV file...25 }26}
1<?php23namespace App\Jobs;45use Illuminate\Bus\Batchable;6use Illuminate\Contracts\Queue\ShouldQueue;7use Illuminate\Foundation\Queue\Queueable;89class ImportCsv implements ShouldQueue10{11 use Batchable, Queueable;1213 /**14 * Execute the job.15 */16 public function handle(): void17 {18 if ($this->batch()->cancelled()) {19 // Determine if the batch has been cancelled...2021 return;22 }2324 // Import a portion of the CSV file...25 }26}
分派批次
若要分派一批次的 Job,可使用 Bus
Facade 的 batch
方法。當然,批次功能與完成回呼一起使用時是最有用。因此,可以使用 then
, catch
與 finally
方法來為該批次定義完成回呼。這些回呼都會在被叫用時收到 Illuminate\Bus\Batch
實體。在這個範例中,我們先假設我們正在處理一批次的任務,用來在 CSV 檔中處理給定數量的行:
1use App\Jobs\ImportCsv;2use Illuminate\Bus\Batch;3use Illuminate\Support\Facades\Bus;4use Throwable;56$batch = Bus::batch([7 new ImportCsv(1, 100),8 new ImportCsv(101, 200),9 new ImportCsv(201, 300),10 new ImportCsv(301, 400),11 new ImportCsv(401, 500),12])->before(function (Batch $batch) {13 // The batch has been created but no jobs have been added...14})->progress(function (Batch $batch) {15 // A single job has completed successfully...16})->then(function (Batch $batch) {17 // All jobs completed successfully...18})->catch(function (Batch $batch, Throwable $e) {19 // First batch job failure detected...20})->finally(function (Batch $batch) {21 // The batch has finished executing...22})->dispatch();2324return $batch->id;
1use App\Jobs\ImportCsv;2use Illuminate\Bus\Batch;3use Illuminate\Support\Facades\Bus;4use Throwable;56$batch = Bus::batch([7 new ImportCsv(1, 100),8 new ImportCsv(101, 200),9 new ImportCsv(201, 300),10 new ImportCsv(301, 400),11 new ImportCsv(401, 500),12])->before(function (Batch $batch) {13 // The batch has been created but no jobs have been added...14})->progress(function (Batch $batch) {15 // A single job has completed successfully...16})->then(function (Batch $batch) {17 // All jobs completed successfully...18})->catch(function (Batch $batch, Throwable $e) {19 // First batch job failure detected...20})->finally(function (Batch $batch) {21 // The batch has finished executing...22})->dispatch();2324return $batch->id;
可使用 $batch->id
屬性來取得該批次的 ID。在該批次被分派後,可使用這個 ID 來向 Laravel 的 Command Bus 查詢有關該批次的資訊。
Since batch callbacks are serialized and executed at a later time by the Laravel queue, you should not use the $this
variable within the callbacks. In addition, since batched jobs are wrapped within database transactions, database statements that trigger implicit commits should not be executed within the jobs.
為批次命名
若為批次命名,則一些像是 Laravel Horizon 與 Laravel Telescope 之類的工具就可為該批次提供對使用者更友善的偵錯資訊。若要為批次指定任意名稱,可在定義批次時呼叫 name
方法:
1$batch = Bus::batch([2 // ...3])->then(function (Batch $batch) {4 // All jobs completed successfully...5})->name('Import CSV')->dispatch();
1$batch = Bus::batch([2 // ...3])->then(function (Batch $batch) {4 // All jobs completed successfully...5})->name('Import CSV')->dispatch();
Batch Connection and Queue
若想指定批次 Job 的連線與佇列,可使用 onConnection
與 onQueue
方法。所有的批次 Job 都必須要相同的連線與佇列中執行:
1$batch = Bus::batch([2 // ...3])->then(function (Batch $batch) {4 // All jobs completed successfully...5})->onConnection('redis')->onQueue('imports')->dispatch();
1$batch = Bus::batch([2 // ...3])->then(function (Batch $batch) {4 // All jobs completed successfully...5})->onConnection('redis')->onQueue('imports')->dispatch();
Chains and Batches
只要將串聯的 Job 放在陣列中,就可以在批次中定義一組串聯的 Job。舉例來說,我們可以平行執行兩個 Job 串聯,並在這兩個 Job 串聯都處理完畢後執行回呼:
1use App\Jobs\ReleasePodcast;2use App\Jobs\SendPodcastReleaseNotification;3use Illuminate\Bus\Batch;4use Illuminate\Support\Facades\Bus;56Bus::batch([7 [8 new ReleasePodcast(1),9 new SendPodcastReleaseNotification(1),10 ],11 [12 new ReleasePodcast(2),13 new SendPodcastReleaseNotification(2),14 ],15])->then(function (Batch $batch) {16 // ...17})->dispatch();
1use App\Jobs\ReleasePodcast;2use App\Jobs\SendPodcastReleaseNotification;3use Illuminate\Bus\Batch;4use Illuminate\Support\Facades\Bus;56Bus::batch([7 [8 new ReleasePodcast(1),9 new SendPodcastReleaseNotification(1),10 ],11 [12 new ReleasePodcast(2),13 new SendPodcastReleaseNotification(2),14 ],15])->then(function (Batch $batch) {16 // ...17})->dispatch();
Conversely, you may run batches of jobs within a chain by defining batches within the chain. For example, you could first run a batch of jobs to release multiple podcasts then a batch of jobs to send the release notifications:
1use App\Jobs\FlushPodcastCache;2use App\Jobs\ReleasePodcast;3use App\Jobs\SendPodcastReleaseNotification;4use Illuminate\Support\Facades\Bus;56Bus::chain([7 new FlushPodcastCache,8 Bus::batch([9 new ReleasePodcast(1),10 new ReleasePodcast(2),11 ]),12 Bus::batch([13 new SendPodcastReleaseNotification(1),14 new SendPodcastReleaseNotification(2),15 ]),16])->dispatch();
1use App\Jobs\FlushPodcastCache;2use App\Jobs\ReleasePodcast;3use App\Jobs\SendPodcastReleaseNotification;4use Illuminate\Support\Facades\Bus;56Bus::chain([7 new FlushPodcastCache,8 Bus::batch([9 new ReleasePodcast(1),10 new ReleasePodcast(2),11 ]),12 Bus::batch([13 new SendPodcastReleaseNotification(1),14 new SendPodcastReleaseNotification(2),15 ]),16])->dispatch();
Adding Jobs to Batches
有時候,若能在批次 Job 中新增其他額外的 Job 會很實用。特別是當我們要在一個 Web Request 中批次處理數千筆 Job 時,會讓 Job 的分派過程變得很耗時。因此,比起直接分派數千筆 Job,我們可以先分派一個初始化的批次,用來作為 Job 的「載入程式」,然後讓這個載入程式再向批次內填入更多的 Job:
1$batch = Bus::batch([2 new LoadImportBatch,3 new LoadImportBatch,4 new LoadImportBatch,5])->then(function (Batch $batch) {6 // All jobs completed successfully...7})->name('Import Contacts')->dispatch();
1$batch = Bus::batch([2 new LoadImportBatch,3 new LoadImportBatch,4 new LoadImportBatch,5])->then(function (Batch $batch) {6 // All jobs completed successfully...7})->name('Import Contacts')->dispatch();
在這個例子中,我們可以使用 LoadImportBatch
Job 來填入其他額外的 Job。若要填入其他 Job,我們可以使用批次實體上的 add
方法。批次實體可使用 Job 的 batch
方法來取得:
1use App\Jobs\ImportContacts;2use Illuminate\Support\Collection;34/**5 * Execute the job.6 */7public function handle(): void8{9 if ($this->batch()->cancelled()) {10 return;11 }1213 $this->batch()->add(Collection::times(1000, function () {14 return new ImportContacts;15 }));16}
1use App\Jobs\ImportContacts;2use Illuminate\Support\Collection;34/**5 * Execute the job.6 */7public function handle(): void8{9 if ($this->batch()->cancelled()) {10 return;11 }1213 $this->batch()->add(Collection::times(1000, function () {14 return new ImportContacts;15 }));16}
我們只能向目前 Job 正在執行的批次新增 Job。
檢查批次
提供給批次處理完成回呼的 Illuminate\Bus\Batch
實體有許多的屬性與方法,可以讓我們處理與取得給定 Job 批次的資訊:
1// The UUID of the batch...2$batch->id;34// The name of the batch (if applicable)...5$batch->name;67// The number of jobs assigned to the batch...8$batch->totalJobs;910// The number of jobs that have not been processed by the queue...11$batch->pendingJobs;1213// The number of jobs that have failed...14$batch->failedJobs;1516// The number of jobs that have been processed thus far...17$batch->processedJobs();1819// The completion percentage of the batch (0-100)...20$batch->progress();2122// Indicates if the batch has finished executing...23$batch->finished();2425// Cancel the execution of the batch...26$batch->cancel();2728// Indicates if the batch has been cancelled...29$batch->cancelled();
1// The UUID of the batch...2$batch->id;34// The name of the batch (if applicable)...5$batch->name;67// The number of jobs assigned to the batch...8$batch->totalJobs;910// The number of jobs that have not been processed by the queue...11$batch->pendingJobs;1213// The number of jobs that have failed...14$batch->failedJobs;1516// The number of jobs that have been processed thus far...17$batch->processedJobs();1819// The completion percentage of the batch (0-100)...20$batch->progress();2122// Indicates if the batch has finished executing...23$batch->finished();2425// Cancel the execution of the batch...26$batch->cancel();2728// Indicates if the batch has been cancelled...29$batch->cancelled();
從 Route 上回傳批次
所有的 Illuminate\Bus\Batch
實體都可被序列化為 JSON,因此我們可以直接在專案的 Route 中回傳批次實體來取得有關該批次資訊的 JSON Payload,其中也包含該批次的完成度。如此一來,我們就能方便地在專案的 UI 上顯示該批次完成度的資訊。
若要使用 ID 來取得批次,可使用 Bus
Facade 的 findBatch
方法:
1use Illuminate\Support\Facades\Bus;2use Illuminate\Support\Facades\Route;34Route::get('/batch/{batchId}', function (string $batchId) {5 return Bus::findBatch($batchId);6});
1use Illuminate\Support\Facades\Bus;2use Illuminate\Support\Facades\Route;34Route::get('/batch/{batchId}', function (string $batchId) {5 return Bus::findBatch($batchId);6});
取消批次
有時候,我們會需要取消給定批次的執行。若要取消執行批次,可在 Illuminate\Bus\Batch
實體上呼叫 cancel
方法:
1/**2 * Execute the job.3 */4public function handle(): void5{6 if ($this->user->exceedsImportLimit()) {7 return $this->batch()->cancel();8 }910 if ($this->batch()->cancelled()) {11 return;12 }13}
1/**2 * Execute the job.3 */4public function handle(): void5{6 if ($this->user->exceedsImportLimit()) {7 return $this->batch()->cancel();8 }910 if ($this->batch()->cancelled()) {11 return;12 }13}
讀者可能已經從上面的範例中注意到,批次的 Job 一般都應該在繼續執行前先判斷自己所在的批次是否已被取消。不過,為了讓開發過程更方便,也可以在 Job 上指定 SkipIfBatchCancelled
Middleware,這樣就不需要手動檢查批次是否已被取消。就像該 Middleware 的名稱一樣,這個 Middleware 會告訴 Laravel,當 Job 對應的批次被取消時,就不要在繼續處理 Job:
1use Illuminate\Queue\Middleware\SkipIfBatchCancelled;23/**4 * Get the middleware the job should pass through.5 */6public function middleware(): array7{8 return [new SkipIfBatchCancelled];9}
1use Illuminate\Queue\Middleware\SkipIfBatchCancelled;23/**4 * Get the middleware the job should pass through.5 */6public function middleware(): array7{8 return [new SkipIfBatchCancelled];9}
批次失敗
若批次中的 Job 執行失敗,則會叫用 catch
回呼 (若有指定的話)。只有在批次中第一個失敗的 Job 才會叫用該回呼。
允許失敗
若在批次中的 Job 執行失敗,Laravel 會自動將該批次標記為「已取消」。若有需要的話,我們可以禁用這個行為,好讓 Job 失敗是不要自動將批次標記為取消。若要禁用此行為,可在分派批次時呼叫 allowFailures
方法:
1$batch = Bus::batch([2 // ...3])->then(function (Batch $batch) {4 // All jobs completed successfully...5})->allowFailures()->dispatch();
1$batch = Bus::batch([2 // ...3])->then(function (Batch $batch) {4 // All jobs completed successfully...5})->allowFailures()->dispatch();
重試失敗的批次 Job
Laravel 提供了一個方便的 queue:retry-batch
Artisan 指令,能讓我們輕鬆重試給定批次中所有失敗的 Job。queue:retry-batch
指令的參數為要重試 Job 之批次的 UUID:
1php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
1php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
修建批次
若未修建批次,則 job_batches
資料表很快就會變得很大。為了避免這個狀況,應定期每日執行 queue:prune-batches
Artisan 指令:
1use Illuminate\Support\Facades\Schedule;23Schedule::command('queue:prune-batches')->daily();
1use Illuminate\Support\Facades\Schedule;23Schedule::command('queue:prune-batches')->daily();
預設情況下,完成超過 24 小時的批次會被修建掉。可以在呼叫該指令時使用 hours
選項來指定批次資料要保留多久。舉例來說,下列指令會刪除完成超過 48 小時前的所有批次:
1use Illuminate\Support\Facades\Schedule;23Schedule::command('queue:prune-batches --hours=48')->daily();
1use Illuminate\Support\Facades\Schedule;23Schedule::command('queue:prune-batches --hours=48')->daily();
有時候,jobs_batches
資料表可能會有一些從未成功完成的批次記錄,如批次中有 Job 失敗且從每次嘗試都失敗的批次。可以在 queue:prune-batxhes
指令的使用 unfinished
選項來修剪這些未完成的批次:
1use Illuminate\Support\Facades\Schedule;23Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();
1use Illuminate\Support\Facades\Schedule;23Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();
類似的,jobs_batches
資料表可能會累積一些已取消批次的批次記錄。可以在 queue:prune-batxhes
指令上使用 cancelled
選項來修剪這些已取消的批次:
1use Illuminate\Support\Facades\Schedule;23Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();
1use Illuminate\Support\Facades\Schedule;23Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();
Storing Batches in DynamoDB
Laravel also provides support for storing batch meta information in DynamoDB instead of a relational database. However, you will need to manually create a DynamoDB table to store all of the batch records.
Typically, this table should be named job_batches
, but you should name the table based on the value of the queue.batching.table
configuration value within your application's queue
configuration file.
DynamoDB Batch Table Configuration
The job_batches
table should have a string primary partition key named application
and a string primary sort key named id
. The application
portion of the key will contain your application's name as defined by the name
configuration value within your application's app
configuration file. Since the application name is part of the DynamoDB table's key, you can use the same table to store job batches for multiple Laravel applications.
In addition, you may define ttl
attribute for your table if you would like to take advantage of automatic batch pruning.
DynamoDB Configuration
Next, install the AWS SDK so that your Laravel application can communicate with Amazon DynamoDB:
1composer require aws/aws-sdk-php
1composer require aws/aws-sdk-php
Then, set the queue.batching.driver
configuration option's value to dynamodb
. In addition, you should define key
, secret
, and region
configuration options within the batching
configuration array. These options will be used to authenticate with AWS. When using the dynamodb
driver, the queue.batching.database
configuration option is unnecessary:
1'batching' => [2 'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),3 'key' => env('AWS_ACCESS_KEY_ID'),4 'secret' => env('AWS_SECRET_ACCESS_KEY'),5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),6 'table' => 'job_batches',7],
1'batching' => [2 'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),3 'key' => env('AWS_ACCESS_KEY_ID'),4 'secret' => env('AWS_SECRET_ACCESS_KEY'),5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),6 'table' => 'job_batches',7],
Pruning Batches in DynamoDB
When utilizing DynamoDB to store job batch information, the typical pruning commands used to prune batches stored in a relational database will not work. Instead, you may utilize DynamoDB's native TTL functionality to automatically remove records for old batches.
If you defined your DynamoDB table with a ttl
attribute, you may define configuration parameters to instruct Laravel how to prune batch records. The queue.batching.ttl_attribute
configuration value defines the name of the attribute holding the TTL, while the queue.batching.ttl
configuration value defines the number of seconds after which a batch record can be removed from the DynamoDB table, relative to the last time the record was updated:
1'batching' => [2 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),3 'key' => env('AWS_ACCESS_KEY_ID'),4 'secret' => env('AWS_SECRET_ACCESS_KEY'),5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),6 'table' => 'job_batches',7 'ttl_attribute' => 'ttl',8 'ttl' => 60 * 60 * 24 * 7, // 7 days...9],
1'batching' => [2 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),3 'key' => env('AWS_ACCESS_KEY_ID'),4 'secret' => env('AWS_SECRET_ACCESS_KEY'),5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),6 'table' => 'job_batches',7 'ttl_attribute' => 'ttl',8 'ttl' => 60 * 60 * 24 * 7, // 7 days...9],
將閉包放入佇列
Instead of dispatching a job class to the queue, you may also dispatch a closure. This is great for quick, simple tasks that need to be executed outside of the current request cycle. When dispatching closures to the queue, the closure's code content is cryptographically signed so that it cannot be modified in transit:
1$podcast = App\Podcast::find(1);23dispatch(function () use ($podcast) {4 $podcast->publish();5});
1$podcast = App\Podcast::find(1);23dispatch(function () use ($podcast) {4 $podcast->publish();5});
使用 catch
方法,就能為佇列閉包提供一組要在所有重試次數都失敗的時候執行的閉包:
1use Throwable;23dispatch(function () use ($podcast) {4 $podcast->publish();5})->catch(function (Throwable $e) {6 // This job has failed...7});
1use Throwable;23dispatch(function () use ($podcast) {4 $podcast->publish();5})->catch(function (Throwable $e) {6 // This job has failed...7});
由於 catch
的回呼會被序列化並在稍後由 Laravel 的佇列執行,因此請不要在 catch
的回呼中使用 $this
變數。
Running the Queue Worker
使用 queue:work
指令
Laravel 中隨附了一個 Artisan 指令,可用來開啟 Queue Worker,以在 Job 被推入佇列後處理這些 Job。可以使用 queue:work
Artisan 指令來執行 Queue Worker。請注意,當執行 queue:work
指令後,該指令會持續執行,直到我們手動停止該指令或關閉終端機為止:
1php artisan queue:work
1php artisan queue:work
若要讓 queue:work
處理程序在背景持續執行,請使用如 Supervisor 等的 Process Monitor,以確保 Queue Worker 持續執行。
若想將 Job ID 包含在 queue:work
指令的輸出,則可在呼叫該指令時加上 -v
旗標:
1php artisan queue:work -v
1php artisan queue:work -v
請記得,Queue Worker 是會持續執行的處理程序,且會將已開啟的程式狀態保存在記憶體中。因此,Queue Worker 開始執行後若有更改程式碼,這些 Worker 將不會知道有這些修改。所以,在部署過程中,請確保有重新啟動 Queue Worker。此外,也請注意,在各個 Job 間,也不會自動重設程式所建立或修改的任何靜態狀態。
或者,我們也可以執行 queue:listen
指令。使用 queue:listen
指令時,若有更新程式碼或重設程式的狀態,就不需手動重新啟動 Queue Worker。不過,這個指令比起 queue:work
指令來說比較沒有效率:
1php artisan queue:listen
1php artisan queue:listen
執行多個 Queue Worker
若要指派多個 Worker 給某個 Queue 並同時處理多個 Job,只需要啟動多個 queue:work
處理程序即可。若要啟動多個 queue:work
,在本機上,我們可以開啟多個終端機分頁來執行;若是在正是環境上,則可以使用 Process Manager 的設定來啟動多個 queue:work
。使用 Supervisor 時,可使用 numprocs
設定值。
Specifying the Connection and Queue
也可以指定 Worker 要使用的佇列連線。傳給 work
指令的連線名稱應對影到 config/queue.php
設定檔中所定義的其中一個連線:
1php artisan queue:work redis
1php artisan queue:work redis
預設情況下,queue:work
指令擲回處理給定連線上預設佇列的 Job。不過,我們也可以自定 Queue Worker,以處理給定連線上的特定佇列。舉例來說,若我們把所有的電子郵件都放在 redis
連線的 emails
佇列中執行,則我們可以執行下列指令來啟動一個處理該佇列的 Worker:
1php artisan queue:work redis --queue=emails
1php artisan queue:work redis --queue=emails
Processing a Specified Number of Jobs
可使用 --once
選項來讓 Worker 一次只處理佇列中的一個 Job:
1php artisan queue:work --once
1php artisan queue:work --once
可使用 --max-jobs
選項來讓 Worker 只處理特定數量的 Job,然後就終止執行。該選項適合與 Supervisor 搭配使用,這樣我們就能讓 Worker 在處理特定數量的 Job 後自動重新執行,以釋出該 Worker 所積累的記憶體:
1php artisan queue:work --max-jobs=1000
1php artisan queue:work --max-jobs=1000
Processing All Queued Jobs and Then Exiting
可使用 --stop-when-empty
選項來讓 Worker 處理所有的 Job 然後終止執行。在 Docker Container 中處理 Laravel 佇列時,若在佇列為空時停止關閉 Container,就適合使用該選項:
1php artisan queue:work --stop-when-empty
1php artisan queue:work --stop-when-empty
Processing Jobs for a Given Number of Seconds
--max-time
選項可用來讓 Worker 處理給定秒數的 Job,然後終止執行。該選項是何與 Supervisor 搭配使用,以在處理 Job 給定時間後自動重新啟動 Worker,並釋放期間可能積累的記憶體:
1# Process jobs for one hour and then exit...2php artisan queue:work --max-time=3600
1# Process jobs for one hour and then exit...2php artisan queue:work --max-time=3600
Worker 的休眠期間
若佇列中有 Job,則 Worker 會不間斷地處理這些 Job。不過,使用 sleep
選項可用來讓 Worker 判斷當沒有 Job 時要「休眠」多少秒。當然,在休眠期間,Worker 就不會處理任何新的 Job:
1php artisan queue:work --sleep=3
1php artisan queue:work --sleep=3
Maintenance Mode and Queues
While your application is in maintenance mode, no queued jobs will be handled. The jobs will continue to be handled as normal once the application is out of maintenance mode.
To force your queue workers to process jobs even if maintenance mode is enabled, you may use --force
option:
1php artisan queue:work --force
1php artisan queue:work --force
資源上的考量
Daemon 型的 Queue Worker 並不會在每個 Job 處理後「重新啟動」Laravel。因此,在每個 Job 處理完畢後,請務必釋放任何吃資源的功能。舉例來說,若我們使用了 GD 函式庫來進行圖片處理,則應在處理完圖片後使用 imagedestroy
來釋放記憶體。
佇列的優先度
有時候,我們可能會向調整各個佇列的處理優先度。舉例來說,在 config/queue.php
設定檔中,我們可以把 redis
連線上的預設 queue
設為 low
(低)。不過,有時候,我們可能會想像這樣把 Job 推入 high
(高) 優先度的佇列:
1dispatch((new Job)->onQueue('high'));
1dispatch((new Job)->onQueue('high'));
若要啟動 Worker 以驗證是否所有 high
佇列上的 Job 都比 low
佇列上的 Job 還要早被處理,只需要傳入一組以逗號分隔的佇列名稱列表給 work
指令即可:
1php artisan queue:work --queue=high,low
1php artisan queue:work --queue=high,low
Queue Workers and Deployment
由於 Queue Worker 時持續執行的處理程序,因此除非重啟啟動 Queue Worker,否則 Queue Worker 不會知道程式碼有被修改過。要部署有使用 Queue Worker 的專案,最簡單的做法就是在部署過程中重新啟動 Queue Worker。我們可以執行 queue:restart
指令來重新啟動所有的 Worker:
1php artisan queue:restart
1php artisan queue:restart
該指令會通知所有的 Queue Worker,讓所有的 Worker 在處理完目前 Job 且在現有 Job 不遺失的情況下終止執行 Worker。由於 Queue Worker 會在 queue:restart
指令執行後終止執行,因此請務必使用如 Supervisor 這樣的 Process Manager 來自動重新啟動 Queue Worker。
佇列會使用快取來儲存重新啟動訊號,因此在使用此功能前請先確認專案上是否有設定好正確的快取 Driver。
Job Expirations and Timeouts
Job 的有效期限
在 config/queue.php
設定檔中,每個佇列連線都有定義一個 retry_after
選項。這個選項用來指定在重新嘗試目前處理的 Job 前需要等待多少秒。舉例來說,若 retry_after
設為 90
,則若某個 Job 已被處理 90 秒,且期間沒有被釋放或刪除,則該 Job 會被釋放回佇列中。一般來說,應將 retry_after
的值設為 Job 在合理情況下要完成執行所需的最大秒數。
唯一一個不含 retry_after
值的佇列連線是 Amazon SQS。SQS 會使用預設的 Visibility Timeout 來重試 Job。Visibility Timeout 的值由 AWS Console 中控制。
Worker 的逾時
queue:work
Aritsan 指令有一個 --timeout
選項。預設情況下,--timeout
值為 60 秒。若 Job 執行超過逾時值所指定的秒數後,負責處理該 Job 的 Worker 就會以錯誤終止執行。一般來說,Worker 會自動由 Server 上設定的 Process Manager 重新開啟:
1php artisan queue:work --timeout=60
1php artisan queue:work --timeout=60
雖然 retry_after
設定選項與 --timeout
CLI 選項並不相同,不過這兩個選項會互相配合使用,以確保 Job 不遺失,且 Job 只會成功執行一次。
--timeout
的值必須至少比 retry_after
設定選項短個幾秒,以確保 Worker 在處理到當掉的 Job 時會在重試 Job 前先終止該 Job。若 --timeout
選項比 retry_after
設定值還要長的話,則 Job 就有可能會被處理兩次。
Supervisor 設定
在正式環境中,我們會需要一種能讓 queue:work
處理程序持續執行的方法。queue:work
可能會因為各種原因而停止執行,如 Worker 執行達到逾時值,或是在執行了 queue:restart
指令後等。
因此,我們需要設定一個能偵測到 queue:work
處理程序終止執行,並能自動重新啟動這些 Worker 的 Process Monitor。此外,使用 Process Monitor 還能讓我們指定要同時執行多少個 queue:work
處理程序。Supervisor 時一個常見用於 Linux 環境的 Process Monitor,在本文中接下來的部分我們會來看看要如何設定 Supervisor。
安裝 Supervisor
Supervisor 是一個用於 Linux 作業系統的 Process Monitor,使用 Supervisor 就可以在 queue:work
處理程序執行失敗時自動重新啟動。若要在 Ubuntu 上安裝 Supervisor,可使用下列指令:
1sudo apt-get install supervisor
1sudo apt-get install supervisor
如果你覺得要設定並管理 Supervisor 太難、太複雜的話,可以考慮使用 Laravel Forge。Laravel Forge 會幫你在 Laravel 專案的正式環境上自動安裝並設定 Supervisor。
設定 Supervisor
Supervisor 設定檔一般都存放在 /etc/supervisor/conf.d
目錄下。在該目錄中,我們可以建立任意數量的設定檔,以告訴 Supervisor 要如何監看這些處理程序。舉例來說,我們先建立一個用於啟動並監看 queue:work
處理程序的 laravel-worker.conf
檔案:
1[program:laravel-worker]2process_name=%(program_name)s_%(process_num)02d3command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=36004autostart=true5autorestart=true6stopasgroup=true7killasgroup=true8user=forge9numprocs=810redirect_stderr=true11stdout_logfile=/home/forge/app.com/worker.log12stopwaitsecs=3600
1[program:laravel-worker]2process_name=%(program_name)s_%(process_num)02d3command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=36004autostart=true5autorestart=true6stopasgroup=true7killasgroup=true8user=forge9numprocs=810redirect_stderr=true11stdout_logfile=/home/forge/app.com/worker.log12stopwaitsecs=3600
在這個範例中,numprocs
指示詞用於告訴 Supervisor 要執行 8 個 queue:work
處理程序,並監看這 8 個處理程序,然後當這些處理程序執行失敗時自動重新啟動。我們可以更改該設定檔中的 command
指示詞,以調整為所需的佇列連線與 Worker 選項。
請務必確保 stopwaitsecs
值比花費時間最多的 Job 所需執行的秒數還要大。若該值設定不對,可能會讓 Supervisor 在 Job 處理完成前就終止該 Job。
開啟 Supervisor
建立好設定檔後,我們就可以使用下列指令更新 Supervisor 設定並開啟我們設定好的處理程序:
1sudo supervisorctl reread23sudo supervisorctl update45sudo supervisorctl start "laravel-worker:*"
1sudo supervisorctl reread23sudo supervisorctl update45sudo supervisorctl start "laravel-worker:*"
更多有關 Supervisor 的資訊,請參考 Supervisor 的說明文件。
處理失敗的 Job
有時候,放入佇列的 Job 可能會執行失敗。請別擔心,計劃永遠趕不上變化!Laravel 中內建了一個方便的方法,可用來指定 Job 要重試的最大次數。若非同步執行的 Job 超過最大嘗試執行次數,則該 Job 會被插入到 failed_jobs
資料庫資料表中。同步分派的 Job 若執行失敗,則不會被保存在該資料表中,而我們的專案則會馬上收到 Job 產生的 Exception。
A migration to create the failed_jobs
table is typically already present in new Laravel applications. However, if your application does not contain a migration for this table, you may use the make:queue-failed-table
command to create the migration:
1php artisan make:queue-failed-table23php artisan migrate
1php artisan make:queue-failed-table23php artisan migrate
在執行 [Queue Worker] 處理程序時,我們可以使用 queue:work
指令上的 --tries
開關來指定某個 Job 所要嘗試執行的最大次數。若為指定 --tries
選項的值,則 Job 就只會嘗試執行一次,或是依照 Job 類別中 $tries
屬性所設定的值作為最大嘗試次數:
1php artisan queue:work redis --tries=3
1php artisan queue:work redis --tries=3
使用 --backoff
選項,就可指定當 Job 遇到 Exception 時,Laravel 要等待多少秒才重新嘗試該 Job。預設情況下,Job 會馬上被釋放回佇列中,以便重新嘗試該 Job:
1php artisan queue:work redis --tries=3 --backoff=3
1php artisan queue:work redis --tries=3 --backoff=3
若想以 Job 為單位來設定當 Job 遇到 Exception 時 Laravel 要等待多少秒才重新嘗試該 Job,則可在 Job 類別上定義 backoff
屬性:
1/**2 * The number of seconds to wait before retrying the job.3 *4 * @var int5 */6public $backoff = 3;
1/**2 * The number of seconds to wait before retrying the job.3 *4 * @var int5 */6public $backoff = 3;
若需要使用更複雜的邏輯來判斷 Job 的 Backoff 時間,可在 Job 類別上定義 backoff
方法:
1/**2* Calculate the number of seconds to wait before retrying the job.3*/4public function backoff(): int5{6 return 3;7}
1/**2* Calculate the number of seconds to wait before retrying the job.3*/4public function backoff(): int5{6 return 3;7}
You may easily configure "exponential" backoffs by returning an array of backoff values from the backoff
method. In this example, the retry delay will be 1 second for the first retry, 5 seconds for the second retry, 10 seconds for the third retry, and 10 seconds for every subsequent retry if there are more attempts remaining:
1/**2* Calculate the number of seconds to wait before retrying the job.3*4* @return array<int, int>5*/6public function backoff(): array7{8 return [1, 5, 10];9}
1/**2* Calculate the number of seconds to wait before retrying the job.3*4* @return array<int, int>5*/6public function backoff(): array7{8 return [1, 5, 10];9}
當 Job 執行失敗後進行清理
若某個特定的 Job 失敗後,我們可能會想傳送通知給使用者,或是恢復這個 Job 中所部分完成的一些動作。為此,我們可以在 Job 類別中定義一個 failed
方法。導致該 Job 失敗的 Throwable
實體會傳入給 failed
方法:
1<?php23namespace App\Jobs;45use App\Models\Podcast;6use App\Services\AudioProcessor;7use Illuminate\Contracts\Queue\ShouldQueue;8use Illuminate\Foundation\Queue\Queueable;9use Throwable;1011class ProcessPodcast implements ShouldQueue12{13 use Queueable;1415 /**16 * Create a new job instance.17 */18 public function __construct(19 public Podcast $podcast,20 ) {}2122 /**23 * Execute the job.24 */25 public function handle(AudioProcessor $processor): void26 {27 // Process uploaded podcast...28 }2930 /**31 * Handle a job failure.32 */33 public function failed(?Throwable $exception): void34 {35 // Send user notification of failure, etc...36 }37}
1<?php23namespace App\Jobs;45use App\Models\Podcast;6use App\Services\AudioProcessor;7use Illuminate\Contracts\Queue\ShouldQueue;8use Illuminate\Foundation\Queue\Queueable;9use Throwable;1011class ProcessPodcast implements ShouldQueue12{13 use Queueable;1415 /**16 * Create a new job instance.17 */18 public function __construct(19 public Podcast $podcast,20 ) {}2122 /**23 * Execute the job.24 */25 public function handle(AudioProcessor $processor): void26 {27 // Process uploaded podcast...28 }2930 /**31 * Handle a job failure.32 */33 public function failed(?Throwable $exception): void34 {35 // Send user notification of failure, etc...36 }37}
叫用 failed
方法前會先初始化該 Job 的一個新。因此,在 handle
方法中對類別屬性做出的更改都將遺失。
重試失敗的 Job
若要檢視所有已插入 failed_jobs
資料表中失敗的 Job,可使用 queue:failed
Artisan 指令:
1php artisan queue:failed
1php artisan queue:failed
queue:failed
指令會列出 Job ID、連線、佇列、失敗時間……等,以及其他有關該 Job 的資訊。可使用 Job ID 來重試失敗的 Job。舉例來說,若要重試 ID 為 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
的失敗 Job,請執行下列指令:
1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
若有需要,可傳入多個 ID 給該指令:
1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
也可以嘗試特定佇列中所有失敗的 Job:
1php artisan queue:retry --queue=name
1php artisan queue:retry --queue=name
若要重試所有失敗的 Job,請執行 queue:retry
指令,並傳入 all
作為 ID:
1php artisan queue:retry all
1php artisan queue:retry all
若想刪除失敗的 Job,可使用 queue:forget
指令:
1php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
1php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
使用 Horizon,請不要使用 queue:forget
指令,請使用 horizon:forget
指令來刪除失敗的 Job。
若要從 failed_jobs
資料表中刪除所有失敗的 Job,可使用 queue:flush
指令:
1php artisan queue:flush
1php artisan queue:flush
忽略不存在的 Model
將 Eloquent Model 插入進 Job 時,該 Model 會自動被序列化再放入佇列中,並在要處理該 Job 時再重新從資料庫中取出。不過,若在該 Job 等待被 Worker 處理的期間刪除了該 Model,則 Job 可能會遇到 ModelNotFoundException
而失敗。
為了方便起見,可將 Job 的 deleteWhenMissingModels
屬性設為 true
,就可以自動刪除有遺失 Model 的 Job。若該選項設為 true
,則 Laravel 會自動默默地在不產生 Exception 的情況下取消該 Job:
1/**2 * Delete the job if its models no longer exist.3 *4 * @var bool5 */6public $deleteWhenMissingModels = true;
1/**2 * Delete the job if its models no longer exist.3 *4 * @var bool5 */6public $deleteWhenMissingModels = true;
修剪失敗的 Job
可以呼叫 queue:prune-failed
Artisan 指令來修剪專案中所有 failed_jobs
資料表中的記錄:
1php artisan queue:prune-failed
1php artisan queue:prune-failed
預設情況下,所有超過 24 小時的失敗 Job 記錄都會被修剪。若有提供 --hours
選項給該指令,則只會保留過去 N 小時中所插入的失敗 Job 記錄。舉例來說,下列指令會刪除所有插入超過 48 小時的失敗 Job 記錄:
1php artisan queue:prune-failed --hours=48
1php artisan queue:prune-failed --hours=48
Storing Failed Jobs in DynamoDB
Laravel also provides support for storing your failed job records in DynamoDB instead of a relational database table. However, you must manually create a DynamoDB table to store all of the failed job records. Typically, this table should be named failed_jobs
, but you should name the table based on the value of the queue.failed.table
configuration value within your application's queue
configuration file.
failed_jobs
資料表應有一個名為 application
的字串主分區索引鍵,以及一個名為 uuid
的字串主排序索引鍵。索引鍵的 application
這個部分會包含專案的名稱,即 app
設定檔中的 name
設定值。由於專案名稱會是 DynamoDB 資料表中索引鍵的一部分,因此,我們可以使用相同的資料表來保存多個 Laravel 專案中的失敗 Job。
此外,也請確保有安裝 AWS SDK,好讓 Laravel 專案能與 Amazon DynamoDB 溝通:
1composer require aws/aws-sdk-php
1composer require aws/aws-sdk-php
接著,請設定 queue.failed.driver
設定選項值為 dynamodb
。此外,也請在失敗 Job 設定陣列中定義 key
、secret
、region
等設定選項。這些選項會用來向 AWS 進行身份驗證。使用 dynamodb
Driver 時,就不需要 queue.failed.database
設定選項:
1'failed' => [2 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),3 'key' => env('AWS_ACCESS_KEY_ID'),4 'secret' => env('AWS_SECRET_ACCESS_KEY'),5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),6 'table' => 'failed_jobs',7],
1'failed' => [2 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),3 'key' => env('AWS_ACCESS_KEY_ID'),4 'secret' => env('AWS_SECRET_ACCESS_KEY'),5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),6 'table' => 'failed_jobs',7],
不保存失敗的 Job
只要將 queue.failed.driver
設定值設為 null
,就可以讓 Laravel 不保存失敗的 Job 以忽略這些 Job。一般來說,可使用 QUEUE_FAILED_DRIVER
環境變數來調整這個值:
1QUEUE_FAILED_DRIVER=null
1QUEUE_FAILED_DRIVER=null
失敗 Job 的事件
若想註冊一個會在每次 Job 失敗時叫用的 Event Listener,可使用 Queue
Facade 的 failing
方法。舉例來說,我們可以在 Laravel 中內建的 AppServiceProvider
中 boot
方法內將一個閉包附加至該事件上:
1<?php23namespace App\Providers;45use Illuminate\Support\Facades\Queue;6use Illuminate\Support\ServiceProvider;7use Illuminate\Queue\Events\JobFailed;89class AppServiceProvider extends ServiceProvider10{11 /**12 * Register any application services.13 */14 public function register(): void15 {16 // ...17 }1819 /**20 * Bootstrap any application services.21 */22 public function boot(): void23 {24 Queue::failing(function (JobFailed $event) {25 // $event->connectionName26 // $event->job27 // $event->exception28 });29 }30}
1<?php23namespace App\Providers;45use Illuminate\Support\Facades\Queue;6use Illuminate\Support\ServiceProvider;7use Illuminate\Queue\Events\JobFailed;89class AppServiceProvider extends ServiceProvider10{11 /**12 * Register any application services.13 */14 public function register(): void15 {16 // ...17 }1819 /**20 * Bootstrap any application services.21 */22 public function boot(): void23 {24 Queue::failing(function (JobFailed $event) {25 // $event->connectionName26 // $event->job27 // $event->exception28 });29 }30}
在佇列中清理 Job
使用 Horizon,請不要使用 queue:clear
指令,請使用 horizon:clear
指令來清理佇列中的 Job。
若想從預設連線的預設佇列中刪除所有 Job,可使用 queue:clear
Artisan 指令:
1php artisan queue:clear
1php artisan queue:clear
可以提供 connection
引數與 queue
選項來刪除特定連線與佇列中的 Job:
1php artisan queue:clear redis --queue=emails
1php artisan queue:clear redis --queue=emails
目前,只有 SQS、Redis、資料庫等佇列 Driver 能支援清除佇列中的 Job。此外,刪除 SQS Message 可能會需要至多 60 秒的時間,因此在清理佇列的 60 秒後所傳送給 SQS 佇列的 Job 也可能會被刪除。
監控佇列
若佇列突然收到大量的 Job,則佇列可能會有來不及處理,造成 Job 需要更長的等待時間才能完成。若有需要的話,Laravel 可以在佇列 Job 遇到特定閥值時傳送通知。
若要開始監控佇列,請排程設定每 10 分鐘執行 queue:monitor
指令。這個指令接受要監控的佇列名稱,以及所要設定的 Job 數量閥值:
1php artisan queue:monitor redis:default,redis:deployments --max=100
1php artisan queue:monitor redis:default,redis:deployments --max=100
Scheduling this command alone is not enough to trigger a notification alerting you of the queue's overwhelmed status. When the command encounters a queue that has a job count exceeding your threshold, an Illuminate\Queue\Events\QueueBusy
event will be dispatched. You may listen for this event within your application's AppServiceProvider
in order to send a notification to you or your development team:
1use App\Notifications\QueueHasLongWaitTime;2use Illuminate\Queue\Events\QueueBusy;3use Illuminate\Support\Facades\Event;4use Illuminate\Support\Facades\Notification;56/**7 * Bootstrap any application services.8 */9public function boot(): void10{11 Event::listen(function (QueueBusy $event) {13 ->notify(new QueueHasLongWaitTime(14 $event->connection,15 $event->queue,16 $event->size17 ));18 });19}
1use App\Notifications\QueueHasLongWaitTime;2use Illuminate\Queue\Events\QueueBusy;3use Illuminate\Support\Facades\Event;4use Illuminate\Support\Facades\Notification;56/**7 * Bootstrap any application services.8 */9public function boot(): void10{11 Event::listen(function (QueueBusy $event) {13 ->notify(new QueueHasLongWaitTime(14 $event->connection,15 $event->queue,16 $event->size17 ));18 });19}
測試
在測試會分派 Job 的程式時,可以讓 Laravel 不要真的執行該 Job。因為我們可以直接測試 Job,將 Job 與分派該 Job 的程式碼測試分開。當然,若要測試 Job 本身,需要在測試中先初始化一個 Job 實體,並直接呼叫 handle
方法。
You may use the Queue
facade's fake
method to prevent queued jobs from actually being pushed to the queue. After calling the Queue
facade's fake
method, you may then assert that the application attempted to push jobs to the queue:
1<?php23use App\Jobs\AnotherJob;4use App\Jobs\FinalJob;5use App\Jobs\ShipOrder;6use Illuminate\Support\Facades\Queue;78test('orders can be shipped', function () {9 Queue::fake();1011 // Perform order shipping...1213 // Assert that no jobs were pushed...14 Queue::assertNothingPushed();1516 // Assert a job was pushed to a given queue...17 Queue::assertPushedOn('queue-name', ShipOrder::class);1819 // Assert a job was pushed twice...20 Queue::assertPushed(ShipOrder::class, 2);2122 // Assert a job was not pushed...23 Queue::assertNotPushed(AnotherJob::class);2425 // Assert that a Closure was pushed to the queue...26 Queue::assertClosurePushed();2728 // Assert the total number of jobs that were pushed...29 Queue::assertCount(3);30});
1<?php23use App\Jobs\AnotherJob;4use App\Jobs\FinalJob;5use App\Jobs\ShipOrder;6use Illuminate\Support\Facades\Queue;78test('orders can be shipped', function () {9 Queue::fake();1011 // Perform order shipping...1213 // Assert that no jobs were pushed...14 Queue::assertNothingPushed();1516 // Assert a job was pushed to a given queue...17 Queue::assertPushedOn('queue-name', ShipOrder::class);1819 // Assert a job was pushed twice...20 Queue::assertPushed(ShipOrder::class, 2);2122 // Assert a job was not pushed...23 Queue::assertNotPushed(AnotherJob::class);2425 // Assert that a Closure was pushed to the queue...26 Queue::assertClosurePushed();2728 // Assert the total number of jobs that were pushed...29 Queue::assertCount(3);30});
1<?php23namespace Tests\Feature;45use App\Jobs\AnotherJob;6use App\Jobs\FinalJob;7use App\Jobs\ShipOrder;8use Illuminate\Support\Facades\Queue;9use Tests\TestCase;1011class ExampleTest extends TestCase12{13 public function test_orders_can_be_shipped(): void14 {15 Queue::fake();1617 // Perform order shipping...1819 // Assert that no jobs were pushed...20 Queue::assertNothingPushed();2122 // Assert a job was pushed to a given queue...23 Queue::assertPushedOn('queue-name', ShipOrder::class);2425 // Assert a job was pushed twice...26 Queue::assertPushed(ShipOrder::class, 2);2728 // Assert a job was not pushed...29 Queue::assertNotPushed(AnotherJob::class);3031 // Assert that a Closure was pushed to the queue...32 Queue::assertClosurePushed();3334 // Assert the total number of jobs that were pushed...35 Queue::assertCount(3);36 }37}
1<?php23namespace Tests\Feature;45use App\Jobs\AnotherJob;6use App\Jobs\FinalJob;7use App\Jobs\ShipOrder;8use Illuminate\Support\Facades\Queue;9use Tests\TestCase;1011class ExampleTest extends TestCase12{13 public function test_orders_can_be_shipped(): void14 {15 Queue::fake();1617 // Perform order shipping...1819 // Assert that no jobs were pushed...20 Queue::assertNothingPushed();2122 // Assert a job was pushed to a given queue...23 Queue::assertPushedOn('queue-name', ShipOrder::class);2425 // Assert a job was pushed twice...26 Queue::assertPushed(ShipOrder::class, 2);2728 // Assert a job was not pushed...29 Queue::assertNotPushed(AnotherJob::class);3031 // Assert that a Closure was pushed to the queue...32 Queue::assertClosurePushed();3334 // Assert the total number of jobs that were pushed...35 Queue::assertCount(3);36 }37}
可以傳入一個閉包給 assertPushed
或 assertNotPushed
方法,來判斷某個 Job 是否通過給定的「真值測試 (Truth Test)」。若被推入的 Job 中至少有一個 Job 通過給定的真值測試,則該 Assertion 會被視為成功:
1Queue::assertPushed(function (ShipOrder $job) use ($order) {2 return $job->order->id === $order->id;3});
1Queue::assertPushed(function (ShipOrder $job) use ($order) {2 return $job->order->id === $order->id;3});
Faking a Subset of Jobs
若只想模擬特定的 Job,並讓其他 Job 都被正常執行,可以傳入要被模擬的 Job 類別名稱給 fake
方法:
1test('orders can be shipped', function () {2 Queue::fake([3 ShipOrder::class,4 ]);56 // Perform order shipping...78 // Assert a job was pushed twice...9 Queue::assertPushed(ShipOrder::class, 2);10});
1test('orders can be shipped', function () {2 Queue::fake([3 ShipOrder::class,4 ]);56 // Perform order shipping...78 // Assert a job was pushed twice...9 Queue::assertPushed(ShipOrder::class, 2);10});
1public function test_orders_can_be_shipped(): void2{3 Queue::fake([4 ShipOrder::class,5 ]);67 // Perform order shipping...89 // Assert a job was pushed twice...10 Queue::assertPushed(ShipOrder::class, 2);11}
1public function test_orders_can_be_shipped(): void2{3 Queue::fake([4 ShipOrder::class,5 ]);67 // Perform order shipping...89 // Assert a job was pushed twice...10 Queue::assertPushed(ShipOrder::class, 2);11}
也可以使用 fakeExcept
方法來 Fake 除了一組特定 Job 外的所有 Event:
1Queue::fake()->except([2 ShipOrder::class,3]);
1Queue::fake()->except([2 ShipOrder::class,3]);
測試串連的 Job
若要測試串連的 Job,我們需要使用到 Bus
Facade 的模擬功能。Bus
Facade 的 assertChained
方法可用來判斷是否有分派某個串聯的 Job。assertChained
方法接受一組串聯 Job 的陣列作為其第一個引數:
1use App\Jobs\RecordShipment;2use App\Jobs\ShipOrder;3use App\Jobs\UpdateInventory;4use Illuminate\Support\Facades\Bus;56Bus::fake();78// ...910Bus::assertChained([11 ShipOrder::class,12 RecordShipment::class,13 UpdateInventory::class14]);
1use App\Jobs\RecordShipment;2use App\Jobs\ShipOrder;3use App\Jobs\UpdateInventory;4use Illuminate\Support\Facades\Bus;56Bus::fake();78// ...910Bus::assertChained([11 ShipOrder::class,12 RecordShipment::class,13 UpdateInventory::class14]);
就像上述範例中可看到的一樣,串聯 Job 的陣列就是一組包含 Job 類別名稱的陣列。不過,也可以提供一組實際 Job 實體的陣列。當提供的陣列為 Job 實體的陣列時,Laravel 會確保程式所分派的串聯 Job 都具是相同的類別,且擁有相同的屬性值:
1Bus::assertChained([2 new ShipOrder,3 new RecordShipment,4 new UpdateInventory,5]);
1Bus::assertChained([2 new ShipOrder,3 new RecordShipment,4 new UpdateInventory,5]);
可以使用 assertDispatchedWithoutChain
方法來判斷 Job 被推入 Queue 但未包含串聯 Job:
1Bus::assertDispatchedWithoutChain(ShipOrder::class);
1Bus::assertDispatchedWithoutChain(ShipOrder::class);
Testing Chain Modifications
If a chained job prepends or appends jobs to an existing chain, you may use the job's assertHasChain
method to assert that the job has the expected chain of remaining jobs:
1$job = new ProcessPodcast;23$job->handle();45$job->assertHasChain([6 new TranscribePodcast,7 new OptimizePodcast,8 new ReleasePodcast,9]);
1$job = new ProcessPodcast;23$job->handle();45$job->assertHasChain([6 new TranscribePodcast,7 new OptimizePodcast,8 new ReleasePodcast,9]);
The assertDoesntHaveChain
method may be used to assert that the job's remaining chain is empty:
1$job->assertDoesntHaveChain();
1$job->assertDoesntHaveChain();
Testing Chained Batches
If your job chain contains a batch of jobs, you may assert that the chained batch matches your expectations by inserting a Bus::chainedBatch
definition within your chain assertion:
1use App\Jobs\ShipOrder;2use App\Jobs\UpdateInventory;3use Illuminate\Bus\PendingBatch;4use Illuminate\Support\Facades\Bus;56Bus::assertChained([7 new ShipOrder,8 Bus::chainedBatch(function (PendingBatch $batch) {9 return $batch->jobs->count() === 3;10 }),11 new UpdateInventory,12]);
1use App\Jobs\ShipOrder;2use App\Jobs\UpdateInventory;3use Illuminate\Bus\PendingBatch;4use Illuminate\Support\Facades\Bus;56Bus::assertChained([7 new ShipOrder,8 Bus::chainedBatch(function (PendingBatch $batch) {9 return $batch->jobs->count() === 3;10 }),11 new UpdateInventory,12]);
測試 Job 批次
Bus
Facade 的 assertBatched
方法可用來判斷是否有分派[Job 批次]。提供給 assertBatched
方法的閉包會收到 Illuminate\Bus\PendingBatch
的實體,該實體可用來檢查批次中的 Job:
1use Illuminate\Bus\PendingBatch;2use Illuminate\Support\Facades\Bus;34Bus::fake();56// ...78Bus::assertBatched(function (PendingBatch $batch) {9 return $batch->name == 'import-csv' &&10 $batch->jobs->count() === 10;11});
1use Illuminate\Bus\PendingBatch;2use Illuminate\Support\Facades\Bus;34Bus::fake();56// ...78Bus::assertBatched(function (PendingBatch $batch) {9 return $batch->name == 'import-csv' &&10 $batch->jobs->count() === 10;11});
You may use the assertBatchCount
method to assert that a given number of batches were dispatched:
1Bus::assertBatchCount(3);
1Bus::assertBatchCount(3);
You may use assertNothingBatched
to assert that no batches were dispatched:
1Bus::assertNothingBatched();
1Bus::assertNothingBatched();
測試 Job 或批次行為
除此之外,我們有時候也需要測試個別 Job 與其底層批次的互動。舉例來說,我們可能需要測試某個 Job 是否取消了批次中剩下的流程。這時,我們需要使用 withFakeBatch
方法來將一個模擬的 Batch 指派給該 Job。withFakeBatch
方法會回傳一個包含 Job 實體與模擬 Batch 的陣列:
1[$job, $batch] = (new ShipOrder)->withFakeBatch();23$job->handle();45$this->assertTrue($batch->cancelled());6$this->assertEmpty($batch->added);
1[$job, $batch] = (new ShipOrder)->withFakeBatch();23$job->handle();45$this->assertTrue($batch->cancelled());6$this->assertEmpty($batch->added);
Testing Job / Queue Interactions
Sometimes, you may need to test that a queued job releases itself back onto the queue. Or, you may need to test that the job deleted itself. You may test these queue interactions by instantiating the job and invoking the withFakeQueueInteractions
method.
Once the job's queue interactions have been faked, you may invoke the handle
method on the job. After invoking the job, the assertReleased
, assertDeleted
, assertNotDeleted
, assertFailed
, and assertNotFailed
methods may be used to make assertions against the job's queue interactions:
1use App\Jobs\ProcessPodcast;23$job = (new ProcessPodcast)->withFakeQueueInteractions();45$job->handle();67$job->assertReleased(delay: 30);8$job->assertDeleted();9$job->assertNotDeleted();10$job->assertFailed();11$job->assertNotFailed();
1use App\Jobs\ProcessPodcast;23$job = (new ProcessPodcast)->withFakeQueueInteractions();45$job->handle();67$job->assertReleased(delay: 30);8$job->assertDeleted();9$job->assertNotDeleted();10$job->assertFailed();11$job->assertNotFailed();
Job 事件
在 Queue
Facade 上使用 before
與 after
方法,就可以指定要在佇列 Job 處理前後所要執行的回呼。在這些回呼中,我們就有機會能進行記錄額外的日誌、增加主控台上統計數字等動作。一般來說,應在某個 Service Provider 中 boot
方法內呼叫這些方法。舉例來說,我們可以使用 Laravel 中內建的 AppServiceProvider
:
1<?php23namespace App\Providers;45use Illuminate\Support\Facades\Queue;6use Illuminate\Support\ServiceProvider;7use Illuminate\Queue\Events\JobProcessed;8use Illuminate\Queue\Events\JobProcessing;910class AppServiceProvider extends ServiceProvider11{12 /**13 * Register any application services.14 */15 public function register(): void16 {17 // ...18 }1920 /**21 * Bootstrap any application services.22 */23 public function boot(): void24 {25 Queue::before(function (JobProcessing $event) {26 // $event->connectionName27 // $event->job28 // $event->job->payload()29 });3031 Queue::after(function (JobProcessed $event) {32 // $event->connectionName33 // $event->job34 // $event->job->payload()35 });36 }37}
1<?php23namespace App\Providers;45use Illuminate\Support\Facades\Queue;6use Illuminate\Support\ServiceProvider;7use Illuminate\Queue\Events\JobProcessed;8use Illuminate\Queue\Events\JobProcessing;910class AppServiceProvider extends ServiceProvider11{12 /**13 * Register any application services.14 */15 public function register(): void16 {17 // ...18 }1920 /**21 * Bootstrap any application services.22 */23 public function boot(): void24 {25 Queue::before(function (JobProcessing $event) {26 // $event->connectionName27 // $event->job28 // $event->job->payload()29 });3031 Queue::after(function (JobProcessed $event) {32 // $event->connectionName33 // $event->job34 // $event->job->payload()35 });36 }37}
使用 Queue
Facade 的 looping
方法,我們就能指定要在 Worker 嘗試從佇列中取得 Job 前執行的回呼。舉例來說,我們可以註冊一個閉包來回溯前一個失敗 Job 中未關閉的 Transaction:
1use Illuminate\Support\Facades\DB;2use Illuminate\Support\Facades\Queue;34Queue::looping(function () {5 while (DB::transactionLevel() > 0) {6 DB::rollBack();7 }8});
1use Illuminate\Support\Facades\DB;2use Illuminate\Support\Facades\Queue;34Queue::looping(function () {5 while (DB::transactionLevel() > 0) {6 DB::rollBack();7 }8});