翻譯進度
47.13% 已翻譯
更新時間:
2024年6月30日 星期日 上午8:15:00 [UTC]
翻譯人員:
幫我們翻譯此頁

佇列 - Queue

簡介

在製作 Web App 時,有些任務若在 Web Request 中進行會花費太多時間,如解析 CSV 檔並上傳。所幸,在 Laravel 中,要建立在背景執行的佇列任務非常輕鬆。只要將需要花費時間的任務移到佇列中執行,就能加速網站對 Request 的回應速度,並提供更好的使用者經驗給客戶。

Laravel 的佇列為各種不同的佇列後端都提供了統一的 API。這些後端包括 Amazon SQSRedis、甚至是關聯式資料庫。

Laravel 的佇列設定選項保存在專案的 config/queue.php 設定檔中。在這個檔案內,可以看到供各個 Laravel 內建佇列 Driver 使用的連線設定,包含資料庫、Amazon SQSRedisBeanstalkd 等 Driver,還包括一個會即時執行任務的同步佇列 (用於本機開發)。還包含一個 null 佇列,用於忽略佇列任務。

lightbulb

Laravel 現在還提供 Horizon。Horizon 是為 Redis 佇列提供的一個一個漂亮面板。更多資訊請參考完整的 Horizon 說明文件

連線 Vs. 佇列

在開始使用 Laravel 佇列前,我們需要先瞭解「連線(Connection)」與「佇列(Queue)」的差別。在 config/queue.php 設定檔中有個 connections 設定陣列。該選項用於定義連到後端佇列服務的連線,後端佇列服務就是像 Amazon SQS、Beanstalk、Redis 等。不過,一個佇列連線可以有多個「佇列」,我們可以將這些不同的佇列想成是幾個不同堆疊的佇列任務。

可以注意到範例 queue 設定檔中的各個範例連線設定中都包含了一個 queue 屬性。這個 queue 屬性指定的,就是當我們將任務傳給這個連線時預設會被分派的佇列。換句話說,若我們在分派任務時沒有顯式定義要分派到哪個佇列上,這個任務就會被分派到連線設定中 queue 屬性所定義的佇列上:

1use App\Jobs\ProcessPodcast;
2 
3// This job is sent to the default connection's default queue...
4ProcessPodcast::dispatch();
5 
6// This job is sent to the default connection's "emails" queue...
7ProcessPodcast::dispatch()->onQueue('emails');
1use App\Jobs\ProcessPodcast;
2 
3// This job is sent to the default connection's default queue...
4ProcessPodcast::dispatch();
5 
6// This job is sent to the default connection's "emails" queue...
7ProcessPodcast::dispatch()->onQueue('emails');

有的程式沒有要將任務推送到不同佇列的需求,這些程式只需要有單一佇列就好了。不過,因為 Laravel 的 Queue Worker 可調整各個 Queue 的優先處理等級,因此如果想要調整不同任務的優先處理順序,把任務推送到不同佇列就很有用。就來來說,我們如果把任務推送到 high 佇列,我們就可以執行一個 Worker 來讓這個佇列以更高優先級處理:

1php artisan queue:work --queue=high,default
1php artisan queue:work --queue=high,default

Driver 注意事項與前置需求

Database

若要使用 database 佇列 Driver,我們需要先有一個用來存放任務的資料庫資料表。若要產生一個用於建立這個資料表的 Migration,請執行 queue:table Artisan 指令。建立好 Migration 後,就可以使用 migrate 指令來 Migrate 資料庫:

1php artisan queue:table
2 
3php artisan migrate
1php artisan queue:table
2 
3php artisan migrate

最後,別忘了更新專案 .env 檔中的 QUEUE_CONNECTION 變數來讓專案使用 database Driver:

1QUEUE_CONNECTION=database
1QUEUE_CONNECTION=database

Redis

若要使用 redis 佇列 Driver,請在 config/database.php 設定檔中設定 Redis 資料庫連線。

Redis Cluster

若 Redis 佇列要使用 Redis Cluster,則設定的佇列名稱必須包含一個 Key Hash Tag。必須加上 Key Hash Tag,這樣才能確保給定佇列中所有的 Redis 索引鍵都有被放在相同的 Hash Slot 中:

1'redis' => [
2 'driver' => 'redis',
3 'connection' => 'default',
4 'queue' => '{default}',
5 'retry_after' => 90,
6],
1'redis' => [
2 'driver' => 'redis',
3 'connection' => 'default',
4 'queue' => '{default}',
5 'retry_after' => 90,
6],

Blocking

在使用 Redis 佇列時,可使用 block_for 設定選項來指定 Redis Driver 在迭代 Worker 迴圈並重新讀取 Redis 資料庫來等新 Job 進來時要等待多久。

可依據佇列的負載來調整這個值,以避免不斷讀取 Redis 資料庫來尋找新任務,會比較有效率。舉例來說,我們可以將其設為 5,表示 Redis Driver 在等待新任務出現時應先等待 5 秒再查詢 Redis 資料庫:

1'redis' => [
2 'driver' => 'redis',
3 'connection' => 'default',
4 'queue' => 'default',
5 'retry_after' => 90,
6 'block_for' => 5,
7],
1'redis' => [
2 'driver' => 'redis',
3 'connection' => 'default',
4 'queue' => 'default',
5 'retry_after' => 90,
6 'block_for' => 5,
7],
lightbulb

block_for 設為 0 會導致 Queue Worker 在新 Job 出現前一直 Block。也會導致在處理下一個 Job 前都無法處理如 SIGTERM 等訊號 (Signal)。

其他 Driver 的前置要求

下列 Queue Driver 還需要一些相依性套件。可以使用 Composer 套件管理員來安裝這些相依性套件:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~4.0
  • Redis: predis/predis ~1.0 或 phpredis PHP 擴充套件

建立 Job

產生 Job 類別

預設情況下,專案中所有可放入佇列的任務都存放在 app/Jobs 目錄內。若 app/Jobs 目錄不存在,則執行 make:jobs Artisan 指令時會建立該目錄:

1php artisan make:job ProcessPodcast
1php artisan make:job ProcessPodcast

產生的類別會實作 Illuminate\Contracts\Queue\ShouldQueue 介面,這樣 Laravel 就知道該 Job 要被推入佇列並以非同步方式執行。

lightbulb

可以安裝 Stub 來自訂 Job 的 Stub。

類別架構

Job 類別非常簡單,通常只包含了一個 handle 方法,會在佇列處理 Job 時叫用。要開始使用 Job,我們先來看一個範例 Job 類別。在這個範例中,我們先假裝時我們在管理一個 Podcast 上架服務,我們需要在上架前處理上傳的 Podcast 檔案:

1<?php
2 
3namespace App\Jobs;
4 
5use App\Models\Podcast;
6use App\Services\AudioProcessor;
7use Illuminate\Bus\Queueable;
8use Illuminate\Contracts\Queue\ShouldQueue;
9use Illuminate\Foundation\Bus\Dispatchable;
10use Illuminate\Queue\InteractsWithQueue;
11use Illuminate\Queue\SerializesModels;
12 
13class ProcessPodcast implements ShouldQueue
14{
15 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
16 
17 /**
18 * The podcast instance.
19 *
20 * @var \App\Models\Podcast
21 */
22 protected $podcast;
23 
24 /**
25 * Create a new job instance.
26 *
27 * @param App\Models\Podcast $podcast
28 * @return void
29 */
30 public function __construct(Podcast $podcast)
31 {
32 $this->podcast = $podcast;
33 }
34 
35 /**
36 * Execute the job.
37 *
38 * @param App\Services\AudioProcessor $processor
39 * @return void
40 */
41 public function handle(AudioProcessor $processor)
42 {
43 // Process uploaded podcast...
44 }
45}
1<?php
2 
3namespace App\Jobs;
4 
5use App\Models\Podcast;
6use App\Services\AudioProcessor;
7use Illuminate\Bus\Queueable;
8use Illuminate\Contracts\Queue\ShouldQueue;
9use Illuminate\Foundation\Bus\Dispatchable;
10use Illuminate\Queue\InteractsWithQueue;
11use Illuminate\Queue\SerializesModels;
12 
13class ProcessPodcast implements ShouldQueue
14{
15 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
16 
17 /**
18 * The podcast instance.
19 *
20 * @var \App\Models\Podcast
21 */
22 protected $podcast;
23 
24 /**
25 * Create a new job instance.
26 *
27 * @param App\Models\Podcast $podcast
28 * @return void
29 */
30 public function __construct(Podcast $podcast)
31 {
32 $this->podcast = $podcast;
33 }
34 
35 /**
36 * Execute the job.
37 *
38 * @param App\Services\AudioProcessor $processor
39 * @return void
40 */
41 public function handle(AudioProcessor $processor)
42 {
43 // Process uploaded podcast...
44 }
45}

在這個範例中,可以看到我們直接將 [Eloquent Model] 傳入佇列任務的 Constructor(建構函式) 中。由於該任務有使用 SerializesModels Trait,所以 Eloquent Model 與已載入的關聯 Model 都會被序列化處理,並在處理任務時反序列化。

若佇列任務的 Constructor 中接受 Eloquent Model,則只有 Model 的識別元(Identifier)會被序列化進佇列中。實際要處理任務時,佇列系統會自動從資料庫中重新取得完整的 Model 實體以及已載入的關聯。通過這種序列化 Model 的做法,我們就能縮小傳入佇列 Driver 的任務承載(Payload)

handle 方法的相依性插入

佇列在處理任務時會叫用該任務的 handle 方法。請注意,我們可以在任務的 handle 方法上型別提示(Type-Hint)任何相依性項目。Laravel Service Container 會自動插入這些相依性。

若想完整控制 Container 要如何插入這些相依性到 handle 方法,可使用 Container 的 bindMethod 方法。bindMethod 方法接收一個回呼,該回呼則接收該任務與 Container。我們可以在這個回呼中自行叫用 handle 方法。一般來說,我們應該從 App\Providers\AppServiceProvider Service Providerboot 方法中叫用這個方法:

1use App\Jobs\ProcessPodcast;
2use App\Services\AudioProcessor;
3 
4$this->app->bindMethod([ProcessPodcast::class, 'handle'], function ($job, $app) {
5 return $job->handle($app->make(AudioProcessor::class));
6});
1use App\Jobs\ProcessPodcast;
2use App\Services\AudioProcessor;
3 
4$this->app->bindMethod([ProcessPodcast::class, 'handle'], function ($job, $app) {
5 return $job->handle($app->make(AudioProcessor::class));
6});
lightbulb

二進位資料,如圖片等,應在傳入佇列任務前先使用 base64_encode 函式進行編碼。若未進行編碼,則這些資料在放入佇列時可能無法正確被序列化為 JSON。

佇列中的關聯

由於 Model 上已載入的關聯也會被序列化,因此序列化的任務字串有時候會變得很大。若要防止關聯被序列化,可在設定屬性值時在 Model 上呼叫 withoutRelations 方法。該方法會回傳該 Model 不含已載入關聯的實體:

1/**
2 * Create a new job instance.
3 *
4 * @param \App\Models\Podcast $podcast
5 * @return void
6 */
7public function __construct(Podcast $podcast)
8{
9 $this->podcast = $podcast->withoutRelations();
10}
1/**
2 * Create a new job instance.
3 *
4 * @param \App\Models\Podcast $podcast
5 * @return void
6 */
7public function __construct(Podcast $podcast)
8{
9 $this->podcast = $podcast->withoutRelations();
10}

此外,當任務被反序列化,然後 Model 關聯被從資料庫中重新取出時,這些關聯的資料會以完整的關聯取出。這表示,若在 Model 被任務佇列序列化前有對關聯套用任何查詢條件,在反序列化時,這些條件都不會被套用。因此,若只想處理給定關聯中的一部分,應在佇列任務中重新套用這些查詢條件。

不重複 Job

lightbulb

若要使用不重複任務,則需要使用支援 [Atomic Lock] 的快取 Driver。目前,memcachedredisdynamodbdatabasefilearray 等快取 Driver 有支援 Atomic Lock。此外,不重複任務的條件限制(Constraint)不會被套用到批次任務中的人物上。

有時候,我們可能會想確保某個任務在佇列中一次只能有一個實體。我們可以在 Job 類別上實作 ShouldBeUnique 介面來確保一次只執行一個實體。要實作這個介面,我們需要在 Class 上定義幾個額外的方法:

