From c93d80edd09b46bae27b1addf0f6a65bc89a10af Mon Sep 17 00:00:00 2001 From: mscherer Date: Wed, 13 May 2026 19:11:20 +0200 Subject: [PATCH] Add createJob(unique: true) for fan-out dedup When a fan-out dispatcher enqueues per-tenant work on every cron tick, plain `createJob()` happily inserts a duplicate row even if the previous tick's job for the same tenant is still pending or stuck. The result is a slow accumulation of identical pending jobs -- in one observed case, 5876 stuck `VolunteerCheckOutReminder` rows from a single tenant whose DB connection had been decommissioned. `isQueued()` already exists for this, but every caller has to wrap each `createJob()` in a two-line manual guard. This adds the guard to `createJob()` itself as an opt-in `unique` flag: $queuedJobsTable->createJob( 'VolunteerCheckOutReminder', ['account_uuid' => $accountUuid], [ 'reference' => 'volunteer_check_out:' . $accountUuid, 'unique' => true, ], ); When `unique` is set and a pending (`completed IS NULL`) job exists for the same `(reference, resolved job_task)` pair, the existing entity is returned and no new row is inserted. The dedup hit is logged at info level. `unique` without a `reference` throws `InvalidArgumentException` at the call site -- failing fast beats silently inserting an undeduped row. The flag lives on `JobConfig` as a request-time property outside `_keyMap`, so it never leaks into `toArray()` output or the `queued_jobs` row. Default is `false`, fully BC. Race window: two ticks landing simultaneously can both pass the `isQueued()` check and both insert. A DB-level unique constraint would close that, but it requires a migration and a decision on how callers should opt in per-table -- out of scope for this PR. The 99% effectiveness already kills the slow-buildup scenario this is built for. --- docs/guide/queueing-jobs.md | 37 ++++++- src/Config/JobConfig.php | 45 ++++++++ src/Model/Table/QueuedJobsTable.php | 38 ++++++- .../Model/Table/QueuedJobsTableTest.php | 103 ++++++++++++++++++ 4 files changed, 220 insertions(+), 3 deletions(-) diff --git a/docs/guide/queueing-jobs.md b/docs/guide/queueing-jobs.md index 8051ec61..ff80ff17 100644 --- a/docs/guide/queueing-jobs.md +++ b/docs/guide/queueing-jobs.md @@ -27,7 +27,7 @@ The `createJob()` function takes three arguments. - The first argument is the name of the type of job that you are creating. - The second argument is optional, but if set must be an array of data and will be passed as a payload parameter to the `run()` function of the worker. It can also be a (DTO) object that implements `CakeDto\Dto\FromArrayToArrayInterface` or provides `toArray()` method. -- The third argument is options (`'notBefore'`, `'priority'`, `'group'`, `'reference'`). Either as array or `Queue\Config\JobConfig` class. +- The third argument is options (`'notBefore'`, `'priority'`, `'group'`, `'reference'`, `'unique'`). Either as array or `Queue\Config\JobConfig` class. See [Avoiding parallel (re)queueing](#avoiding-parallel-re-queueing) for the `unique` flag. > `priority` is sorted ascending, therefore a task with priority 1 will be executed before a task with priority 5 @@ -201,6 +201,41 @@ For more complex use cases, you can manually use `->find()->where()`, of course. Note that the 2nd argument (job type) is optional, but recommended. If you do not use it, make sure your reference is globally unique. +### Built-in dedup with `unique: true` + +For fan-out dispatchers and cron-driven tasks that enqueue many similar +jobs, the manual `isQueued()` guard above is two lines of boilerplate +per enqueue site. The `unique` option folds that into `createJob()` +directly: if a pending (incomplete) job already exists for the same +`(reference, resolved job_task)` pair, the existing entity is returned +and no new row is inserted. + +```php +$queuedJobsTable->createJob( + 'VolunteerCheckOutReminder', + ['account_uuid' => $accountUuid], + [ + 'reference' => 'volunteer_check_out:' . $accountUuid, + 'unique' => true, + ], +); +// First call: inserts a new pending job, returns the new entity. +// Subsequent calls while that job is pending: return the *existing* +// entity without inserting. Once the original completes, the next call +// inserts a fresh row. +``` + +`unique: true` without a `reference` throws `InvalidArgumentException` +— fail-fast at the call site rather than silently inserting an +undeduped row. The dedup check uses an info-level log entry +(`Queue.createJob: dedup hit for reference= ...`) so +operators can confirm the guard is firing. + +The dedup window is "the previous job is not yet completed" — pending, +in-progress, and failed-but-not-completed jobs all block. This is the +right shape for cron fan-out: if a previous tick is stuck or still +running, the next tick holds back instead of stacking a duplicate. + ## Updating progress/status The `createJob()` method returns the entity. So you can store the ID and at any time ask the queue about the status of this job. diff --git a/src/Config/JobConfig.php b/src/Config/JobConfig.php index 7249cbe3..ee37881e 100644 --- a/src/Config/JobConfig.php +++ b/src/Config/JobConfig.php @@ -40,6 +40,11 @@ class JobConfig { */ public const FIELD_STATUS = 'status'; + /** + * @var string + */ + public const FIELD_UNIQUE = 'unique'; + /** * For camelBacked input/output. * @@ -92,6 +97,14 @@ class JobConfig { */ protected $status; + /** + * Request-time flag. Not persisted; intentionally outside `_keyMap` so + * it never leaks into `toArray()` output or the `queued_jobs` row. + * + * @var bool + */ + protected bool $unique = false; + /** * @var array> */ @@ -128,6 +141,15 @@ class JobConfig { public function fromArray(array $data, ?string $type = null) { $type = $this->keyType($type, static::TYPE_CAMEL); + // `unique` is a request-time flag, not persisted state, so it lives + // outside `_keyMap`. Handle it here so a caller can pass it in the + // array shape (`createJob($task, $data, ['reference' => ..., 'unique' => true])`) + // without tripping the strict `field()` lookup below. + if (array_key_exists('unique', $data)) { + $this->unique = (bool)$data['unique']; + unset($data['unique']); + } + foreach ($data as $field => $value) { if ($type !== static::TYPE_CAMEL) { $field = $this->field($field, $type); @@ -477,4 +499,27 @@ public function hasStatus(): bool { return $this->status !== null; } + /** + * Enable dedup for this job: if a pending (incomplete) job already + * exists with the same `reference` and resolved `job_task`, + * `createJob()` returns that existing entity instead of inserting a + * duplicate. Requires `reference` to be set. + * + * @param bool $unique + * + * @return $this + */ + public function setUnique(bool $unique) { + $this->unique = $unique; + + return $this; + } + + /** + * @return bool + */ + public function isUnique(): bool { + return $this->unique; + } + } diff --git a/src/Model/Table/QueuedJobsTable.php b/src/Model/Table/QueuedJobsTable.php index 0b08119b..077a3e5d 100644 --- a/src/Model/Table/QueuedJobsTable.php +++ b/src/Model/Table/QueuedJobsTable.php @@ -10,6 +10,7 @@ use Cake\Event\EventInterface; use Cake\Event\EventManager; use Cake\I18n\DateTime; +use Cake\Log\Log; use Cake\ORM\Query\SelectQuery; use Cake\ORM\Table; use Cake\Validation\Validator; @@ -185,12 +186,19 @@ public function createConfig(): JobConfig { * - group: Used to group similar QueuedJobs * - reference: An optional reference string * - status: To set an initial status text + * - unique: When true (with a `reference`), an existing pending job + * matching `(reference, resolved job_task)` is returned instead of + * inserting a duplicate row. Useful for fan-out dispatchers that + * could otherwise stack up identical per-tenant work when a previous + * run is still pending or stuck. * * @param string $jobTask Job task name or FQCN. * @param object|array|null $data Array of data or DTO like object. * @param \Queue\Config\JobConfig|array $config Config to save along with the job. * - * @return \Queue\Model\Entity\QueuedJob Saved job entity + * @throws \InvalidArgumentException If `unique` is set without a `reference`. + * + * @return \Queue\Model\Entity\QueuedJob Saved job entity (or the existing pending entity if `unique` deduped). */ public function createJob(string $jobTask, array|object|null $data = null, array|JobConfig $config = []): QueuedJob { if (!$config instanceof JobConfig) { @@ -206,8 +214,34 @@ public function createJob(string $jobTask, array|object|null $data = null, array throw new InvalidArgumentException('Data must be `array|null`, implement `' . FromArrayToArrayInterface::class . '` or provide a `toArray()` method'); } + $resolvedTask = $this->jobTask($jobTask); + + if ($config->isUnique()) { + if (!$config->hasReference()) { + throw new InvalidArgumentException('createJob() with `unique` requires a `reference` to dedupe on.'); + } + + $existing = $this->find() + ->where([ + 'reference' => $config->getReferenceOrFail(), + 'job_task' => $resolvedTask, + 'completed IS' => null, + ]) + ->first(); + if ($existing !== null) { + Log::info(sprintf( + 'Queue.createJob: dedup hit for %s reference=%s (returning existing job_id=%d)', + $resolvedTask, + $config->getReferenceOrFail(), + $existing->id, + )); + + return $existing; + } + } + $queuedJob = [ - 'job_task' => $this->jobTask($jobTask), + 'job_task' => $resolvedTask, 'data' => $data, 'notbefore' => $config->hasNotBefore() ? $this->getDateTime($config->getNotBeforeOrFail()) : null, 'priority' => $config->getPriority(), diff --git a/tests/TestCase/Model/Table/QueuedJobsTableTest.php b/tests/TestCase/Model/Table/QueuedJobsTableTest.php index 25b43d18..8f65f0b3 100644 --- a/tests/TestCase/Model/Table/QueuedJobsTableTest.php +++ b/tests/TestCase/Model/Table/QueuedJobsTableTest.php @@ -16,6 +16,7 @@ use Cake\I18n\DateTime; use Cake\ORM\TableRegistry; use Cake\TestSuite\TestCase; +use InvalidArgumentException; use Queue\Model\Enum\Priority; use Queue\Model\Table\QueuedJobsTable; use Queue\Queue\Task\ExampleTask; @@ -707,6 +708,108 @@ public function testPriority() { $this->assertSame(['key' => 'k2'], $data); } + /** + * @return void + */ + public function testCreateJobUniqueReturnsExistingPendingJob(): void { + $first = $this->QueuedJobs->createJob( + 'Foo', + ['k' => 'v1'], + ['reference' => 'tenant-1', 'unique' => true], + ); + + $second = $this->QueuedJobs->createJob( + 'Foo', + ['k' => 'v2'], + ['reference' => 'tenant-1', 'unique' => true], + ); + + $this->assertSame($first->id, $second->id, 'unique should return the existing pending job'); + // Original payload is preserved; the second call is a no-op insert. + $this->assertSame(['k' => 'v1'], $second->data); + $this->assertSame(1, $this->QueuedJobs->find()->where(['reference' => 'tenant-1'])->count()); + } + + /** + * Once the first job is completed, a later `unique` call must create a + * fresh row -- otherwise the next scheduled run could never enqueue. + * + * @return void + */ + public function testCreateJobUniqueInsertsAgainAfterCompletion(): void { + $first = $this->QueuedJobs->createJob( + 'Foo', + null, + ['reference' => 'tenant-2', 'unique' => true], + ); + + $first->completed = new DateTime(); + $this->QueuedJobs->saveOrFail($first); + + $second = $this->QueuedJobs->createJob( + 'Foo', + null, + ['reference' => 'tenant-2', 'unique' => true], + ); + + $this->assertNotSame($first->id, $second->id); + $this->assertSame(2, $this->QueuedJobs->find()->where(['reference' => 'tenant-2'])->count()); + } + + /** + * Dedup is scoped to (reference, job_task) -- a different task name with + * the same reference still inserts. + * + * @return void + */ + public function testCreateJobUniqueIsScopedByJobTask(): void { + Configure::write('Queue.skipExistenceCheck', true); + + $foo = $this->QueuedJobs->createJob( + 'Foo', + null, + ['reference' => 'shared-ref', 'unique' => true], + ); + + $bar = $this->QueuedJobs->createJob( + 'Bar', + null, + ['reference' => 'shared-ref', 'unique' => true], + ); + + $this->assertNotSame($foo->id, $bar->id); + $this->assertSame('Foo', $foo->job_task); + $this->assertSame('Bar', $bar->job_task); + } + + /** + * Without `unique`, two calls with the same reference both insert -- BC + * preserved for callers that use `reference` for audit but want every + * scheduled run to enqueue independently. + * + * @return void + */ + public function testCreateJobWithoutUniqueDoesNotDedupe(): void { + $first = $this->QueuedJobs->createJob('Foo', null, ['reference' => 'tenant-3']); + $second = $this->QueuedJobs->createJob('Foo', null, ['reference' => 'tenant-3']); + + $this->assertNotSame($first->id, $second->id); + $this->assertSame(2, $this->QueuedJobs->find()->where(['reference' => 'tenant-3'])->count()); + } + + /** + * `unique` without a `reference` is a programming error -- fail fast at + * the call site rather than silently inserting an undeduped row. + * + * @return void + */ + public function testCreateJobUniqueWithoutReferenceThrows(): void { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessageMatches('/unique.*reference/i'); + + $this->QueuedJobs->createJob('Foo', null, ['unique' => true]); + } + /** * @return void */