Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion docs/guide/queueing-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ The `createJob()` function takes three arguments.
- The first argument is the name of the type of job that you are creating.
- The second argument is optional, but if set must be an array of data and will be passed as a payload parameter to the `run()` function of the worker.
It can also be a (DTO) object that implements `CakeDto\Dto\FromArrayToArrayInterface` or provides `toArray()` method.
- The third argument is options (`'notBefore'`, `'priority'`, `'group'`, `'reference'`). Either as array or `Queue\Config\JobConfig` class.
- The third argument is options (`'notBefore'`, `'priority'`, `'group'`, `'reference'`, `'unique'`). Either as array or `Queue\Config\JobConfig` class. See [Avoiding parallel (re)queueing](#avoiding-parallel-re-queueing) for the `unique` flag.

> `priority` is sorted ascending, therefore a task with priority 1 will be executed before a task with priority 5

Expand Down Expand Up @@ -201,6 +201,41 @@ For more complex use cases, you can manually use `->find()->where()`, of course.

Note that the 2nd argument (job type) is optional, but recommended. If you do not use it, make sure your reference is globally unique.

### Built-in dedup with `unique: true`

For fan-out dispatchers and cron-driven tasks that enqueue many similar
jobs, the manual `isQueued()` guard above is two lines of boilerplate
per enqueue site. The `unique` option folds that into `createJob()`
directly: if a pending (incomplete) job already exists for the same
`(reference, resolved job_task)` pair, the existing entity is returned
and no new row is inserted.

```php
$queuedJobsTable->createJob(
'VolunteerCheckOutReminder',
['account_uuid' => $accountUuid],
[
'reference' => 'volunteer_check_out:' . $accountUuid,
'unique' => true,
],
);
// First call: inserts a new pending job, returns the new entity.
// Subsequent calls while that job is pending: return the *existing*
// entity without inserting. Once the original completes, the next call
// inserts a fresh row.
```

`unique: true` without a `reference` throws `InvalidArgumentException`
— fail-fast at the call site rather than silently inserting an
undeduped row. The dedup check uses an info-level log entry
(`Queue.createJob: dedup hit for <task> reference=<ref> ...`) so
operators can confirm the guard is firing.

The dedup window is "the previous job is not yet completed" — pending,
in-progress, and failed-but-not-completed jobs all block. This is the
right shape for cron fan-out: if a previous tick is stuck or still
running, the next tick holds back instead of stacking a duplicate.

## Updating progress/status

The `createJob()` method returns the entity. So you can store the ID and at any time ask the queue about the status of this job.
Expand Down
45 changes: 45 additions & 0 deletions src/Config/JobConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ class JobConfig {
*/
public const FIELD_STATUS = 'status';

/**
* @var string
*/
public const FIELD_UNIQUE = 'unique';

/**
* For camelBacked input/output.
*
Expand Down Expand Up @@ -92,6 +97,14 @@ class JobConfig {
*/
protected $status;

/**
* Request-time flag. Not persisted; intentionally outside `_keyMap` so
* it never leaks into `toArray()` output or the `queued_jobs` row.
*
* @var bool
*/
protected bool $unique = false;

/**
* @var array<string, array<string, string>>
*/
Expand Down Expand Up @@ -128,6 +141,15 @@ class JobConfig {
public function fromArray(array $data, ?string $type = null) {
$type = $this->keyType($type, static::TYPE_CAMEL);

// `unique` is a request-time flag, not persisted state, so it lives
// outside `_keyMap`. Handle it here so a caller can pass it in the
// array shape (`createJob($task, $data, ['reference' => ..., 'unique' => true])`)
// without tripping the strict `field()` lookup below.
if (array_key_exists('unique', $data)) {
$this->unique = (bool)$data['unique'];
unset($data['unique']);
}

foreach ($data as $field => $value) {
if ($type !== static::TYPE_CAMEL) {
$field = $this->field($field, $type);
Expand Down Expand Up @@ -477,4 +499,27 @@ public function hasStatus(): bool {
return $this->status !== null;
}

/**
* Enable dedup for this job: if a pending (incomplete) job already
* exists with the same `reference` and resolved `job_task`,
* `createJob()` returns that existing entity instead of inserting a
* duplicate. Requires `reference` to be set.
*
* @param bool $unique
*
* @return $this
*/
public function setUnique(bool $unique) {
$this->unique = $unique;

return $this;
}

/**
* @return bool
*/
public function isUnique(): bool {
return $this->unique;
}

}
38 changes: 36 additions & 2 deletions src/Model/Table/QueuedJobsTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Cake\Event\EventInterface;
use Cake\Event\EventManager;
use Cake\I18n\DateTime;
use Cake\Log\Log;
use Cake\ORM\Query\SelectQuery;
use Cake\ORM\Table;
use Cake\Validation\Validator;
Expand Down Expand Up @@ -185,12 +186,19 @@ public function createConfig(): JobConfig {
* - group: Used to group similar QueuedJobs
* - reference: An optional reference string
* - status: To set an initial status text
* - unique: When true (with a `reference`), an existing pending job
* matching `(reference, resolved job_task)` is returned instead of
* inserting a duplicate row. Useful for fan-out dispatchers that
* could otherwise stack up identical per-tenant work when a previous
* run is still pending or stuck.
*
* @param string $jobTask Job task name or FQCN.
* @param object|array<string, mixed>|null $data Array of data or DTO like object.
* @param \Queue\Config\JobConfig|array<string, mixed> $config Config to save along with the job.
*
* @return \Queue\Model\Entity\QueuedJob Saved job entity
* @throws \InvalidArgumentException If `unique` is set without a `reference`.
*
* @return \Queue\Model\Entity\QueuedJob Saved job entity (or the existing pending entity if `unique` deduped).
*/
public function createJob(string $jobTask, array|object|null $data = null, array|JobConfig $config = []): QueuedJob {
if (!$config instanceof JobConfig) {
Expand All @@ -206,8 +214,34 @@ public function createJob(string $jobTask, array|object|null $data = null, array
throw new InvalidArgumentException('Data must be `array|null`, implement `' . FromArrayToArrayInterface::class . '` or provide a `toArray()` method');
}

$resolvedTask = $this->jobTask($jobTask);

if ($config->isUnique()) {
if (!$config->hasReference()) {
throw new InvalidArgumentException('createJob() with `unique` requires a `reference` to dedupe on.');
}

$existing = $this->find()
->where([
'reference' => $config->getReferenceOrFail(),
'job_task' => $resolvedTask,
'completed IS' => null,
])
->first();
if ($existing !== null) {
Log::info(sprintf(
'Queue.createJob: dedup hit for %s reference=%s (returning existing job_id=%d)',
$resolvedTask,
$config->getReferenceOrFail(),
$existing->id,
));

return $existing;
}
}

$queuedJob = [
'job_task' => $this->jobTask($jobTask),
'job_task' => $resolvedTask,
'data' => $data,
'notbefore' => $config->hasNotBefore() ? $this->getDateTime($config->getNotBeforeOrFail()) : null,
'priority' => $config->getPriority(),
Expand Down
103 changes: 103 additions & 0 deletions tests/TestCase/Model/Table/QueuedJobsTableTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Cake\I18n\DateTime;
use Cake\ORM\TableRegistry;
use Cake\TestSuite\TestCase;
use InvalidArgumentException;
use Queue\Model\Enum\Priority;
use Queue\Model\Table\QueuedJobsTable;
use Queue\Queue\Task\ExampleTask;
Expand Down Expand Up @@ -707,6 +708,108 @@ public function testPriority() {
$this->assertSame(['key' => 'k2'], $data);
}

/**
* @return void
*/
public function testCreateJobUniqueReturnsExistingPendingJob(): void {
$first = $this->QueuedJobs->createJob(
'Foo',
['k' => 'v1'],
['reference' => 'tenant-1', 'unique' => true],
);

$second = $this->QueuedJobs->createJob(
'Foo',
['k' => 'v2'],
['reference' => 'tenant-1', 'unique' => true],
);

$this->assertSame($first->id, $second->id, 'unique should return the existing pending job');
// Original payload is preserved; the second call is a no-op insert.
$this->assertSame(['k' => 'v1'], $second->data);
$this->assertSame(1, $this->QueuedJobs->find()->where(['reference' => 'tenant-1'])->count());
}

/**
* Once the first job is completed, a later `unique` call must create a
* fresh row -- otherwise the next scheduled run could never enqueue.
*
* @return void
*/
public function testCreateJobUniqueInsertsAgainAfterCompletion(): void {
$first = $this->QueuedJobs->createJob(
'Foo',
null,
['reference' => 'tenant-2', 'unique' => true],
);

$first->completed = new DateTime();
$this->QueuedJobs->saveOrFail($first);

$second = $this->QueuedJobs->createJob(
'Foo',
null,
['reference' => 'tenant-2', 'unique' => true],
);

$this->assertNotSame($first->id, $second->id);
$this->assertSame(2, $this->QueuedJobs->find()->where(['reference' => 'tenant-2'])->count());
}

/**
* Dedup is scoped to (reference, job_task) -- a different task name with
* the same reference still inserts.
*
* @return void
*/
public function testCreateJobUniqueIsScopedByJobTask(): void {
Configure::write('Queue.skipExistenceCheck', true);

$foo = $this->QueuedJobs->createJob(
'Foo',
null,
['reference' => 'shared-ref', 'unique' => true],
);

$bar = $this->QueuedJobs->createJob(
'Bar',
null,
['reference' => 'shared-ref', 'unique' => true],
);

$this->assertNotSame($foo->id, $bar->id);
$this->assertSame('Foo', $foo->job_task);
$this->assertSame('Bar', $bar->job_task);
}

/**
* Without `unique`, two calls with the same reference both insert -- BC
* preserved for callers that use `reference` for audit but want every
* scheduled run to enqueue independently.
*
* @return void
*/
public function testCreateJobWithoutUniqueDoesNotDedupe(): void {
$first = $this->QueuedJobs->createJob('Foo', null, ['reference' => 'tenant-3']);
$second = $this->QueuedJobs->createJob('Foo', null, ['reference' => 'tenant-3']);

$this->assertNotSame($first->id, $second->id);
$this->assertSame(2, $this->QueuedJobs->find()->where(['reference' => 'tenant-3'])->count());
}

/**
* `unique` without a `reference` is a programming error -- fail fast at
* the call site rather than silently inserting an undeduped row.
*
* @return void
*/
public function testCreateJobUniqueWithoutReferenceThrows(): void {
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessageMatches('/unique.*reference/i');

$this->QueuedJobs->createJob('Foo', null, ['unique' => true]);
}

/**
* @return void
*/
Expand Down
Loading