1<?php
2 
3use Illuminate\Contracts\Queue\ShouldQueue;
4use Illuminate\Contracts\Queue\ShouldBeUnique;
5 
6class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
7{
8 ...
9}
1<?php
2 
3use Illuminate\Contracts\Queue\ShouldQueue;
4use Illuminate\Contracts\Queue\ShouldBeUnique;
5 
6class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
7{
8 ...
9}

在上述範例中,UpdateSearchIndex Job 是不重複(Unique)的。所以,若佇列中已經有該 Job 的另一個實體且尚未執行完畢,就不會再次分派該 Job。

在某些情況下,我們可能會想指定要用來判斷 Job 是否重複的「索引鍵」,或是我們可能會想指定一個逾時時間,讓這個 Job 在執行超過該逾時後就不再判斷是否重複。為此,可在 Job 類別上定義 uniqueIduniqueFor 屬性或方法:

1<?php
2 
3use App\Models\Product;
4use Illuminate\Contracts\Queue\ShouldQueue;
5use Illuminate\Contracts\Queue\ShouldBeUnique;
6 
7class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
8{
9 /**
10 * The product instance.
11 *
12 * @var \App\Product
13 */
14 public $product;
15 
16 /**
17 * The number of seconds after which the job's unique lock will be released.
18 *
19 * @var int
20 */
21 public $uniqueFor = 3600;
22 
23 /**
24 * The unique ID of the job.
25 *
26 * @return string
27 */
28 public function uniqueId()
29 {
30 return $this->product->id;
31 }
32}
1<?php
2 
3use App\Models\Product;
4use Illuminate\Contracts\Queue\ShouldQueue;
5use Illuminate\Contracts\Queue\ShouldBeUnique;
6 
7class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
8{
9 /**
10 * The product instance.
11 *
12 * @var \App\Product
13 */
14 public $product;
15 
16 /**
17 * The number of seconds after which the job's unique lock will be released.
18 *
19 * @var int
20 */
21 public $uniqueFor = 3600;
22 
23 /**
24 * The unique ID of the job.
25 *
26 * @return string
27 */
28 public function uniqueId()
29 {
30 return $this->product->id;
31 }
32}

在上述範例中,UpdateSearchIndex Job 使用 Product ID 來判斷是否重複。因此,若新分派的 Job 有相同的 Product ID,則直到現存 Job 執行完畢前,這個 Job 都會被忽略。此外,若現有的 Job 在一個小時內都未被處理,這個不重複鎖定會被解除,之後若有另一個具相同重複索引鍵的 Job 將可被分派進佇列中。

在開始處理 Job 後仍維持讓 Job 不重複

預設情況下,不重複的 Job 會在執行完成或所有嘗試都失敗後「解除鎖定」。不過,有的情況下,我們可能會想在執行完成前就先解除鎖定 Job。為此,不要在該 Job 上實作 ShouldBeUnique,而是實作 ShouldBeUniqueUntillProcessing Contract:

1<?php
2 
3use App\Models\Product;
4use Illuminate\Contracts\Queue\ShouldQueue;
5use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
6 
7class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
8{
9 // ...
10}
1<?php
2 
3use App\Models\Product;
4use Illuminate\Contracts\Queue\ShouldQueue;
5use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
6 
7class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
8{
9 // ...
10}

不重複 Job 的鎖定

當分派 ShouldBeUnique 時,Laravel 會在幕後使用 uniqueId 索引鍵來取得一個 Lock(鎖定)。若未能取得 Lock,就不會分派該 Job。當 Job 完成處理或所有嘗試都失敗後,就會解除該 Lock。預設情況下,Laravel 會使用預設的快取 Driver 來取得該 Lock。不過,若想使用其他 Driver 來取得 Lock,可定義一個 uniqueVia 方法,並在該方法中回傳要使用的快取 Driver:

1use Illuminate\Support\Facades\Cache;
2 
3class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
4{
5 ...
6 
7 /**
8 * Get the cache driver for the unique job lock.
9 *
10 * @return \Illuminate\Contracts\Cache\Repository
11 */
12 public function uniqueVia()
13 {
14 return Cache::driver('redis');
15 }
16}
1use Illuminate\Support\Facades\Cache;
2 
3class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
4{
5 ...
6 
7 /**
8 * Get the cache driver for the unique job lock.
9 *
10 * @return \Illuminate\Contracts\Cache\Repository
11 */
12 public function uniqueVia()
13 {
14 return Cache::driver('redis');
15 }
16}
lightbulb

若想限制某個 Job 可同時(Concurrent)執行的數量,請使用 WithoutOverlapping Job Middleware 而不是使用 Unique Job。

Job Middleware

使用 Job Middleware 就能讓我們將佇列 Job 包裝在一組自定邏輯內執行,讓我們能減少在各個 Job 內撰寫重複的程式碼。舉例來說,假設有下列這個 handle 方法,該方法會使用 Laravel 的 Redis 頻率限制功能,限制每 5 秒只能處理 1 個 Job:

1use Illuminate\Support\Facades\Redis;
2 
3/**
4 * Execute the job.
5 *
6 * @return void
7 */
8public function handle()
9{
10 Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
11 info('Lock obtained...');
12 
13 // Handle job...
14 }, function () {
15 // Could not obtain lock...
16 
17 return $this->release(5);
18 });
19}
1use Illuminate\Support\Facades\Redis;
2 
3/**
4 * Execute the job.
5 *
6 * @return void
7 */
8public function handle()
9{
10 Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
11 info('Lock obtained...');
12 
13 // Handle job...
14 }, function () {
15 // Could not obtain lock...
16 
17 return $this->release(5);
18 });
19}

雖然我們確實可以這樣寫,但這樣一來 handle 方法的實作就變得很亂,因為我們的程式碼跟 Redis 頻率限制的邏輯混在一起了。此外,這樣的頻率限制邏輯一定也會與其他我們想要作頻率限制的 Job 重複。

我們可以定義一個 Job Middleware 來處理頻率限制,而不用在 handle 方法內處理。Laravel 中沒有預設放置 Job Middleware 的地方,因此我們可以隨意在專案內放置這些 Job Middleware。舉例來說,我們可以把 Middleware 放在 app/Jobs/Middleware 目錄下:

1<?php
2 
3namespace App\Jobs\Middleware;
4 
5use Illuminate\Support\Facades\Redis;
6 
7class RateLimited
8{
9 /**
10 * Process the queued job.
11 *
12 * @param mixed $job
13 * @param callable $next
14 * @return mixed
15 */
16 public function handle($job, $next)
17 {
18 Redis::throttle('key')
19 ->block(0)->allow(1)->every(5)
20 ->then(function () use ($job, $next) {
21 // Lock obtained...
22 
23 $next($job);
24 }, function () use ($job) {
25 // Could not obtain lock...
26 
27 $job->release(5);
28 });
29 }
30}
1<?php
2 
3namespace App\Jobs\Middleware;
4 
5use Illuminate\Support\Facades\Redis;
6 
7class RateLimited
8{
9 /**
10 * Process the queued job.
11 *
12 * @param mixed $job
13 * @param callable $next
14 * @return mixed
15 */
16 public function handle($job, $next)
17 {
18 Redis::throttle('key')
19 ->block(0)->allow(1)->every(5)
20 ->then(function () use ($job, $next) {
21 // Lock obtained...
22 
23 $next($job);
24 }, function () use ($job) {
25 // Could not obtain lock...
26 
27 $job->release(5);
28 });
29 }
30}

就像這樣,跟 Route Middleware 很像,Job Middleware 會收到正在處理的 Job,以及要繼續執行 Job 時要叫用的回呼。

建立好 Job Middleware 後,我們就可以在 Job 的 middleware 方法內將這個 Middleware 附加上去了。make:job 產生的空 Job 不包含 middleware 方法,所以我們需要手動在 Job 類別中新增這個方法:

1use App\Jobs\Middleware\RateLimited;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array
7 */
8public function middleware()
9{
10 return [new RateLimited];
11}
1use App\Jobs\Middleware\RateLimited;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array
7 */
8public function middleware()
9{
10 return [new RateLimited];
11}
lightbulb

Job Middleware 也可以被指派給可放入佇列的 Event Listener、Mailable、Notification 等。

頻率限制

雖然我們已經示範了要如何自行撰寫頻率限制的 Job Middleware。不過,其實 Laravel 有內建用來為 Job 做頻率限制的 Middleware。就跟 Route 的 Rate Limiter 一樣,可以使用 RateLimiter Facade 的 for 方法來定義 Job 的頻率限制。

舉例來說,我們可能會想讓使用者能備份資料,而一般的使用者限制為每小時可備份一次,VIP 使用者則不限次數。若要做這種頻率限制,可以在 AppServiceProvider 中的 boot 方法內定義一個 RateLimiter

1use Illuminate\Cache\RateLimiting\Limit;
2use Illuminate\Support\Facades\RateLimiter;
3 
4/**
5 * Bootstrap any application services.
6 *
7 * @return void
8 */
9public function boot()
10{
11 RateLimiter::for('backups', function ($job) {
12 return $job->user->vipCustomer()
13 ? Limit::none()
14 : Limit::perHour(1)->by($job->user->id);
15 });
16}
1use Illuminate\Cache\RateLimiting\Limit;
2use Illuminate\Support\Facades\RateLimiter;
3 
4/**
5 * Bootstrap any application services.
6 *
7 * @return void
8 */
9public function boot()
10{
11 RateLimiter::for('backups', function ($job) {
12 return $job->user->vipCustomer()
13 ? Limit::none()
14 : Limit::perHour(1)->by($job->user->id);
15 });
16}

在上述範例中,我們定義了一個每小時的頻率限制。除了以小時來定義頻率限制外,也可以使用 perMinute 方法來以分鐘定義頻率限制。此外,我們也可以傳入任意值給頻率限制的 by 方法。傳給 by 的值通常是用來區分不同使用者的:

1return Limit::perMinute(50)->by($job->user->id);
1return Limit::perMinute(50)->by($job->user->id);

定義好頻率限制後,我們就可以使用 Illuminate\Queue\Middleware\RateLimited Middleware 來將這個 Rate Limiter 附加到備份 Job 上。每當這個 Job 超過頻率限制後,這個 Middleware 就會依照頻率限制的間隔,使用適當的延遲時間來將該 Job 放回到佇列中。

1use Illuminate\Queue\Middleware\RateLimited;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array
7 */
8public function middleware()
9{
10 return [new RateLimited('backups')];
11}
1use Illuminate\Queue\Middleware\RateLimited;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array
7 */
8public function middleware()
9{
10 return [new RateLimited('backups')];
11}

將受頻率限制的 Job 放回佇列後,一樣會增加 Job 的 attemps 總數。若有需要可以在 Job 類別上適當地設定 triesmaxExceptions 屬性。或者,也可以使用 retryUntil 方法 來定義不再重新嘗試 Job 的時間。

若不想讓 Job 在遇到頻率限制後重新嘗試,可使用 dontRelease 方法:

1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array
5 */
6public function middleware()
7{
8 return [(new RateLimited('backups'))->dontRelease()];
9}
1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array
5 */
6public function middleware()
7{
8 return [(new RateLimited('backups'))->dontRelease()];
9}
lightbulb

若使用 Redis,可使用 Illuminate\Queue\Middleware\RateLimitedWithRedis Middleware。這個 Middleware 有為 Redis 做最佳化,比起一般基礎的頻率限制 Middleware 來說會更有效率。

避免 Job 重疊

Laravel 隨附了一個 Illuminate\Queue\Middleware\WithoutOverlapping Middleware,可讓我們依照任意索引鍵來避免 Job 重疊。使用這個 Middleware 就能避免同一個資源同時被多個佇列 Job 修改。

舉例來說,假設我們有個佇列任務會負責更新使用者的信用分數,而我們想避免兩個更新相同 User ID 的信用分數 Job 重疊。為此,可在 Job 的 middleware 方法中回傳 WithoutOverlapping Middleware:

1use Illuminate\Queue\Middleware\WithoutOverlapping;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array
7 */
8public function middleware()
9{
10 return [new WithoutOverlapping($this->user->id)];
11}
1use Illuminate\Queue\Middleware\WithoutOverlapping;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array
7 */
8public function middleware()
9{
10 return [new WithoutOverlapping($this->user->id)];
11}

每當有重疊的 Job,這些 Job 都會被重新放到佇列中。我們可以指定一個秒數,讓這些被重新放回佇列的 Job 在重新嘗試前必須等待多久:

1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array
5 */
6public function middleware()
7{
8 return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
9}
1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array
5 */
6public function middleware()
7{
8 return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
9}

