From 993b481169eed2f9818f90ef6fbce42bd973b289 Mon Sep 17 00:00:00 2001 From: Jignesh Sanghani Date: Fri, 10 Apr 2026 17:38:01 +0530 Subject: [PATCH 1/5] feat: add .id() method for job deduplication --- README.md | 32 ++++++ src/drivers/fake_adapter.ts | 13 +++ src/drivers/knex_adapter.ts | 16 ++- src/drivers/redis_adapter.ts | 48 ++++++++- src/job_dispatcher.ts | 38 ++++++- src/types/main.ts | 7 ++ tests/_mocks/memory_adapter.ts | 13 +++ tests/_utils/register_driver_test_suite.ts | 112 +++++++++++++++++++- tests/fake_adapter.spec.ts | 59 ++++++++++- tests/job_dispatcher.spec.ts | 113 +++++++++++++++++++++ 10 files changed, 444 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index aafc6cf..2add2d0 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ npm install @boringnode/queue - **Priority Queues**: Process high-priority jobs first - **Bulk Dispatch**: Efficiently dispatch thousands of jobs at once - **Job Grouping**: Organize related jobs for monitoring +- **Job Deduplication**: Prevent duplicate jobs with custom IDs - **Retry with Backoff**: Exponential, linear, or fixed backoff strategies - **Job Timeout**: Fail or retry jobs that exceed a time limit - **Job History**: Retain completed/failed jobs for debugging @@ -131,6 +132,37 @@ await SendEmailJob.dispatchMany(recipients).group('newsletter-jan-2025') The `groupId` is stored with job data and accessible via `job.data.groupId`. +## Job Deduplication + +Prevent the same job from being pushed to the queue twice using custom job IDs: + +```typescript +// First dispatch - job is created +await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run() + +// Second dispatch with same ID - silently skipped +await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run() +``` + +The custom ID is automatically prefixed with the job name, so different job types can use the same ID without conflicts: + +```typescript +// These are two different jobs, no conflict +await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run() +await SendReceiptJob.dispatch({ orderId: 123 }).id('order-123').run() +``` + +Deduplication is atomic and race-condition-free across all adapters: + +- **Redis**: Uses `HSETNX` (set-if-not-exists) +- **Knex**: Uses `INSERT ... ON CONFLICT DO NOTHING` + +> [!NOTE] +> Without `.id()`, jobs use auto-generated UUIDs and are never deduplicated. The `.id()` method is only available on single dispatch, not `dispatchMany`. + +> [!TIP] +> When job retention is enabled (`removeOnComplete: false`), completed jobs remain in storage. A re-dispatch with the same custom ID will be silently skipped since the record still exists. With the default retention (`true`), completed jobs are removed immediately, so re-dispatch with the same ID succeeds normally. + ## Job History & Retention Keep completed and failed jobs for debugging: diff --git a/src/drivers/fake_adapter.ts b/src/drivers/fake_adapter.ts index ddc528a..39bfbb8 100644 --- a/src/drivers/fake_adapter.ts +++ b/src/drivers/fake_adapter.ts @@ -160,6 +160,11 @@ export class FakeAdapter implements Adapter { } async pushOn(queue: string, jobData: JobData): Promise { + if (jobData.unique) { + const jobs = this.#queues.get(queue) + if (jobs?.some((j) => j.id === jobData.id)) return + } + this.#recordPush(queue, jobData) this.#enqueue(queue, jobData) } @@ -169,6 +174,14 @@ export class FakeAdapter implements Adapter { } pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + if (jobData.unique) { + const jobs = this.#queues.get(queue) + if (jobs?.some((j) => j.id === jobData.id)) return Promise.resolve() + + const delayed = this.#delayedJobs.get(queue) + if (delayed?.has(jobData.id)) return Promise.resolve() + } + this.#recordPush(queue, jobData, delay) this.#schedulePush(queue, jobData, delay) diff --git a/src/drivers/knex_adapter.ts b/src/drivers/knex_adapter.ts index 56bcd34..a421a7c 100644 --- a/src/drivers/knex_adapter.ts +++ b/src/drivers/knex_adapter.ts @@ -370,13 +370,19 @@ export class KnexAdapter implements Adapter { const timestamp = Date.now() const score = calculateScore(priority, timestamp) - await this.#connection(this.#jobsTable).insert({ + const query = this.#connection(this.#jobsTable).insert({ id: jobData.id, queue, status: 'pending', data: JSON.stringify(jobData), score, }) + + if (jobData.unique) { + await query.onConflict(['id', 'queue']).ignore() + } else { + await query + } } async pushLater(jobData: JobData, delay: number): Promise { @@ -386,13 +392,19 @@ export class KnexAdapter implements Adapter { async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { const executeAt = Date.now() + delay - await this.#connection(this.#jobsTable).insert({ + const query = this.#connection(this.#jobsTable).insert({ id: jobData.id, queue, status: 'delayed', data: JSON.stringify(jobData), execute_at: executeAt, }) + + if (jobData.unique) { + await query.onConflict(['id', 'queue']).ignore() + } else { + await query + } } async pushMany(jobs: JobData[]): Promise { diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index bb66c05..4e4bc4b 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -35,6 +35,26 @@ const PUSH_JOB_SCRIPT = ` return 1 ` +/** + * Lua script for pushing a unique job. + * Uses HSETNX to only store data if the job doesn't already exist. + * Only adds to pending ZSET if the job was newly created. + */ +const PUSH_UNIQUE_JOB_SCRIPT = ` + local data_key = KEYS[1] + local pending_key = KEYS[2] + local job_id = ARGV[1] + local job_data = ARGV[2] + local score = tonumber(ARGV[3]) + + local added = redis.call('HSETNX', data_key, job_id, job_data) + if added == 1 then + redis.call('ZADD', pending_key, score, job_id) + end + + return added +` + /** * Lua script for pushing a delayed job. * Stores job data in the central hash and adds jobId to delayed ZSET. @@ -52,6 +72,26 @@ const PUSH_DELAYED_JOB_SCRIPT = ` return 1 ` +/** + * Lua script for pushing a unique delayed job. + * Uses HSETNX to only store data if the job doesn't already exist. + * Only adds to delayed ZSET if the job was newly created. + */ +const PUSH_UNIQUE_DELAYED_JOB_SCRIPT = ` + local data_key = KEYS[1] + local delayed_key = KEYS[2] + local job_id = ARGV[1] + local job_data = ARGV[2] + local execute_at = tonumber(ARGV[3]) + + local added = redis.call('HSETNX', data_key, job_id, job_data) + if added == 1 then + redis.call('ZADD', delayed_key, execute_at, job_id) + end + + return added +` + /** * Lua script for atomic job acquisition. * 1. Check and process delayed jobs @@ -620,8 +660,10 @@ export class RedisAdapter implements Adapter { const keys = this.#getKeys(queue) const executeAt = Date.now() + delay + const script = jobData.unique ? PUSH_UNIQUE_DELAYED_JOB_SCRIPT : PUSH_DELAYED_JOB_SCRIPT + await this.#connection.eval( - PUSH_DELAYED_JOB_SCRIPT, + script, 2, keys.data, keys.delayed, @@ -637,8 +679,10 @@ export class RedisAdapter implements Adapter { const timestamp = Date.now() const score = calculateScore(priority, timestamp) + const script = jobData.unique ? PUSH_UNIQUE_JOB_SCRIPT : PUSH_JOB_SCRIPT + await this.#connection.eval( - PUSH_JOB_SCRIPT, + script, 2, keys.data, keys.pending, diff --git a/src/job_dispatcher.ts b/src/job_dispatcher.ts index 9198895..e95d05d 100644 --- a/src/job_dispatcher.ts +++ b/src/job_dispatcher.ts @@ -47,6 +47,7 @@ export class JobDispatcher { #delay?: Duration #priority?: number #groupId?: string + #id?: string /** * Create a new job dispatcher. @@ -148,6 +149,40 @@ export class JobDispatcher { return this } + /** + * Set a custom job ID for deduplication. + * + * When a custom ID is provided, the adapter will silently skip + * the job if one with the same ID already exists in the queue. + * The ID is automatically prefixed with the job name to prevent + * collisions between different job types. + * + * @param jobId - Custom identifier for this job + * @returns This dispatcher for chaining + * + * @example + * ```typescript + * // Prevent duplicate invoice jobs for the same order + * await SendInvoiceJob.dispatch({ orderId: 123 }) + * .id('order-123') + * .run() + * + * // Second dispatch with same ID is silently skipped + * await SendInvoiceJob.dispatch({ orderId: 123 }) + * .id('order-123') + * .run() + * ``` + */ + id(jobId: string): this { + if (!jobId) { + throw new Error('Job ID must be a non-empty string') + } + + this.#id = jobId + + return this + } + /** * Use a specific adapter for this job. * @@ -181,7 +216,7 @@ export class JobDispatcher { * ``` */ async run(): Promise { - const id = randomUUID() + const id = this.#id ? `${this.#name}::${this.#id}` : randomUUID() debug('dispatching job %s with id %s using payload %s', this.#name, id, this.#payload) @@ -197,6 +232,7 @@ export class JobDispatcher { priority: this.#priority, groupId: this.#groupId, createdAt: Date.now(), + ...(this.#id ? { unique: true } : {}), } const message: JobDispatchMessage = { jobs: [jobData], queue: this.#queue, delay: parsedDelay } diff --git a/src/types/main.ts b/src/types/main.ts index 4bc68c9..a4e8c5f 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -133,6 +133,13 @@ export interface JobData { * Injected by OTel plugin at dispatch time. */ traceContext?: Record + + /** + * When true, adapters use atomic insert-if-not-exists semantics + * to silently skip duplicate jobs with the same ID. + * Set automatically when a custom job ID is provided via `.id()`. + */ + unique?: boolean } /** diff --git a/tests/_mocks/memory_adapter.ts b/tests/_mocks/memory_adapter.ts index 08fbc12..c0a1768 100644 --- a/tests/_mocks/memory_adapter.ts +++ b/tests/_mocks/memory_adapter.ts @@ -51,6 +51,11 @@ export class MemoryAdapter implements Adapter { } async pushOn(queue: string, jobData: JobData): Promise { + if (jobData.unique) { + const jobs = this.#queues.get(queue) + if (jobs?.some((j) => j.id === jobData.id)) return + } + if (!this.#queues.has(queue)) { this.#queues.set(queue, []) } @@ -63,6 +68,14 @@ export class MemoryAdapter implements Adapter { } pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + if (jobData.unique) { + const jobs = this.#queues.get(queue) + if (jobs?.some((j) => j.id === jobData.id)) return Promise.resolve() + + const delayed = this.#delayedJobs.get(queue) + if (delayed?.has(jobData.id)) return Promise.resolve() + } + if (!this.#delayedJobs.has(queue)) { this.#delayedJobs.set(queue, new Map()) } diff --git a/tests/_utils/register_driver_test_suite.ts b/tests/_utils/register_driver_test_suite.ts index ecf5727..f363830 100644 --- a/tests/_utils/register_driver_test_suite.ts +++ b/tests/_utils/register_driver_test_suite.ts @@ -613,7 +613,9 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.isNull(job3) }) - test('recoverStalledJobs should only recover jobs from the targeted queue', async ({ assert }) => { + test('recoverStalledJobs should only recover jobs from the targeted queue', async ({ + assert, + }) => { const adapter = await options.createAdapter() adapter.setWorkerId('worker-1') @@ -1647,4 +1649,112 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.equal(second!.id, 'medium') assert.equal(third!.id, 'low') }) + + test('pushOn with unique flag should skip duplicate job', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('test-queue', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + unique: true, + }) + + await adapter.pushOn('test-queue', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + unique: true, + }) + + const size = await adapter.sizeOf('test-queue') + assert.equal(size, 1) + + const job = await adapter.popFrom('test-queue') + assert.deepEqual(job!.payload, { attempt: 1 }) + }) + + test('pushOn without unique flag should insert normally', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('test-queue', { + id: 'job-1', + name: 'TestJob', + payload: { data: 'first' }, + attempts: 0, + }) + + await adapter.pushOn('test-queue', { + id: 'job-2', + name: 'TestJob', + payload: { data: 'second' }, + attempts: 0, + }) + + const size = await adapter.sizeOf('test-queue') + assert.equal(size, 2) + }) + + test('pushLaterOn with unique flag should skip duplicate delayed job', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushLaterOn( + 'test-queue', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + unique: true, + }, + 60_000 + ) + + await adapter.pushLaterOn( + 'test-queue', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + unique: true, + }, + 60_000 + ) + + const job = await adapter.getJob('TestJob::delayed-1', 'test-queue') + assert.isNotNull(job) + assert.deepEqual(job!.data.payload, { attempt: 1 }) + }) + + test('pushOn with unique flag should allow same id on different queues', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('queue-a', { + id: 'TestJob::shared-id', + name: 'TestJob', + payload: { queue: 'a' }, + attempts: 0, + unique: true, + }) + + await adapter.pushOn('queue-b', { + id: 'TestJob::shared-id', + name: 'TestJob', + payload: { queue: 'b' }, + attempts: 0, + unique: true, + }) + + const sizeA = await adapter.sizeOf('queue-a') + const sizeB = await adapter.sizeOf('queue-b') + assert.equal(sizeA, 1) + assert.equal(sizeB, 1) + }) } diff --git a/tests/fake_adapter.spec.ts b/tests/fake_adapter.spec.ts index a4eae64..f6d1383 100644 --- a/tests/fake_adapter.spec.ts +++ b/tests/fake_adapter.spec.ts @@ -91,6 +91,64 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) + test('should skip duplicate pushOn when unique flag is set', async ({ assert }) => { + const adapter = fake()() + + await adapter.pushOn('default', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + unique: true, + }) + + await adapter.pushOn('default', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + unique: true, + }) + + const size = await adapter.size() + assert.equal(size, 1) + adapter.assertPushedCount(1) + + await adapter.destroy() + }) + + test('should skip duplicate pushLaterOn when unique flag is set', async ({ assert }) => { + const adapter = fake()() + + await adapter.pushLaterOn( + 'default', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + unique: true, + }, + 5000 + ) + + await adapter.pushLaterOn( + 'default', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + unique: true, + }, + 5000 + ) + + adapter.assertPushedCount(1) + + await adapter.destroy() + }) + test('should support job class matchers', async ({ assert }) => { const adapter = fake()() @@ -128,5 +186,4 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) - }) diff --git a/tests/job_dispatcher.spec.ts b/tests/job_dispatcher.spec.ts index 7c71311..3027c04 100644 --- a/tests/job_dispatcher.spec.ts +++ b/tests/job_dispatcher.spec.ts @@ -317,6 +317,119 @@ test.group('JobDispatcher | groupId', () => { }) }) +test.group('JobDispatcher | custom id', () => { + test('should throw error when id is empty', async ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', { data: 'test' }).id(''), + 'Job ID must be a non-empty string' + ) + }) + + test('should use custom id prefixed with job name', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const { jobId } = await new JobDispatcher('SendInvoiceJob', { orderId: 123 }) + .id('order-123') + .run() + + assert.equal(jobId, 'SendInvoiceJob::order-123') + + const job = await sharedAdapter.pop() + assert.isNotNull(job) + assert.equal(job!.id, 'SendInvoiceJob::order-123') + }) + + test('should set unique flag on job data when custom id is provided', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('UniqueJob', { data: 'test' }).id('my-id').run() + + const job = await sharedAdapter.pop() + assert.isNotNull(job) + assert.isTrue(job!.unique) + }) + + test('should not set unique flag when no custom id is provided', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('RegularJob', { data: 'test' }).run() + + const job = await sharedAdapter.pop() + assert.isNotNull(job) + assert.isUndefined(job!.unique) + }) + + test('should silently skip duplicate job with same custom id', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('DedupJob', { attempt: 1 }).id('dedup-1').run() + await new JobDispatcher('DedupJob', { attempt: 2 }).id('dedup-1').run() + + const size = await sharedAdapter.size() + assert.equal(size, 1) + + const job = await sharedAdapter.pop() + assert.deepEqual(job!.payload, { attempt: 1 }) + }) + + test('should allow same custom id for different job names', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('JobA', { type: 'a' }).id('same-id').run() + await new JobDispatcher('JobB', { type: 'b' }).id('same-id').run() + + const size = await sharedAdapter.size() + assert.equal(size, 2) + }) + + test('should work with other options like priority and queue', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const { jobId } = await new JobDispatcher('PriorityDedupJob', { task: 'important' }) + .id('task-1') + .toQueue('high') + .priority(1) + .run() + + assert.equal(jobId, 'PriorityDedupJob::task-1') + + const job = await sharedAdapter.popFrom('high') + assert.isNotNull(job) + assert.equal(job!.priority, 1) + assert.isTrue(job!.unique) + }) +}) + test.group('JobBatchDispatcher', () => { test('should dispatch multiple jobs correctly', async ({ assert }) => { const sharedAdapter = memory()() From 67d73221922520cd31d77b7c6d7a47bf425c66e3 Mon Sep 17 00:00:00 2001 From: Jignesh Sanghani Date: Fri, 10 Apr 2026 18:48:44 +0530 Subject: [PATCH 2/5] fix: remove unused assert parameter in fake_adapter test --- tests/fake_adapter.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/fake_adapter.spec.ts b/tests/fake_adapter.spec.ts index f6d1383..077977f 100644 --- a/tests/fake_adapter.spec.ts +++ b/tests/fake_adapter.spec.ts @@ -117,7 +117,7 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) - test('should skip duplicate pushLaterOn when unique flag is set', async ({ assert }) => { + test('should skip duplicate pushLaterOn when unique flag is set', async () => { const adapter = fake()() await adapter.pushLaterOn( From 69c06352e6c7f40ae4ba6faeddefe93e6680cc22 Mon Sep 17 00:00:00 2001 From: Jignesh Sanghani Date: Fri, 10 Apr 2026 23:09:41 +0530 Subject: [PATCH 3/5] fix: check all job states in dedup guard for fake and memory adapters --- README.md | 3 ++- src/drivers/fake_adapter.ts | 13 +++++-------- tests/_mocks/memory_adapter.ts | 13 +++++-------- 3 files changed, 12 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 2add2d0..6c9ab86 100644 --- a/README.md +++ b/README.md @@ -152,10 +152,11 @@ await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run() await SendReceiptJob.dispatch({ orderId: 123 }).id('order-123').run() ``` -Deduplication is atomic and race-condition-free across all adapters: +Deduplication is atomic and race-condition-free for adapters that support storage-level uniqueness checks: - **Redis**: Uses `HSETNX` (set-if-not-exists) - **Knex**: Uses `INSERT ... ON CONFLICT DO NOTHING` +- **SyncAdapter**: Executes jobs inline and does not support deduplication > [!NOTE] > Without `.id()`, jobs use auto-generated UUIDs and are never deduplicated. The `.id()` method is only available on single dispatch, not `dispatchMany`. diff --git a/src/drivers/fake_adapter.ts b/src/drivers/fake_adapter.ts index 39bfbb8..c3af06c 100644 --- a/src/drivers/fake_adapter.ts +++ b/src/drivers/fake_adapter.ts @@ -161,8 +161,8 @@ export class FakeAdapter implements Adapter { async pushOn(queue: string, jobData: JobData): Promise { if (jobData.unique) { - const jobs = this.#queues.get(queue) - if (jobs?.some((j) => j.id === jobData.id)) return + const existing = await this.getJob(jobData.id, queue) + if (existing) return } this.#recordPush(queue, jobData) @@ -173,13 +173,10 @@ export class FakeAdapter implements Adapter { return this.pushLaterOn('default', jobData, delay) } - pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { if (jobData.unique) { - const jobs = this.#queues.get(queue) - if (jobs?.some((j) => j.id === jobData.id)) return Promise.resolve() - - const delayed = this.#delayedJobs.get(queue) - if (delayed?.has(jobData.id)) return Promise.resolve() + const existing = await this.getJob(jobData.id, queue) + if (existing) return } this.#recordPush(queue, jobData, delay) diff --git a/tests/_mocks/memory_adapter.ts b/tests/_mocks/memory_adapter.ts index c0a1768..de18210 100644 --- a/tests/_mocks/memory_adapter.ts +++ b/tests/_mocks/memory_adapter.ts @@ -52,8 +52,8 @@ export class MemoryAdapter implements Adapter { async pushOn(queue: string, jobData: JobData): Promise { if (jobData.unique) { - const jobs = this.#queues.get(queue) - if (jobs?.some((j) => j.id === jobData.id)) return + const existing = await this.getJob(jobData.id, queue) + if (existing) return } if (!this.#queues.has(queue)) { @@ -67,13 +67,10 @@ export class MemoryAdapter implements Adapter { return this.pushLaterOn('default', jobData, delay) } - pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { if (jobData.unique) { - const jobs = this.#queues.get(queue) - if (jobs?.some((j) => j.id === jobData.id)) return Promise.resolve() - - const delayed = this.#delayedJobs.get(queue) - if (delayed?.has(jobData.id)) return Promise.resolve() + const existing = await this.getJob(jobData.id, queue) + if (existing) return } if (!this.#delayedJobs.has(queue)) { From b53a6df5035840c8e5cb9ed8832eb83ed0b406a1 Mon Sep 17 00:00:00 2001 From: Jignesh Sanghani Date: Sat, 11 Apr 2026 00:12:14 +0530 Subject: [PATCH 4/5] refactor: replace .id() with .dedup() API for extensible job deduplication --- .gitignore | 3 ++ README.md | 18 +++++----- src/drivers/fake_adapter.ts | 4 +-- src/drivers/knex_adapter.ts | 4 +-- src/drivers/redis_adapter.ts | 12 +++---- src/job_dispatcher.ts | 40 ++++++++++++---------- src/types/main.ts | 10 ++++-- tests/_mocks/memory_adapter.ts | 4 +-- tests/_utils/register_driver_test_suite.ts | 20 +++++------ tests/fake_adapter.spec.ts | 12 +++---- tests/job_dispatcher.spec.ts | 38 ++++++++++---------- 11 files changed, 87 insertions(+), 78 deletions(-) diff --git a/.gitignore b/.gitignore index 3df4c92..9153b8c 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,9 @@ coverage npm-debug.log yarn-error.log +# OS specific +.DS_Store + # Editors specific .fleet .idea diff --git a/README.md b/README.md index 6c9ab86..a3a0969 100644 --- a/README.md +++ b/README.md @@ -134,22 +134,22 @@ The `groupId` is stored with job data and accessible via `job.data.groupId`. ## Job Deduplication -Prevent the same job from being pushed to the queue twice using custom job IDs: +Prevent the same job from being pushed to the queue twice using `.dedup()`: ```typescript // First dispatch - job is created -await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run() +await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() -// Second dispatch with same ID - silently skipped -await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run() +// Second dispatch with same dedup ID - silently skipped +await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() ``` -The custom ID is automatically prefixed with the job name, so different job types can use the same ID without conflicts: +The dedup ID is automatically prefixed with the job name, so different job types can use the same ID without conflicts: ```typescript // These are two different jobs, no conflict -await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run() -await SendReceiptJob.dispatch({ orderId: 123 }).id('order-123').run() +await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() +await SendReceiptJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() ``` Deduplication is atomic and race-condition-free for adapters that support storage-level uniqueness checks: @@ -159,10 +159,10 @@ Deduplication is atomic and race-condition-free for adapters that support storag - **SyncAdapter**: Executes jobs inline and does not support deduplication > [!NOTE] -> Without `.id()`, jobs use auto-generated UUIDs and are never deduplicated. The `.id()` method is only available on single dispatch, not `dispatchMany`. +> Without `.dedup()`, jobs use auto-generated UUIDs and are never deduplicated. The `.dedup()` method is only available on single dispatch, not `dispatchMany`. > [!TIP] -> When job retention is enabled (`removeOnComplete: false`), completed jobs remain in storage. A re-dispatch with the same custom ID will be silently skipped since the record still exists. With the default retention (`true`), completed jobs are removed immediately, so re-dispatch with the same ID succeeds normally. +> When job retention is enabled (`removeOnComplete: false`), completed jobs remain in storage. A re-dispatch with the same dedup ID will be silently skipped since the record still exists. With the default retention (`true`), completed jobs are removed immediately, so re-dispatch with the same ID succeeds normally. ## Job History & Retention diff --git a/src/drivers/fake_adapter.ts b/src/drivers/fake_adapter.ts index c3af06c..e5f830c 100644 --- a/src/drivers/fake_adapter.ts +++ b/src/drivers/fake_adapter.ts @@ -160,7 +160,7 @@ export class FakeAdapter implements Adapter { } async pushOn(queue: string, jobData: JobData): Promise { - if (jobData.unique) { + if (jobData.dedup) { const existing = await this.getJob(jobData.id, queue) if (existing) return } @@ -174,7 +174,7 @@ export class FakeAdapter implements Adapter { } async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { - if (jobData.unique) { + if (jobData.dedup) { const existing = await this.getJob(jobData.id, queue) if (existing) return } diff --git a/src/drivers/knex_adapter.ts b/src/drivers/knex_adapter.ts index a421a7c..5e688af 100644 --- a/src/drivers/knex_adapter.ts +++ b/src/drivers/knex_adapter.ts @@ -378,7 +378,7 @@ export class KnexAdapter implements Adapter { score, }) - if (jobData.unique) { + if (jobData.dedup) { await query.onConflict(['id', 'queue']).ignore() } else { await query @@ -400,7 +400,7 @@ export class KnexAdapter implements Adapter { execute_at: executeAt, }) - if (jobData.unique) { + if (jobData.dedup) { await query.onConflict(['id', 'queue']).ignore() } else { await query diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index 4e4bc4b..054ac9e 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -36,11 +36,11 @@ const PUSH_JOB_SCRIPT = ` ` /** - * Lua script for pushing a unique job. + * Lua script for pushing a dedup job. * Uses HSETNX to only store data if the job doesn't already exist. * Only adds to pending ZSET if the job was newly created. */ -const PUSH_UNIQUE_JOB_SCRIPT = ` +const PUSH_DEDUP_JOB_SCRIPT = ` local data_key = KEYS[1] local pending_key = KEYS[2] local job_id = ARGV[1] @@ -73,11 +73,11 @@ const PUSH_DELAYED_JOB_SCRIPT = ` ` /** - * Lua script for pushing a unique delayed job. + * Lua script for pushing a dedup delayed job. * Uses HSETNX to only store data if the job doesn't already exist. * Only adds to delayed ZSET if the job was newly created. */ -const PUSH_UNIQUE_DELAYED_JOB_SCRIPT = ` +const PUSH_DEDUP_DELAYED_JOB_SCRIPT = ` local data_key = KEYS[1] local delayed_key = KEYS[2] local job_id = ARGV[1] @@ -660,7 +660,7 @@ export class RedisAdapter implements Adapter { const keys = this.#getKeys(queue) const executeAt = Date.now() + delay - const script = jobData.unique ? PUSH_UNIQUE_DELAYED_JOB_SCRIPT : PUSH_DELAYED_JOB_SCRIPT + const script = jobData.dedup ? PUSH_DEDUP_DELAYED_JOB_SCRIPT : PUSH_DELAYED_JOB_SCRIPT await this.#connection.eval( script, @@ -679,7 +679,7 @@ export class RedisAdapter implements Adapter { const timestamp = Date.now() const score = calculateScore(priority, timestamp) - const script = jobData.unique ? PUSH_UNIQUE_JOB_SCRIPT : PUSH_JOB_SCRIPT + const script = jobData.dedup ? PUSH_DEDUP_JOB_SCRIPT : PUSH_JOB_SCRIPT await this.#connection.eval( script, diff --git a/src/job_dispatcher.ts b/src/job_dispatcher.ts index e95d05d..6a104d6 100644 --- a/src/job_dispatcher.ts +++ b/src/job_dispatcher.ts @@ -15,11 +15,12 @@ import { parse } from './utils.js' * * ``` * Job.dispatch(payload) - * .toQueue('emails') // optional: target queue - * .priority(1) // optional: 1-10, lower = higher priority - * .in('5m') // optional: delay before processing - * .with('redis') // optional: specific adapter - * .run() // dispatch the job + * .toQueue('emails') // optional: target queue + * .priority(1) // optional: 1-10, lower = higher priority + * .in('5m') // optional: delay before processing + * .dedup({ id: 'order-123' }) // optional: deduplication + * .with('redis') // optional: specific adapter + * .run() // dispatch the job * ``` * * @typeParam T - The payload type for this job @@ -47,7 +48,7 @@ export class JobDispatcher { #delay?: Duration #priority?: number #groupId?: string - #id?: string + #dedup?: { id: string } /** * Create a new job dispatcher. @@ -150,35 +151,36 @@ export class JobDispatcher { } /** - * Set a custom job ID for deduplication. + * Configure deduplication for this job. * - * When a custom ID is provided, the adapter will silently skip - * the job if one with the same ID already exists in the queue. + * When deduplication is configured, the adapter will silently skip + * the job if one with the same dedup ID already exists in the queue. * The ID is automatically prefixed with the job name to prevent * collisions between different job types. * - * @param jobId - Custom identifier for this job + * @param options - Deduplication options + * @param options.id - Unique deduplication key * @returns This dispatcher for chaining * * @example * ```typescript * // Prevent duplicate invoice jobs for the same order * await SendInvoiceJob.dispatch({ orderId: 123 }) - * .id('order-123') + * .dedup({ id: 'order-123' }) * .run() * - * // Second dispatch with same ID is silently skipped + * // Second dispatch with same dedup ID is silently skipped * await SendInvoiceJob.dispatch({ orderId: 123 }) - * .id('order-123') + * .dedup({ id: 'order-123' }) * .run() * ``` */ - id(jobId: string): this { - if (!jobId) { - throw new Error('Job ID must be a non-empty string') + dedup(options: { id: string }): this { + if (!options.id) { + throw new Error('Dedup ID must be a non-empty string') } - this.#id = jobId + this.#dedup = options return this } @@ -216,7 +218,7 @@ export class JobDispatcher { * ``` */ async run(): Promise { - const id = this.#id ? `${this.#name}::${this.#id}` : randomUUID() + const id = this.#dedup ? `${this.#name}::${this.#dedup.id}` : randomUUID() debug('dispatching job %s with id %s using payload %s', this.#name, id, this.#payload) @@ -232,7 +234,7 @@ export class JobDispatcher { priority: this.#priority, groupId: this.#groupId, createdAt: Date.now(), - ...(this.#id ? { unique: true } : {}), + ...(this.#dedup ? { dedup: { id: this.#dedup.id } } : {}), } const message: JobDispatchMessage = { jobs: [jobData], queue: this.#queue, delay: parsedDelay } diff --git a/src/types/main.ts b/src/types/main.ts index a4e8c5f..599086b 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -135,11 +135,15 @@ export interface JobData { traceContext?: Record /** - * When true, adapters use atomic insert-if-not-exists semantics + * Deduplication configuration for this job. + * When set, adapters use atomic insert-if-not-exists semantics * to silently skip duplicate jobs with the same ID. - * Set automatically when a custom job ID is provided via `.id()`. + * Set automatically when `.dedup()` is called on the dispatcher. */ - unique?: boolean + dedup?: { + /** The original dedup key provided by the caller (before name-prefixing). */ + id: string + } } /** diff --git a/tests/_mocks/memory_adapter.ts b/tests/_mocks/memory_adapter.ts index de18210..a944610 100644 --- a/tests/_mocks/memory_adapter.ts +++ b/tests/_mocks/memory_adapter.ts @@ -51,7 +51,7 @@ export class MemoryAdapter implements Adapter { } async pushOn(queue: string, jobData: JobData): Promise { - if (jobData.unique) { + if (jobData.dedup) { const existing = await this.getJob(jobData.id, queue) if (existing) return } @@ -68,7 +68,7 @@ export class MemoryAdapter implements Adapter { } async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { - if (jobData.unique) { + if (jobData.dedup) { const existing = await this.getJob(jobData.id, queue) if (existing) return } diff --git a/tests/_utils/register_driver_test_suite.ts b/tests/_utils/register_driver_test_suite.ts index f363830..ab80635 100644 --- a/tests/_utils/register_driver_test_suite.ts +++ b/tests/_utils/register_driver_test_suite.ts @@ -1650,7 +1650,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.equal(third!.id, 'low') }) - test('pushOn with unique flag should skip duplicate job', async ({ assert }) => { + test('pushOn with dedup should skip duplicate job', async ({ assert }) => { const adapter = await options.createAdapter() adapter.setWorkerId('worker-1') @@ -1659,7 +1659,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { name: 'TestJob', payload: { attempt: 1 }, attempts: 0, - unique: true, + dedup: { id: 'order-1' }, }) await adapter.pushOn('test-queue', { @@ -1667,7 +1667,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { name: 'TestJob', payload: { attempt: 2 }, attempts: 0, - unique: true, + dedup: { id: 'order-1' }, }) const size = await adapter.sizeOf('test-queue') @@ -1677,7 +1677,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.deepEqual(job!.payload, { attempt: 1 }) }) - test('pushOn without unique flag should insert normally', async ({ assert }) => { + test('pushOn without dedup should insert normally', async ({ assert }) => { const adapter = await options.createAdapter() adapter.setWorkerId('worker-1') @@ -1699,7 +1699,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.equal(size, 2) }) - test('pushLaterOn with unique flag should skip duplicate delayed job', async ({ assert }) => { + test('pushLaterOn with dedup should skip duplicate delayed job', async ({ assert }) => { const adapter = await options.createAdapter() adapter.setWorkerId('worker-1') @@ -1710,7 +1710,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { name: 'TestJob', payload: { attempt: 1 }, attempts: 0, - unique: true, + dedup: { id: 'delayed-1' }, }, 60_000 ) @@ -1722,7 +1722,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { name: 'TestJob', payload: { attempt: 2 }, attempts: 0, - unique: true, + dedup: { id: 'delayed-1' }, }, 60_000 ) @@ -1732,7 +1732,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.deepEqual(job!.data.payload, { attempt: 1 }) }) - test('pushOn with unique flag should allow same id on different queues', async ({ assert }) => { + test('pushOn with dedup should allow same id on different queues', async ({ assert }) => { const adapter = await options.createAdapter() adapter.setWorkerId('worker-1') @@ -1741,7 +1741,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { name: 'TestJob', payload: { queue: 'a' }, attempts: 0, - unique: true, + dedup: { id: 'shared-id' }, }) await adapter.pushOn('queue-b', { @@ -1749,7 +1749,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { name: 'TestJob', payload: { queue: 'b' }, attempts: 0, - unique: true, + dedup: { id: 'shared-id' }, }) const sizeA = await adapter.sizeOf('queue-a') diff --git a/tests/fake_adapter.spec.ts b/tests/fake_adapter.spec.ts index 077977f..94924b6 100644 --- a/tests/fake_adapter.spec.ts +++ b/tests/fake_adapter.spec.ts @@ -91,7 +91,7 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) - test('should skip duplicate pushOn when unique flag is set', async ({ assert }) => { + test('should skip duplicate pushOn when dedup is set', async ({ assert }) => { const adapter = fake()() await adapter.pushOn('default', { @@ -99,7 +99,7 @@ test.group('FakeAdapter', () => { name: 'TestJob', payload: { attempt: 1 }, attempts: 0, - unique: true, + dedup: { id: 'order-1' }, }) await adapter.pushOn('default', { @@ -107,7 +107,7 @@ test.group('FakeAdapter', () => { name: 'TestJob', payload: { attempt: 2 }, attempts: 0, - unique: true, + dedup: { id: 'order-1' }, }) const size = await adapter.size() @@ -117,7 +117,7 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) - test('should skip duplicate pushLaterOn when unique flag is set', async () => { + test('should skip duplicate pushLaterOn when dedup is set', async () => { const adapter = fake()() await adapter.pushLaterOn( @@ -127,7 +127,7 @@ test.group('FakeAdapter', () => { name: 'TestJob', payload: { attempt: 1 }, attempts: 0, - unique: true, + dedup: { id: 'delayed-1' }, }, 5000 ) @@ -139,7 +139,7 @@ test.group('FakeAdapter', () => { name: 'TestJob', payload: { attempt: 2 }, attempts: 0, - unique: true, + dedup: { id: 'delayed-1' }, }, 5000 ) diff --git a/tests/job_dispatcher.spec.ts b/tests/job_dispatcher.spec.ts index 3027c04..214b4e9 100644 --- a/tests/job_dispatcher.spec.ts +++ b/tests/job_dispatcher.spec.ts @@ -317,15 +317,15 @@ test.group('JobDispatcher | groupId', () => { }) }) -test.group('JobDispatcher | custom id', () => { - test('should throw error when id is empty', async ({ assert }) => { +test.group('JobDispatcher | dedup', () => { + test('should throw error when dedup id is empty', async ({ assert }) => { assert.throws( - () => new JobDispatcher('TestJob', { data: 'test' }).id(''), - 'Job ID must be a non-empty string' + () => new JobDispatcher('TestJob', { data: 'test' }).dedup({ id: '' }), + 'Dedup ID must be a non-empty string' ) }) - test('should use custom id prefixed with job name', async ({ assert }) => { + test('should use dedup id prefixed with job name', async ({ assert }) => { const sharedAdapter = memory()() await QueueManager.init({ @@ -334,7 +334,7 @@ test.group('JobDispatcher | custom id', () => { }) const { jobId } = await new JobDispatcher('SendInvoiceJob', { orderId: 123 }) - .id('order-123') + .dedup({ id: 'order-123' }) .run() assert.equal(jobId, 'SendInvoiceJob::order-123') @@ -344,7 +344,7 @@ test.group('JobDispatcher | custom id', () => { assert.equal(job!.id, 'SendInvoiceJob::order-123') }) - test('should set unique flag on job data when custom id is provided', async ({ assert }) => { + test('should set dedup field on job data when dedup is configured', async ({ assert }) => { const sharedAdapter = memory()() await QueueManager.init({ @@ -352,14 +352,14 @@ test.group('JobDispatcher | custom id', () => { adapters: { memory: () => sharedAdapter }, }) - await new JobDispatcher('UniqueJob', { data: 'test' }).id('my-id').run() + await new JobDispatcher('UniqueJob', { data: 'test' }).dedup({ id: 'my-id' }).run() const job = await sharedAdapter.pop() assert.isNotNull(job) - assert.isTrue(job!.unique) + assert.deepEqual(job!.dedup, { id: 'my-id' }) }) - test('should not set unique flag when no custom id is provided', async ({ assert }) => { + test('should not set dedup field when dedup is not configured', async ({ assert }) => { const sharedAdapter = memory()() await QueueManager.init({ @@ -371,10 +371,10 @@ test.group('JobDispatcher | custom id', () => { const job = await sharedAdapter.pop() assert.isNotNull(job) - assert.isUndefined(job!.unique) + assert.isUndefined(job!.dedup) }) - test('should silently skip duplicate job with same custom id', async ({ assert }) => { + test('should silently skip duplicate job with same dedup id', async ({ assert }) => { const sharedAdapter = memory()() await QueueManager.init({ @@ -382,8 +382,8 @@ test.group('JobDispatcher | custom id', () => { adapters: { memory: () => sharedAdapter }, }) - await new JobDispatcher('DedupJob', { attempt: 1 }).id('dedup-1').run() - await new JobDispatcher('DedupJob', { attempt: 2 }).id('dedup-1').run() + await new JobDispatcher('DedupJob', { attempt: 1 }).dedup({ id: 'dedup-1' }).run() + await new JobDispatcher('DedupJob', { attempt: 2 }).dedup({ id: 'dedup-1' }).run() const size = await sharedAdapter.size() assert.equal(size, 1) @@ -392,7 +392,7 @@ test.group('JobDispatcher | custom id', () => { assert.deepEqual(job!.payload, { attempt: 1 }) }) - test('should allow same custom id for different job names', async ({ assert }) => { + test('should allow same dedup id for different job names', async ({ assert }) => { const sharedAdapter = memory()() await QueueManager.init({ @@ -400,8 +400,8 @@ test.group('JobDispatcher | custom id', () => { adapters: { memory: () => sharedAdapter }, }) - await new JobDispatcher('JobA', { type: 'a' }).id('same-id').run() - await new JobDispatcher('JobB', { type: 'b' }).id('same-id').run() + await new JobDispatcher('JobA', { type: 'a' }).dedup({ id: 'same-id' }).run() + await new JobDispatcher('JobB', { type: 'b' }).dedup({ id: 'same-id' }).run() const size = await sharedAdapter.size() assert.equal(size, 2) @@ -416,7 +416,7 @@ test.group('JobDispatcher | custom id', () => { }) const { jobId } = await new JobDispatcher('PriorityDedupJob', { task: 'important' }) - .id('task-1') + .dedup({ id: 'task-1' }) .toQueue('high') .priority(1) .run() @@ -426,7 +426,7 @@ test.group('JobDispatcher | custom id', () => { const job = await sharedAdapter.popFrom('high') assert.isNotNull(job) assert.equal(job!.priority, 1) - assert.isTrue(job!.unique) + assert.deepEqual(job!.dedup, { id: 'task-1' }) }) }) From d0820876f4bfc51d6b4b0556cbfafe241056fce3 Mon Sep 17 00:00:00 2001 From: Jignesh Sanghani Date: Wed, 22 Apr 2026 17:17:11 +0530 Subject: [PATCH 5/5] feat: add .dedup() API for job deduplication --- README.md | 96 ++++++--- src/contracts/adapter.ts | 24 ++- src/drivers/fake_adapter.ts | 162 +++++++++++++-- src/drivers/knex_adapter.ts | 224 ++++++++++++++------- src/drivers/redis_adapter.ts | 199 +++++++++++++++--- src/drivers/sync_adapter.ts | 4 + src/job_batch_dispatcher.ts | 1 - src/job_dispatcher.ts | 98 +++++++-- src/queue_config_resolver.ts | 5 +- src/queue_manager.ts | 4 +- src/services/queue_schema.ts | 36 +++- src/types/main.ts | 28 ++- src/types/tracing_channels.ts | 9 +- src/worker.ts | 51 ++++- tests/_mocks/memory_adapter.ts | 157 +++++++++++++-- tests/_utils/register_driver_test_suite.ts | 198 ++++++++++++++++++ tests/adapter.spec.ts | 4 +- tests/job_dispatcher.spec.ts | 155 +++++++++++++- tests/otel.spec.ts | 80 ++++++-- tests/sync_adapter.spec.ts | 4 +- tests/worker.spec.ts | 4 +- 21 files changed, 1299 insertions(+), 244 deletions(-) diff --git a/README.md b/README.md index a3a0969..3cee4fa 100644 --- a/README.md +++ b/README.md @@ -134,7 +134,9 @@ The `groupId` is stored with job data and accessible via `job.data.groupId`. ## Job Deduplication -Prevent the same job from being pushed to the queue twice using `.dedup()`: +Prevent the same job from being pushed multiple times. Four modes, all via `.dedup()`: + +### Simple (skip while job exists) ```typescript // First dispatch - job is created @@ -144,25 +146,65 @@ await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() ``` -The dedup ID is automatically prefixed with the job name, so different job types can use the same ID without conflicts: +### Throttle (skip within TTL window) ```typescript -// These are two different jobs, no conflict -await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() -await SendReceiptJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() +// Within 5s, duplicates are skipped. After 5s, a new job is created. +await SendEmailJob.dispatch({ to: 'user@example.com' }) + .dedup({ id: 'welcome-123', ttl: '5s' }) + .run() ``` -Deduplication is atomic and race-condition-free for adapters that support storage-level uniqueness checks: +### Extend (reset TTL on duplicate) -- **Redis**: Uses `HSETNX` (set-if-not-exists) -- **Knex**: Uses `INSERT ... ON CONFLICT DO NOTHING` -- **SyncAdapter**: Executes jobs inline and does not support deduplication +```typescript +// Each duplicate push resets the TTL timer. +await RateLimitJob.dispatch({ userId: 42 }).dedup({ id: 'rate-42', ttl: '1m', extend: true }).run() +``` -> [!NOTE] -> Without `.dedup()`, jobs use auto-generated UUIDs and are never deduplicated. The `.dedup()` method is only available on single dispatch, not `dispatchMany`. +### Debounce (replace payload + reset TTL) + +```typescript +// Within the 2s window, the latest payload overwrites the previous pending job. +await SaveDraftJob.dispatch({ content: 'latest draft' }) + .dedup({ id: 'draft-42', ttl: '2s', replace: true, extend: true }) + .run() +``` + +### Inspecting the outcome + +`DispatchResult` tells you what happened: + +```typescript +const { jobId, deduped } = await SaveDraftJob.dispatch({ content: '...' }) + .dedup({ id: 'draft-42', ttl: '2s', replace: true }) + .run() + +// deduped: 'added' | 'skipped' | 'replaced' | 'extended' +// jobId: the UUID of the job (the existing one when deduped) +``` + +### How it works + +- The dedup ID is automatically prefixed with the job name (`SendInvoiceJob::order-123`), so different job types can reuse the same key. +- `ttl` accepts a Duration (`'5s'`, `'1m'`) or milliseconds. +- `extend` and `replace` **require** `ttl` — calling them without `ttl` throws. +- `replace` only applies to jobs in `pending` or `delayed` state. Active (executing) jobs are left alone; the dispatch returns `{ deduped: 'skipped' }`. +- `replace` swaps the **payload only** — priority, queue, delay, and groupId of the existing job are retained. To change those, use a different dedup id or wait for the TTL to expire. +- `retryJob` does not touch the dedup entry — a retried job continues to occupy the dedup slot. TTL runs on wall-clock time, so long-running retries may outlive the TTL window. Use a generous TTL or no TTL if retries must stay deduped. +- Atomic and race-free: + - **Redis**: `SET + PEXPIRE` under a Lua script with `HSETNX`-style guards. + - **Knex**: transactional `SELECT ... FOR UPDATE` + insert/update inside a transaction. + - **SyncAdapter**: executes inline, no dedup support. + +### Caveats -> [!TIP] -> When job retention is enabled (`removeOnComplete: false`), completed jobs remain in storage. A re-dispatch with the same dedup ID will be silently skipped since the record still exists. With the default retention (`true`), completed jobs are removed immediately, so re-dispatch with the same ID succeeds normally. +- Without `.dedup()`, jobs use auto-generated UUIDs and are never deduplicated. +- `.dedup()` is only available on single dispatch. `dispatchMany` / `pushManyOn` reject jobs with a `dedup` field. +- Scheduled jobs (`.schedule()`) do not support dedup — each cron/interval fire is an independent dispatch. +- With no `ttl`, dedup persists until the job is removed (completed/failed without retention). When retention keeps the record, re-dispatch stays blocked until the record is pruned. +- With `ttl`, dedup expires after the window — a new job (new UUID) is created. The old job still runs. +- Knex concurrent race: two `pushOn` calls with the same dedup id firing at the exact same instant on Postgres (READ COMMITTED) can both succeed (rare). Serialize at the app layer if strict guarantees are required, or use Redis. ## Job History & Retention @@ -569,7 +611,7 @@ import * as boringqueue from '@boringnode/queue' const instrumentation = new QueueInstrumentation({ messagingSystem: 'boringqueue', // default - executionSpanLinkMode: 'link', // or 'parent' + executionSpanLinkMode: 'link', // or 'parent' }) instrumentation.enable() @@ -582,19 +624,19 @@ The instrumentation patches `QueueManager.init()` to automatically inject its wr The instrumentation uses standard [OTel messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/) where they map cleanly, plus a few queue-specific custom attributes. -| Attribute | Kind | Description | -| ------------------------------- | ------- | ------------------------------------------ | -| `messaging.system` | Semconv | `'boringqueue'` (configurable) | -| `messaging.operation.name` | Semconv | `'publish'` or `'process'` | -| `messaging.destination.name` | Semconv | Queue name | -| `messaging.message.id` | Semconv | Job ID for single-message spans | -| `messaging.batch.message_count` | Semconv | Number of jobs in a batch dispatch | -| `messaging.message.retry.count` | Custom | Retry count (0-based) for a job attempt | -| `messaging.job.name` | Custom | Job class name (e.g. `SendEmailJob`) | -| `messaging.job.status` | Custom | `'completed'`, `'failed'`, or `'retrying'` | -| `messaging.job.group_id` | Custom | Queue-specific group identifier | -| `messaging.job.priority` | Custom | Queue-specific job priority | -| `messaging.job.delay_ms` | Custom | Delay before the job becomes available | +| Attribute | Kind | Description | +| ------------------------------- | ------- | --------------------------------------------- | +| `messaging.system` | Semconv | `'boringqueue'` (configurable) | +| `messaging.operation.name` | Semconv | `'publish'` or `'process'` | +| `messaging.destination.name` | Semconv | Queue name | +| `messaging.message.id` | Semconv | Job ID for single-message spans | +| `messaging.batch.message_count` | Semconv | Number of jobs in a batch dispatch | +| `messaging.message.retry.count` | Custom | Retry count (0-based) for a job attempt | +| `messaging.job.name` | Custom | Job class name (e.g. `SendEmailJob`) | +| `messaging.job.status` | Custom | `'completed'`, `'failed'`, or `'retrying'` | +| `messaging.job.group_id` | Custom | Queue-specific group identifier | +| `messaging.job.priority` | Custom | Queue-specific job priority | +| `messaging.job.delay_ms` | Custom | Delay before the job becomes available | | `messaging.job.queue_time_ms` | Custom | Time spent waiting in queue before processing | ### Trace Context Propagation diff --git a/src/contracts/adapter.ts b/src/contracts/adapter.ts index e05067e..8bd8e12 100644 --- a/src/contracts/adapter.ts +++ b/src/contracts/adapter.ts @@ -1,4 +1,5 @@ import type { + DedupOutcome, JobData, JobRecord, JobRetention, @@ -7,6 +8,17 @@ import type { ScheduleListOptions, } from '../types/main.js' +/** + * Result of a push operation when dedup was involved. + * `outcome` tells the dispatcher what happened; `jobId` is the ID of the + * existing job when deduped (skipped/replaced/extended). + */ +export interface PushResult { + outcome: DedupOutcome + /** ID of the existing job when a duplicate was detected, otherwise the newly added job's id. */ + jobId: string +} + /** * A job that has been acquired by a worker for processing. * Extends JobData with the timestamp when the job was acquired. @@ -119,24 +131,27 @@ export interface Adapter { * Push a job to the default queue for immediate processing. * * @param jobData - The job data to push + * @returns PushResult if jobData.dedup is set, otherwise void */ - push(jobData: JobData): Promise + push(jobData: JobData): Promise /** * Push a job to a specific queue for immediate processing. * * @param queue - The queue name to push to * @param jobData - The job data to push + * @returns PushResult if jobData.dedup is set, otherwise void */ - pushOn(queue: string, jobData: JobData): Promise + pushOn(queue: string, jobData: JobData): Promise /** * Push a job to the default queue with a delay. * * @param jobData - The job data to push * @param delay - Delay in milliseconds before the job becomes available + * @returns PushResult if jobData.dedup is set, otherwise void */ - pushLater(jobData: JobData, delay: number): Promise + pushLater(jobData: JobData, delay: number): Promise /** * Push a job to a specific queue with a delay. @@ -144,8 +159,9 @@ export interface Adapter { * @param queue - The queue name to push to * @param jobData - The job data to push * @param delay - Delay in milliseconds before the job becomes available + * @returns PushResult if jobData.dedup is set, otherwise void */ - pushLaterOn(queue: string, jobData: JobData, delay: number): Promise + pushLaterOn(queue: string, jobData: JobData, delay: number): Promise /** * Push multiple jobs to the default queue for immediate processing. diff --git a/src/drivers/fake_adapter.ts b/src/drivers/fake_adapter.ts index e5f830c..3671cc4 100644 --- a/src/drivers/fake_adapter.ts +++ b/src/drivers/fake_adapter.ts @@ -2,7 +2,7 @@ import assert from 'node:assert/strict' import { randomUUID } from 'node:crypto' import { isDeepStrictEqual } from 'node:util' import { CronExpressionParser } from 'cron-parser' -import type { Adapter, AcquiredJob } from '../contracts/adapter.js' +import type { Adapter, AcquiredJob, PushResult } from '../contracts/adapter.js' import type { JobData, JobClass, @@ -16,6 +16,14 @@ import { DEFAULT_PRIORITY } from '../constants.js' import { parse } from '../utils.js' import { Job } from '../job.js' +interface DedupEntry { + jobId: string + createdAt: number + ttl?: number + replace?: boolean + extend?: boolean +} + interface ActiveJob { job: JobData acquiredAt: number @@ -71,6 +79,7 @@ export class FakeAdapter implements Adapter { #pendingTimeouts = new Set() #schedules = new Map() #pushedJobs: FakeJobRecord[] = [] + #dedupIndex = new Map>() #onDispose?: () => void /** @@ -116,6 +125,7 @@ export class FakeAdapter implements Adapter { this.#failedJobs.clear() this.#schedules.clear() this.#pushedJobs = [] + this.#dedupIndex.clear() } assertPushed(matcher: FakeJobMatcher, query?: FakeJobQuery): void { @@ -155,34 +165,36 @@ export class FakeAdapter implements Adapter { return jobs.length } - async push(jobData: JobData): Promise { + async push(jobData: JobData): Promise { return this.pushOn('default', jobData) } - async pushOn(queue: string, jobData: JobData): Promise { - if (jobData.dedup) { - const existing = await this.getJob(jobData.id, queue) - if (existing) return - } + async pushOn(queue: string, jobData: JobData): Promise { + const deduped = await this.#applyDedup(queue, jobData) + if (deduped) return deduped this.#recordPush(queue, jobData) this.#enqueue(queue, jobData) + + if (jobData.dedup) { + return { outcome: 'added', jobId: jobData.id } + } } - async pushLater(jobData: JobData, delay: number): Promise { + async pushLater(jobData: JobData, delay: number): Promise { return this.pushLaterOn('default', jobData, delay) } - async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { - if (jobData.dedup) { - const existing = await this.getJob(jobData.id, queue) - if (existing) return - } + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + const deduped = await this.#applyDedup(queue, jobData) + if (deduped) return deduped this.#recordPush(queue, jobData, delay) this.#schedulePush(queue, jobData, delay) - return Promise.resolve() + if (jobData.dedup) { + return { outcome: 'added', jobId: jobData.id } + } } async pushMany(jobs: JobData[]): Promise { @@ -190,6 +202,10 @@ export class FakeAdapter implements Adapter { } async pushManyOn(queue: string, jobs: JobData[]): Promise { + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + for (const job of jobs) { await this.pushOn(queue, job) } @@ -236,6 +252,7 @@ export class FakeAdapter implements Adapter { this.#activeJobs.delete(jobId) if (removeOnComplete === undefined || removeOnComplete === true) { + this.#cleanupDedupForJob(queue, active.job) return } @@ -254,6 +271,7 @@ export class FakeAdapter implements Adapter { this.#activeJobs.delete(jobId) if (removeOnFail === undefined || removeOnFail === true) { + this.#cleanupDedupForJob(queue, active.job) return } @@ -308,6 +326,7 @@ export class FakeAdapter implements Adapter { if (currentStalledCount >= maxStalledCount) { // Fail permanently - just remove from active this.#activeJobs.delete(jobId) + this.#cleanupDedupForJob(active.queue, active.job) continue } @@ -536,26 +555,131 @@ export class FakeAdapter implements Adapter { records.push(record) if (retention && retention !== true) { - this.#applyRetention(records, retention) + this.#applyRetention(records, retention, queue) } } - #applyRetention(records: JobRecord[], retention: JobRetention) { + #applyRetention(records: JobRecord[], retention: JobRetention, queue: string) { if (retention === false || retention === true) { return } + const pruned: JobRecord[] = [] + if (retention.age !== undefined) { const maxAgeMs = parse(retention.age) if (maxAgeMs > 0) { const cutoff = Date.now() - maxAgeMs - const filtered = records.filter((record) => (record.finishedAt ?? 0) >= cutoff) - records.splice(0, records.length, ...filtered) + const kept: JobRecord[] = [] + for (const record of records) { + if ((record.finishedAt ?? 0) >= cutoff) { + kept.push(record) + } else { + pruned.push(record) + } + } + records.splice(0, records.length, ...kept) } } if (retention.count !== undefined && retention.count > 0 && records.length > retention.count) { - records.splice(0, records.length - retention.count) + const excess = records.length - retention.count + pruned.push(...records.slice(0, excess)) + records.splice(0, excess) + } + + for (const record of pruned) { + this.#cleanupDedupForJob(queue, record.data) + } + } + + #applyDedup(queue: string, jobData: JobData): PushResult | null { + if (!jobData.dedup) return null + + const dedupId = jobData.dedup.id + const now = Date.now() + const entry = this.#getDedupEntry(queue, dedupId) + + if (entry) { + const withinTtl = !entry.ttl || now - entry.createdAt < entry.ttl + if (withinTtl) { + const existing = this.#findJobById(queue, entry.jobId) + if (existing) { + const replaceable = existing.location === 'pending' || existing.location === 'delayed' + if (jobData.dedup.replace && replaceable) { + existing.job.payload = structuredClone(jobData.payload) + if (jobData.dedup.extend && entry.ttl) { + entry.createdAt = now + } + return { outcome: 'replaced', jobId: entry.jobId } + } + if (jobData.dedup.extend && entry.ttl) { + entry.createdAt = now + return { outcome: 'extended', jobId: entry.jobId } + } + return { outcome: 'skipped', jobId: entry.jobId } + } + } + } + + this.#setDedupEntry(queue, dedupId, { + jobId: jobData.id, + createdAt: now, + ttl: jobData.dedup.ttl, + replace: jobData.dedup.replace, + extend: jobData.dedup.extend, + }) + + return null + } + + #findJobById( + queue: string, + jobId: string + ): { + job: JobData + location: 'pending' | 'delayed' | 'active' | 'completed' | 'failed' + } | null { + const active = this.#activeJobs.get(jobId) + if (active && active.queue === queue) { + return { job: active.job, location: 'active' } + } + const pending = this.#queues.get(queue)?.find((j) => j.id === jobId) + if (pending) { + return { job: pending, location: 'pending' } + } + const delayed = this.#delayedJobs.get(queue)?.get(jobId) + if (delayed) { + return { job: delayed.job, location: 'delayed' } + } + const completed = this.#findHistory(this.#completedJobs, queue, jobId) + if (completed) { + return { job: completed.data, location: 'completed' } + } + const failed = this.#findHistory(this.#failedJobs, queue, jobId) + if (failed) { + return { job: failed.data, location: 'failed' } + } + return null + } + + #getDedupEntry(queue: string, dedupId: string): DedupEntry | undefined { + return this.#dedupIndex.get(queue)?.get(dedupId) + } + + #setDedupEntry(queue: string, dedupId: string, entry: DedupEntry): void { + if (!this.#dedupIndex.has(queue)) { + this.#dedupIndex.set(queue, new Map()) + } + this.#dedupIndex.get(queue)!.set(dedupId, entry) + } + + #cleanupDedupForJob(queue: string, job: JobData): void { + if (!job.dedup) return + const entry = this.#getDedupEntry(queue, job.dedup.id) + // Only delete if the entry points to THIS job (replace may have re-pointed it elsewhere) + if (entry && entry.jobId === job.id) { + this.#dedupIndex.get(queue)?.delete(job.dedup.id) } } diff --git a/src/drivers/knex_adapter.ts b/src/drivers/knex_adapter.ts index 5e688af..607becc 100644 --- a/src/drivers/knex_adapter.ts +++ b/src/drivers/knex_adapter.ts @@ -1,8 +1,9 @@ import { randomUUID } from 'node:crypto' import KnexPkg from 'knex' import type { Knex } from 'knex' -import type { Adapter, AcquiredJob } from '../contracts/adapter.js' +import type { Adapter, AcquiredJob, PushResult } from '../contracts/adapter.js' import type { + DedupOutcome, JobData, JobRecord, JobRetention, @@ -117,9 +118,7 @@ export class KnexAdapter implements Adapter { // Update job to active status // For SQLite (no SKIP LOCKED), add status='pending' guard to prevent double-claim - const updateQuery = trx(this.#jobsTable) - .where('id', job.id) - .where('queue', queue) + const updateQuery = trx(this.#jobsTable).where('id', job.id).where('queue', queue) if (!this.#supportsSkipLocked()) { updateQuery.where('status', 'pending') @@ -178,14 +177,11 @@ export class KnexAdapter implements Adapter { const priority = jobData.priority ?? DEFAULT_PRIORITY const score = calculateScore(priority, now) - await trx(this.#jobsTable) - .where('id', job.id) - .where('queue', queue) - .update({ - status: 'pending', - score, - execute_at: null, - }) + await trx(this.#jobsTable).where('id', job.id).where('queue', queue).update({ + status: 'pending', + score, + execute_at: null, + }) } }) } @@ -331,82 +327,175 @@ export class KnexAdapter implements Adapter { if (retryAt && retryAt.getTime() > now) { // Move to delayed - await this.#connection(this.#jobsTable) - .where('id', jobId) - .where('queue', queue) - .update({ - status: 'delayed', - data: updatedData, - worker_id: null, - acquired_at: null, - score: null, - execute_at: retryAt.getTime(), - }) + await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({ + status: 'delayed', + data: updatedData, + worker_id: null, + acquired_at: null, + score: null, + execute_at: retryAt.getTime(), + }) } else { // Move back to pending const priority = jobData.priority ?? DEFAULT_PRIORITY const score = calculateScore(priority, now) - await this.#connection(this.#jobsTable) - .where('id', jobId) - .where('queue', queue) - .update({ - status: 'pending', - data: updatedData, - worker_id: null, - acquired_at: null, - score, - execute_at: null, - }) + await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({ + status: 'pending', + data: updatedData, + worker_id: null, + acquired_at: null, + score, + execute_at: null, + }) } } - async push(jobData: JobData): Promise { + async push(jobData: JobData): Promise { return this.pushOn('default', jobData) } - async pushOn(queue: string, jobData: JobData): Promise { + async pushOn(queue: string, jobData: JobData): Promise { const priority = jobData.priority ?? DEFAULT_PRIORITY const timestamp = Date.now() const score = calculateScore(priority, timestamp) - const query = this.#connection(this.#jobsTable).insert({ + if (jobData.dedup) { + return this.#pushWithDedup(queue, jobData, { + id: jobData.id, + queue, + status: 'pending', + data: JSON.stringify(jobData), + score, + }) + } + + await this.#connection(this.#jobsTable).insert({ id: jobData.id, queue, status: 'pending', data: JSON.stringify(jobData), score, }) - - if (jobData.dedup) { - await query.onConflict(['id', 'queue']).ignore() - } else { - await query - } } - async pushLater(jobData: JobData, delay: number): Promise { + async pushLater(jobData: JobData, delay: number): Promise { return this.pushLaterOn('default', jobData, delay) } - async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { const executeAt = Date.now() + delay - const query = this.#connection(this.#jobsTable).insert({ + if (jobData.dedup) { + return this.#pushWithDedup(queue, jobData, { + id: jobData.id, + queue, + status: 'delayed', + data: JSON.stringify(jobData), + execute_at: executeAt, + }) + } + + await this.#connection(this.#jobsTable).insert({ id: jobData.id, queue, status: 'delayed', data: JSON.stringify(jobData), execute_at: executeAt, }) + } - if (jobData.dedup) { - await query.onConflict(['id', 'queue']).ignore() - } else { - await query + async #pushWithDedup( + queue: string, + jobData: JobData, + insertRow: Record + ): Promise { + const dedup = jobData.dedup! + + try { + return await this.#pushWithDedupTxn(queue, jobData, insertRow, dedup) + } catch (err) { + if (this.#isMissingDedupColumn(err)) { + throw new Error( + `Dedup columns missing on "${this.#jobsTable}". Run QueueSchemaService.addDedupColumns() on your jobs table before dispatching jobs with .dedup().`, + { cause: err } + ) + } + throw err } } + #isMissingDedupColumn(err: unknown): boolean { + if (!err || typeof err !== 'object') return false + const message = (err as { message?: string }).message + if (!message) return false + // Postgres: 'column "dedup_id" does not exist' + // SQLite: 'no such column: dedup_id' + // MySQL: "Unknown column 'dedup_id' in 'where clause'" + return ( + /dedup_id/.test(message) && /(does not exist|no such column|Unknown column)/i.test(message) + ) + } + + #pushWithDedupTxn( + queue: string, + jobData: JobData, + insertRow: Record, + dedup: NonNullable + ): Promise { + return this.#connection.transaction(async (trx) => { + const existing = await trx(this.#jobsTable) + .where('queue', queue) + .where('dedup_id', dedup.id) + .orderBy('dedup_at', 'desc') + .forUpdate() + .first() + + const now = Date.now() + + if (existing) { + const dedupAt = existing.dedup_at != null ? Number(existing.dedup_at) : null + const dedupTtl = existing.dedup_ttl != null ? Number(existing.dedup_ttl) : null + const withinTtl = dedupTtl === null || (dedupAt !== null && now - dedupAt < dedupTtl) + + if (withinTtl) { + const status = existing.status as JobStatus + const replaceable = status === 'pending' || status === 'delayed' + + if (dedup.replace && replaceable) { + const storedData = + typeof existing.data === 'string' ? JSON.parse(existing.data) : existing.data + const newData = { ...storedData, payload: jobData.payload } + const updates: Record = { data: JSON.stringify(newData) } + if (dedup.extend && dedupTtl) { + updates.dedup_at = now + } + await trx(this.#jobsTable).where({ id: existing.id, queue }).update(updates) + return { outcome: 'replaced' as DedupOutcome, jobId: existing.id as string } + } + + if (dedup.extend && dedupTtl) { + await trx(this.#jobsTable).where({ id: existing.id, queue }).update({ dedup_at: now }) + return { outcome: 'extended' as DedupOutcome, jobId: existing.id as string } + } + + return { outcome: 'skipped' as DedupOutcome, jobId: existing.id as string } + } + // TTL expired — fall through to insert a new row. Old row (if retained + // history) is left intact. + } + + await trx(this.#jobsTable).insert({ + ...insertRow, + dedup_id: dedup.id, + dedup_at: now, + dedup_ttl: dedup.ttl ?? null, + }) + + return { outcome: 'added' as DedupOutcome, jobId: jobData.id } + }) + } + async pushMany(jobs: JobData[]): Promise { return this.pushManyOn('default', jobs) } @@ -414,6 +503,10 @@ export class KnexAdapter implements Adapter { async pushManyOn(queue: string, jobs: JobData[]): Promise { if (jobs.length === 0) return + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + const now = Date.now() const rows = jobs.map((job) => ({ id: job.id, @@ -470,10 +563,7 @@ export class KnexAdapter implements Adapter { if (currentStalledCount >= maxStalledCount) { // Fail permanently - remove the job - await trx(this.#jobsTable) - .where('id', row.id) - .where('queue', queue) - .delete() + await trx(this.#jobsTable).where('id', row.id).where('queue', queue).delete() } else { // Recover: increment stalledCount and put back in pending jobData.stalledCount = currentStalledCount + 1 @@ -546,9 +636,9 @@ export class KnexAdapter implements Adapter { } async getSchedule(id: string): Promise { - const row = (await this.#connection(this.#schedulesTable) - .where('id', id) - .first()) as ScheduleRow | undefined + const row = (await this.#connection(this.#schedulesTable).where('id', id).first()) as + | ScheduleRow + | undefined if (!row) return null return this.#rowToScheduleData(row) @@ -577,16 +667,12 @@ export class KnexAdapter implements Adapter { if (updates.runCount !== undefined) data.run_count = updates.runCount if (Object.keys(data).length > 0) { - await this.#connection(this.#schedulesTable) - .where('id', id) - .update(data) + await this.#connection(this.#schedulesTable).where('id', id).update(data) } } async deleteSchedule(id: string): Promise { - await this.#connection(this.#schedulesTable) - .where('id', id) - .delete() + await this.#connection(this.#schedulesTable).where('id', id).delete() } async claimDueSchedule(): Promise { @@ -641,13 +727,11 @@ export class KnexAdapter implements Adapter { } // Update atomically - await trx(this.#schedulesTable) - .where('id', row.id) - .update({ - next_run_at: nextRunAt, - last_run_at: now, - run_count: newRunCount, - }) + await trx(this.#schedulesTable).where('id', row.id).update({ + next_run_at: nextRunAt, + last_run_at: now, + run_count: newRunCount, + }) // Return schedule data (before update state for payload) return this.#rowToScheduleData(row) diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index 054ac9e..90e4a2b 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -2,7 +2,8 @@ import { randomUUID } from 'node:crypto' import { Redis, type RedisOptions } from 'ioredis' import { DEFAULT_PRIORITY } from '../constants.js' import { calculateScore } from '../utils.js' -import type { Adapter, AcquiredJob } from '../contracts/adapter.js' +import type { Adapter, AcquiredJob, PushResult } from '../contracts/adapter.js' +import type { DedupOutcome } from '../types/main.js' import type { JobData, JobRecord, @@ -37,22 +38,55 @@ const PUSH_JOB_SCRIPT = ` /** * Lua script for pushing a dedup job. - * Uses HSETNX to only store data if the job doesn't already exist. - * Only adds to pending ZSET if the job was newly created. + * + * Behavior: + * - If dedup key exists AND job still exists AND within TTL: apply replace/extend, skip insert. + * - If dedup key exists but job data missing (orphan): proceed to insert new. + * - If TTL expired or no prior entry: insert new job, record dedup key with TTL. + * + * Replace only applies to non-active (pending/delayed) jobs. Active jobs are skipped. + * + * Returns {outcome, job_id}: outcome ∈ 'added' | 'skipped' | 'replaced' | 'extended'. */ const PUSH_DEDUP_JOB_SCRIPT = ` local data_key = KEYS[1] local pending_key = KEYS[2] + local dedup_key = KEYS[3] + local active_key = KEYS[4] local job_id = ARGV[1] local job_data = ARGV[2] local score = tonumber(ARGV[3]) - - local added = redis.call('HSETNX', data_key, job_id, job_data) - if added == 1 then - redis.call('ZADD', pending_key, score, job_id) + local ttl = tonumber(ARGV[4]) + local extend = tonumber(ARGV[5]) + local replace = tonumber(ARGV[6]) + + local existing = redis.call('GET', dedup_key) + if existing then + local has_data = redis.call('HEXISTS', data_key, existing) == 1 + if has_data then + local is_active = redis.call('HEXISTS', active_key, existing) == 1 + if replace == 1 and not is_active then + redis.call('HSET', data_key, existing, job_data) + if extend == 1 and ttl > 0 then + redis.call('PEXPIRE', dedup_key, ttl) + end + return {'replaced', existing} + end + if extend == 1 and ttl > 0 then + redis.call('PEXPIRE', dedup_key, ttl) + return {'extended', existing} + end + return {'skipped', existing} + end end - return added + redis.call('HSET', data_key, job_id, job_data) + redis.call('ZADD', pending_key, score, job_id) + redis.call('SET', dedup_key, job_id) + if ttl > 0 then + redis.call('PEXPIRE', dedup_key, ttl) + end + return {'added', job_id} ` /** @@ -74,22 +108,47 @@ const PUSH_DELAYED_JOB_SCRIPT = ` /** * Lua script for pushing a dedup delayed job. - * Uses HSETNX to only store data if the job doesn't already exist. - * Only adds to delayed ZSET if the job was newly created. + * Same semantics as PUSH_DEDUP_JOB_SCRIPT but adds to delayed ZSET. */ const PUSH_DEDUP_DELAYED_JOB_SCRIPT = ` local data_key = KEYS[1] local delayed_key = KEYS[2] + local dedup_key = KEYS[3] + local active_key = KEYS[4] local job_id = ARGV[1] local job_data = ARGV[2] local execute_at = tonumber(ARGV[3]) - - local added = redis.call('HSETNX', data_key, job_id, job_data) - if added == 1 then - redis.call('ZADD', delayed_key, execute_at, job_id) + local ttl = tonumber(ARGV[4]) + local extend = tonumber(ARGV[5]) + local replace = tonumber(ARGV[6]) + + local existing = redis.call('GET', dedup_key) + if existing then + local has_data = redis.call('HEXISTS', data_key, existing) == 1 + if has_data then + local is_active = redis.call('HEXISTS', active_key, existing) == 1 + if replace == 1 and not is_active then + redis.call('HSET', data_key, existing, job_data) + if extend == 1 and ttl > 0 then + redis.call('PEXPIRE', dedup_key, ttl) + end + return {'replaced', existing} + end + if extend == 1 and ttl > 0 then + redis.call('PEXPIRE', dedup_key, ttl) + return {'extended', existing} + end + return {'skipped', existing} + end end - return added + redis.call('HSET', data_key, job_id, job_data) + redis.call('ZADD', delayed_key, execute_at, job_id) + redis.call('SET', dedup_key, job_id) + if ttl > 0 then + redis.call('PEXPIRE', dedup_key, ttl) + end + return {'added', job_id} ` /** @@ -150,16 +209,27 @@ const ACQUIRE_JOB_SCRIPT = ` /** * Lua script for removing a job completely (no history). + * Also cleans up the dedup key if the job had dedup metadata. */ const REMOVE_JOB_SCRIPT = ` local data_key = KEYS[1] local active_key = KEYS[2] local job_id = ARGV[1] + local dedup_prefix = ARGV[2] if redis.call('HEXISTS', active_key, job_id) == 0 then return 0 end + -- Read job data to extract dedup.id before deleting + local job_data = redis.call('HGET', data_key, job_id) + if job_data then + local ok, job = pcall(cjson.decode, job_data) + if ok and job and job.dedup and job.dedup.id then + redis.call('DEL', dedup_prefix .. job.dedup.id) + end + end + redis.call('HDEL', active_key, job_id) redis.call('HDEL', data_key, job_id) @@ -169,6 +239,7 @@ const REMOVE_JOB_SCRIPT = ` /** * Lua script for finalizing a job in history. * Removes from active, stores finalization info, and prunes old records. + * When pruning removes job data, also deletes the associated dedup key. */ const FINALIZE_JOB_SCRIPT = ` local data_key = KEYS[1] @@ -180,6 +251,7 @@ const FINALIZE_JOB_SCRIPT = ` local max_age = tonumber(ARGV[3]) local max_count = tonumber(ARGV[4]) local error_message = ARGV[5] + local dedup_prefix = ARGV[6] -- Verify job is active if redis.call('HEXISTS', active_key, job_id) == 0 then @@ -199,11 +271,24 @@ const FINALIZE_JOB_SCRIPT = ` redis.call('HSET', history_key, job_id, cjson.encode(record)) redis.call('ZADD', index_key, now, job_id) + local function delete_dedup_for(ids) + for i = 1, #ids do + local d = redis.call('HGET', data_key, ids[i]) + if d then + local ok, job = pcall(cjson.decode, d) + if ok and job and job.dedup and job.dedup.id then + redis.call('DEL', dedup_prefix .. job.dedup.id) + end + end + end + end + -- Prune by age if max_age and max_age > 0 then local cutoff = now - max_age local expired = redis.call('ZRANGEBYSCORE', index_key, 0, cutoff) if #expired > 0 then + delete_dedup_for(expired) redis.call('ZREM', index_key, unpack(expired)) redis.call('HDEL', history_key, unpack(expired)) redis.call('HDEL', data_key, unpack(expired)) @@ -217,6 +302,7 @@ const FINALIZE_JOB_SCRIPT = ` local excess = size - max_count local stale = redis.call('ZRANGE', index_key, 0, excess - 1) if #stale > 0 then + delete_dedup_for(stale) redis.call('ZREM', index_key, unpack(stale)) redis.call('HDEL', history_key, unpack(stale)) redis.call('HDEL', data_key, unpack(stale)) @@ -290,6 +376,7 @@ const RECOVER_STALLED_JOBS_SCRIPT = ` local now = tonumber(ARGV[1]) local stalled_threshold = tonumber(ARGV[2]) local max_stalled_count = tonumber(ARGV[3]) + local dedup_prefix = ARGV[4] local recovered = 0 local stalled_cutoff = now - stalled_threshold @@ -315,7 +402,10 @@ const RECOVER_STALLED_JOBS_SCRIPT = ` -- Check if job has exceeded max stalled count if current_stalled_count >= max_stalled_count then - -- Job failed permanently, remove data too + -- Job failed permanently, remove data + dedup key + if job.dedup and job.dedup.id then + redis.call('DEL', dedup_prefix .. job.dedup.id) + end redis.call('HDEL', data_key, job_id) else -- Recover: increment stalledCount and put back in pending @@ -520,6 +610,14 @@ export class RedisAdapter implements Adapter { } } + #getDedupKey(queue: string, dedupId: string): string { + return `${this.#getDedupPrefix(queue)}${dedupId}` + } + + #getDedupPrefix(queue: string): string { + return `${redisKey}::${queue}::dedup::` + } + setWorkerId(workerId: string): void { this.#workerId = workerId } @@ -558,10 +656,11 @@ export class RedisAdapter implements Adapter { async completeJob(jobId: string, queue: string, removeOnComplete?: JobRetention): Promise { const keys = this.#getKeys(queue) + const dedupPrefix = this.#getDedupPrefix(queue) const { keep, maxAge, maxCount } = resolveRetention(removeOnComplete) if (!keep) { - await this.#connection.eval(REMOVE_JOB_SCRIPT, 2, keys.data, keys.active, jobId) + await this.#connection.eval(REMOVE_JOB_SCRIPT, 2, keys.data, keys.active, jobId, dedupPrefix) return } @@ -576,7 +675,8 @@ export class RedisAdapter implements Adapter { Date.now().toString(), maxAge.toString(), maxCount.toString(), - '' + '', + dedupPrefix ) } @@ -587,10 +687,11 @@ export class RedisAdapter implements Adapter { removeOnFail?: JobRetention ): Promise { const keys = this.#getKeys(queue) + const dedupPrefix = this.#getDedupPrefix(queue) const { keep, maxAge, maxCount } = resolveRetention(removeOnFail) if (!keep) { - await this.#connection.eval(REMOVE_JOB_SCRIPT, 2, keys.data, keys.active, jobId) + await this.#connection.eval(REMOVE_JOB_SCRIPT, 2, keys.data, keys.active, jobId, dedupPrefix) return } @@ -605,7 +706,8 @@ export class RedisAdapter implements Adapter { Date.now().toString(), maxAge.toString(), maxCount.toString(), - error?.message || '' + error?.message || '', + dedupPrefix ) } @@ -648,22 +750,39 @@ export class RedisAdapter implements Adapter { return JSON.parse(result as string) } - push(jobData: JobData): Promise { + push(jobData: JobData): Promise { return this.pushOn('default', jobData) } - pushLater(jobData: JobData, delay: number): Promise { + pushLater(jobData: JobData, delay: number): Promise { return this.pushLaterOn('default', jobData, delay) } - async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { const keys = this.#getKeys(queue) const executeAt = Date.now() + delay - const script = jobData.dedup ? PUSH_DEDUP_DELAYED_JOB_SCRIPT : PUSH_DELAYED_JOB_SCRIPT + if (jobData.dedup) { + const dedupKey = this.#getDedupKey(queue, jobData.dedup.id) + const result = (await this.#connection.eval( + PUSH_DEDUP_DELAYED_JOB_SCRIPT, + 4, + keys.data, + keys.delayed, + dedupKey, + keys.active, + jobData.id, + JSON.stringify(jobData), + executeAt.toString(), + (jobData.dedup.ttl ?? 0).toString(), + jobData.dedup.extend ? '1' : '0', + jobData.dedup.replace ? '1' : '0' + )) as [string, string] + return { outcome: result[0] as DedupOutcome, jobId: result[1] } + } await this.#connection.eval( - script, + PUSH_DELAYED_JOB_SCRIPT, 2, keys.data, keys.delayed, @@ -673,16 +792,33 @@ export class RedisAdapter implements Adapter { ) } - async pushOn(queue: string, jobData: JobData): Promise { + async pushOn(queue: string, jobData: JobData): Promise { const keys = this.#getKeys(queue) const priority = jobData.priority ?? DEFAULT_PRIORITY const timestamp = Date.now() const score = calculateScore(priority, timestamp) - const script = jobData.dedup ? PUSH_DEDUP_JOB_SCRIPT : PUSH_JOB_SCRIPT + if (jobData.dedup) { + const dedupKey = this.#getDedupKey(queue, jobData.dedup.id) + const result = (await this.#connection.eval( + PUSH_DEDUP_JOB_SCRIPT, + 4, + keys.data, + keys.pending, + dedupKey, + keys.active, + jobData.id, + JSON.stringify(jobData), + score.toString(), + (jobData.dedup.ttl ?? 0).toString(), + jobData.dedup.extend ? '1' : '0', + jobData.dedup.replace ? '1' : '0' + )) as [string, string] + return { outcome: result[0] as DedupOutcome, jobId: result[1] } + } await this.#connection.eval( - script, + PUSH_JOB_SCRIPT, 2, keys.data, keys.pending, @@ -699,6 +835,10 @@ export class RedisAdapter implements Adapter { async pushManyOn(queue: string, jobs: JobData[]): Promise { if (jobs.length === 0) return + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + const keys = this.#getKeys(queue) const now = Date.now() const multi = this.#connection.multi() @@ -738,7 +878,8 @@ export class RedisAdapter implements Adapter { keys.pending, now.toString(), stalledThreshold.toString(), - maxStalledCount.toString() + maxStalledCount.toString(), + this.#getDedupPrefix(queue) ) return recovered as number diff --git a/src/drivers/sync_adapter.ts b/src/drivers/sync_adapter.ts index 796db00..d97fa55 100644 --- a/src/drivers/sync_adapter.ts +++ b/src/drivers/sync_adapter.ts @@ -59,6 +59,10 @@ export class SyncAdapter implements Adapter { } async pushManyOn(queue: string, jobs: JobData[]): Promise { + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + for (const job of jobs) { await this.pushOn(queue, job) } diff --git a/src/job_batch_dispatcher.ts b/src/job_batch_dispatcher.ts index 61864af..35452aa 100644 --- a/src/job_batch_dispatcher.ts +++ b/src/job_batch_dispatcher.ts @@ -165,7 +165,6 @@ export class JobBatchDispatcher { const message: JobDispatchMessage = { jobs, queue: this.#queue } - await dispatchChannel.tracePromise(async () => { await wrapInternal(() => adapter.pushManyOn(this.#queue, jobs)) }, message) diff --git a/src/job_dispatcher.ts b/src/job_dispatcher.ts index 6a104d6..d860f19 100644 --- a/src/job_dispatcher.ts +++ b/src/job_dispatcher.ts @@ -48,7 +48,12 @@ export class JobDispatcher { #delay?: Duration #priority?: number #groupId?: string - #dedup?: { id: string } + #dedup?: { + id: string + ttl?: number + extend?: boolean + replace?: boolean + } /** * Create a new job dispatcher. @@ -153,34 +158,63 @@ export class JobDispatcher { /** * Configure deduplication for this job. * - * When deduplication is configured, the adapter will silently skip - * the job if one with the same dedup ID already exists in the queue. - * The ID is automatically prefixed with the job name to prevent - * collisions between different job types. + * Modes: + * - **Simple** (`{ id }`): skip duplicates while the job exists. + * - **Throttle** (`{ id, ttl }`): skip duplicates within a TTL window. + * - **Debounce** (`{ id, ttl, replace: true }`): replace payload of the existing + * non-active job on duplicate within TTL. + * - **Extend** (`{ id, ttl, extend: true }`): reset the TTL window on each duplicate. + * + * The id is automatically prefixed with the job name to prevent collisions + * between different job types. * - * @param options - Deduplication options * @param options.id - Unique deduplication key - * @returns This dispatcher for chaining + * @param options.ttl - TTL as Duration ('5s', 5000). Required for extend/replace. + * @param options.extend - Reset TTL on duplicate within window. + * @param options.replace - Replace payload of existing non-active job within window. * * @example * ```typescript - * // Prevent duplicate invoice jobs for the same order + * // Simple dedup * await SendInvoiceJob.dispatch({ orderId: 123 }) * .dedup({ id: 'order-123' }) - * .run() * - * // Second dispatch with same dedup ID is silently skipped - * await SendInvoiceJob.dispatch({ orderId: 123 }) - * .dedup({ id: 'order-123' }) - * .run() + * // Throttle: 5 second window + * await SendEmailJob.dispatch({ to: 'x' }) + * .dedup({ id: 'welcome', ttl: '5s' }) + * + * // Debounce: replace payload within window + * await SaveDraftJob.dispatch({ content: 'latest' }) + * .dedup({ id: 'draft-42', ttl: '2s', replace: true, extend: true }) * ``` */ - dedup(options: { id: string }): this { + dedup(options: { id: string; ttl?: Duration; extend?: boolean; replace?: boolean }): this { if (!options.id) { throw new Error('Dedup ID must be a non-empty string') } - this.#dedup = options + if (options.id.length > 400) { + throw new Error('Dedup ID must be 400 characters or less') + } + + if ((options.extend || options.replace) && options.ttl === undefined) { + throw new Error('dedup.ttl is required when extend or replace is set') + } + + let parsedTtl: number | undefined + if (options.ttl !== undefined) { + parsedTtl = parse(options.ttl) + if (!Number.isFinite(parsedTtl) || parsedTtl < 0) { + throw new Error('dedup.ttl must be a non-negative duration') + } + } + + this.#dedup = { + id: options.id, + ttl: parsedTtl, + extend: options.extend, + replace: options.replace, + } return this } @@ -218,7 +252,8 @@ export class JobDispatcher { * ``` */ async run(): Promise { - const id = this.#dedup ? `${this.#name}::${this.#dedup.id}` : randomUUID() + const id = randomUUID() + const dedupId = this.#dedup ? `${this.#name}::${this.#dedup.id}` : undefined debug('dispatching job %s with id %s using payload %s', this.#name, id, this.#payload) @@ -234,19 +269,40 @@ export class JobDispatcher { priority: this.#priority, groupId: this.#groupId, createdAt: Date.now(), - ...(this.#dedup ? { dedup: { id: this.#dedup.id } } : {}), + ...(dedupId + ? { + dedup: { + id: dedupId, + ttl: this.#dedup!.ttl, + extend: this.#dedup!.extend, + replace: this.#dedup!.replace, + }, + } + : {}), } const message: JobDispatchMessage = { jobs: [jobData], queue: this.#queue, delay: parsedDelay } + let pushResult: { outcome: DispatchResult['deduped']; jobId: string } | undefined await dispatchChannel.tracePromise(async () => { - if (parsedDelay !== undefined) { - await wrapInternal(() => adapter.pushLaterOn(this.#queue, jobData, parsedDelay)) - } else { - await wrapInternal(() => adapter.pushOn(this.#queue, jobData)) + const result = + parsedDelay !== undefined + ? await wrapInternal(() => adapter.pushLaterOn(this.#queue, jobData, parsedDelay)) + : await wrapInternal(() => adapter.pushOn(this.#queue, jobData)) + + if (result && typeof result === 'object' && 'outcome' in result) { + pushResult = { outcome: result.outcome, jobId: result.jobId } + message.dedupOutcome = result.outcome } }, message) + if (pushResult && this.#dedup) { + return { + jobId: pushResult.jobId, + deduped: pushResult.outcome, + } + } + return { jobId: id } } diff --git a/src/queue_config_resolver.ts b/src/queue_config_resolver.ts index 1679f85..02e863b 100644 --- a/src/queue_config_resolver.ts +++ b/src/queue_config_resolver.ts @@ -102,10 +102,7 @@ export class QueueConfigResolver { * merge like `retry.maxRetries`. */ #normalizeJobRetryConfig(jobOptions?: JobOptions): RetryConfig | undefined { - if ( - !jobOptions || - (jobOptions.retry === undefined && jobOptions.maxRetries === undefined) - ) { + if (!jobOptions || (jobOptions.retry === undefined && jobOptions.maxRetries === undefined)) { return undefined } diff --git a/src/queue_manager.ts b/src/queue_manager.ts index 76d4377..1ff8f23 100644 --- a/src/queue_manager.ts +++ b/src/queue_manager.ts @@ -7,7 +7,9 @@ import { QueueConfigResolver } from './queue_config_resolver.js' import type { Adapter } from './contracts/adapter.js' import type { AdapterFactory, JobFactory, QueueManagerConfig } from './types/main.js' -const noopInternalOperationWrapper: NonNullable = async (fn) => fn() +const noopInternalOperationWrapper: NonNullable< + QueueManagerConfig['internalOperationWrapper'] +> = async (fn) => fn() const noopExecutionWrapper: NonNullable = async (fn) => fn() type QueueManagerFakeState = { diff --git a/src/services/queue_schema.ts b/src/services/queue_schema.ts index ed53eef..addf3c7 100644 --- a/src/services/queue_schema.ts +++ b/src/services/queue_schema.ts @@ -26,15 +26,46 @@ export class QueueSchemaService { table.bigint('execute_at').unsigned().nullable() table.bigint('finished_at').unsigned().nullable() table.text('error').nullable() + table.string('dedup_id', 510).nullable() + table.bigint('dedup_at').unsigned().nullable() + table.bigint('dedup_ttl').unsigned().nullable() table.primary(['id', 'queue']) table.index(['queue', 'status', 'score']) table.index(['queue', 'status', 'execute_at']) table.index(['queue', 'status', 'finished_at']) + table.index(['queue', 'dedup_id']) extend?.(table) }) } + /** + * Idempotent migration: adds dedup columns (dedup_id, dedup_at, dedup_ttl) + * and a (queue, dedup_id) index to an existing jobs table. + * + * Safe to run multiple times. Uses hasColumn checks so it won't fail on re-runs. + * For large Postgres tables, consider pausing workers during the run. + */ + async addDedupColumns(tableName: string = 'queue_jobs'): Promise { + const hasDedupId = await this.#connection.schema.hasColumn(tableName, 'dedup_id') + const hasDedupAt = await this.#connection.schema.hasColumn(tableName, 'dedup_at') + const hasDedupTtl = await this.#connection.schema.hasColumn(tableName, 'dedup_ttl') + + if (!hasDedupId || !hasDedupAt || !hasDedupTtl) { + await this.#connection.schema.alterTable(tableName, (table) => { + if (!hasDedupId) table.string('dedup_id', 510).nullable() + if (!hasDedupAt) table.bigint('dedup_at').unsigned().nullable() + if (!hasDedupTtl) table.bigint('dedup_ttl').unsigned().nullable() + }) + } + + if (!hasDedupId) { + await this.#connection.schema.alterTable(tableName, (table) => { + table.index(['queue', 'dedup_id']) + }) + } + } + /** * Creates the schedules table with the default schema. * The optional callback allows adding custom columns. @@ -57,10 +88,7 @@ export class QueueSchemaService { table.integer('run_count').unsigned().notNullable().defaultTo(0) table.timestamp('next_run_at').nullable() table.timestamp('last_run_at').nullable() - table - .timestamp('created_at') - .notNullable() - .defaultTo(this.#connection.fn.now()) + table.timestamp('created_at').notNullable().defaultTo(this.#connection.fn.now()) table.index(['status', 'next_run_at']) extend?.(table) diff --git a/src/types/main.ts b/src/types/main.ts index 599086b..a4ae51c 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -43,9 +43,20 @@ export type JobStatus = 'pending' | 'active' | 'delayed' | 'completed' | 'failed * console.log(`Dispatched job: ${jobId}`) * ``` */ +/** + * Outcome of a dedup-enabled dispatch. + * - `added`: new job was inserted + * - `skipped`: duplicate found within TTL, skipped silently + * - `replaced`: duplicate found within TTL, existing job's payload was replaced + * - `extended`: duplicate found within TTL, TTL window was reset + */ +export type DedupOutcome = 'added' | 'skipped' | 'replaced' | 'extended' + export interface DispatchResult { /** Unique identifier for this specific job instance */ jobId: string + /** Dedup outcome (only present when `.dedup()` was used). */ + deduped?: DedupOutcome } /** @@ -136,13 +147,24 @@ export interface JobData { /** * Deduplication configuration for this job. - * When set, adapters use atomic insert-if-not-exists semantics - * to silently skip duplicate jobs with the same ID. + * When set, adapters apply dedup semantics keyed on `dedup.id`. * Set automatically when `.dedup()` is called on the dispatcher. */ dedup?: { - /** The original dedup key provided by the caller (before name-prefixing). */ + /** Dedup key, prefixed with the job name (e.g. `SendInvoiceJob::order-123`). */ id: string + /** + * TTL in milliseconds. When set, dedup lock auto-expires after TTL. + * After expiry, the same dedup id produces a brand-new job (coexists with prior). + * 0 or undefined = no TTL; dedup lock persists until the job is removed. + */ + ttl?: number + /** Reset the TTL window when a duplicate arrives within it. */ + extend?: boolean + /** Replace payload of the existing non-active job when a duplicate arrives within TTL. */ + replace?: boolean + /** Timestamp (ms) when this dedup entry was recorded. Set by adapters. */ + createdAt?: number } } diff --git a/src/types/tracing_channels.ts b/src/types/tracing_channels.ts index e2507d8..77d2057 100644 --- a/src/types/tracing_channels.ts +++ b/src/types/tracing_channels.ts @@ -6,7 +6,7 @@ */ import type { AcquiredJob } from '../contracts/adapter.js' -import type { JobData } from './main.js' +import type { DedupOutcome, JobData } from './main.js' /** * Tracing data structure for job dispatch events. @@ -21,6 +21,13 @@ export type JobDispatchMessage = { /** Delay in milliseconds before the job becomes available */ delay?: number + /** + * Deduplication outcome when the job used `.dedup()`. Allows OTel/tracing + * consumers to distinguish added vs skipped/replaced/extended dispatches. + * Populated by the dispatcher after the push call completes. + */ + dedupOutcome?: DedupOutcome + /** Error that caused the dispatch to fail */ error?: Error } diff --git a/src/worker.ts b/src/worker.ts index 5316d00..83cec94 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -7,7 +7,13 @@ import { JobPool } from './job_pool.js' import { JobExecutionRuntime } from './job_runtime.js' import { dispatchChannel, executeChannel } from './tracing_channels.js' import type { Adapter, AcquiredJob } from './contracts/adapter.js' -import type { JobContext, JobOptions, JobRetention, QueueManagerConfig, WorkerCycle } from './types/main.js' +import type { + JobContext, + JobOptions, + JobRetention, + QueueManagerConfig, + WorkerCycle, +} from './types/main.js' import type { JobDispatchMessage, JobExecuteMessage } from './types/tracing_channels.js' import { Locator } from './locator.js' import { DEFAULT_PRIORITY } from './constants.js' @@ -347,11 +353,26 @@ export class Worker { return executeChannel.tracePromise(async () => { try { await runtime.execute(instance, payload, context) - await this.#wrapInternal(() => this.#adapter.completeJob(job.id, queue, retention.removeOnComplete)) + await this.#wrapInternal(() => + this.#adapter.completeJob(job.id, queue, retention.removeOnComplete) + ) executeMessage.status = 'completed' - debug('worker %s: successfully executed job %s in %dms', this.#id, job.id, (performance.now() - startTime).toFixed(2)) + debug( + 'worker %s: successfully executed job %s in %dms', + this.#id, + job.id, + (performance.now() - startTime).toFixed(2) + ) } catch (e) { - await this.#handleExecutionFailure({ error: e as Error, job, queue, instance, runtime, retention, executeMessage }) + await this.#handleExecutionFailure({ + error: e as Error, + job, + queue, + instance, + runtime, + retention, + executeMessage, + }) } executeMessage.duration = Number((performance.now() - startTime).toFixed(2)) @@ -377,7 +398,12 @@ export class Worker { if (outcome.type === 'failed') { options.executeMessage.status = 'failed' await this.#wrapInternal(() => - this.#adapter.failJob(options.job.id, options.queue, outcome.storageError, options.retention.removeOnFail) + this.#adapter.failJob( + options.job.id, + options.queue, + outcome.storageError, + options.retention.removeOnFail + ) ) await options.instance.failed?.(outcome.hookError) return @@ -389,8 +415,15 @@ export class Worker { options.executeMessage.nextRetryAt = outcome.retryAt if (outcome.retryAt) { - debug('worker %s: job %s will retry at %s', this.#id, options.job.id, outcome.retryAt.toISOString()) - await this.#wrapInternal(() => this.#adapter.retryJob(options.job.id, options.queue, outcome.retryAt)) + debug( + 'worker %s: job %s will retry at %s', + this.#id, + options.job.id, + outcome.retryAt.toISOString() + ) + await this.#wrapInternal(() => + this.#adapter.retryJob(options.job.id, options.queue, outcome.retryAt) + ) } else { await this.#wrapInternal(() => this.#adapter.retryJob(options.job.id, options.queue)) } @@ -426,7 +459,9 @@ export class Worker { } catch (error) { debug('worker %s: failed to initialize job %s (%s)', this.#id, job.id, job.name) const retention = QueueManager.getConfigResolver().resolveJobOptions(queue) - await this.#wrapInternal(() => this.#adapter.failJob(job.id, queue, error as Error, retention.removeOnFail)) + await this.#wrapInternal(() => + this.#adapter.failJob(job.id, queue, error as Error, retention.removeOnFail) + ) throw error } } diff --git a/tests/_mocks/memory_adapter.ts b/tests/_mocks/memory_adapter.ts index a944610..fa64e31 100644 --- a/tests/_mocks/memory_adapter.ts +++ b/tests/_mocks/memory_adapter.ts @@ -1,5 +1,5 @@ import { randomUUID } from 'node:crypto' -import type { Adapter, AcquiredJob } from '../../src/contracts/adapter.js' +import type { Adapter, AcquiredJob, PushResult } from '../../src/contracts/adapter.js' import type { JobData, JobRecord, @@ -21,6 +21,14 @@ interface DelayedJob { executeAt: number } +interface DedupEntry { + jobId: string + createdAt: number + ttl?: number + replace?: boolean + extend?: boolean +} + export function memory() { return () => new MemoryAdapter() } @@ -33,6 +41,7 @@ export class MemoryAdapter implements Adapter { #failedJobs: Map = new Map() #pendingTimeouts: Set = new Set() #schedules: Map = new Map() + #dedupIndex: Map> = new Map() setWorkerId(_workerId: string): void {} @@ -46,32 +55,32 @@ export class MemoryAdapter implements Adapter { return jobs.length } - async push(jobData: JobData): Promise { + async push(jobData: JobData): Promise { return this.pushOn('default', jobData) } - async pushOn(queue: string, jobData: JobData): Promise { - if (jobData.dedup) { - const existing = await this.getJob(jobData.id, queue) - if (existing) return - } + async pushOn(queue: string, jobData: JobData): Promise { + const deduped = this.#applyDedup(queue, jobData) + if (deduped) return deduped if (!this.#queues.has(queue)) { this.#queues.set(queue, []) } this.#queues.get(queue)!.push(jobData) + + if (jobData.dedup) { + return { outcome: 'added', jobId: jobData.id } + } } - async pushLater(jobData: JobData, delay: number): Promise { + async pushLater(jobData: JobData, delay: number): Promise { return this.pushLaterOn('default', jobData, delay) } - async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { - if (jobData.dedup) { - const existing = await this.getJob(jobData.id, queue) - if (existing) return - } + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + const deduped = this.#applyDedup(queue, jobData) + if (deduped) return deduped if (!this.#delayedJobs.has(queue)) { this.#delayedJobs.set(queue, new Map()) @@ -83,12 +92,17 @@ export class MemoryAdapter implements Adapter { const timeout = setTimeout(() => { this.#pendingTimeouts.delete(timeout) this.#delayedJobs.get(queue)?.delete(jobData.id) - void this.pushOn(queue, jobData) + if (!this.#queues.has(queue)) { + this.#queues.set(queue, []) + } + this.#queues.get(queue)!.push(jobData) }, delay) this.#pendingTimeouts.add(timeout) - return Promise.resolve() + if (jobData.dedup) { + return { outcome: 'added', jobId: jobData.id } + } } async pushMany(jobs: JobData[]): Promise { @@ -96,6 +110,10 @@ export class MemoryAdapter implements Adapter { } async pushManyOn(queue: string, jobs: JobData[]): Promise { + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + for (const job of jobs) { await this.pushOn(queue, job) } @@ -142,6 +160,7 @@ export class MemoryAdapter implements Adapter { this.#activeJobs.delete(jobId) if (removeOnComplete === undefined || removeOnComplete === true) { + this.#cleanupDedupForJob(queue, active.job) return } @@ -160,6 +179,7 @@ export class MemoryAdapter implements Adapter { this.#activeJobs.delete(jobId) if (removeOnFail === undefined || removeOnFail === true) { + this.#cleanupDedupForJob(queue, active.job) return } @@ -214,6 +234,7 @@ export class MemoryAdapter implements Adapter { if (currentStalledCount >= maxStalledCount) { // Fail permanently - just remove from active this.#activeJobs.delete(jobId) + this.#cleanupDedupForJob(queue, active.job) continue } @@ -406,26 +427,41 @@ export class MemoryAdapter implements Adapter { records.push(record) if (retention && retention !== true) { - this.#applyRetention(records, retention) + this.#applyRetention(records, retention, queue) } } - #applyRetention(records: JobRecord[], retention: JobRetention) { + #applyRetention(records: JobRecord[], retention: JobRetention, queue: string) { if (retention === false || retention === true) { return } + const pruned: JobRecord[] = [] + if (retention.age !== undefined) { const maxAgeMs = parse(retention.age) if (maxAgeMs > 0) { const cutoff = Date.now() - maxAgeMs - const filtered = records.filter((record) => (record.finishedAt ?? 0) >= cutoff) - records.splice(0, records.length, ...filtered) + const kept: JobRecord[] = [] + for (const record of records) { + if ((record.finishedAt ?? 0) >= cutoff) { + kept.push(record) + } else { + pruned.push(record) + } + } + records.splice(0, records.length, ...kept) } } if (retention.count !== undefined && retention.count > 0 && records.length > retention.count) { - records.splice(0, records.length - retention.count) + const excess = records.length - retention.count + pruned.push(...records.slice(0, excess)) + records.splice(0, excess) + } + + for (const record of pruned) { + this.#cleanupDedupForJob(queue, record.data) } } @@ -435,4 +471,85 @@ export class MemoryAdapter implements Adapter { return records.find((record) => record.data.id === jobId) ?? null } + + #applyDedup(queue: string, jobData: JobData): PushResult | null { + if (!jobData.dedup) return null + + const dedupId = jobData.dedup.id + const now = Date.now() + const entry = this.#dedupIndex.get(queue)?.get(dedupId) + + if (entry) { + const withinTtl = !entry.ttl || now - entry.createdAt < entry.ttl + if (withinTtl) { + const existing = this.#findJobById(queue, entry.jobId) + if (existing) { + const replaceable = existing.location === 'pending' || existing.location === 'delayed' + if (jobData.dedup.replace && replaceable) { + existing.job.payload = structuredClone(jobData.payload) + if (jobData.dedup.extend && entry.ttl) { + entry.createdAt = now + } + return { outcome: 'replaced', jobId: entry.jobId } + } + if (jobData.dedup.extend && entry.ttl) { + entry.createdAt = now + return { outcome: 'extended', jobId: entry.jobId } + } + return { outcome: 'skipped', jobId: entry.jobId } + } + } + } + + if (!this.#dedupIndex.has(queue)) { + this.#dedupIndex.set(queue, new Map()) + } + this.#dedupIndex.get(queue)!.set(dedupId, { + jobId: jobData.id, + createdAt: now, + ttl: jobData.dedup.ttl, + replace: jobData.dedup.replace, + extend: jobData.dedup.extend, + }) + + return null + } + + #findJobById( + queue: string, + jobId: string + ): { + job: JobData + location: 'pending' | 'delayed' | 'active' | 'completed' | 'failed' + } | null { + const active = this.#activeJobs.get(jobId) + if (active && active.queue === queue) { + return { job: active.job, location: 'active' } + } + const pending = this.#queues.get(queue)?.find((j) => j.id === jobId) + if (pending) { + return { job: pending, location: 'pending' } + } + const delayed = this.#delayedJobs.get(queue)?.get(jobId) + if (delayed) { + return { job: delayed.job, location: 'delayed' } + } + const completed = this.#findHistory(this.#completedJobs, queue, jobId) + if (completed) { + return { job: completed.data, location: 'completed' } + } + const failed = this.#findHistory(this.#failedJobs, queue, jobId) + if (failed) { + return { job: failed.data, location: 'failed' } + } + return null + } + + #cleanupDedupForJob(queue: string, job: JobData): void { + if (!job.dedup) return + const entry = this.#dedupIndex.get(queue)?.get(job.dedup.id) + if (entry && entry.jobId === job.id) { + this.#dedupIndex.get(queue)?.delete(job.dedup.id) + } + } } diff --git a/tests/_utils/register_driver_test_suite.ts b/tests/_utils/register_driver_test_suite.ts index ab80635..874956a 100644 --- a/tests/_utils/register_driver_test_suite.ts +++ b/tests/_utils/register_driver_test_suite.ts @@ -1757,4 +1757,202 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.equal(sizeA, 1) assert.equal(sizeB, 1) }) + + test('dedup TTL: new job allowed after TTL expires', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('ttl-queue', { + id: 'uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::ttl-1', ttl: 80 }, + }) + + const second = await adapter.pushOn('ttl-queue', { + id: 'uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::ttl-1', ttl: 80 }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'skipped') + + await new Promise((r) => setTimeout(r, 150)) + + const third = await adapter.pushOn('ttl-queue', { + id: 'uuid-3', + name: 'TestJob', + payload: { n: 3 }, + attempts: 0, + dedup: { id: 'TestJob::ttl-1', ttl: 80 }, + }) + assert.equal(third && typeof third === 'object' && third.outcome, 'added') + }) + + test('dedup replace: duplicate within TTL swaps payload on pending job', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('rep-queue', { + id: 'rep-uuid-1', + name: 'TestJob', + payload: { version: 1 }, + attempts: 0, + dedup: { id: 'TestJob::rep-1', ttl: 10_000, replace: true }, + }) + + const second = await adapter.pushOn('rep-queue', { + id: 'rep-uuid-2', + name: 'TestJob', + payload: { version: 2 }, + attempts: 0, + dedup: { id: 'TestJob::rep-1', ttl: 10_000, replace: true }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'replaced') + + const size = await adapter.sizeOf('rep-queue') + assert.equal(size, 1) + + const job = await adapter.popFrom('rep-queue') + assert.deepEqual(job!.payload, { version: 2 }) + }) + + test('dedup extend: duplicate within TTL resets the window', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('ext-queue', { + id: 'ext-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::ext-1', ttl: 100, extend: true }, + }) + + await new Promise((r) => setTimeout(r, 60)) + + const second = await adapter.pushOn('ext-queue', { + id: 'ext-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::ext-1', ttl: 100, extend: true }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'extended') + + await new Promise((r) => setTimeout(r, 60)) + + // Without extend, 50ms elapsed > 40ms TTL would've expired. + const third = await adapter.pushOn('ext-queue', { + id: 'ext-uuid-3', + name: 'TestJob', + payload: { n: 3 }, + attempts: 0, + dedup: { id: 'TestJob::ext-1', ttl: 100, extend: true }, + }) + assert.equal(third && typeof third === 'object' && third.outcome, 'extended') + }) + + test('dedup: cleanup removes dedup entry when job is completed without retention', async ({ + assert, + }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('clean-queue', { + id: 'clean-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::clean-1' }, + }) + + const popped = await adapter.popFrom('clean-queue') + await adapter.completeJob(popped!.id, 'clean-queue', true) + + // Dedup should be cleaned — new push should succeed + const second = await adapter.pushOn('clean-queue', { + id: 'clean-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::clean-1' }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'added') + }) + + test('dedup: cleanup removes dedup entry when job fails without retention', async ({ + assert, + }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('clean-fail', { + id: 'fail-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::fail-1' }, + }) + + const popped = await adapter.popFrom('clean-fail') + await adapter.failJob(popped!.id, 'clean-fail', new Error('boom'), true) + + const second = await adapter.pushOn('clean-fail', { + id: 'fail-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::fail-1' }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'added') + }) + + test('dedup: retryJob preserves dedup entry (new dispatch stays blocked)', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('retry-queue', { + id: 'retry-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::retry-1' }, + }) + + const popped = await adapter.popFrom('retry-queue') + await adapter.retryJob(popped!.id, 'retry-queue') + + // retry puts job back — dedup entry still points to same job + const second = await adapter.pushOn('retry-queue', { + id: 'retry-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::retry-1' }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'skipped') + }) + + test('dedup: pushManyOn rejects jobs with dedup', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await assert.rejects( + () => + adapter.pushManyOn('batch-queue', [ + { id: 'a', name: 'TestJob', payload: {}, attempts: 0 }, + { + id: 'b', + name: 'TestJob', + payload: {}, + attempts: 0, + dedup: { id: 'TestJob::batch-1' }, + }, + ]), + /dedup is not supported in batch dispatch/ + ) + }) } diff --git a/tests/adapter.spec.ts b/tests/adapter.spec.ts index 50e6259..7f7a293 100644 --- a/tests/adapter.spec.ts +++ b/tests/adapter.spec.ts @@ -80,7 +80,9 @@ test.group('Adapter | Redis', (group) => { ) }) - test('deleteSchedule should not leave ghost index under write-failure chaos', async ({ assert }) => { + test('deleteSchedule should not leave ghost index under write-failure chaos', async ({ + assert, + }) => { const adapter = new RedisAdapter(connection) const id = 'chaos-delete-schedule' diff --git a/tests/job_dispatcher.spec.ts b/tests/job_dispatcher.spec.ts index 214b4e9..38243b0 100644 --- a/tests/job_dispatcher.spec.ts +++ b/tests/job_dispatcher.spec.ts @@ -325,7 +325,7 @@ test.group('JobDispatcher | dedup', () => { ) }) - test('should use dedup id prefixed with job name', async ({ assert }) => { + test('should store dedup id prefixed with job name', async ({ assert }) => { const sharedAdapter = memory()() await QueueManager.init({ @@ -333,15 +333,17 @@ test.group('JobDispatcher | dedup', () => { adapters: { memory: () => sharedAdapter }, }) - const { jobId } = await new JobDispatcher('SendInvoiceJob', { orderId: 123 }) + const result = await new JobDispatcher('SendInvoiceJob', { orderId: 123 }) .dedup({ id: 'order-123' }) .run() - assert.equal(jobId, 'SendInvoiceJob::order-123') + assert.match(result.jobId, /^[0-9a-f-]{36}$/) + assert.equal(result.deduped, 'added') const job = await sharedAdapter.pop() assert.isNotNull(job) - assert.equal(job!.id, 'SendInvoiceJob::order-123') + assert.equal(job!.id, result.jobId) + assert.equal(job!.dedup?.id, 'SendInvoiceJob::order-123') }) test('should set dedup field on job data when dedup is configured', async ({ assert }) => { @@ -356,7 +358,7 @@ test.group('JobDispatcher | dedup', () => { const job = await sharedAdapter.pop() assert.isNotNull(job) - assert.deepEqual(job!.dedup, { id: 'my-id' }) + assert.equal(job!.dedup?.id, 'UniqueJob::my-id') }) test('should not set dedup field when dedup is not configured', async ({ assert }) => { @@ -415,18 +417,155 @@ test.group('JobDispatcher | dedup', () => { adapters: { memory: () => sharedAdapter }, }) - const { jobId } = await new JobDispatcher('PriorityDedupJob', { task: 'important' }) + const { jobId, deduped } = await new JobDispatcher('PriorityDedupJob', { task: 'important' }) .dedup({ id: 'task-1' }) .toQueue('high') .priority(1) .run() - assert.equal(jobId, 'PriorityDedupJob::task-1') + assert.match(jobId, /^[0-9a-f-]{36}$/) + assert.equal(deduped, 'added') const job = await sharedAdapter.popFrom('high') assert.isNotNull(job) assert.equal(job!.priority, 1) - assert.deepEqual(job!.dedup, { id: 'task-1' }) + assert.equal(job!.dedup?.id, 'PriorityDedupJob::task-1') + }) + + test('should throw when extend is set without ttl', ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', {}).dedup({ id: 'x', extend: true }), + 'dedup.ttl is required when extend or replace is set' + ) + }) + + test('should throw when replace is set without ttl', ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', {}).dedup({ id: 'x', replace: true }), + 'dedup.ttl is required when extend or replace is set' + ) + }) + + test('should throw when ttl is negative', ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', {}).dedup({ id: 'x', ttl: -1 }), + 'dedup.ttl must be a non-negative duration' + ) + }) + + test('should throw when dedup id exceeds 400 chars', ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', {}).dedup({ id: 'a'.repeat(401) }), + 'Dedup ID must be 400 characters or less' + ) + }) + + test('TTL: new job allowed after TTL expires', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const first = await new JobDispatcher('ThrottleJob', { n: 1 }) + .dedup({ id: 'throttle-1', ttl: 80 }) + .run() + assert.equal(first.deduped, 'added') + + const second = await new JobDispatcher('ThrottleJob', { n: 2 }) + .dedup({ id: 'throttle-1', ttl: 80 }) + .run() + assert.equal(second.deduped, 'skipped') + + await setTimeout(150) + + const third = await new JobDispatcher('ThrottleJob', { n: 3 }) + .dedup({ id: 'throttle-1', ttl: 80 }) + .run() + assert.equal(third.deduped, 'added') + assert.notEqual(third.jobId, first.jobId) + + const size = await sharedAdapter.size() + assert.equal(size, 2) + }) + + test('extend: duplicate within TTL resets the window', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const first = await new JobDispatcher('ExtendJob', { n: 1 }) + .dedup({ id: 'ext-1', ttl: 100, extend: true }) + .run() + assert.equal(first.deduped, 'added') + + await setTimeout(60) + + const second = await new JobDispatcher('ExtendJob', { n: 2 }) + .dedup({ id: 'ext-1', ttl: 100, extend: true }) + .run() + assert.equal(second.deduped, 'extended') + assert.equal(second.jobId, first.jobId) + + await setTimeout(60) + + // Without extend, original 40ms TTL would've expired (50ms elapsed). + // With extend, second push reset timer → still within window. + const third = await new JobDispatcher('ExtendJob', { n: 3 }) + .dedup({ id: 'ext-1', ttl: 100, extend: true }) + .run() + assert.equal(third.deduped, 'extended') + }) + + test('replace: duplicate within TTL swaps the pending job payload', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const first = await new JobDispatcher('ReplaceJob', { version: 1 }) + .dedup({ id: 'draft-1', ttl: 100, replace: true }) + .run() + assert.equal(first.deduped, 'added') + + const second = await new JobDispatcher('ReplaceJob', { version: 2 }) + .dedup({ id: 'draft-1', ttl: 100, replace: true }) + .run() + assert.equal(second.deduped, 'replaced') + assert.equal(second.jobId, first.jobId) + + const size = await sharedAdapter.size() + assert.equal(size, 1) + + const job = await sharedAdapter.pop() + assert.deepEqual(job!.payload, { version: 2 }) + }) + + test('replace: active job is not replaced (returns skipped)', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('ActiveReplaceJob', { version: 1 }) + .dedup({ id: 'ar-1', ttl: 1000, replace: true }) + .run() + + await sharedAdapter.pop() // moves to active + + const second = await new JobDispatcher('ActiveReplaceJob', { version: 2 }) + .dedup({ id: 'ar-1', ttl: 1000, replace: true }) + .run() + + assert.equal(second.deduped, 'skipped') }) }) diff --git a/tests/otel.spec.ts b/tests/otel.spec.ts index 4fe4b93..70a7a71 100644 --- a/tests/otel.spec.ts +++ b/tests/otel.spec.ts @@ -22,13 +22,17 @@ function makeJob(overrides: Partial = {}): AcquiredJob { * Creates an instrumentation with a fake QueueManager, * captures the injected wrappers from the patched init. */ -async function setupWithWrappers(config: ConstructorParameters[0] = {}) { +async function setupWithWrappers( + config: ConstructorParameters[0] = {} +) { const instrumentation = new QueueInstrumentation(config) instrumentation.enable() let capturedConfig: any const fakeManager = { - init: async (cfg: any) => { capturedConfig = cfg }, + init: async (cfg: any) => { + capturedConfig = cfg + }, } instrumentation.manuallyRegister({ QueueManager: fakeManager }) @@ -37,13 +41,21 @@ async function setupWithWrappers(config: ConstructorParameters(fn: () => Promise, job: AcquiredJob, queue: string) => Promise, - internalOperationWrapper: capturedConfig.internalOperationWrapper as (fn: () => Promise) => Promise, + executionWrapper: capturedConfig.executionWrapper as ( + fn: () => Promise, + job: AcquiredJob, + queue: string + ) => Promise, + internalOperationWrapper: capturedConfig.internalOperationWrapper as ( + fn: () => Promise + ) => Promise, } } test.group('QueueInstrumentation | lifecycle', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('enable() is idempotent', ({ assert }) => { @@ -80,7 +92,9 @@ test.group('QueueInstrumentation | lifecycle', (group) => { }) test.group('QueueInstrumentation | dispatch via DC', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('creates PRODUCER span when no active span', async ({ assert }) => { @@ -120,7 +134,10 @@ test.group('QueueInstrumentation | dispatch via DC', (group) => { assert.isDefined(parentSpan) assert.isDefined(producerSpan) assert.equal(producerSpan!.parentSpanContext?.spanId, parentSpan!.spanContext().spanId) - assert.equal(jobData.traceContext?.traceparent?.split('-')[2], producerSpan!.spanContext().spanId) + assert.equal( + jobData.traceContext?.traceparent?.split('-')[2], + producerSpan!.spanContext().spanId + ) instrumentation.disable() }) @@ -188,7 +205,9 @@ test.group('QueueInstrumentation | dispatch via DC', (group) => { }) test.group('QueueInstrumentation | execute via executionWrapper', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('creates CONSUMER span with semconv attributes', async ({ assert }) => { @@ -321,10 +340,21 @@ test.group('QueueInstrumentation | execute via executionWrapper', (group) => { test('queue_time_ms end-to-end via dispatch then execute', async ({ assert }) => { const { instrumentation, executionWrapper } = await setupWithWrappers() - const jobData: JobData = { id: 'e2e-qt-1', name: 'E2EQueueTimeJob', payload: {}, attempts: 0, createdAt: Date.now() } + const jobData: JobData = { + id: 'e2e-qt-1', + name: 'E2EQueueTimeJob', + payload: {}, + attempts: 0, + createdAt: Date.now(), + } await dispatchChannel.tracePromise(async () => {}, { jobs: [jobData], queue: 'default' }) - const job = makeJob({ id: 'e2e-qt-1', name: 'E2EQueueTimeJob', createdAt: jobData.createdAt, traceContext: jobData.traceContext }) + const job = makeJob({ + id: 'e2e-qt-1', + name: 'E2EQueueTimeJob', + createdAt: jobData.createdAt, + traceContext: jobData.traceContext, + }) const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' } await executeChannel.tracePromise(async () => { @@ -347,11 +377,15 @@ test.group('QueueInstrumentation | execute via executionWrapper', (group) => { const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' } await executeChannel.tracePromise(async () => { - await executionWrapper(async () => { - const tracer = trace.getTracer('test-child') - const childSpan = tracer.startSpan('child-operation') - childSpan.end() - }, job, 'default') + await executionWrapper( + async () => { + const tracer = trace.getTracer('test-child') + const childSpan = tracer.startSpan('child-operation') + childSpan.end() + }, + job, + 'default' + ) }, message) const spans = getFinishedSpans() @@ -367,7 +401,9 @@ test.group('QueueInstrumentation | execute via executionWrapper', (group) => { }) test.group('QueueInstrumentation | trace linking', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('link mode links consumer to dispatch trace', async ({ assert }) => { @@ -394,7 +430,9 @@ test.group('QueueInstrumentation | trace linking', (group) => { }) test('parent mode makes consumer child of dispatch', async ({ assert }) => { - const { instrumentation, executionWrapper } = await setupWithWrappers({ executionSpanLinkMode: 'parent' }) + const { instrumentation, executionWrapper } = await setupWithWrappers({ + executionSpanLinkMode: 'parent', + }) const jobData: JobData = { id: 'parent-1', name: 'ParentedJob', payload: {}, attempts: 0 } await dispatchChannel.tracePromise(async () => {}, { jobs: [jobData], queue: 'default' }) @@ -417,7 +455,9 @@ test.group('QueueInstrumentation | trace linking', (group) => { }) test.group('QueueInstrumentation | manuallyRegister', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('patches init to inject wrappers', async ({ assert }) => { @@ -448,7 +488,9 @@ test.group('QueueInstrumentation | manuallyRegister', (group) => { }) test.group('QueueInstrumentation | custom config', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('custom messagingSystem attribute', async ({ assert }) => { diff --git a/tests/sync_adapter.spec.ts b/tests/sync_adapter.spec.ts index 5bf0e00..1d05d50 100644 --- a/tests/sync_adapter.spec.ts +++ b/tests/sync_adapter.spec.ts @@ -58,9 +58,7 @@ test.group('SyncAdapter', (group) => { assert.deepEqual(contextJobIds, Array(contextJobIds.length).fill(jobId)) }) - test('should log delayed sync job failures without unhandled rejections', async ({ - assert, - }) => { + test('should log delayed sync job failures without unhandled rejections', async ({ assert }) => { const logger = new MemoryLogger() let unhandledError: unknown const onUnhandledRejection = (error: unknown) => { diff --git a/tests/worker.spec.ts b/tests/worker.spec.ts index 226531b..fbe5b65 100644 --- a/tests/worker.spec.ts +++ b/tests/worker.spec.ts @@ -549,7 +549,9 @@ test.group('Worker', () => { const controller = new AbortController() const originalTimeout = AbortSignal.timeout const originalAddEventListener = controller.signal.addEventListener.bind(controller.signal) - const originalRemoveEventListener = controller.signal.removeEventListener.bind(controller.signal) + const originalRemoveEventListener = controller.signal.removeEventListener.bind( + controller.signal + ) let addedAbortListeners = 0 let removedAbortListeners = 0