若想在 Job 重疊時馬上刪除這些重疊的 Job 來讓這些 Job 不被重試,請使用 dontRelease 方法:

1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array
5 */
6public function middleware()
7{
8 return [(new WithoutOverlapping($this->order->id))->dontRelease()];
9}
1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array
5 */
6public function middleware()
7{
8 return [(new WithoutOverlapping($this->order->id))->dontRelease()];
9}

WithoutOverlapping Middleware 使用 Laravel 的 Atomic Lock 功能提供。有時候,Job 可能會未預期地失敗或逾時,並可能未正確釋放 Lock。因此,我們可以使用 expireAfter 方法來顯式定義一個 Lock 的有效時間。舉例來說,下列範例會讓 Laravel 在 Job 開始處理的 3 分鐘後釋放 WithoutOverlapping Lock:

1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array
5 */
6public function middleware()
7{
8 return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
9}
1/**
2 * Get the middleware the job should pass through.
3 *
4 * @return array
5 */
6public function middleware()
7{
8 return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
9}
lightbulb

若要使用 WithoutOverlapping Middleware,則需要使用支援 [Atomic Lock] 的快取 Driver。目前,memcachedredisdynamodbdatabasefilearray 等快取 Driver 有支援 Atomic Lock。

頻率限制的 Exception

Laravel 中隨附了一個 Illuminate\Queue\Middleware\ThrottlesExceptions Middleware,能讓我們針對 Exception 做頻率限制。每當有 Job 擲回特定數量的 Exception 時,接下來要再次嘗試執行該 Job 前,必須要等待特定的時間過後才能繼續。對於一些使用了不穩定第三方服務的 Job 來說,特別適合使用這個功能。

舉例來說,假設我們有個使用了第三方 API 的佇列 Job,而這個 Job 會擲回 Exception。若要對 Exception 做頻率限制,可以在 Job 的 middleware 方法內回傳 ThrottlesExceptions Middleware。一般來說,這個 Middleware 應放在實作基於時間的 attempts之 Job 內:

1use Illuminate\Queue\Middleware\ThrottlesExceptions;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array
7 */
8public function middleware()
9{
10 return [new ThrottlesExceptions(10, 5)];
11}
12 
13/**
14 * Determine the time at which the job should timeout.
15 *
16 * @return \DateTime
17 */
18public function retryUntil()
19{
20 return now()->addMinutes(5);
21}
1use Illuminate\Queue\Middleware\ThrottlesExceptions;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array
7 */
8public function middleware()
9{
10 return [new ThrottlesExceptions(10, 5)];
11}
12 
13/**
14 * Determine the time at which the job should timeout.
15 *
16 * @return \DateTime
17 */
18public function retryUntil()
19{
20 return now()->addMinutes(5);
21}

Middleware 的第一個 Constructor 引數為 Exception 的數量,當 Job 擲回這個數量的 Exception 後就會被限制執行。第二個引數則是當被限制執行後,在繼續執行之前所要等待的分鐘數。在上述的範例中,若 Job 在 5 分鐘內擲回了 10 個 Exception,則 Laravel 會等待 5 分鐘,然後再繼續嘗試執行該 Job。

當 Job 擲回 Exception,但還未達到所設定的 Exception 閥值,則一般情況下會馬上重試 Job。不過,也可以在講 Middleware 附加到 Job 上時呼叫 backoff 方法來指定一個以分鐘為單位的數字,來指定 Job 所要延遲的時間:

1use Illuminate\Queue\Middleware\ThrottlesExceptions;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array
7 */
8public function middleware()
9{
10 return [(new ThrottlesExceptions(10, 5))->backoff(5)];
11}
1use Illuminate\Queue\Middleware\ThrottlesExceptions;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array
7 */
8public function middleware()
9{
10 return [(new ThrottlesExceptions(10, 5))->backoff(5)];
11}

這個 Middleware 在內部使用了 Laravel 的快取系統來實作頻率限制,並使用了該 Job 的類別名稱來作為快取的「索引鍵」。可以在講 Middleware 附加到 Job 上時呼叫 by 方法來複寫這個索引鍵。當有多個 Job 都使用了同一個第三方服務時,就很適合使用這個方法來讓這些 Job 都共用相同的頻率限制:

1use Illuminate\Queue\Middleware\ThrottlesExceptions;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array
7 */
8public function middleware()
9{
10 return [(new ThrottlesExceptions(10, 10))->by('key')];
11}
1use Illuminate\Queue\Middleware\ThrottlesExceptions;
2 
3/**
4 * Get the middleware the job should pass through.
5 *
6 * @return array
7 */
8public function middleware()
9{
10 return [(new ThrottlesExceptions(10, 10))->by('key')];
11}
lightbulb

若使用 Redis,可使用 Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis Middleware。該 Middleware 有為 Redis 最佳化,因此會比一般的 Exception 頻率限制 Middleware 還要有效率。

分派 Job

寫好 Job 類別後,就可以使用 Job 上的 dispatch 方法來分派該 Job。傳給 dispatch 方法的引數會被傳給 Job 的 Constructor:

1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Http\Controllers\Controller;
6use App\Jobs\ProcessPodcast;
7use App\Models\Podcast;
8use Illuminate\Http\Request;
9 
10class PodcastController extends Controller
11{
12 /**
13 * Store a new podcast.
14 *
15 * @param \Illuminate\Http\Request $request
16 * @return \Illuminate\Http\Response
17 */
18 public function store(Request $request)
19 {
20 $podcast = Podcast::create(...);
21 
22 // ...
23 
24 ProcessPodcast::dispatch($podcast);
25 }
26}
1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Http\Controllers\Controller;
6use App\Jobs\ProcessPodcast;
7use App\Models\Podcast;
8use Illuminate\Http\Request;
9 
10class PodcastController extends Controller
11{
12 /**
13 * Store a new podcast.
14 *
15 * @param \Illuminate\Http\Request $request
16 * @return \Illuminate\Http\Response
17 */
18 public function store(Request $request)
19 {
20 $podcast = Podcast::create(...);
21 
22 // ...
23 
24 ProcessPodcast::dispatch($podcast);
25 }
26}

若想要有條件地分派 Job,可使用 dispatchIf dispatchUnless` 方法:

1ProcessPodcast::dispatchIf($accountActive, $podcast);
2 
3ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
1ProcessPodcast::dispatchIf($accountActive, $podcast);
2 
3ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

延遲分派

若不想讓 Job 馬上被 Queue Worker 處理,可在分派 Job 時使用 delay 方法。舉例來說,我們來指定讓一個 Job 在分派的 10 分鐘後才被開始處理:

1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Http\Controllers\Controller;
6use App\Jobs\ProcessPodcast;
7use App\Models\Podcast;
8use Illuminate\Http\Request;
9 
10class PodcastController extends Controller
11{
12 /**
13 * Store a new podcast.
14 *
15 * @param \Illuminate\Http\Request $request
16 * @return \Illuminate\Http\Response
17 */
18 public function store(Request $request)
19 {
20 $podcast = Podcast::create(...);
21 
22 // ...
23 
24 ProcessPodcast::dispatch($podcast)
25 ->delay(now()->addMinutes(10));
26 }
27}
1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Http\Controllers\Controller;
6use App\Jobs\ProcessPodcast;
7use App\Models\Podcast;
8use Illuminate\Http\Request;
9 
10class PodcastController extends Controller
11{
12 /**
13 * Store a new podcast.
14 *
15 * @param \Illuminate\Http\Request $request
16 * @return \Illuminate\Http\Response
17 */
18 public function store(Request $request)
19 {
20 $podcast = Podcast::create(...);
21 
22 // ...
23 
24 ProcessPodcast::dispatch($podcast)
25 ->delay(now()->addMinutes(10));
26 }
27}
lightbulb

Amazon SQS 佇列服務的延遲時間最多只能為 15 分鐘。

在 Response 被傳送給瀏覽器後才進行分派

dispatchAfterResponse 則是另一個分派 Job 的方法,該方法延遲分派 Job,直到 HTTP Response 被傳回使用者瀏覽器後才開始處理Job。這樣一來,在處理佇列 Job 的同時,使用者就能繼續使用我們的網站。一般來說,這種做法應只用於一些只需花費 1 秒鐘的 Job,如寄送 E-Mail 鄧。由於這些 Job 會在目前的 HTTP Request 中處理,因此使用這種方式分派 Job 就不需要執行 Queue Worker:

1use App\Jobs\SendNotification;
2 
3SendNotification::dispatchAfterResponse();
1use App\Jobs\SendNotification;
2 
3SendNotification::dispatchAfterResponse();

也可以用 dispatch 分派一個閉包,然後在 dispatch 輔助函式後串上一個 afterResponse 方法來在 HTTP Response 被傳送給瀏覽器後執行這個閉包:

1use App\Mail\WelcomeMessage;
2use Illuminate\Support\Facades\Mail;
3 
4dispatch(function () {
5 Mail::to('taylor@example.com')->send(new WelcomeMessage);
6})->afterResponse();
1use App\Mail\WelcomeMessage;
2use Illuminate\Support\Facades\Mail;
3 
4dispatch(function () {
5 Mail::to('taylor@example.com')->send(new WelcomeMessage);
6})->afterResponse();

同步分派

若想馬上分派 Job (即,同步(Synchronous)),則可使用 dispatchSync 方法。在使用這個方法時,所分派的 Job 不會被放入佇列,而會在目前的處理程序中馬上執行:

1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Http\Controllers\Controller;
6use App\Jobs\ProcessPodcast;
7use App\Models\Podcast;
8use Illuminate\Http\Request;
9 
10class PodcastController extends Controller
11{
12 /**
13 * Store a new podcast.
14 *
15 * @param \Illuminate\Http\Request $request
16 * @return \Illuminate\Http\Response
17 */
18 public function store(Request $request)
19 {
20 $podcast = Podcast::create(...);
21 
22 // Create podcast...
23 
24 ProcessPodcast::dispatchSync($podcast);
25 }
26}
1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Http\Controllers\Controller;
6use App\Jobs\ProcessPodcast;
7use App\Models\Podcast;
8use Illuminate\Http\Request;
9 
10class PodcastController extends Controller
11{
12 /**
13 * Store a new podcast.
14 *
15 * @param \Illuminate\Http\Request $request
16 * @return \Illuminate\Http\Response
17 */
18 public function store(Request $request)
19 {
20 $podcast = Podcast::create(...);
21 
22 // Create podcast...
23 
24 ProcessPodcast::dispatchSync($podcast);
25 }
26}

Job 與資料庫 Transaction

雖然,在資料庫 Transaction 中分派 Job 是完全 OK 的,但應特別注意 Job 能否被正確執行。當我們在 Transaction 中分派 Job 後,這個 Job 很有可能會在 Transaction 被 Commit 前就被 Queue Worker 給執行了。這時候,我們在 Transaction 中對 Model 或資料庫記錄所做出的更改都還未反應到資料庫上。而且,在 Transaction 中做建立的 Model 或資料庫記錄也可能還未出現在資料庫中。

幸好,Laravel 提供了數種方法可解決這個狀況。第一種方法,我們可以在 Queue 連線的設定陣列中設定 after_commit 連線選項:

1'redis' => [
2 'driver' => 'redis',
3 // ...
4 'after_commit' => true,
5],
1'redis' => [
2 'driver' => 'redis',
3 // ...
4 'after_commit' => true,
5],

after_commit 設為 true 後,我們就可以在資料庫 Transaction 中分派 Job 了。Laravel 會等到所有資料庫 Transaction 都被 Commit 後才將 Job 分派出去。不過,當然,若目前沒有正在處理的資料庫 Transaction,這個 Job 會馬上被分派。

若因為 Transaction 中發上 Exception 而造成 Transaction 被 Roll Back(回滾),則在這個 Transaction 間所分派的 Job 也會被取消。

lightbulb

after_commit 設定選項設為 true 後,所有放入佇列的 Listener、Maillable、Notification、廣播事件……等都會等待到所有資料庫 Transaciton 都 Commit 後才被分派。

內嵌指定 Commit 的分派行為

若未將 after_commit 佇列連線選項設為 true,則我們還是可以指定讓某個特定的 Job 在所有已開啟的資料庫 Transaction 都被 Commit 後才被分派。若要這麼做,可在分派動作後串上 afterCommit 方法:

1use App\Jobs\ProcessPodcast;
2 
3ProcessPodcast::dispatch($podcast)->afterCommit();
1use App\Jobs\ProcessPodcast;
2 
3ProcessPodcast::dispatch($podcast)->afterCommit();

同樣地,若 after_commit 選項為 true,則我們也可以馬上分派某個特定的 Job,而不等待資料庫 Transaction 的 Commit:

1ProcessPodcast::dispatch($podcast)->beforeCommit();
1ProcessPodcast::dispatch($podcast)->beforeCommit();

Job 的串聯

通過 Job 串聯,我們就可以指定一組佇列 Job 的清單,在主要 Job 執行成功後才依序執行這組 Job。若按照順序執行的其中一個 Job 執行失敗,則剩下的 Job 都將不被執行。若要執行佇列的 Job 串聯,可使用 Bus Facade 中的 chain 方法。Laravel 的 Command Bus(指令匯流排) 是一個低階的原件,佇列 Job 的分派功能就是使用這個原件製作的:

1use App\Jobs\OptimizePodcast;
2use App\Jobs\ProcessPodcast;
3use App\Jobs\ReleasePodcast;
4use Illuminate\Support\Facades\Bus;
5 
6Bus::chain([
7 new ProcessPodcast,
8 new OptimizePodcast,
9 new ReleasePodcast,
10])->dispatch();
1use App\Jobs\OptimizePodcast;
2use App\Jobs\ProcessPodcast;
3use App\Jobs\ReleasePodcast;
4use Illuminate\Support\Facades\Bus;
5 
6Bus::chain([
7 new ProcessPodcast,
8 new OptimizePodcast,
9 new ReleasePodcast,
10])->dispatch();

除了串聯 Job 類別實體,我們也可以串聯閉包:

1Bus::chain([
2 new ProcessPodcast,
3 new OptimizePodcast,
4 function () {
5 Podcast::update(...);
6 },
7])->dispatch();
1Bus::chain([
2 new ProcessPodcast,
3 new OptimizePodcast,
4 function () {
5 Podcast::update(...);
6 },
7])->dispatch();
lightbulb

在 Job 中使用 $this->delete() 方法來刪除 Job 是沒有辦法讓串聯的 Job 不被執行的。只有當串聯中的 Job 失敗時才會停止執行。

串聯的連線與佇列

若想指定串聯 Job 的連線與佇列,則可使用 onConnectiononQueue 方法。除非佇列 Job 有特別指定不同的連線或佇列,否則,這些方法可用來指定要使用的連線名稱與佇列名稱:

1Bus::chain([
2 new ProcessPodcast,
3 new OptimizePodcast,
4 new ReleasePodcast,
5])->onConnection('redis')->onQueue('podcasts')->dispatch();
1Bus::chain([
2 new ProcessPodcast,
3 new OptimizePodcast,
4 new ReleasePodcast,
5])->onConnection('redis')->onQueue('podcasts')->dispatch();

串聯失敗

將 Job 串聯起來後,可使用 catch 方法來指定當串聯中有 Job 失敗時要被叫用的閉包。給定的回呼會收到一個導致 Job 失敗的 Throwable 實體:

1use Illuminate\Support\Facades\Bus;
2use Throwable;
3 
4Bus::chain([
5 new ProcessPodcast,
6 new OptimizePodcast,
7 new ReleasePodcast,
8])->catch(function (Throwable $e) {
9 // A job within the chain has failed...
10})->dispatch();
1use Illuminate\Support\Facades\Bus;
2use Throwable;
3 
4Bus::chain([
5 new ProcessPodcast,
6 new OptimizePodcast,
7 new ReleasePodcast,
8])->catch(function (Throwable $e) {
9 // A job within the chain has failed...
10})->dispatch();

自定佇列與連線

分派至特定的佇列

我們可以將 Job 分門別類放入不同的佇列中,進而分類管理這些 Job,甚至能針對不同佇列設定優先度、指定要有多少個 Worker。不過請記得,放入不同佇列不會將 Job 推送到佇列設定檔中所定義的不同佇列「連線」上,而只會將 Job 推入單一連線中指定的佇列。若要指定佇列,請在分派 Job 時使用 onQueue 方法:

1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Http\Controllers\Controller;
6use App\Jobs\ProcessPodcast;
7use App\Models\Podcast;
8use Illuminate\Http\Request;
9 
10class PodcastController extends Controller
11{
12 /**
13 * Store a new podcast.
14 *
15 * @param \Illuminate\Http\Request $request
16 * @return \Illuminate\Http\Response
17 */
18 public function store(Request $request)
19 {
20 $podcast = Podcast::create(...);
21 
22 // Create podcast...
23 
24 ProcessPodcast::dispatch($podcast)->onQueue('processing');
25 }
26}
1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Http\Controllers\Controller;
6use App\Jobs\ProcessPodcast;
7use App\Models\Podcast;
8use Illuminate\Http\Request;
9 
10class PodcastController extends Controller
11{
12 /**
13 * Store a new podcast.
14 *
15 * @param \Illuminate\Http\Request $request
16 * @return \Illuminate\Http\Response
17 */
18 public function store(Request $request)
19 {
20 $podcast = Podcast::create(...);
21 
22 // Create podcast...
23 
24 ProcessPodcast::dispatch($podcast)->onQueue('processing');
25 }
26}

或者,也可以在 Job 的 Constructor 中呼叫 onQueue 方法來指定 Job 的佇列:

1<?php
2 
3namespace App\Jobs;
4 
5 use Illuminate\Bus\Queueable;
6 use Illuminate\Contracts\Queue\ShouldQueue;
7 use Illuminate\Foundation\Bus\Dispatchable;
8 use Illuminate\Queue\InteractsWithQueue;
9 use Illuminate\Queue\SerializesModels;
10 
11class ProcessPodcast implements ShouldQueue
12{
13 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
14 
15 /**
16 * Create a new job instance.
17 *
18 * @return void
19 */
20 public function __construct()
21 {
22 $this->onQueue('processing');
23 }
24}
1<?php
2 
3namespace App\Jobs;
4 
5 use Illuminate\Bus\Queueable;
6 use Illuminate\Contracts\Queue\ShouldQueue;
7 use Illuminate\Foundation\Bus\Dispatchable;
8 use Illuminate\Queue\InteractsWithQueue;
9 use Illuminate\Queue\SerializesModels;
10 
11class ProcessPodcast implements ShouldQueue
12{
13 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
14 
15 /**
16 * Create a new job instance.
17 *
18 * @return void
19 */
20 public function __construct()
21 {
22 $this->onQueue('processing');
23 }
24}

分派至特定連線

若專案有使用到多個佇列連線,則可以使用 onConnection 方法來指定要將 Job 推送到哪個連線:

1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Http\Controllers\Controller;
6use App\Jobs\ProcessPodcast;
7use App\Models\Podcast;
8use Illuminate\Http\Request;
9 
10class PodcastController extends Controller
11{
12 /**
13 * Store a new podcast.
14 *
15 * @param \Illuminate\Http\Request $request
16 * @return \Illuminate\Http\Response
17 */
18 public function store(Request $request)
19 {
20 $podcast = Podcast::create(...);
21 
22 // Create podcast...
23 
24 ProcessPodcast::dispatch($podcast)->onConnection('sqs');
25 }
26}
1<?php
2 
3namespace App\Http\Controllers;
4 
5use App\Http\Controllers\Controller;
6use App\Jobs\ProcessPodcast;
7use App\Models\Podcast;
8use Illuminate\Http\Request;
9 
10class PodcastController extends Controller
11{
12 /**
13 * Store a new podcast.
14 *
15 * @param \Illuminate\Http\Request $request
16 * @return \Illuminate\Http\Response
17 */
18 public function store(Request $request)
19 {
20 $podcast = Podcast::create(...);
21 
22 // Create podcast...
23 
24 ProcessPodcast::dispatch($podcast)->onConnection('sqs');
25 }
26}

也可以將 onConnectiononQueue 方法串聯在一起來指定 Job 的連線與佇列:

1ProcessPodcast::dispatch($podcast)
2 ->onConnection('sqs')
3 ->onQueue('processing');
1ProcessPodcast::dispatch($podcast)
2 ->onConnection('sqs')
3 ->onQueue('processing');

或者,也可以在 Job 的 Constructor 中呼叫 onConnection 來指定 Job 的連線:

1<?php
2 
3namespace App\Jobs;
4 
5 use Illuminate\Bus\Queueable;
6 use Illuminate\Contracts\Queue\ShouldQueue;
7 use Illuminate\Foundation\Bus\Dispatchable;
8 use Illuminate\Queue\InteractsWithQueue;
9 use Illuminate\Queue\SerializesModels;
10 
11class ProcessPodcast implements ShouldQueue
12{
13 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
14 
15 /**
16 * Create a new job instance.
17 *
18 * @return void
19 */
20 public function __construct()
21 {
22 $this->onConnection('sqs');
23 }
24}
1<?php
2 
3namespace App\Jobs;
4 
5 use Illuminate\Bus\Queueable;
6 use Illuminate\Contracts\Queue\ShouldQueue;
7 use Illuminate\Foundation\Bus\Dispatchable;
8 use Illuminate\Queue\InteractsWithQueue;
9 use Illuminate\Queue\SerializesModels;
10 
11class ProcessPodcast implements ShouldQueue
12{
13 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
14 
15 /**
16 * Create a new job instance.
17 *
18 * @return void
19 */
20 public function __construct()
21 {
22 $this->onConnection('sqs');
23 }
24}

指定最大嘗試次數與逾時

最大嘗試次數

若有某個佇列 Job 遇到錯誤,我們通常不會想讓這個 Job 一直重試。因此,Laravel 提供了多種定義 Job 重試次數的方法。

其中一種指定 Job 最大嘗試次數的方法是在 Artisan 指令列中使用 --tries 開關。使用這種方式指定的嘗試次數會套用到所有該 Worker 處理的 Job,除非 Job 上有特別指定嘗試次數:

1php artisan queue:work --tries=3
1php artisan queue:work --tries=3

若 Job 嘗試了最大嘗試次數,則這個 Job 會被視為是「執行失敗(Failed)」。更多有關處理執行失敗 Job 的資訊,請參考 執行失敗 Job 的說明文件

也可以用另一種更仔細的方法,就是在 Job 類別內定義這個 Job 的最大嘗試次數。若有在 Job 中指定最大嘗試次數,定義在 Job 類別內的次數會比指令列中 --tries 的值擁有更高的優先度:

1<?php
2 
3namespace App\Jobs;
4 
5class ProcessPodcast implements ShouldQueue
6{
7 /**
8 * The number of times the job may be attempted.
9 *
10 * @var int
11 */
12 public $tries = 5;
13}
1<?php
2 
3namespace App\Jobs;
4 
5class ProcessPodcast implements ShouldQueue
6{
7 /**
8 * The number of times the job may be attempted.
9 *
10 * @var int
11 */
12 public $tries = 5;
13}

基於時間的嘗試限制

除了定義 Job 重試多少次要視為失敗以外,也可以限制 Job 嘗試執行的時間長度。這樣一來,在指定的時間範圍內,Job 就可以不斷重試。若要定義最長可重試時間,請在 Job 類別中定義一個 retryUntil 方法。該方法應回傳 DateTime 實體:

1/**
2 * Determine the time at which the job should timeout.
3 *
4 * @return \DateTime
5 */
6public function retryUntil()
7{
8 return now()->addMinutes(10);
9}
1/**
2 * Determine the time at which the job should timeout.
3 *
4 * @return \DateTime
5 */
6public function retryUntil()
7{
8 return now()->addMinutes(10);
9}
lightbulb

也可以在放入佇列的 Event Listener 中定義一個 tries 屬性或 retryUntil 方法。

最大 Exception 數

有時候,我們可能會想讓 Job 可重試多次,但當出現指定數量的未處理 Exception 後,就視為執行失敗 (與直接使用 release 方法釋放 Job 不同)。若要指定未處理 Exception 數量,可在 Job 類別中定義一個 maxExceptions 屬性:

1<?php
2 
3namespace App\Jobs;
4 
5use Illuminate\Support\Facades\Redis;
6 
7class ProcessPodcast implements ShouldQueue
8{
9 /**
10 * The number of times the job may be attempted.
11 *
12 * @var int
13 */
14 public $tries = 25;
15 
16 /**
17 * The maximum number of unhandled exceptions to allow before failing.
18 *
19 * @var int
20 */
21 public $maxExceptions = 3;
22 
23 /**
24 * Execute the job.
25 *
26 * @return void
27 */
28 public function handle()
29 {
30 Redis::throttle('key')->allow(10)->every(60)->then(function () {
31 // Lock obtained, process the podcast...
32 }, function () {
33 // Unable to obtain lock...
34 return $this->release(10);
35 });
36 }
37}
1<?php
2 
3namespace App\Jobs;
4 
5use Illuminate\Support\Facades\Redis;
6 
7class ProcessPodcast implements ShouldQueue
8{
9 /**
10 * The number of times the job may be attempted.
11 *
12 * @var int
13 */
14 public $tries = 25;
15 
16 /**
17 * The maximum number of unhandled exceptions to allow before failing.
18 *
19 * @var int
20 */
21 public $maxExceptions = 3;
22 
23 /**
24 * Execute the job.
25 *
26 * @return void
27 */
28 public function handle()
29 {
30 Redis::throttle('key')->allow(10)->every(60)->then(function () {
31 // Lock obtained, process the podcast...
32 }, function () {
33 // Unable to obtain lock...
34 return $this->release(10);
35 });
36 }
37}

在這個例子中,這個 Job 會在程式無法在 10 秒內取得 Redis Lock 時被釋放,而這個 Job 在此期間最多可嘗試 25 次。不過,若 Job 中有擲回未處理的 Exception,則會被視為是失敗的 Job。

逾時

lightbulb

必須安裝 pcntl PHP 擴充程式才可指定 Job 的逾時。

通常來說,我們知道某個佇列任務大約需要花多少時間執行。因此,在 Laravel 中,我們可以指定一個「逾時」值。若 Job 執行超過逾時值所指定的秒數後,負責處理該 Job 的 Worker 就會以錯誤終止執行。一般來說,Worker 會自動由 Server 上設定的 Process Manager 重新開啟。

可在 Artisan 指令列上使用 --timeout 開關來指定 Job 能執行的最大秒數:

1php artisan queue:work --timeout=30
1php artisan queue:work --timeout=30

若 Job 不斷執行逾時超過其最大重試次數,則該 Job 會被標記為執行失敗。

也可以在 Job 類別中定義該 Job 能執行的最大秒數。若有在 Job 上指定逾時,則在 Job 類別上定義的逾時比在指令列上指定的數字擁有更高的優先度:

1<?php
2 
3namespace App\Jobs;
4 
5class ProcessPodcast implements ShouldQueue
6{
7 /**
8 * The number of seconds the job can run before timing out.
9 *
10 * @var int
11 */
12 public $timeout = 120;
13}
1<?php
2 
3namespace App\Jobs;
4 
5class ProcessPodcast implements ShouldQueue
6{
7 /**
8 * The number of seconds the job can run before timing out.
9 *
10 * @var int
11 */
12 public $timeout = 120;
13}

有時候,如 Socket 或連外 HTTP 連線等的 IO Blocking Process 可能不適用所指定的逾時設定。因此,若有使用到這些功能,也請在這些功能的 API 上指定逾時。舉例來說,若使用 Guzzle,則可像這樣指定連線與 Request 的逾時值:

逾時後視為失敗

若想讓 Job 在逾時後被標記為執行失敗,可在 Job 類別上定義 $failOnTimeout 屬性:

1/**
2 * Indicate if the job should be marked as failed on timeout.
3 *
4 * @var bool
5 */
6public $failOnTimeout = true;
1/**
2 * Indicate if the job should be marked as failed on timeout.
3 *
4 * @var bool
5 */
6public $failOnTimeout = true;

錯誤處理

若在處理 Job 時有擲回 Exception,則這個 Job 會被自動釋放回佇列中,好讓這個 Job 能被重新嘗試。被釋放會佇列的 Job 會繼續被重試,直到重試次數達到專案上所設定的最大次數。最大重試次數可使用 queue:work Artisan 指令上的 --tries 開關來定義。或者,也可以在 Job 類別上定義最大重試次數。更多有關如何執行 Queue Worker 的資訊可在本文後方找到

手動釋放 Job

有的時候,我們可能會想手動將 Job 釋放會佇列中,好讓這個 Job 能在稍後重試。若要手動釋放 Job,可以呼叫 release 方法:

1/**
2 * Execute the job.
3 *
4 * @return void
5 */
6public function handle()
7{
8 // ...
9 
10 $this->release();
11}
1/**
2 * Execute the job.
3 *
4 * @return void
5 */
6public function handle()
7{
8 // ...
9 
10 $this->release();
11}

預設情況下,release 方法會將 Job 釋放會佇列中並立即處理。不過,若傳入一個整數給 release 方法,就可以指定讓佇列等待給定秒數後才開始處理該 Job:

1$this->release(10);
1$this->release(10);

手動讓 Job 失敗

有時候,我們可能需要手動將 Job 標記為「失敗」。若要手動將 Job 標記為失敗,可呼叫 fail 方法:

1/**
2 * Execute the job.
3 *
4 * @return void
5 */
6public function handle()
7{
8 // ...
9 
10 $this->fail();
11}
1/**
2 * Execute the job.
3 *
4 * @return void
5 */
6public function handle()
7{
8 // ...
9 
10 $this->fail();
11}

若要在 Catch 到 Exception 時將 Job 標記為失敗,可將這個 Exception 傳給 fail 方法:

1$this->fail($exception);
1$this->fail($exception);
lightbulb

有關失敗 Job 的更多資訊,請參考有關處理失敗 Job 的說明文件

批次 Job

使用 Laravel 的批次 Job 功能,就可以輕鬆地批次執行多個 Job,並在批次 Job 執行完成後進行一些動作。在開始使用批次 Job 之前,我們需要先建立一個資料庫 Migration,以建立用來保存有關批次 Job 詮釋資訊(Meta Information)的資料表,如批次 Job 的完成度等。可以使用 queue:batches-table Artisan 指令來建立這個 Migration:

1php artisan queue:batches-table
2 
3php artisan migrate
1php artisan queue:batches-table
2 
3php artisan migrate

定義可批次處理的 Job

若要定義可批次處理的 Job,請先像平常一樣建立可放入佇列的 Job。不過,我們還需要在這個 Job 類別中加上 Illuminate\Bus\Batchable Trait。這個 Trait 提供了一個 batch 方法,可使用該方法來取得該 Job 所在的批次:

1<?php
2 
3namespace App\Jobs;
4 
5use Illuminate\Bus\Batchable;
6use Illuminate\Bus\Queueable;
7use Illuminate\Contracts\Queue\ShouldQueue;
8use Illuminate\Foundation\Bus\Dispatchable;
9use Illuminate\Queue\InteractsWithQueue;
10use Illuminate\Queue\SerializesModels;
11 
12class ImportCsv implements ShouldQueue
13{
14 use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
15 
16 /**
17 * Execute the job.
18 *
19 * @return void
20 */
21 public function handle()
22 {
23 if ($this->batch()->cancelled()) {
24 // Determine if the batch has been cancelled...
25 
26 return;
27 }
28 
29 // Import a portion of the CSV file...
30 }
31}
1<?php
2 
3namespace App\Jobs;
4 
5use Illuminate\Bus\Batchable;
6use Illuminate\Bus\Queueable;
7use Illuminate\Contracts\Queue\ShouldQueue;
8use Illuminate\Foundation\Bus\Dispatchable;
9use Illuminate\Queue\InteractsWithQueue;
10use Illuminate\Queue\SerializesModels;
11 
12class ImportCsv implements ShouldQueue
13{
14 use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
15 
16 /**
17 * Execute the job.
18 *
19 * @return void
20 */
21 public function handle()
22 {
23 if ($this->batch()->cancelled()) {
24 // Determine if the batch has been cancelled...
25 
26 return;
27 }
28 
29 // Import a portion of the CSV file...
30 }
31}

分派批次

若要分派一批次的 Job,可使用 Bus Facade 的 batch 方法。當然,批次功能與完成回呼一起使用時是最有用。因此,可以使用 then, catchfinally 方法來為該批次定義完成回呼。這些回呼都會在被叫用時收到 Illuminate\Bus\Batch 實體。在這個範例中,我們先假設我們正在處理一批次的任務,用來在 CSV 檔中處理給定數量的行:

1use App\Jobs\ImportCsv;
2use Illuminate\Bus\Batch;
3use Illuminate\Support\Facades\Bus;
4use Throwable;
5 
6$batch = Bus::batch([
7 new ImportCsv(1, 100),
8 new ImportCsv(101, 200),
9 new ImportCsv(201, 300),
10 new ImportCsv(301, 400),
11 new ImportCsv(401, 500),
12])->then(function (Batch $batch) {
13 // All jobs completed successfully...
14})->catch(function (Batch $batch, Throwable $e) {
15 // First batch job failure detected...
16})->finally(function (Batch $batch) {
17 // The batch has finished executing...
18})->dispatch();
19 
20return $batch->id;
1use App\Jobs\ImportCsv;
2use Illuminate\Bus\Batch;
3use Illuminate\Support\Facades\Bus;
4use Throwable;
5 
6$batch = Bus::batch([
7 new ImportCsv(1, 100),
8 new ImportCsv(101, 200),
9 new ImportCsv(201, 300),
10 new ImportCsv(301, 400),
11 new ImportCsv(401, 500),
12])->then(function (Batch $batch) {
13 // All jobs completed successfully...
14})->catch(function (Batch $batch, Throwable $e) {
15 // First batch job failure detected...
16})->finally(function (Batch $batch) {
17 // The batch has finished executing...
18})->dispatch();
19 
20return $batch->id;

可使用 $batch->id 屬性來取得該批次的 ID。在該批次被分派後,可使用這個 ID 來向 Laravel 的 Command Bus 查詢有關該批次的資訊。

lightbulb

由於批次的回呼會被序列化並在稍後由 Laravel 的佇列執行,因此請不要在回呼中使用 $this 變數。

為批次命名

若為批次命名,則一些像是 Laravel Horizon 與 Laravel Telescope 之類的工具就可為該批次提供對使用者更友善的偵錯資訊。若要為批次指定任意名稱,可在定義批次時呼叫 name 方法:

1$batch = Bus::batch([
2 // ...
3])->then(function (Batch $batch) {
4 // All jobs completed successfully...
5})->name('Import CSV')->dispatch();
1$batch = Bus::batch([
2 // ...
3])->then(function (Batch $batch) {
4 // All jobs completed successfully...
5})->name('Import CSV')->dispatch();

批次的連線與佇列

若想指定批次 Job 的連線與佇列,可使用 onConnectiononQueue 方法。所有的批次 Job 都必須要相同的連線與佇列中執行:

1$batch = Bus::batch([
2 // ...
3])->then(function (Batch $batch) {
4 // All jobs completed successfully...
5})->onConnection('redis')->onQueue('imports')->dispatch();
1$batch = Bus::batch([
2 // ...
3])->then(function (Batch $batch) {
4 // All jobs completed successfully...
5})->onConnection('redis')->onQueue('imports')->dispatch();

在批次中串聯

只要將串聯的 Job 放在陣列中,就可以在批次中定義一組串聯的 Job。舉例來說,我們可以平行執行兩個 Job 串聯,並在這兩個 Job 串聯都處理完畢後執行回呼:

1use App\Jobs\ReleasePodcast;
2use App\Jobs\SendPodcastReleaseNotification;
3use Illuminate\Bus\Batch;
4use Illuminate\Support\Facades\Bus;
5 
6Bus::batch([
7 [
8 new ReleasePodcast(1),
9 new SendPodcastReleaseNotification(1),
10 ],
11 [
12 new ReleasePodcast(2),
13 new SendPodcastReleaseNotification(2),
14 ],
15])->then(function (Batch $batch) {
16 // ...
17})->dispatch();
1use App\Jobs\ReleasePodcast;
2use App\Jobs\SendPodcastReleaseNotification;
3use Illuminate\Bus\Batch;
4use Illuminate\Support\Facades\Bus;
5 
6Bus::batch([
7 [
8 new ReleasePodcast(1),
9 new SendPodcastReleaseNotification(1),
10 ],
11 [
12 new ReleasePodcast(2),
13 new SendPodcastReleaseNotification(2),
14 ],
15])->then(function (Batch $batch) {
16 // ...
17})->dispatch();

將 Job 加入批次

有時候,若能在批次 Job 中新增其他額外的 Job 會很實用。特別是當我們要在一個 Web Request 中批次處理數千筆 Job 時,會讓 Job 的分派過程變得很耗時。因此,比起直接分派數千筆 Job,我們可以先分派一個初始化的批次,用來作為 Job 的「載入程式」,然後讓這個載入程式再向批次內填入更多的 Job:

1$batch = Bus::batch([
2 new LoadImportBatch,
3 new LoadImportBatch,
4 new LoadImportBatch,
5])->then(function (Batch $batch) {
6 // All jobs completed successfully...
7})->name('Import Contacts')->dispatch();
1$batch = Bus::batch([
2 new LoadImportBatch,
3 new LoadImportBatch,
4 new LoadImportBatch,
5])->then(function (Batch $batch) {
6 // All jobs completed successfully...
7})->name('Import Contacts')->dispatch();

在這個例子中,我們可以使用 LoadImportBatch Job 來填入其他額外的 Job。若要填入其他 Job,我們可以使用批次實體上的 add 方法。批次實體可使用 Job 的 batch 方法來取得:

1use App\Jobs\ImportContacts;
2use Illuminate\Support\Collection;
3 
4/**
5 * Execute the job.
6 *
7 * @return void
8 */
9public function handle()
10{
11 if ($this->batch()->cancelled()) {
12 return;
13 }
14 
15 $this->batch()->add(Collection::times(1000, function () {
16 return new ImportContacts;
17 }));
18}
1use App\Jobs\ImportContacts;
2use Illuminate\Support\Collection;
3 
4/**
5 * Execute the job.
6 *
7 * @return void
8 */
9public function handle()
10{
11 if ($this->batch()->cancelled()) {
12 return;
13 }
14 
15 $this->batch()->add(Collection::times(1000, function () {
16 return new ImportContacts;
17 }));
18}
lightbulb

我們只能向目前 Job 正在執行的批次新增 Job。

檢查批次

提供給批次處理完成回呼的 Illuminate\Bus\Batch 實體有許多的屬性與方法,可以讓我們處理與取得給定 Job 批次的資訊:

1// The UUID of the batch...
2$batch->id;
3 
4// The name of the batch (if applicable)...
5$batch->name;
6 
7// The number of jobs assigned to the batch...
8$batch->totalJobs;
9 
10// The number of jobs that have not been processed by the queue...
11$batch->pendingJobs;
12 
13// The number of jobs that have failed...
14$batch->failedJobs;
15 
16// The number of jobs that have been processed thus far...
17$batch->processedJobs();
18 
19// The completion percentage of the batch (0-100)...
20$batch->progress();
21 
22// Indicates if the batch has finished executing...
23$batch->finished();
24 
25// Cancel the execution of the batch...
26$batch->cancel();
27 
28// Indicates if the batch has been cancelled...
29$batch->cancelled();
1// The UUID of the batch...
2$batch->id;
3 
4// The name of the batch (if applicable)...
5$batch->name;
6 
7// The number of jobs assigned to the batch...
8$batch->totalJobs;
9 
10// The number of jobs that have not been processed by the queue...
11$batch->pendingJobs;
12 
13// The number of jobs that have failed...
14$batch->failedJobs;
15 
16// The number of jobs that have been processed thus far...
17$batch->processedJobs();
18 
19// The completion percentage of the batch (0-100)...
20$batch->progress();
21 
22// Indicates if the batch has finished executing...
23$batch->finished();
24 
25// Cancel the execution of the batch...
26$batch->cancel();
27 
28// Indicates if the batch has been cancelled...
29$batch->cancelled();

從 Route 上回傳批次

所有的 Illuminate\Bus\Batch 實體都可被序列化為 JSON,因此我們可以直接在專案的 Route 中回傳批次實體來取得有關該批次資訊的 JSON Payload,其中也包含該批次的完成度。如此一來,我們就能方便地在專案的 UI 上顯示該批次完成度的資訊。

若要使用 ID 來取得批次,可使用 Bus Facade 的 findBatch 方法:

1use Illuminate\Support\Facades\Bus;
2use Illuminate\Support\Facades\Route;
3 
4Route::get('/batch/{batchId}', function (string $batchId) {
5 return Bus::findBatch($batchId);
6});
1use Illuminate\Support\Facades\Bus;
2use Illuminate\Support\Facades\Route;
3 
4Route::get('/batch/{batchId}', function (string $batchId) {
5 return Bus::findBatch($batchId);
6});

取消批次

有時候,我們會需要取消給定批次的執行。若要取消執行批次,可在 Illuminate\Bus\Batch 實體上呼叫 cancel 方法:

1/**
2 * Execute the job.
3 *
4 * @return void
5 */
6public function handle()
7{
8 if ($this->user->exceedsImportLimit()) {
9 return $this->batch()->cancel();
10 }
11 
12 if ($this->batch()->cancelled()) {
13 return;
14 }
15}
1/**
2 * Execute the job.
3 *
4 * @return void
5 */
6public function handle()
7{
8 if ($this->user->exceedsImportLimit()) {
9 return $this->batch()->cancel();
10 }
11 
12 if ($this->batch()->cancelled()) {
13 return;
14 }
15}

雖然讀者可能已經在前面的範例中注意到了,不過,批次 Job 一般都應在其 handle 方法的最前方檢查該批次是否已被取消:

1/**
2 * Execute the job.
3 *
4 * @return void
5 */
6public function handle()
7{
8 if ($this->batch()->cancelled()) {
9 return;
10 }
11 
12 // Continue processing...
13}
1/**
2 * Execute the job.
3 *
4 * @return void
5 */
6public function handle()
7{
8 if ($this->batch()->cancelled()) {
9 return;
10 }
11 
12 // Continue processing...
13}

批次失敗

若批次中的 Job 執行失敗,則會叫用 catch 回呼 (若有指定的話)。只有在批次中第一個失敗的 Job 才會叫用該回呼。

允許失敗

若在批次中的 Job 執行失敗,Laravel 會自動將該批次標記為「已取消(Cancelled)」。若有需要的話,我們可以禁用這個行為,好讓 Job 失敗是不要自動將批次標記為取消。若要禁用此行為,可在分派批次時呼叫 allowFailures 方法:

1$batch = Bus::batch([
2 // ...
3])->then(function (Batch $batch) {
4 // All jobs completed successfully...
5})->allowFailures()->dispatch();
1$batch = Bus::batch([
2 // ...
3])->then(function (Batch $batch) {
4 // All jobs completed successfully...
5})->allowFailures()->dispatch();

重試失敗的批次 Job

Laravel 提供了一個方便的 queue:retry-batch Artisan 指令,能讓我們輕鬆重試給定批次中所有失敗的 Job。queue:retry-batch 指令的參數為要重試 Job 之批次的 UUID:

1php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
1php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

修建批次

若未修建(Prune)批次,則 job_batches 資料表很快就會變得很大。為了避免這個狀況,應定期每日執行 queue:prune-batches Artisan 指令:

1$schedule->command('queue:prune-batches')->daily();
1$schedule->command('queue:prune-batches')->daily();

預設情況下,完成超過 24 小時的批次會被修建掉。可以在呼叫該指令時使用 hours 選項來指定批次資料要保留多久。舉例來說,下列指令會刪除完成超過 48 小時前的所有批次:

1$schedule->command('queue:prune-batches --hours=48')->daily();
1$schedule->command('queue:prune-batches --hours=48')->daily();

有時候,jobs_batches 資料表可能會有一些從未成功完成的批次記錄,如批次中有 Job 失敗且從每次嘗試都失敗的批次。可以在 queue:prune-batxhes 指令的使用 unfinished 選項來修剪這些未完成的批次:

1$schedule->command('queue:prune-batches --hours=48 --unfinished=72')->daily();
1$schedule->command('queue:prune-batches --hours=48 --unfinished=72')->daily();

將閉包放入佇列

除了將 Job 類別分派進佇列外,我們也可以分派閉包。分派閉包對於一些要在目前 Request 週期外執行的快速、簡單任務來說很好用。把閉包放入佇列時,該閉包的程式碼內容會以密碼學的方式進行簽署,以避免其程式碼在傳輸過程中遭到篡改:

1$podcast = App\Podcast::find(1);
2 
3dispatch(function () use ($podcast) {
4 $podcast->publish();
5});
1$podcast = App\Podcast::find(1);
2 
3dispatch(function () use ($podcast) {
4 $podcast->publish();
5});

使用 catch 方法,就能為佇列閉包提供一組要在所有重試次數都失敗的時候執行的閉包:

1use Throwable;
2 
3dispatch(function () use ($podcast) {
4 $podcast->publish();
5})->catch(function (Throwable $e) {
6 // This job has failed...
7});
1use Throwable;
2 
3dispatch(function () use ($podcast) {
4 $podcast->publish();
5})->catch(function (Throwable $e) {
6 // This job has failed...
7});

執行 Queue Worker

使用 queue:work 指令

Laravel 中隨附了一個 Artisan 指令,可用來開啟 Queue Worker(佇列背景工作角色),以在 Job 被推入佇列後處理這些 Job。可以使用 queue:work Artisan 指令來執行 Queue Worker。請注意,當執行 queue:work 指令後,該指令會持續執行,直到我們手動停止該指令或關閉終端機為止:

1php artisan queue:work
1php artisan queue:work
lightbulb

若要讓 queue:work 處理程序在背景持續執行,請使用如 Supervisor 等的 Process Monitor(處理程序監看程式),以確保 Queue Worker 持續執行。

請記得,Queue Worker 是會持續執行的處理程序,且會將已開啟的程式狀態保存在記憶體中。因此,Queue Worker 開始執行後若有更改程式碼,這些 Worker 將不會知道有這些修改。所以,在部署過程中,請確保有重新啟動 Queue Worker。此外,也請注意,在各個 Job 間,也不會自動重設程式所建立或修改的任何靜態狀態(Static State)

或者,我們也可以執行 queue:listen 指令。使用 queue:listen 指令時,若有更新程式碼或重設程式的狀態,就不需手動重新啟動 Queue Worker。不過,這個指令比起 queue:work 指令來說比較沒有效率:

1php artisan queue:listen
1php artisan queue:listen

執行多個 Queue Worker

若要指派多個 Worker 給某個 Queue 並同時處理多個 Job,只需要啟動多個 queue:work 處理程序即可。若要啟動多個 queue:work,在本機上,我們可以開啟多個終端機分頁來執行;若是在正是環境上,則可以使用 Process Manager 的設定來啟動多個 queue:work使用 Supervisor 時,可使用 numprocs 設定值。

指定連線與佇列

也可以指定 Worker 要使用的佇列連線。傳給 work 指令的連線名稱應對影到 config/queue.php 設定檔中所定義的其中一個連線:

1php artisan queue:work redis
1php artisan queue:work redis

預設情況下,queue:work 指令擲回處理給定連線上預設佇列的 Job。不過,我們也可以自定 Queue Worker,以處理給定連線上的特定佇列。舉例來說,若我們把所有的電子郵件都放在 redis 連線的 emails 佇列中執行,則我們可以執行下列指令來啟動一個處理該佇列的 Worker:

1php artisan queue:work redis --queue=emails
1php artisan queue:work redis --queue=emails

處理指定數量的 Job

可使用 --once 選項來讓 Worker 一次只處理佇列中的一個 Job:

1php artisan queue:work --once
1php artisan queue:work --once

可使用 --max-jobs 選項來讓 Worker 只處理特定數量的 Job,然後就終止執行。該選項適合與 Supervisor 搭配使用,這樣我們就能讓 Worker 在處理特定數量的 Job 後自動重新執行,以釋出該 Worker 所積累的記憶體:

1php artisan queue:work --max-jobs=1000
1php artisan queue:work --max-jobs=1000

處理所有放入佇列的 Job 然後終止執行

可使用 --stop-when-empty 選項來讓 Worker 處理所有的 Job 然後終止執行。在 Docker Container 中處理 Laravel 佇列時,若在佇列為空時停止關閉 Container,就適合使用該選項:

1php artisan queue:work --stop-when-empty
1php artisan queue:work --stop-when-empty

在給定秒數內處理 Job

--max-time 選項可用來讓 Worker 處理給定秒數的 Job,然後終止執行。該選項是何與 Supervisor 搭配使用,以在處理 Job 給定時間後自動重新啟動 Worker,並釋放期間可能積累的記憶體:

1// Process jobs for one hour and then exit...
2php artisan queue:work --max-time=3600
1// Process jobs for one hour and then exit...
2php artisan queue:work --max-time=3600

Worker 的休眠期間

若佇列中有 Job,則 Worker 會不間斷地處理這些 Job。不過,使用 sleep 選項可用來讓 Worker 判斷當沒有新 Job 時要「休眠」多少秒。在休眠期間,Worker 不會處理任何新的 Job。當 Worker 喚醒後,才會開始處理這些 Job。

1php artisan queue:work --sleep=3
1php artisan queue:work --sleep=3

資源上的考量

Daemon 型的 Queue Worker 並不會在每個 Job 處理後「重新啟動」Laravel。因此,在每個 Job 處理完畢後,請務必釋放任何吃資源的功能。舉例來說,若我們使用了 GD 函式庫來進行圖片處理,則應在處理完圖片後使用 imagedestroy 來釋放記憶體。

佇列的優先度

有時候,我們可能會向調整各個佇列的處理優先度。舉例來說,在 config/queue.php 設定檔中,我們可以把 redis 連線上的預設 queue 設為 low (低)。不過,有時候,我們可能會想像這樣把 Job 推入 high (高) 優先度的佇列:

1dispatch((new Job)->onQueue('high'));
1dispatch((new Job)->onQueue('high'));

若要啟動 Worker 以驗證是否所有 high 佇列上的 Job 都比 low 佇列上的 Job 還要早被處理,只需要傳入一組以逗號分隔的佇列名稱列表給 work 指令即可:

1php artisan queue:work --queue=high,low
1php artisan queue:work --queue=high,low

Queue Worker 與部署

由於 Queue Worker 時持續執行的處理程序,因此除非重啟啟動 Queue Worker,否則 Queue Worker 不會知道程式碼有被修改過。要部署有使用 Queue Worker 的專案,最簡單的做法就是在部署過程中重新啟動 Queue Worker。我們可以執行 queue:restart 指令來重新啟動所有的 Worker:

1php artisan queue:restart
1php artisan queue:restart

該指令會通知所有的 Queue Worker,讓所有的 Worker 在處理完目前 Job 且在現有 Job 不遺失的情況下終止執行 Worker。由於 Queue Worker 會在 queue:restart 指令執行後終止執行,因此請務必使用如 Supervisor 這樣的 Process Manager 來自動重新啟動 Queue Worker。

lightbulb

佇列會使用快取來儲存重新啟動訊號,因此在使用此功能前請先確認專案上是否有設定好正確的快取 Driver。

Job 的有效期限與逾時

Job 的有效期限

config/queue.php 設定檔中,每個佇列連線都有定義一個 retry_after 選項。這個選項用來指定在重新嘗試目前處理的 Job 前需要等待多少秒。舉例來說,若 retry_after 設為 90,則若某個 Job 已被處理 90 秒,且期間沒有被釋放或刪除,則該 Job 會被釋放回佇列中。一般來說,應將 retry_after 的值設為 Job 在合理情況下要完成執行所需的最大秒數。

lightbulb

唯一一個不含 retry_after 值的佇列連線是 Amazon SQS。SQS 會使用預設的 Visibility Timeout 來重試 Job。Visibility Timeout 的值由 AWS Console 中控制。

Worker 的逾時

queue:work Aritsan 指令有一個 --timeout 選項。若 Job 執行超過逾時值所指定的秒數後,負責處理該 Job 的 Worker 就會以錯誤終止執行。一般來說,Worker 會自動由 Server 上設定的 Process Manager 重新開啟:

1php artisan queue:work --timeout=60
1php artisan queue:work --timeout=60

雖然 retry_after 設定選項與 --timeout CLI 選項並不相同,不過這兩個選項會互相配合使用,以確保 Job 不遺失,且 Job 只會成功執行一次。

lightbulb

--timeout 的值必須至少比 retry_after 設定選項短個幾秒,以確保 Worker 在處理到當掉的 Job 時會在重試 Job 前先終止該 Job。若 --timeout 選項比 retry_after 設定值還要長的話,則 Job 就有可能會被處理兩次。

Supervisor 設定

在正式環境中,我們會需要一種能讓 queue:work 處理程序持續執行的方法。queue:work 可能會因為各種原因而停止執行,如 Worker 執行達到逾時值,或是在執行了 queue:restart 指令後等。

因此,我們需要設定一個能偵測到 queue:work 處理程序終止執行,並能自動重新啟動這些 Worker 的 Process Monitor。此外,使用 Process Monitor 還能讓我們指定要同時執行多少個 queue:work 處理程序。Supervisor 時一個常見用於 Linux 環境的 Process Monitor,在本文中接下來的部分我們會來看看要如何設定 Supervisor。

安裝 Supervisor

Supervisor 是一個用於 Linux 作業系統的 Process Monitor,使用 Supervisor 就可以在 queue:work 處理程序執行失敗時自動重新啟動。若要在 Ubuntu 上安裝 Supervisor,可使用下列指令:

1sudo apt-get install supervisor
1sudo apt-get install supervisor
lightbulb

如果你覺得要設定並管理 Supervisor 太難、太複雜的話,可以考慮使用 Laravel Forge。Laravel Forge 會幫你在 Laravel 專案的正式環境上自動安裝並設定 Supervisor。

設定 Supervisor

Supervisor 設定檔一般都存放在 /etc/supervisor/conf.d 目錄下。在該目錄中,我們可以建立任意數量的設定檔,以告訴 Supervisor 要如何監看這些處理程序。舉例來說,我們先建立一個用於啟動並監看 queue:work 處理程序的 laravel-worker.conf 檔案:

1[program:laravel-worker]
2process_name=%(program_name)s_%(process_num)02d
3command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
4autostart=true
5autorestart=true
6stopasgroup=true
7killasgroup=true
8user=forge
9numprocs=8
10redirect_stderr=true
11stdout_logfile=/home/forge/app.com/worker.log
12stopwaitsecs=3600
1[program:laravel-worker]
2process_name=%(program_name)s_%(process_num)02d
3command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
4autostart=true
5autorestart=true
6stopasgroup=true
7killasgroup=true
8user=forge
9numprocs=8
10redirect_stderr=true
11stdout_logfile=/home/forge/app.com/worker.log
12stopwaitsecs=3600

在這個範例中,numprocs 指示詞用於告訴 Supervisor 要執行 8 個 queue:work 處理程序,並監看這 8 個處理程序,然後當這些處理程序執行失敗時自動重新啟動。我們可以更改該設定檔中的 command 指示詞,以調整為所需的佇列連線與 Worker 選項。

lightbulb

請務必確保 stopwaitsecs 值比花費時間最多的 Job 所需執行的秒數還要大。若該值設定不對,可能會讓 Supervisor 在 Job 處理完成前就終止該 Job。

開啟 Supervisor

建立好設定檔後,我們就可以使用下列指令更新 Supervisor 設定並開啟我們設定好的處理程序:

1sudo supervisorctl reread
2 
3sudo supervisorctl update
4 
5sudo supervisorctl start laravel-worker:*
1sudo supervisorctl reread
2 
3sudo supervisorctl update
4 
5sudo supervisorctl start laravel-worker:*

更多有關 Supervisor 的資訊,請參考 Supervisor 的說明文件

處理失敗的 Job

有時候,放入佇列的 Job 可能會執行失敗。請別擔心,計劃永遠趕不上變化!Laravel 中內建了一個方便的方法,可用來指定 Job 要重試的最大次數。若某個 Job 已經達到重試次數的最大限制,則該 Job 就會被插入 failed_jobs 資料庫資料表中。當然,若沒有這個資料表的話,我們需要先建立該資料表。若要為 failed_jobs 資料表建立 Migration,可使用 queue:failed-tables 指令:

1php artisan queue:failed-table
2 
3php artisan migrate
1php artisan queue:failed-table
2 
3php artisan migrate

在執行 [Queue Worker] 處理程序時,我們可以使用 queue:work 指令上的 --tries 開關來指定某個 Job 所要嘗試執行的最大次數。若為指定 --tries 選項的值,則 Job 就只會嘗試執行一次,或是依照 Job 類別中 $tries 屬性所設定的值作為最大嘗試次數:

1php artisan queue:work redis --tries=3
1php artisan queue:work redis --tries=3

使用 --backoff 選項,就可指定當 Job 遇到 Exception 時,Laravel 要等待多少秒才重新嘗試該 Job。預設情況下,Job 會馬上被釋放回佇列中,以便重新嘗試該 Job:

1php artisan queue:work redis --tries=3 --backoff=3
1php artisan queue:work redis --tries=3 --backoff=3

若想以 Job 為單位來設定當 Job 遇到 Exception 時 Laravel 要等待多少秒才重新嘗試該 Job,則可在 Job 類別上定義 backoff 屬性:

1/**
2 * The number of seconds to wait before retrying the job.
3 *
4 * @var int
5 */
6public $backoff = 3;
1/**
2 * The number of seconds to wait before retrying the job.
3 *
4 * @var int
5 */
6public $backoff = 3;

若需要使用更複雜的邏輯來判斷 Job 的 Backoff 時間,可在 Job 類別上定義 backoff 方法:

1/**
2* Calculate the number of seconds to wait before retrying the job.
3*
4* @return int
5*/
6public function backoff()
7{
8 return 3;
9}
1/**
2* Calculate the number of seconds to wait before retrying the job.
3*
4* @return int
5*/
6public function backoff()
7{
8 return 3;
9}

只要在 backoff 方法中回傳一組包含 Backoff 值的陣列,就能輕鬆地設定「指數級」的 Backoff。在這個例子中,第一次重試的延遲為 1 秒,第二次重試為 5 秒,第三次重試為 10 秒:

1/**
2* Calculate the number of seconds to wait before retrying the job.
3*
4* @return array
5*/
6public function backoff()
7{
8 return [1, 5, 10];
9}
1/**
2* Calculate the number of seconds to wait before retrying the job.
3*
4* @return array
5*/
6public function backoff()
7{
8 return [1, 5, 10];
9}

當 Job 執行失敗後進行清理

若某個特定的 Job 失敗後,我們可能會想傳送通知給使用者,或是恢復這個 Job 中所部分完成的一些動作。為此,我們可以在 Job 類別中定義一個 failed 方法。導致該 Job 失敗的 Throwable 實體會傳入給 failed 方法:

1<?php
2 
3namespace App\Jobs;
4 
5use App\Models\Podcast;
6use App\Services\AudioProcessor;
7use Illuminate\Bus\Queueable;
8use Illuminate\Contracts\Queue\ShouldQueue;
9use Illuminate\Queue\InteractsWithQueue;
10use Illuminate\Queue\SerializesModels;
11use Throwable;
12 
13class ProcessPodcast implements ShouldQueue
14{
15 use InteractsWithQueue, Queueable, SerializesModels;
16 
17 /**
18 * The podcast instance.
19 *
20 * @var \App\Podcast
21 */
22 protected $podcast;
23 
24 /**
25 * Create a new job instance.
26 *
27 * @param \App\Models\Podcast $podcast
28 * @return void
29 */
30 public function __construct(Podcast $podcast)
31 {
32 $this->podcast = $podcast;
33 }
34 
35 /**
36 * Execute the job.
37 *
38 * @param \App\Services\AudioProcessor $processor
39 * @return void
40 */
41 public function handle(AudioProcessor $processor)
42 {
43 // Process uploaded podcast...
44 }
45 
46 /**
47 * Handle a job failure.
48 *
49 * @param \Throwable $exception
50 * @return void
51 */
52 public function failed(Throwable $exception)
53 {
54 // Send user notification of failure, etc...
55 }
56}
1<?php
2 
3namespace App\Jobs;
4 
5use App\Models\Podcast;
6use App\Services\AudioProcessor;
7use Illuminate\Bus\Queueable;
8use Illuminate\Contracts\Queue\ShouldQueue;
9use Illuminate\Queue\InteractsWithQueue;
10use Illuminate\Queue\SerializesModels;
11use Throwable;
12 
13class ProcessPodcast implements ShouldQueue
14{
15 use InteractsWithQueue, Queueable, SerializesModels;
16 
17 /**
18 * The podcast instance.
19 *
20 * @var \App\Podcast
21 */
22 protected $podcast;
23 
24 /**
25 * Create a new job instance.
26 *
27 * @param \App\Models\Podcast $podcast
28 * @return void
29 */
30 public function __construct(Podcast $podcast)
31 {
32 $this->podcast = $podcast;
33 }
34 
35 /**
36 * Execute the job.
37 *
38 * @param \App\Services\AudioProcessor $processor
39 * @return void
40 */
41 public function handle(AudioProcessor $processor)
42 {
43 // Process uploaded podcast...
44 }
45 
46 /**
47 * Handle a job failure.
48 *
49 * @param \Throwable $exception
50 * @return void
51 */
52 public function failed(Throwable $exception)
53 {
54 // Send user notification of failure, etc...
55 }
56}
lightbulb

叫用 failed 方法前會先初始化該 Job 的一個新。因此,在 handle 方法中對類別屬性做出的更改都將遺失。

重試失敗的 Job

若要檢視所有已插入 failed_jobs 資料表中失敗的 Job,可使用 queue:failed Artisan 指令:

1php artisan queue:failed
1php artisan queue:failed

queue:failed 指令會列出 Job ID、連線、佇列、失敗時間……等,以及其他有關該 Job 的資訊。可使用 Job ID 來重試失敗的 Job。舉例來說,若要重試 ID 為 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 的失敗 Job,請執行下列指令:

1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

若有需要,可傳入多個 ID 給該指令:

1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
1php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

也可以嘗試特定佇列中所有失敗的 Job:

1php artisan queue:retry --queue=name
1php artisan queue:retry --queue=name

若要重試所有失敗的 Job,請執行 queue:retry 指令,並傳入 all 作為 ID:

1php artisan queue:retry all
1php artisan queue:retry all

若想刪除失敗的 Job,可使用 queue:forget 指令:

1php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
1php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
lightbulb

使用 Horizon,請不要使用 queue:forget 指令,請使用 horizon:forget 指令來刪除失敗的 Job。

若要從 failed_jobs 資料表中刪除所有失敗的 Job,可使用 queue:flush 指令:

1php artisan queue:flush
1php artisan queue:flush

忽略不存在的 Model

將 Eloquent Model 插入進 Job 時,該 Model 會自動被序列化再放入佇列中,並在要處理該 Job 時再重新從資料庫中取出。不過,若在該 Job 等待被 Worker 處理的期間刪除了該 Model,則 Job 可能會遇到 ModelNotFoundException 而失敗。

為了方便起見,可將 Job 的 deleteWhenMissingModels 屬性設為 true,就可以自動刪除有遺失 Model 的 Job。若該選項設為 true,則 Laravel 會自動默默地在不產生 Exception 的情況下取消該 Job:

1/**
2 * Delete the job if its models no longer exist.
3 *
4 * @var bool
5 */
6public $deleteWhenMissingModels = true;
1/**
2 * Delete the job if its models no longer exist.
3 *
4 * @var bool
5 */
6public $deleteWhenMissingModels = true;

修剪失敗的 Job

可以呼叫 queue:prune-failed Artisan 指令來刪除專案中所有 failed_jobs 資料表中的記錄:

1php artisan queue:prune-failed
1php artisan queue:prune-failed

若有提供 --hours 選項給該指令,則只會保留過去 N 小時中所插入的失敗 Job 記錄。舉例來說,下列指令會刪除所有插入超過 48 小時的失敗 Job 記錄:

1php artisan queue:prune-failed --hours=48
1php artisan queue:prune-failed --hours=48

排序 DynamoDB 中的失敗 Job

出了將失敗 Job 記錄保存在關聯式資料庫資料表意外,在 Laravel 中,也支援將失敗 Job 記錄保存在 DynamoDB 中。不過,若要保存在 DynamoDB 資料表中,我們需要先建立一個 DynamoDB 資料表來保存失敗的 Job 記錄。一般來說,這個資料表的名稱應為 failed_jobs,不過,請依照專案的 queue 設定檔中 queue.failed.table 設定值來命名資料表。

failed_jobs 資料表應有一個名為 application 的字串主分區索引鍵(Primary Partition Key),以及一個名為 uuid 的字串主排序索引鍵(Primary Sort Key)。索引鍵的 application 這個部分會包含專案的名稱,即 app 設定檔中的 name 設定值。由於專案名稱會是 DynamoDB 資料表中索引鍵的一部分,因此,我們可以使用相同的資料表來保存多個 Laravel 專案中的失敗 Job。

此外,也請確保有安裝 AWS SDK,好讓 Laravel 專案能與 Amazon DynamoDB 溝通:

1composer require aws/aws-sdk-php
1composer require aws/aws-sdk-php

接著,請設定 queue.failed.driver 設定選項值為 dynamodb。此外,也請在失敗 Job 設定陣列中定義 keysecretregion 等設定選項。這些選項會用來向 AWS 進行身份驗證。使用 dynamodb Driver 時,就不需要 queue.failed.database 設定選項:

1'failed' => [
2 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
3 'key' => env('AWS_ACCESS_KEY_ID'),
4 'secret' => env('AWS_SECRET_ACCESS_KEY'),
5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
6 'table' => 'failed_jobs',
7],
1'failed' => [
2 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
3 'key' => env('AWS_ACCESS_KEY_ID'),
4 'secret' => env('AWS_SECRET_ACCESS_KEY'),
5 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
6 'table' => 'failed_jobs',
7],

不保存失敗的 Job

只要將 queue.failed.driver 設定值設為 null,就可以讓 Laravel 不保存失敗的 Job 以忽略這些 Job。一般來說,可使用 QUEUE_FAILED_DRIVER 環境變數來調整這個值:

1QUEUE_FAILED_DRIVER=null
1QUEUE_FAILED_DRIVER=null

失敗 Job 的事件

若想註冊一個會在每次 Job 失敗時叫用的 Event Listener,可使用 Queue Facade 的 failing 方法。舉例來說,我們可以在 Laravel 中內建的 AppServiceProviderboot 方法內將一個閉包附加至該事件上:

1<?php
2 
3namespace App\Providers;
4 
5use Illuminate\Support\Facades\Queue;
6use Illuminate\Support\ServiceProvider;
7use Illuminate\Queue\Events\JobFailed;
8 
9class AppServiceProvider extends ServiceProvider
10{
11 /**
12 * Register any application services.
13 *
14 * @return void
15 */
16 public function register()
17 {
18 //
19 }
20 
21 /**
22 * Bootstrap any application services.
23 *
24 * @return void
25 */
26 public function boot()
27 {
28 Queue::failing(function (JobFailed $event) {
29 // $event->connectionName
30 // $event->job
31 // $event->exception
32 });
33 }
34}
1<?php
2 
3namespace App\Providers;
4 
5use Illuminate\Support\Facades\Queue;
6use Illuminate\Support\ServiceProvider;
7use Illuminate\Queue\Events\JobFailed;
8 
9class AppServiceProvider extends ServiceProvider
10{
11 /**
12 * Register any application services.
13 *
14 * @return void
15 */
16 public function register()
17 {
18 //
19 }
20 
21 /**
22 * Bootstrap any application services.
23 *
24 * @return void
25 */
26 public function boot()
27 {
28 Queue::failing(function (JobFailed $event) {
29 // $event->connectionName
30 // $event->job
31 // $event->exception
32 });
33 }
34}

在佇列中清理 Job

lightbulb

使用 Horizon,請不要使用 queue:clear 指令,請使用 horizon:clear 指令來清理佇列中的 Job。

若想從預設連線的預設佇列中刪除所有 Job,可使用 queue:clear Artisan 指令:

1php artisan queue:clear
1php artisan queue:clear

可以提供 connection 引數與 queue 選項來刪除特定連線與佇列中的 Job:

1php artisan queue:clear redis --queue=emails
1php artisan queue:clear redis --queue=emails
lightbulb

目前,只有 SQS、Redis、資料庫等佇列 Driver 能支援清除佇列中的 Job。此外,刪除 SQS Message 可能會需要至多 60 秒的時間,因此在清理佇列的 60 秒後所傳送給 SQS 佇列的 Job 也可能會被刪除。

監控佇列

若佇列突然收到大量的 Job,則佇列可能會有來不及處理,造成 Job 需要更長的等待時間才能完成。若有需要的話,Laravel 可以在佇列 Job 遇到特定閥值時傳送通知。

若要開始監控佇列,請排程設定每 10 分鐘執行 queue:monitor 指令。這個指令接受要監控的佇列名稱,以及所要設定的 Job 數量閥值:

1php artisan queue:monitor redis:default,redis:deployments --max=100
1php artisan queue:monitor redis:default,redis:deployments --max=100

若只排程執行這個指令,當佇列的負載過高時還不會觸發通知。當這個指令遇到有佇列超過指定閥值量的 Job 數時,會分派一個 Illuminate\Queue\Events\QueueBusy 事件。我們可以在專案的 EventServiceProvider 內監聽這個事件,以傳送通知給開發團隊:

1use App\Notifications\QueueHasLongWaitTime;
2use Illuminate\Queue\Events\QueueBusy;
3use Illuminate\Support\Facades\Event;
4use Illuminate\Support\Facades\Notification;
5 
6/**
7 * Register any other events for your application.
8 *
9 * @return void
10 */
11public function boot()
12{
13 Event::listen(function (QueueBusy $event) {
14 Notification::route('mail', 'dev@example.com')
15 ->notify(new QueueHasLongWaitTime(
16 $event->connection,
17 $event->queue,
18 $event->size
19 ));
20 });
21}
1use App\Notifications\QueueHasLongWaitTime;
2use Illuminate\Queue\Events\QueueBusy;
3use Illuminate\Support\Facades\Event;
4use Illuminate\Support\Facades\Notification;
5 
6/**
7 * Register any other events for your application.
8 *
9 * @return void
10 */
11public function boot()
12{
13 Event::listen(function (QueueBusy $event) {
14 Notification::route('mail', 'dev@example.com')
15 ->notify(new QueueHasLongWaitTime(
16 $event->connection,
17 $event->queue,
18 $event->size
19 ));
20 });
21}

Job 事件

Queue Facade 上使用 beforeafter 方法,就可以指定要在佇列 Job 處理前後所要執行的回呼。在這些回呼中,我們就有機會能進行記錄額外的日誌、增加主控台上統計數字等動作。一般來說,應在某個 Service Providerboot 方法內呼叫這些方法。舉例來說,我們可以使用 Laravel 中內建的 AppServiceProvider

1<?php
2 
3namespace App\Providers;
4 
5use Illuminate\Support\Facades\Queue;
6use Illuminate\Support\ServiceProvider;
7use Illuminate\Queue\Events\JobProcessed;
8use Illuminate\Queue\Events\JobProcessing;
9 
10class AppServiceProvider extends ServiceProvider
11{
12 /**
13 * Register any application services.
14 *
15 * @return void
16 */
17 public function register()
18 {
19 //
20 }
21 
22 /**
23 * Bootstrap any application services.
24 *
25 * @return void
26 */
27 public function boot()
28 {
29 Queue::before(function (JobProcessing $event) {
30 // $event->connectionName
31 // $event->job
32 // $event->job->payload()
33 });
34 
35 Queue::after(function (JobProcessed $event) {
36 // $event->connectionName
37 // $event->job
38 // $event->job->payload()
39 });
40 }
41}
1<?php
2 
3namespace App\Providers;
4 
5use Illuminate\Support\Facades\Queue;
6use Illuminate\Support\ServiceProvider;
7use Illuminate\Queue\Events\JobProcessed;
8use Illuminate\Queue\Events\JobProcessing;
9 
10class AppServiceProvider extends ServiceProvider
11{
12 /**
13 * Register any application services.
14 *
15 * @return void
16 */
17 public function register()
18 {
19 //
20 }
21 
22 /**
23 * Bootstrap any application services.
24 *
25 * @return void
26 */
27 public function boot()
28 {
29 Queue::before(function (JobProcessing $event) {
30 // $event->connectionName
31 // $event->job
32 // $event->job->payload()
33 });
34 
35 Queue::after(function (JobProcessed $event) {
36 // $event->connectionName
37 // $event->job
38 // $event->job->payload()
39 });
40 }
41}

使用 Queue Facadelooping 方法,我們就能指定要在 Worker 嘗試從佇列中取得 Job 前執行的回呼。舉例來說,我們可以註冊一個閉包來回溯前一個失敗 Job 中未關閉的 Transaction:

1use Illuminate\Support\Facades\DB;
2use Illuminate\Support\Facades\Queue;
3 
4Queue::looping(function () {
5 while (DB::transactionLevel() > 0) {
6 DB::rollBack();
7 }
8});
1use Illuminate\Support\Facades\DB;
2use Illuminate\Support\Facades\Queue;
3 
4Queue::looping(function () {
5 while (DB::transactionLevel() > 0) {
6 DB::rollBack();
7 }
8});
翻譯進度
47.13% 已翻譯
更新時間:
2024年6月30日 星期日 上午8:15:00 [UTC]
翻譯人員:
幫我們翻譯此頁

留言

尚無留言

“Laravel” is a Trademark of Taylor Otwell.
The source documentation is released under MIT license. See laravel/docs on GitHub for details.
The translated documentations are released under MIT license. See cornch/laravel-docs-l10n on GitHub for details.